在 Spark Streaming Context 映射中使用 Spark Context 在 Kafka Event 之后检索文档

Using Spark Context in map of Spark Streaming Context to retrieve documents after Kafka Event

我是 Spark 的新手。 我正在尝试做的是从 Couchbase 视图中检索所有相关文档,并使用来自 Spark Kafka Streaming 的给定 Id。

当我尝试从 Spark 上下文获取此文档时,我总是遇到错误 Task not serializable

从那里,我明白我不能在同一个 JVM 中使用嵌套 RDD 和多个 Spark 上下文,但想找到一个解决方法。

这是我目前的做法:

package xxx.xxx.xxx

import com.couchbase.client.java.document.JsonDocument
import com.couchbase.client.java.document.json.JsonObject
import com.couchbase.client.java.view.ViewQuery
import com.couchbase.spark._

import org.apache.spark.broadcast.Broadcast
import _root_.kafka.serializer.StringDecoder
import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._

object Streaming {
  // Method to create a Json document from Key and Value
  def CreateJsonDocument(s: (String, String)): JsonDocument = {
    //println("- Parsing document")
    //println(s._1)
    //println(s._2)
    val return_doc = JsonDocument.create(s._1, JsonObject.fromJson(s._2))
    (return_doc)
    //(return_doc.content().getString("click"), return_doc)
  }

  def main(args: Array[String]): Unit = {
    // get arguments as key value
    val arguments = args.grouped(2).collect { case Array(k,v) => k.replaceAll("--", "") -> v }.toMap

    println("----------------------------")
    println("Arguments passed to class")
    println("----------------------------")
    println("- Arguments")
    println(arguments)
    println("----------------------------")

    // If the length of the passed arguments is less than 4
    if (arguments.get("brokers") == null || arguments.get("topics") == null) {
      // Provide system error
      System.err.println("Usage: --brokers <broker1:9092> --topics <topic1,topic2,topic3>")
    }

    // Create the Spark configuration with app name
    val conf = new SparkConf().setAppName("Streaming")
    // Create the Spark context
    val sc = new SparkContext(conf)
    // Create the Spark Streaming Context
    val ssc = new StreamingContext(sc, Seconds(2))

    // Setup the broker list
    val kafkaParams = Map("metadata.broker.list" -> arguments.getOrElse("brokers", ""))
    // Setup the topic list
    val topics = arguments.getOrElse("topics", "").split(",").toSet
    // Get the message stream from kafka
    val docs = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)

    docs
      // Separate the key and the content
      .map({ case (key, value) => (key, value) })
      // Parse the content to transform in JSON Document
      .map(s => CreateJsonDocument(s))
      // Call the view to all related Review Application Documents
      //.map(messagedDoc => RetrieveAllReviewApplicationDocs(messagedDoc, sc))
      .map(doc => {

        sc.couchbaseView(ViewQuery.from("my-design-document", "stats").key(messagedDoc.content.getString("id"))).collect()
      })
      .foreachRDD(
          rdd => {
             //Create a report of my documents and store it in Couchbase
             rdd.foreach( println )
          }
        )

    // Start the streaming context
    ssc.start()
    // Wait for termination and catch error if there is a problem in the process
    ssc.awaitTermination()
  }
}

通过使用 Couchbase Client 而不是 Couchbase Spark Context 找到了解决方案。

我不知道这是否是性能方面的最佳方式,但我可以检索计算所需的文档。

package xxx.xxx.xxx

import com.couchbase.client.java.{Bucket, Cluster, CouchbaseCluster}
import com.couchbase.client.java.document.JsonDocument
import com.couchbase.client.java.document.json.JsonObject
import com.couchbase.client.java.view.{ViewResult, ViewQuery}

import _root_.kafka.serializer.StringDecoder
import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._

object Streaming {
  // Method to create a Json document from Key and Value
  def CreateJsonDocument(s: (String, String)): JsonDocument = {
    //println("- Parsing document")
    //println(s._1)
    //println(s._2)
    val return_doc = JsonDocument.create(s._1, JsonObject.fromJson(s._2))
    (return_doc)
    //(return_doc.content().getString("click"), return_doc)
  }

  // Method to retrieve related documents
  def RetrieveDocs (doc: JsonDocument, arguments: Map[String, String]): ViewResult = {
    val cbHosts = arguments.getOrElse("couchbase-hosts", "")
    val cbBucket = arguments.getOrElse("couchbase-bucket", "")
    val cbPassword = arguments.getOrElse("couchbase-password", "")

    val cluster: Cluster = CouchbaseCluster.create(cbHosts)

    val bucket: Bucket = cluster.openBucket(cbBucket, cbPassword)
    val docs : ViewResult = bucket.query(ViewQuery.from("my-design-document", "my-view").key(doc.content().getString("id")))
    cluster.disconnect()
    println(docs)

    (docs)
  }

  def main(args: Array[String]): Unit = {
    // get arguments as key value
    val arguments = args.grouped(2).collect { case Array(k,v) => k.replaceAll("--", "") -> v }.toMap

    println("----------------------------")
    println("Arguments passed to class")
    println("----------------------------")
    println("- Arguments")
    println(arguments)
    println("----------------------------")

    // If the length of the passed arguments is less than 4
    if (arguments.get("brokers") == null || arguments.get("topics") == null) {
      // Provide system error
      System.err.println("Usage: --brokers <broker1:9092> --topics <topic1,topic2,topic3>")
    }

    // Create the Spark configuration with app name
    val conf = new SparkConf().setAppName("Streaming")
    // Create the Spark context
    val sc = new SparkContext(conf)
    // Create the Spark Streaming Context
    val ssc = new StreamingContext(sc, Seconds(2))

    // Setup the broker list
    val kafkaParams = Map("metadata.broker.list" -> arguments.getOrElse("brokers", ""))
    // Setup the topic list
    val topics = arguments.getOrElse("topics", "").split(",").toSet
    // Get the message stream from kafka
    val docs = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
    // Get broadcast arguments
    val argsBC = sc.broadcast(arguments)

    docs
      // Separate the key and the content
      .map({ case (key, value) => (key, value) })
      // Parse the content to transform in JSON Document
      .map(s => CreateJsonDocument(s))
      // Call the view to all related Review Application Documents
      .map(doc => RetrieveDocs(doc, argsBC))
      .foreachRDD(
          rdd => {
             //Create a report of my documents and store it in Couchbase
             rdd.foreach( println )
          }
        )

    // Start the streaming context
    ssc.start()
    // Wait for termination and catch error if there is a problem in the process
    ssc.awaitTermination()
  }
}