在 scala spark 流中使用 foreach 时不想将字符串作为类型?

do not want string as type when using foreach in scala spark streaming?

代码片段:

val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap)
val write2hdfs = lines.filter(x => x._1 == "lineitem").map(_._2)
write2hdfs.foreachRDD(rdd => {

rdd.foreach(avroRecord => {
println(avroRecord)
//val rawByte = avroRecord.getBytes("UTF-8")

面临的问题>

avroRecord 保存从 kafka 流接收到的 avro 编码消息。 当使用上述代码时,默认情况下 avroRecord 是一个字符串。 并且字符串在 scala 中默认使用 UTF-16 编码。

由于此反序列化不正确且面临问题。 消息在发送到 kafka 流时使用 utf-8 编码为 avro。

我需要 avroRecord 是纯字节 而不是 获取字符串然后转换为字节(内部字符串将进行 utf-16 编码)。

或者在 utf-8 中获取 avroRecord 本身的方法。卡在这里死锁。

需要解决此问题的方法。

提前致谢。

更新:

代码段已更改 >

val ssc = new StreamingContext(sparkConf, Seconds(5))
//val ssc = new JavaStreamingContext(sparkConf, Seconds(5))
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val kafkaParams = Map[String, String]("zookeeper.connect" -> 
zkQuorum,"group.id" -> group,"zookeeper.connection.timeout.ms" -> "10000")                    

//val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap)
val lines = 
KafkaUtils.createStream[String,Message,StringDecoder,DefaultDecoder]
(ssc,kafkaParams,topics,StorageLevel.NONE)

进口完成:

import org.apache.spark.streaming._
import org.apache.spark.streaming.api.java.JavaStreamingContext
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions
import org.apache.avro
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericDatumReader, GenericRecord, 
GenericDatumWriter, GenericData}
import org.apache.avro.io.{DecoderFactory, DatumReader, DatumWriter, 
BinaryDecoder}
import org.apache.avro.file.{DataFileReader, DataFileWriter}
import java.io.{File, IOException}
//import java.io.*
import org.apache.commons.io.IOUtils;
import _root_.kafka.serializer.{StringDecoder, DefaultDecoder}
import _root_.kafka.message.Message
import scala.reflect._

编译错误:

正在将 1 个 Scala 源代码编译为 /home/spark_scala/spark_stream_project/target/scala-2.10/类... [错误] /home/spark_scala/spark_stream_project/src/main/scala/sparkStreaming.scala:34: 重载方法值 createStream 与备选方案: [错误] (jssc: org.apache.spark.streaming.api.java.JavaStreamingContext,keyTypeClass: Class[String],valueTypeClass: Class[kafka.message.Message],keyDecoderClass: Class[kafka.serializer.StringDecoder],valueDecoderClass: Class[kafka.serializer.DefaultDecoder],kafkaParams: java.util.Map[String,String] ,主题:java.util.Map[字符串,整数],存储级别:org.apache.spark.storage.StorageLevel)org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream[字符串,kafka.message.Message] [error] (ssc: org.apache.spark.streaming.StreamingContext,kafkaParams: scala.collection.immutable.Map[String,String],topics: scala.collection.immutable.Map[String,Int],storageLevel: org.apache.spark.storage.StorageLevel)(隐含证据$1 :scala.reflect.ClassTag[String],隐含证据 $2:scala.reflect.ClassTag[kafka.message.Message],隐含证据 $3:scala.reflect.ClassTag[kafka.serializer.StringDecoder],隐含证据 $4:scala.reflect.ClassTag[kafka.serializer.DefaultDecoder])org.apache.spark.streaming.dstream.ReceiverInputDStream[(字符串, kafka.message.Message)] [错误] 不能应用于 (org.apache.spark.streaming.StreamingContext, scala.collection.immutable.Map[String,String], String, org.apache.spark.storage.StorageLevel) [错误] val lines = KafkaUtils.createStreamString,Message,StringDecoder,DefaultDecoder [错误] ^

[错误] 发现一处错误

这里有什么问题。 此外,我没有看到 kafkaUtils API 文档中建议的正确构造函数。 API 参考文档我指的是: https://spark.apache.org/docs/1.3.0/api/java/index.html? org/apache/spark/streaming/kafka/KafkaUtils.html

期待支持。

谢谢。

更新 2:

已尝试并建议更正!

代码片段>

val lines = 
KafkaUtils.createStream[String,Message,StringDecoder,DefaultDecoder]
(ssc,kafkaParams,topicMap,StorageLevel.MEMORY_AND_DISK_2)
val write2hdfs = lines.filter(x => x._1 == "lineitem").map(_._2)

面临运行时异常>

java.lang.ClassCastException: [B cannot be cast to kafka.message.Message

On line :
KafkaUtils.createStream[String,Message,StringDecoder,DefaultDecoder]
(ssc,kafkaParams,topicMap,StorageLevel.MEMORY_AND_DISK_2)
val write2hdfs = lines.filter(x => x._1 == "lineitem").map(_._2)

理想情况下过滤这个 Dstream(String,Message) 应该也能正常工作吧? 我需要在进行映射之前从消息中提取有效负载吗?

需要输入请。 谢谢

你可以这样做:

import kafka.serializer.{StringDecoder, DefaultDecoder}
import kafka.message.Message

val kafkaParams = Map[String, String](
    "zookeeper.connect" -> zkQuorum, "group.id" -> group,
    "zookeeper.connection.timeout.ms" -> "10000")
val lines = KafkaUtils.createStream[String, Message, StringDecoder, DefaultDecoder](
      ssc, kafkaParams, topics, storageLevel)

这应该会给你一个 DStream[(String, kafka.message.Message)],你应该能够检索原始字节并从那里转换为 Avro。

这对我有用:

val lines = 
KafkaUtils.createStream[String,Array[Byte],StringDecoder,DefaultDecoder]
(ssc,kafkaParams,topicMap,StorageLevel.MEMORY_AND_DISK_2)

我的要求是获取字节数组,所以改为数组[字节]而不是kafka.message.Message