Giter VIP home page Giter VIP logo

spark-redis's Introduction

license Release CircleCI Maven Central Javadocs Discord Codecov

Spark-Redis

A library for reading and writing data in Redis using Apache Spark.

Spark-Redis provides access to all of Redis' data structures - String, Hash, List, Set and Sorted Set - from Spark as RDDs. It also supports reading and writing with DataFrames and Spark SQL syntax.

The library can be used both with Redis stand-alone as well as clustered databases. When used with Redis cluster, Spark-Redis is aware of its partitioning scheme and adjusts in response to resharding and node failure events.

Spark-Redis also supports Spark Streaming (DStreams) and Structured Streaming.

Version compatibility and branching

The library has several branches, each corresponds to a different supported Spark version. For example, 'branch-2.3' works with any Spark 2.3.x version. The master branch contains the recent development for the next release.

Spark-Redis Spark Redis Supported Scala Versions
master 3.2.x >=2.9.0 2.12
3.0 3.0.x >=2.9.0 2.12
2.4, 2.5, 2.6 2.4.x >=2.9.0 2.11, 2.12
2.3 2.3.x >=2.9.0 2.11
1.4 1.4.x 2.10

Known limitations

  • Java, Python and R API bindings are not provided at this time

Additional considerations

This library is a work in progress so the API may change before the official release.

Documentation

Please make sure you use documentation from the correct branch (2.4, 2.3, etc).

Contributing

You're encouraged to contribute to the Spark-Redis project.

There are two ways you can do so:

Submit Issues

If you encounter an issue while using the library, please report it via the project's issues tracker.

Author Pull Requests

Code contributions to the Spark-Redis project can be made using pull requests. To submit a pull request:

  1. Fork this project.
  2. Make and commit your changes.
  3. Submit your changes as a pull request.

spark-redis's People

Contributors

alexott avatar ankiiitraj avatar chayim avatar cyq89051127 avatar dvirsky avatar fe2s avatar gkorland avatar itamarhaber avatar justinrmiller avatar mayankasthana avatar ntviet18 avatar phyok avatar rylanhalteman avatar shaynativ avatar sunheehnus avatar tgrall avatar varunwachaspati avatar xianwill avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

spark-redis's Issues

how to push spark streamint data into redis by spark-redis?

hi,
Thanks for you provide spark-reidis code for us. Now I having a problem .
when I use KafkaUtils to create a stream ,then I donot known how to push kakfa stream data to redis.
my code is bellow,
val streams = KafkaUtils.createStream(ssc, params.ZK_URL, params.GROUP_ID, topicMap).map(_._2)

how to set expire time of redis' keys

Hi there,

I checked the code and doc, and could not find how to set expire time of keys? Did I missing something or this function was not provided?

Thanks.

data from redis all in one partition

My environment:
only 1 redis, not master/slave,not cluster
I found RedisConfig.scala in line 204, qualify the end :16383.
Maybe it cause my problem:
when i use spark-redis like :

    val rdd = spark.sparkContext.fromRedisList(key,8)
   // import spark.implicits._
    logWarning("RDD Partitions Num is : " + rdd.partitions.length)
    rdd.saveAsTextFile(parquetFile+date)

but result:

[hadoop@hadoop-01 job]$ hadoop fs -ls 1007/20161018
Found 9 items
-rw-r--r--   3 hadoop supergroup          0 2016-10-19 14:21 1007/20161018/_SUCCESS
-rw-r--r--   3 hadoop supergroup 1513178095 2016-10-19 14:21 1007/20161018/part-00000
-rw-r--r--   3 hadoop supergroup          0 2016-10-19 14:20 1007/20161018/part-00001
-rw-r--r--   3 hadoop supergroup          0 2016-10-19 14:20 1007/20161018/part-00002
-rw-r--r--   3 hadoop supergroup          0 2016-10-19 14:20 1007/20161018/part-00003
-rw-r--r--   3 hadoop supergroup          0 2016-10-19 14:20 1007/20161018/part-00004
-rw-r--r--   3 hadoop supergroup          0 2016-10-19 14:20 1007/20161018/part-00005
-rw-r--r--   3 hadoop supergroup          0 2016-10-19 14:20 1007/20161018/part-00006
-rw-r--r--   3 hadoop supergroup          0 2016-10-19 14:20 1007/20161018/part-00007

The value of poolConfig.setMaxTotal is too small

In ConnectionPool.scala I found:
poolConfig.setMaxTotal(250)

But when using Sparkstreaming and call this function:
def onStart() {
val executorPool = ThreadUtils.newFixedThreadPool(keys.length, "BlockLists Streaming")
try {
/* start a executor for each interested List */
keys.foreach{ key =>
executorPool.submit(new MessageHandler(redisConfig.connectionForKey(key), key))
}
} finally {
executorPool.shutdown()
}
}

If keys.length>250, redis will reject rest part of connections, then I cannot put all data into stream(I have >1000 K/Vs in redis). After modifying the maxTotal to 2048 this issue resolved.

sbt build with spark 1.6.1 not working

Hi!

I'm trying to build the package with sbt package for spark version 1.6.1 and the following error occurs:

