Spark compatibility

According to the spark-redis documentation, spark is able to read or write to any redis data type. I’m thinking this may not include RedisJSON? We’ve used this module to standardized our storage of data in Redis and I’m starting to think this is problematic since we need to connect with spark bi-directionally. Any comments if this is possible to do and if so, sample code would be great. Thanks!

Hello RMK,

Today it is not supported, out of the box the Spark connector is using the core data-structure:

That said this is a very interesting request, this would be a great contribution. (starting by creating an issue in the Github project with information about your use case: JSON document structure and type of operations you would like to do)

As a work around, you may be able to use Streaming to send and process messages to RedisJSON.

Regards
Tug

1 Like

Hello rmk,

As tgrall pointed out JSON data type is not currently supported by spark-redis. As a workaround it is possible to create a custom function to persist RDD for example.
In addition to the spark-redis dependency it will require jrejson library:

	<dependency>
		<groupId>com.redislabs</groupId>
		<artifactId>jrejson</artifactId>
		<version>1.2.0</version>
	</dependency>

Sample code:

import com.redislabs.modules.rejson.JReJSON
import com.redislabs.provider.redis.{ReadWriteConfig, RedisConfig}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

object MainTest {

  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .appName("redis-json-example")
      .master("local[*]")
      .config("spark.redis.host", "localhost")
      .config("spark.redis.port", "6379")
      .getOrCreate()

    val rdd = spark.sparkContext.parallelize(Seq(
      Person("1", "Joe", 50),
      Person("2", "John", 60)
    ))

    val redisConfig = RedisConfig.fromSparkConf(spark.sparkContext.getConf)

    toRedisJson(rdd, redisConfig)

  }

  case class Person(id: String, name: String, age: Int)

  def toRedisJson(rdd: RDD[Person], redisConfig: RedisConfig): Unit = {
    rdd.foreachPartition { partition =>
      val groupedByHost = partition.map { person =>
        (redisConfig.getHost(person.id), person)
      }.toList.groupBy(_._1)

      groupedByHost.foreach { case (host, hostAndPersonList) =>
        val jedis = host.connect()
        hostAndPersonList.foreach { case (_, person) =>
          JReJSON.set(jedis, person.id, person)
        }
        jedis.close()
      }
    }
  }

}

The RDD is written to Redis:

127.0.0.1:6379> JSON.get 1 .
"{\"id\":\"1\",\"name\":\"Joe\",\"age\":50}"
127.0.0.1:6379> JSON.get 2 .
"{\"id\":\"2\",\"name\":\"John\",\"age\":60}"
1 Like

Thanks for the response. I had a feeling this was the case. I realize Redis Labs is looking to have Redis take on more ML capabilities so I don’t know if this is necessary in support of this direction, but I will contribute, as you directed. I decided to change to HASH from RedisJson for this reason even though I was very pleased with RedisJson. I’m surprised this hasn’t come up before. After all, JSON seems to be the standard format when doing ML and or REST.

Thanks for the reply. Didn’t know this was possible, but you know, I still have to move data from Redis to Spark :slight_smile: I’m thinking more custom code and too much heavy-lifting for me.