我们如何将 spark 结构化流连接到 redis?
How can we connect spark structured stream to redis?
我的目标是从 Redis 获取流数据并进行处理。如何通过 spark 结构化流连接和处理数据?
要在 Spark 中从 Redis Streams 中读取数据,我们需要建立如何连接到 Redis,以及 Redis Streams 中数据的模式结构。
要连接到 Redis,我们必须使用 Redis 的连接参数创建一个新的 SparkSession:
import com.redislabs.provider.redis._
import redis.clients.jedis.Jedis
object Samj45 {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("redis-example")
.master("local[*]")
.config("spark.redis.host", "localhost")
.config("spark.redis.port", "6379")
.getOrCreate()
val data_from_redis = spark
.readStream
.format("redis")
.option("stream.keys","data_clicks")
.schema(StructType(Array(
StructField("asset", StringType),
StructField("cost", LongType)
)))
.load()
对于写作,您可以使用 ForeachWriter。如果这有帮助,请告诉我。
我的目标是从 Redis 获取流数据并进行处理。如何通过 spark 结构化流连接和处理数据?
要在 Spark 中从 Redis Streams 中读取数据,我们需要建立如何连接到 Redis,以及 Redis Streams 中数据的模式结构。
要连接到 Redis,我们必须使用 Redis 的连接参数创建一个新的 SparkSession:
import com.redislabs.provider.redis._
import redis.clients.jedis.Jedis
object Samj45 {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("redis-example")
.master("local[*]")
.config("spark.redis.host", "localhost")
.config("spark.redis.port", "6379")
.getOrCreate()
val data_from_redis = spark
.readStream
.format("redis")
.option("stream.keys","data_clicks")
.schema(StructType(Array(
StructField("asset", StringType),
StructField("cost", LongType)
)))
.load()
对于写作,您可以使用 ForeachWriter。如果这有帮助,请告诉我。