16:56 $ sbt package
[info] Loading project definition from /Users/manuelmiranda/Downloads/spark-1.6.1-bin-without-hadoop/lib/spark-redis/project
[info] Set current project to spark-redis (in build file:/Users/manuelmiranda/Downloads/spark-1.6.1-bin-without-hadoop/lib/spark-redis/)
[warn] Credentials file /Users/manuelmiranda/.ivy2/.sbtcredentials does not exist
[info] Compiling 9 Scala sources to /Users/manuelmiranda/Downloads/spark-1.6.1-bin-without-hadoop/lib/spark-redis/target/scala-2.11/classes...
[error] /Users/manuelmiranda/Downloads/spark-1.6.1-bin-without-hadoop/lib/spark-redis/src/main/scala/com/redislabs/provider/redis/sql/DefaultSource.scala:10: object Row is not a member of package org.apache.spark.sql.catalyst.expressions
[error] import org.apache.spark.sql.catalyst.expressions.Row
[error]        ^
[error] /Users/manuelmiranda/Downloads/spark-1.6.1-bin-without-hadoop/lib/spark-redis/src/main/scala/com/redislabs/provider/redis/sql/DefaultSource.scala:53: not found: type Row
[error]         val m: Map[String, Row] = partition.map {
[error]                            ^
[error] /Users/manuelmiranda/Downloads/spark-1.6.1-bin-without-hadoop/lib/spark-redis/src/main/scala/com/redislabs/provider/redis/sql/DefaultSource.scala:78: not found: type Row
[error]   def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
[error]                                                                              ^
[error] /Users/manuelmiranda/Downloads/spark-1.6.1-bin-without-hadoop/lib/spark-redis/src/main/scala/com/redislabs/provider/redis/sql/DefaultSource.scala:113: not found: value Row
[error] Error occurred in an application involving default arguments.
[error]           }.toIterator.map(Row.fromSeq(_))
[error]                            ^
[error] four errors found
[error] (compile:compile) Compilation failed
[error] Total time: 15 s, completed Jun 15, 2016 4:56:33 PM

Same happens with version 1.5.2 which is supposed to work according to this post

If I set the version to 1.4.1 (same set as in pom.xml) it works correctly.

sc.toRedis throws NotSerializableException for the stateful streaming job

I have a stateful streaming Spark job like the one below. I'd like to save some calculated values to Redis. Redis seems to work fine without ssc.checkpoint. When I add checkpoint it starts to fail with an error java.io.NotSerializableException: org.apache.spark.SparkContext. But I need checkpoint to persist the state. How to make redis-spark work for me? Am I missing something?

object StreamingWordCount {

  case class WordItem(item: String, count: Long)

  case class WordGroup(key: String, count: Long)

  def updateUserEvents(key: String, value: Option[WordItem], state: State[WordGroup]): Option[WordGroup] = {

    val existing = {
      state.getOption().map(_.count).getOrElse(0L)
    }

    val updated = {
      value.map(s => WordGroup(key, s.count + existing)).getOrElse(WordGroup(key, existing))
    }

    state.update(updated)
    Some(updated)
  }

  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println(
        s"""
           |Usage: DirectKafkaWordCount <brokers> <topics>
           |  <brokers> is a list of one or more Kafka brokers
           |  <topics> is a list of one or more kafka topics to consume from
           |
        """.stripMargin) // TODO redis stuff as parameters
      System.exit(1)
    }

    val Array(brokers, topics) = args

    // Create context with 2 second batch interval
    val sparkConf = new SparkConf()
      .setMaster("local[*]")
      .setAppName("StreamingWordCount")
      .set("redis.host", "redis")
      .set("redis.port", "6379")
      .set("redis.auth", "bunchofmonkeys")

    val sc = new SparkContext(sparkConf)
    val ssc = new StreamingContext(sc, Seconds(10))
    ssc.checkpoint("/tmp/checkpoint")

    // Create direct kafka stream with brokers and topics
    val topicsSet = topics.split(",").toSet
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topicsSet)

    // Get the lines, split them into words, count the words and print
    val lines: DStream[String] = messages.map(_._2)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _).map(s => (s._1, s._2))

    // Merges the current batch with the state
    val stateSpec = StateSpec.function(updateUserEvents _)
    val states = wordCounts.map(s => (s._1, WordItem(s._1, s._2))).mapWithState(stateSpec)
    states.print()

    // import com.redislabs.provider.redis._

    val prepared = states.flatMap(s => s).map(s => (s.key, s.count.toString))

    prepared.foreachRDD { rdd =>
        if (!rdd.isEmpty()) {
        // sc.toRedisZSET(rdd, "word_count", 0) // doesn't work here :: java.io.NotSerializableException: org.apache.spark.SparkContext
          rdd.foreach(wordGroup => {
            // sc.toRedisZSET(rdd, "word_count", 0) // doesn't fit here
            println(s"### $wordGroup")
          })
      }
    }

    // Start the computation
    ssc.start()
    ssc.awaitTermination()
  }
}

Why loading 4000 images into redis using spark-submit takes time (9 Minutes) longer than loading the same images into HBase (2.5 Minutes)?

Loading Images into Redis should be much faster than doing the same thing using Hbase since Redis deals with RAM while HBase uses HDFS to store the data. I was surprised when I loaded 4000 images into Redis, it took 9 Minutes to finish! While the same process I've done using HBase and It took only 2.5 Minutes. Is there an interpretation for this? Any Suggestions to improve my code? Here is my code:
`// The code for loading the images into Hbase (adopted from NIST)
val conf = new SparkConf().setAppName("Fingerprint.LoadData")
val sc = new SparkContext(conf)
Image.dropHBaseTable() Image.createHBaseTable()
val checksum_path = args(0)
println("Reading paths from: %s".format(checksum_path.toString))
val imagepaths = loadImageList(checksum_path) println("Got %s images".format(imagepaths.length))
imagepaths.foreach(println)
println("Reading files into RDD")
val images = sc.parallelize(imagepaths).map(paths => Image.fromFiles(paths._1, paths._2))
println(s"Saving ${images.count} images to HBase")
Image.toHBase(images)
println("Done")

}
val conf = new SparkConf().setAppName("Fingerprint.LoadData")
val sc = new SparkContext(conf)
Image.dropHBaseTable()
Image.createHBaseTable()
val checksum_path = args(0)
println("Reading paths from: %s".format(checksum_path.toString))
val imagepaths = loadImageList(checksum_path)
println("Got %s images".format(imagepaths.length))
imagepaths.foreach(println)
println("Reading files into RDD")
val images = sc.parallelize(imagepaths) .map(paths => Image.fromFiles(paths._1, paths._2)) println(s"Saving ${images.count} images to HBase")
Image.toHBase(images) println("Done")

} def toHBase(rdd: RDD[T]): Unit = {

 val cfg = HBaseConfiguration.create()
 cfg.set(TableOutputFormat.OUTPUT_TABLE, tableName)
 val job = Job.getInstance(cfg)
 job.setOutputFormatClass(classOf[TableOutputFormat[String]])
 rdd.map(Put).saveAsNewAPIHadoopDataset(job.getConfiguration)

}
//The code for Loading images into Redis

val images = sc.parallelize(imagepaths).map(paths => Image.fromFiles(paths._1, paths._2)).collect
for(i <- images){
val stringRdd = sc.parallelize(Seq((i.uuid, new String(i.Png, StandardCharsets.UTF_8))))
sc.toRedisKV(stringRdd)(redisConfig)
stringRdd.collect}
println("Done")`

Using fromRedisZSetWithScore in Zeppelin Fails

I am trying to use Zeppelin to visualize table in Redis.

I have follwoing AWS EMR setup to run Zeppelin
Release label:emr-5.4.0
Hadoop distribution:Amazon 2.7.3
Applications:Ganglia 3.7.2, Spark 2.1.0, Zeppelin 0.7.0

I am running with following Redis-Spark JARs
% ls /usr/share/dal/lib
commons-pool2-2.3.jar jedis-2.7.2.jar spark-redis-0.3.2.jar

I have setup the Zeppelin CLASSPATH environment to pull the proper JARs for Redis-Spark:
$ cat /etc/zeppelin/conf/zeppelin-env.sh
....
export CLASSPATH=":/usr/lib/hadoop-lzo/lib/:/usr/lib/hadoop/hadoop-aws.jar:/usr/share/aws/aws-java-sdk/:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/:/usr/share/aws/emr/emrfs/auxlib/:/usr/share/dal/lib/*"
....

I can ssh to the node and run this code from from the spark-shell CLI on the cluster.

Here is the code that I run in the Zeppelin Spark Interpreter and the the error that is returned:

import com.redislabs.provider.redis._
import org.apache.spark.sql.SparkSession

def redisSpark(host: String, port: String): SparkSession = SparkSession.builder().master("local").appName(this.getClass.getSimpleName).config("redis.host", host).config("redis.port", port).getOrCreate()

sc.stop()
val spark = redisSpark("172.28.4.177", "6379")

val rdd = spark.sparkContext.fromRedisZSetWithScore("portal.300107.month:net:user:device:pid:dsrc.hits")
rdd.foreach(println)

rdd: org.apache.spark.rdd.RDD[(String, Double)] = RedisZSetRDD[11] at RDD at RedisRDD.scala:117
org.apache.spark.SparkException: Job aborted due to stage failure: Task serialization failed: java.io.IOException: Failed to create local dir in /mnt/tmp/blockmgr-97b1f7a1-ca02-4d7a-a362-7f08db74109b/0c.
java.io.IOException: Failed to create local dir in /mnt/tmp/blockmgr-97b1f7a1-ca02-4d7a-a362-7f08db74109b/0c.
at org.apache.spark.storage.DiskBlockManager.getFile(DiskBlockManager.scala:70)
at org.apache.spark.storage.DiskStore.remove(DiskStore.scala:111)
at org.apache.spark.storage.BlockManager.removeBlockInternal(BlockManager.scala:1339)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:910)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:948)
at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:726)
at org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:1233)
at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:122)
at org.apache.spark.broadcast.TorrentBroadcast.(TorrentBroadcast.scala:88)
at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:56)
at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1411)
at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:996)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:918)
at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:862)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1613)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1006)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:918)
at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:862)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1613)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1958)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:917)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:915)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.foreach(RDD.scala:915)
... 56 elided
Caused by: java.io.IOException: Failed to create local dir in /mnt/tmp/blockmgr-97b1f7a1-ca02-4d7a-a362-7f08db74109b/0c.
at org.apache.spark.storage.DiskBlockManager.getFile(DiskBlockManager.scala:70)
at org.apache.spark.storage.DiskStore.remove(DiskStore.scala:111)
at org.apache.spark.storage.BlockManager.removeBlockInternal(BlockManager.scala:1339)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:910)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:948)
at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:726)
at org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:1233)
at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:122)
at org.apache.spark.broadcast.TorrentBroadcast.(TorrentBroadcast.scala:88)
at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:56)
at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1411)
at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:996)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:918)
at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:862)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1613)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

Support write/read spark Dataframe or Dataset?

Hi, I'm lucky to find this project to store RDD to Redis. But the code has not been updated for a long time, and I can't find the corresponding document for #32. Can we support Dataframe or Dataset now?
thanks!

Support expiration

Proposed API change:

right now the API for saving RDDs to redis looks like:

def toRedisKV(kvs: RDD[(String, String)])

We should just change it to:

def toRedisKV(kvs: RDD[(String, String)], ttl:Int = 0)

Where 0 means no expiration.

Same applies for all other save funcs.

Improve unit tests

  1. RedisConfig can be tested with mock data without starting redis instances - validate partitioning functions, node selection, etc.
  2. RedisEndpoint can be tested with a single working redis instance.
  3. any other improvements and lower level tests will be welcome.

Can't run with aws ElasticCache

Hello, after setting up spark-redis I can't get it to work with AWS ElasticCache, but everything run fines using a local Redis. I get the following exception:

Exception in thread "main" redis.clients.jedis.exceptions.JedisConnectionException: Could not get a resource from the pool
        at redis.clients.util.Pool.getResource(Pool.java:50)
        at redis.clients.jedis.JedisPool.getResource(JedisPool.java:86)
        at com.redislabs.provider.redis.ConnectionPool$.connect(ConnectionPool.scala:33)
        at com.redislabs.provider.redis.RedisEndpoint.connect(RedisConfig.scala:67)
        at com.redislabs.provider.redis.RedisConfig.clusterEnabled(RedisConfig.scala:139)
        at com.redislabs.provider.redis.RedisConfig.getNodes(RedisConfig.scala:250)
        at com.redislabs.provider.redis.RedisConfig.getHosts(RedisConfig.scala:166)
        at com.redislabs.provider.redis.RedisConfig.<init>(RedisConfig.scala:89)
        at com.redislabs.provider.redis.RedisContext.toRedisKV$default$3(redisFunctions.scala:225)

About Support for Python?

Hi, I use spark with python, but I noticed java python and R API bindings are not provided.
Any plans to include python API?

toRedisKV method can't handle a large data

I faced NegativeArraySizeException because of the memory shortage while handling a large data.
i.e. the data size is over 20GB, the cluster has about 60 machines(each has 128GB memory)

I think groupByKey function is the cause of this problem, because groupByKey has a risk of consuming memory of the just one machine.

see:

http://mail-archives.us.apache.org/mod_mbox/spark-user/201409.mbox/<[email protected]>

Fails silently when redis instance is full

When writing to an AWS Elasticache cluster, my Spark job was completing succesfully, but the data was not showing up in REDIS. After a while, I realized it was because my Redis database had exceeded the Elasticache size, which explains why the writes were failing. However, it would be nice to get an error from spark-redis when it fails to write.

I don't know if this is a general Redis problem or a general Elasticache problem, but from what I can tell this is a spark-redis problem?

How to use spark to access Redis which running on sentinel mode?

Well, I'm a rookie. I want to access Redis in my spark project.
Here is the Redis process running on my server.

$ ps -ef|grep redis
kam 7237 4438 0 14:01 pts/0 00:00:00 grep --color=auto redis
kam 7469 1 0 Aug07 ? 00:10:23 src/redis-server bogon:6379
kam 7541 1 0 Aug07 ? 00:11:56 src/redis-server bogon:6380
kam 7547 1 0 Aug07 ? 00:11:57 src/redis-server bogon:6381
kam 7835 1 0 Aug07 ? 00:10:27 src/redis-sentinel *:26379 [sentinel]

Maybe I will raise up another sentinel using port 26380
So how shoud I config the spark-redis?
I will highly appreciate your help..

Support ZSET RDDs from sorted sets

We want to support sorted sets as a source for redis RDDs.

proposed API:

def fromRedisZRange(key: String,
                                     start: Int,
                                     stop: Int, 
                                     withScores: Boolean) : RedisZSetRDD

and of course add the ZSET RDD to match.

Other functions to support for ZSET RDDs:

  • ZRANGE
  • ZREVRANGE
  • ZRANGEBYSCORE
  • ZREVRANGEBYSCORE

Can not load data from redis cluster, but can load data from Standalone

I use spark-redis in java project, and found it can load data from standalone redis, but can not do that from redis cluster?

why? thanks.

code as below

  SparkConf sparkConf = new SparkConf().setAppName(appname)
            .set("redis.host", "10.11.44.99") 
            .set("redis.port", "7777");
  JavaSparkContext jsc = new JavaSparkContext(sparkConf);
  Dataset<Row> ds = snapshot_redis_data(jsc.sc(), sparkConf, new Time(0));


public static Dataset<Row> snapshot_redis_data(SparkContext sc, SparkConf conf, Time ts) {
    println("snapshot_redis_data: "
     + " redis.host=" + conf.get("redis.host")
     + " redis.port=" + conf.get("redis.port"));

    RedisConfig rCfg = new RedisConfig(new RedisEndpoint(conf));
    RedisContext rCtx = new RedisContext(sc);

    SparkSession spark = JavaSparkSessionSingleton.getInstance(conf);

    RDD<Tuple2<String, String>> kvRdd = rCtx.fromRedisKV("track.*", 5, rCfg);

    JavaRDD<Row> userRDD = kvRdd.toJavaRDD().map(new Function<Tuple2<String, String>, Row>() {
        public Row call(Tuple2<String, String> tuple2) throws Exception {
            logger.info(tuple2._1 + ":" + tuple2._2);
            String key = tuple2._1;
            String value = tuple2._2;

            String[] ss = key.split("\\.");

            return RowFactory.create(new Timestamp(ts.milliseconds()),
                    Long.parseLong(ss[2]),
                    Long.parseLong(ss[3]),
                    Long.parseLong(value));
        }
    });

    List<StructField> structFields = new ArrayList<StructField>();
    structFields.add(DataTypes.createStructField("ts", DataTypes.TimestampType, true));
    structFields.add(DataTypes.createStructField("rt", DataTypes.LongType, true));
    structFields.add(DataTypes.createStructField("item_id", DataTypes.LongType, true));
    structFields.add(DataTypes.createStructField("count", DataTypes.LongType, true));
    StructType structType = DataTypes.createStructType(structFields);

    Dataset<Row> ds = spark.createDataFrame(userRDD, structType);
    return ds;
}

17/10/10 00:21:03 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
redis.clients.jedis.exceptions.JedisConnectionException: Could not get a resource from the pool
at redis.clients.util.Pool.getResource(Pool.java:50)
at redis.clients.jedis.JedisPool.getResource(JedisPool.java:86)
at com.redislabs.provider.redis.ConnectionPool$.connect(ConnectionPool.scala:33)
at com.redislabs.provider.redis.RedisEndpoint.connect(RedisConfig.scala:67)
at com.redislabs.provider.redis.rdd.Keys$$anonfun$getKeys$1.apply(RedisRDD.scala:417)
at com.redislabs.provider.redis.rdd.Keys$$anonfun$getKeys$1.apply(RedisRDD.scala:416)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at com.redislabs.provider.redis.rdd.Keys$class.getKeys(RedisRDD.scala:416)
at com.redislabs.provider.redis.rdd.RedisKeysRDD.getKeys(RedisRDD.scala:189)
at com.redislabs.provider.redis.rdd.RedisKeysRDD.compute(RedisRDD.scala:271)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at com.redislabs.provider.redis.rdd.RedisKVRDD.compute(RedisRDD.scala:30)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: redis.clients.jedis.exceptions.JedisConnectionException: java.net.ConnectException: Connection refused (Connection refused)
at redis.clients.jedis.Connection.connect(Connection.java:155)
at redis.clients.jedis.BinaryClient.connect(BinaryClient.java:83)
at redis.clients.jedis.BinaryJedis.connect(BinaryJedis.java:1643)
at redis.clients.jedis.JedisFactory.makeObject(JedisFactory.java:85)
at org.apache.commons.pool2.impl.GenericObjectPool.create(GenericObjectPool.java:861)
at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:435)
at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:363)
at redis.clients.util.Pool.getResource(Pool.java:48)
... 37 more

spark-redis support for spark 1.6 and 2.0 - as well as dataframes

is it just me, or does the spark-redis connector not work with spark 1.6?

assuming it won't work with spark 2.0, but haven't tested.

lastly, i was under the impression that this connector supports the Spark DataFrame API.

i've been trying to integrate Redis into my end-to-end recommendation pipeline github repo: https://github.com/fluxcapacitor/pipeline/wiki

but it's crashing all over the place.

any assistance would be appreciated. for now, i'm just using jedis directly, but that's not ideal.

please reach out to me directly @ [email protected] if you'd prefer. we can circle back on this issue once we figure things out.

Maven build failed for Oozie 4.3.0

I want to use Oozie for automating Spark Jobs. I'm trying to build Oozie using Maven 3.3.9. I have modified pom.xml file for hadoop 2.7.1, java 1.7, hbase 1.2.0, oozie 4.3.0, I got the error described below during ./mkdistro.sh Maven build command. It sounds like it is related to hbase credentials class. what modifications I have to do to pass this error?

[INFO] --- maven-compiler-plugin:2.3.2:compile (default-compile) @ oozie-core ---
[INFO] Compiling 517 source files to /usr/lib/oozie/oozie-4.3.0/core/target/classes
[INFO] -------------------------------------------------------------
[ERROR] COMPILATION ERROR :
[INFO] -------------------------------------------------------------
[ERROR] /usr/lib/oozie/oozie-4.3.0/core/src/main/java/org/apache/oozie/action/hadoop/HbaseCredentials.java:[28,45] error: package org.apache.hadoop.hbase.security.token does not exist
[ERROR] /usr/lib/oozie/oozie-4.3.0/core/src/main/java/org/apache/oozie/action/hadoop/HbaseCredentials.java:[29,45] error: package org.apache.hadoop.hbase.security.token does not exist
[ERROR] /usr/lib/oozie/oozie-4.3.0/core/src/main/java/org/apache/oozie/action/hadoop/HbaseCredentials.java:[83,14] error: cannot find symbol
[ERROR] class HbaseCredentials
/usr/lib/oozie/oozie-4.3.0/core/src/main/java/org/apache/oozie/action/hadoop/HbaseCredentials.java:[84,48] error: cannot find symbol
[ERROR] class HbaseCredentials
/usr/lib/oozie/oozie-4.3.0/core/src/main/java/org/apache/oozie/action/hadoop/HbaseCredentials.java:[85,29] error: cannot find symbol
[ERROR] class AuthenticationTokenIdentifier
/usr/lib/oozie/oozie-4.3.0/core/src/main/java/org/apache/oozie/action/hadoop/HbaseCredentials.java:[86,27] error: cannot find symbol
[INFO] 6 errors
[INFO] -------------------------------------------------------------
[INFO] ------------------------------------------------------------------------
[INFO] Reactor Summary:
[INFO]
[INFO] Apache Oozie Main .................................. SUCCESS [ 2.452 s]
[INFO] Apache Oozie Hadoop Utils hadoop-2-4.3.0 ........... SUCCESS [ 4.780 s]
[INFO] Apache Oozie Hadoop Distcp hadoop-2-4.3.0 .......... SUCCESS [ 0.151 s]
[INFO] Apache Oozie Hadoop Auth hadoop-2-4.3.0 Test ....... SUCCESS [ 0.698 s]
[INFO] Apache Oozie Hadoop Libs ........................... SUCCESS [ 0.082 s]
[INFO] Apache Oozie Client ................................ SUCCESS [ 19.330 s]
[INFO] Apache Oozie Share Lib Oozie ....................... SUCCESS [ 4.273 s]
[INFO] Apache Oozie Share Lib HCatalog .................... SUCCESS [ 4.933 s]
[INFO] Apache Oozie Share Lib Distcp ...................... SUCCESS [ 1.104 s]
[INFO] Apache Oozie Core .................................. FAILURE [ 11.598 s]
[INFO] Apache Oozie Share Lib Streaming ................... SKIPPED
[INFO] Apache Oozie Share Lib Pig ......................... SKIPPED
[INFO] Apache Oozie Share Lib Hive ........................ SKIPPED
[INFO] Apache Oozie Share Lib Hive 2 ...................... SKIPPED
[INFO] Apache Oozie Share Lib Sqoop ....................... SKIPPED
[INFO] Apache Oozie Examples .............................. SKIPPED
[INFO] Apache Oozie Share Lib Spark ....................... SKIPPED
[INFO] Apache Oozie Share Lib ............................. SKIPPED
[INFO] Apache Oozie Docs .................................. SKIPPED
[INFO] Apache Oozie WebApp ................................ SKIPPED
[INFO] Apache Oozie Tools ................................. SKIPPED
[INFO] Apache Oozie MiniOozie ............................. SKIPPED
[INFO] Apache Oozie Distro ................................ SKIPPED
[INFO] Apache Oozie ZooKeeper Security Tests .............. SKIPPED
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 52.026 s
[INFO] Finished at: 2018-07-25T11:42:47+03:00
[INFO] Final Memory: 127M/797M
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:2.3.2:compile (default-compile) on project oozie-core: Compilation failure: Compilation failure:
[ERROR] /usr/lib/oozie/oozie-4.3.0/core/src/main/java/org/apache/oozie/action/hadoop/HbaseCredentials.java:[28,45] error: package org.apache.hadoop.hbase.security.token does not exist
[ERROR] /usr/lib/oozie/oozie-4.3.0/core/src/main/java/org/apache/oozie/action/hadoop/HbaseCredentials.java:[29,45] error: package org.apache.hadoop.hbase.security.token does not exist
[ERROR] /usr/lib/oozie/oozie-4.3.0/core/src/main/java/org/apache/oozie/action/hadoop/HbaseCredentials.java:[83,14] error: cannot find symbol
[ERROR] class HbaseCredentials
[ERROR] /usr/lib/oozie/oozie-4.3.0/core/src/main/java/org/apache/oozie/action/hadoop/HbaseCredentials.java:[84,48] error: cannot find symbol
[ERROR] class HbaseCredentials
[ERROR] /usr/lib/oozie/oozie-4.3.0/core/src/main/java/org/apache/oozie/action/hadoop/HbaseCredentials.java:[85,29] error: cannot find symbol
[ERROR] class AuthenticationTokenIdentifier
[ERROR] /usr/lib/oozie/oozie-4.3.0/core/src/main/java/org/apache/oozie/action/hadoop/HbaseCredentials.java:[86,27] error: cannot find symbol
[ERROR] -> [Help 1]
[ERROR]
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.

duplicate tests

There are two versions of each and every integration test: standalone and cluster (e.g. KeysClusterSuite and KeysStandaloneSuite). The code is identical with the only difference in Redis port number and authentication.

It should be refactored to remove duplicate code.

Can someone please add the latest changes to maven repo?

Looking at the commit history, Shay Nativ has added compatibility to spark 2.1 on Oct 2017. We are planning on using this connector for our Spark Redis combo. It will be highly beneficial to everyone if someone can build and add this connector's latest build/jar file to the maven repository so that we don't have to build and maintain a private repo for the latest changes.

Redis Spark JedisDataException: ERR wrong number of arguments

When trying to connect to redis from spark i am getting following exception show below I am not sure i am missing here.

Code:

val sc = new SparkContext(new SparkConf()
.setMaster("local").setAppName(getClass.getName)
.set("redis.host", "127.0.0.1")
.set("redis.port", "6379")
.set("redis.auth", "xxxx"))
val content = fromInputStream(getClass.getClassLoader.getResourceAsStream("blog")).
getLines.toArray.mkString("\n")

val redisConfigStandalone = new RedisConfig(new RedisEndpoint("127.0.0.1", 6379))

val redisConfigCluster = new RedisConfig(new RedisEndpoint("127.0.0.1", 6379))
val wcnts = sc.parallelize(content.split("\W+").filter(!.isEmpty)).map((, 1)).reduceByKey(_ + _).map(x => (x._1, x._2.toString))

val wds = sc.parallelize(content.split("\\W+").filter(!_.isEmpty))
println(wds.count())

Exception:

Exception in thread "main" redis.clients.jedis.exceptions.JedisDataException: ERR wrong number of arguments for 'info' command
at redis.clients.jedis.Protocol.processError(Protocol.java:117)
at redis.clients.jedis.Protocol.process(Protocol.java:142)
at redis.clients.jedis.Protocol.read(Protocol.java:196)
at redis.clients.jedis.Connection.readProtocolWithCheckingBroken(Connection.java:288)
at redis.clients.jedis.Connection.getBinaryBulkReply(Connection.java:207)
at redis.clients.jedis.Connection.getBulkReply(Connection.java:196)
at redis.clients.jedis.BinaryJedis.info(BinaryJedis.java:2671)
at com.redislabs.provider.redis.RedisConfig.clusterEnabled(RedisConfig.scala:127)
at com.redislabs.provider.redis.RedisConfig.getSlots(RedisConfig.scala:204)
at com.redislabs.provider.redis.RedisConfig.getHosts(RedisConfig.scala:159)
at com.redislabs.provider.redis.RedisConfig.(RedisConfig.scala:89)
at com.aline.cloudscm.util.SimpleApp2$.main(SimpleApp2.scala:21)
at com.aline.cloudscm.util.SimpleApp2.main(SimpleApp2.scala)
16/02/28 15:33:27 INFO SparkContext: Invoking stop() from shutdown hook

Support for HINCRBY, LTRIM and LRANGE

Hi @sunheehnus ,

Thanks for providing the library to connect from spark. I am wondering if you support the HINCRBY on Hash and the LTRIM and LRANGE operations on List.

I don't find any code related to these operations. So thought of checking with you. Thanks for your time.

Sano

Update Redis Version in Spark Package Repo

I am using spark 1.6.2 and although I can write to redis, reading does not seem to be working, I am using 0.1.1 as that is the only one available on the spark package repo. Please advise.

sc.toRedisKV(sc.parallelize(("testing", "3") :: Nil), ("localhost", 6379))

127.0.0.1:6379> get "testing"
"3"

val valuesRDD = sc.fromRedisKV("testing", ("localhost", 6379))
:51: error: value fromRedisKV is not a member of org.apache.spark.SparkContext
val valuesRDD = sc.fromRedisKV("testing", ("localhost", 6379))

Is there a tuning guide when using spark-redis?

I am trying to use spark-redis to access data from redis 3.2.0.
I need a tuning guide to make my app as fast as possible.

BTW,due to my App need to do some key-based data deduplication and simple filtering, I chose hash as my data type, do you have any comments or experiences on this?

Remove/Update elements of an existing list in Redis

Hi,

Are there methods to update an existing list in Redis? Methods like toRedisLIST and toRedisFixedLIST just push elements to an existing list, i.e. they use rpush and lpush. I would like something like lset

Thanks,
Cristian

Use Connection Pooling

Proposed solution:

Add a lazy pool to RedisEndpoint objects.

Make sure they are not created on the fly when not needed.

Redis info can't get

My company use SSDB instead of Redis, SSDB can't get "redis info", so when I write data to SSDB, I meet this error:

Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: 0
at com.redislabs.provider.redis.RedisConfig.clusterEnabled(RedisConfig.scala:141)
at com.redislabs.provider.redis.RedisConfig.getNodes(RedisConfig.scala:250)
at com.redislabs.provider.redis.RedisConfig.getHosts(RedisConfig.scala:166)
at com.redislabs.provider.redis.RedisConfig.(RedisConfig.scala:89)
at com.jj.sparkredis.MulRedis$.writeToRedis$1(MulRedis.scala:23)
at com.jj.sparkredis.MulRedis$.main(MulRedis.scala:26)
at com.jj.sparkredis.MulRedis.main(MulRedis.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:729)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

so I patch at https://github.com/shouwangv6/spark-redis/blob/patch-3/src/main/scala/com/redislabs/provider/redis/RedisConfig.scala

Strange issue when trying to connect to RedisLabs managed cluster

I encountered a strange issue. When I was developing/testing with local redis server everything worked, but when I tried to connect to a cluster that managed by RedisLabs.

Code Snippets:
val sc = new SparkContext(new SparkConf()
.setMaster("local")
.setAppName("PreProcess")
.set("redis.host", "10.10.10.10")
.set("redis.port", "12345")
.set("redis.auth", "xxxxxxxx"))

valueTuple.foreach(each => sc.toRedisHASH(sc.parallelize((each.2).toList), "h_follow"+each._1))

I encountered the following error:
============================ error log ==============================
Exception in thread "main" java.lang.NegativeArraySizeException

at redis.clients.jedis.Protocol.processBulkReply(Protocol.java:159)

at redis.clients.jedis.Protocol.process(Protocol.java:136)

at redis.clients.jedis.Protocol.read(Protocol.java:196)

at redis.clients.jedis.Connection.readProtocolWithCheckingBroken(Connection.java:288)

at redis.clients.jedis.Connection.getBinaryBulkReply(Connection.java:207)

at redis.clients.jedis.Connection.getBulkReply(Connection.java:196)

at redis.clients.jedis.BinaryJedis.info(BinaryJedis.java:2671)

at com.redislabs.provider.redis.RedisConfig.clusterEnabled(RedisConfig.scala:146)

at com.redislabs.provider.redis.RedisConfig.getNodes(RedisConfig.scala:252)

at com.redislabs.provider.redis.RedisConfig.getHosts(RedisConfig.scala:168)

at com.redislabs.provider.redis.RedisConfig.(RedisConfig.scala:96)

at com.redislabs.provider.redis.RedisContext.toRedisHASH$default$4(redisFunctions.scala:58)

at PreProcess$$anonfun$main$6.apply(preprocess.scala:115)

at PreProcess$$anonfun$main$6.apply(preprocess.scala:115)

at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)

at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)

at PreProcess$.main(preprocess.scala:115)

at PreProcess.main(preprocess.scala)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:497)

at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)

at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)

at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)

at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)

at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Running the spark cluster with yarn

My code:

    val sc = new SparkContext(new SparkConf()
        .setMaster(sparkMode)
        .setAppName("recommendation")
        .set("redis.host", redisHost)
        .set("redis.port", redisPort)
        .set("redis.auth", redisPassword))

where sparkMode = 'yarn-cluster'

I am getting the following error:

diagnostics: User class threw exception: redis.clients.jedis.exceptions.JedisDataException: ERR unknown command 'CLUSTER'

How to load 4000 images into redis using spark?

I don't have much experience in spark and redis, here is what I wrote trying to load 4000 images into Redis using spark:
def main(args: Array[String]) = {
val conf=new SparkConf()
.setMaster("local")
.setAppName("LoadImagesIntoRedis")
// initial redis host - can be any node in cluster mode
.set("redis.host", "")
// initial redis port
.set("redis.port", "6379")
// optional redis AUTH password
.set("redis.auth", "
")
val sc = new SparkContext(conf)
val redisServerDnsAddress = ""
val redisPortNumber = 6379
val redisPassword = "
"
val redisConfig = new RedisConfig(new RedisEndpoint(redisServerDnsAddress, redisPortNumber, redisPassword))
val checksum_path = "/tmp/sd04_md5.lst"
println("Reading paths from: %s".format(checksum_path.toString))
val imagepaths = LoadData.loadImageList(checksum_path)
println("Got %s images".format(imagepaths.length))
imagepaths.foreach(println)
println("Reading files into RDD")
val images = sc.parallelize(imagepaths).map(paths => Image.fromFiles(paths._1, paths._2))
println(s"Saving ${images.count} images to Redis")
sc.toRedisSET(images, "ImagesSet")(redisConfig)
println("Done")
images.collect()
}
I'm getting mismatch error after compilation on the line :
sc.toRedisSET(images, "ImagesSet")(redisConfig)
Is there any way to load the images using spark, note that I need to use spark to do image processing after loading into redis.

How do performance by keys?

In redis, the Time complexity of 'keys' commond is O(n).
If redis server have a great many of keys , will keys RDD block redis server for long time ?

authentication

Is there a way to pass in authentication credential for the connection?

state of the connector with Spark 2.1 or later

I am trying to use the spark-redis connector with spark 2.1, but seems its not compatible with latest spark.

The documentation mentions that the connector is tested with Apache Spark v1.4.0.
Could you please help me understand the state of the connector with Spark 2.1 or later?

Library dependencies

Hi,

I try 2 ways to add spark-redis dependencies:

  • Grable
  • SBT
    and this is error message:
    screenshot from 2017-08-23 11-01-00
    If i add library via jar file, it's ok. Please help to check this issue.

Thanks,
TrietNV

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.