Row.key 和 row.value 在 spark 结构化流代码中不起作用
Row.key and row.value not working in spark structure streaming code
我下面的代码在读取 AVRO 时出现错误 row.key 和 row.value
火花结构流中生产者的格式。请帮忙
解决问题。我收到错误消息 row.key 符号和 row.value
未找到。我想在Spark中读取数据并将其写入hadoop中的cassandra
system.I 看到这是在 spark 结构流中读取 AVRO 源数据的唯一方法。请让我知道是否有任何其他方法可以从生产者以 AVRO 格式读取 kafka 数据。
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient
import org.apache.spark.sql.execution.streaming.FileStreamSource.Timestamp
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.avro.Schema
import io.confluent.kafka.schemaregistry.client.rest.RestService
import io.confluent.kafka.schemaregistry.client.rest.RestService
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import scala.reflect.runtime.universe._
import scala.collection.JavaConverters._
object ReadKafkaAvro {
case class DeserializedFromKafkaRecord(key: String, value: String)
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder
.appName("ReadKafkaAvro")
.config("spark.master", "local")
.getOrCreate()
import spark.implicits._
val schemaRegistryURL = "http://vtorppsdv01.corp.moneris.com:8081"
val topics = "b24_tx_financial_formatted_clean"
val subjectValueName = topics + "-value"
spark.sparkContext.setLogLevel("ERROR")
val restService = new RestService(schemaRegistryURL)
val valueRestResponseSchema = restService.getLatestVersion(subjectValueName)
//Use Avro parsing classes to get Avro Schema
val parser = new Schema.Parser
val topicValueAvroSchema: Schema = parser.parse(valueRestResponseSchema.getSchema)
//key schema is typically just string but you can do the same process for the key as the value
val keySchemaString = "\"string\""
val keySchema = parser.parse(keySchemaString)
//Create a map with the Schema registry url.
//This is the only Required configuration for Confluent's KafkaAvroDeserializer.
val props = Map("schema.registry.url" -> schemaRegistryURL)
val client = new CachedSchemaRegistryClient(schemaRegistryURL, 20)
//Declare SerDe vars before using Spark structured streaming map. Avoids non serializable class exception.
var keyDeserializer: KafkaAvroDeserializer = null
var valueDeserializer: KafkaAvroDeserializer = null
//Create structured streaming DF to read from the topic.
val rawTopicMessageDF = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "vtorppsdv01.corp.moneris.com:9093,vtorppsdv02.corp.moneris.com:9093,vtorppsdv03.corp.moneris.com:9093")
.option("subscribe", topics)
.option("specific.avro.reader", "true")
.option("startingOffsets", "earliest")
.option("group_id","b24_ptlf_eim_processing")
.option("security.protocol","SSL")
.option("ssl.keystore.location","C:\Users\pawan.likhi\Desktop\spark code\SimpleKafkaConsumer\kafka-eim-dev.jks")
.option("ssl.keystore.password","BW^1=|sY$j")
.option("ssl.key.password","BW^1=|sY$j")
.option("ssl.truststore.location","C:\Users\pawan.likhi\Desktop\spark code\SimpleKafkaConsumer\cpbp-ca-dev.jks")
.option("ssl.truststore.password","iB>3vm@9")//remove for prod
.load()
//instantiate the SerDe classes if not already, then deserialize!
val deserializedTopicMessageDS = rawTopicMessageDF.map{
row =>
if (keyDeserializer == null) {
keyDeserializer = new KafkaAvroDeserializer
keyDeserializer.configure(props.asJava, true) //isKey = true
}
if (valueDeserializer == null) {
valueDeserializer = new KafkaAvroDeserializer
valueDeserializer.configure(props.asJava, false) //isKey = false
}
//Pass the Avro schema.
val deserializedKeyString = keyDeserializer.deserialize(topics, row.key, keySchema).toString //topic name is actually unused in the source code, just required by the signature. Weird right?
val deserializedValueJsonString = valueDeserializer.deserialize(topics, row.value, topicValueAvroSchema).toString
DeserializedFromKafkaRecord(deserializedKeyString, deserializedValueJsonString)
}
val deserializedDSOutputStream = deserializedTopicMessageDS.writeStream
.outputMode("append")
.format("console")
.option("truncate", false)
.start()
.awaitTermination()
}
}
我发现kafka AVRO格式数据不好读。我使用 twitter 双射在 spark streaming 中开发了代码,但我收到了任何建议的反转字节错误。
Error : Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost, executor driver): com.twitter.bijection.InversionFailure: Failed to invert: [B@5335860
我使用的新代码:
import com.twitter.bijection.Injection
import com.twitter.bijection.avro.GenericAvroCodecs
import io.confluent.kafka.schemaregistry.client.{CachedSchemaRegistryClient, SchemaRegistryClient}
import io.confluent.kafka.schemaregistry.client.rest.RestService
import io.confluent.kafka.serializers.{KafkaAvroDecoder, KafkaAvroDeserializer}
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, StringDeserializer}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.avro.generic.GenericRecord
import org.apache.avro.Schema
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.avro.Schema
import org.apache.avro.hadoop.io.AvroDeserializer
import org.apache.commons.codec.StringDecoder
object ReadKafkaAvro1 {
object Injection {
val schemaRegistryURL = "http://vtorppsdv01.corp.moneris.com:8081"
val topics = "b24_tx_financial_formatted_clean"
val subjectValueName = topics + "-value"
val restService = new RestService(schemaRegistryURL)
val valueRestResponseSchema = restService.getLatestVersion(subjectValueName)
val parser = new Schema.Parser()
// val schema = parser.parse(getClass.getResourceAsStream("src\main\resources\b24_tx_financial_formatted_clean.avsc"))
val schema = parser.parse((valueRestResponseSchema.getSchema))
val injection: Injection[GenericRecord, Array[Byte]] = GenericAvroCodecs.toBinary(schema)
}
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("ReadKafkaAvro").setMaster("local[*]")
val streamingCtx = new StreamingContext(conf,Seconds(30))
val schemaRegistryURL1 = "http://vtorppsdv01.corp.moneris.com:8081"
val topics = Array("b24_tx_financial_formatted_clean")
streamingCtx.sparkContext.setLogLevel("ERROR")
val kafkaParms = Map[String,Object]("bootstrap.servers" -> "vtorppsdv01.corp.moneris.com:9093,vtorppsdv02.corp.moneris.com:9093,vtorppsdv03.corp.moneris.com:9093",
"key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer" -> "org.apache.kafka.common.serialization.ByteArrayDeserializer",
"group.id" -> "b24_ptlf_eim_processing" ,
"auto.offset.reset" -> "earliest",
"auto.commit.interval.ms" -> "2000",
"schema.registry.url" -> schemaRegistryURL1,
"enable.auto.commit" -> (false: java.lang.Boolean),
"security.protocol" -> "SSL",
"ssl.keystore.location" -> "C:\Users\pawan.likhi\Desktop\spark code\SimpleKafkaConsumer\kafka-eim-dev.jks",
"ssl.keystore.password" -> "BW^1=|sY$j",
"ssl.key.password" -> "BW^1=|sY$j",
"ssl.truststore.location" -> "C:\Users\pawan.likhi\Desktop\spark code\SimpleKafkaConsumer\cpbp-ca-dev.jks",
"ssl.truststore.password" -> "iB>3vm@9",
"ssl.keystore.type" -> "JCEKS",
"ssl.truststore.type" -> "JCEKS",
"specific.avro.reader" -> "True"
)
val inputStream = KafkaUtils.createDirectStream[String,Array[Byte]](streamingCtx,PreferConsistent,Subscribe[String,Array[Byte]](topics,kafkaParms))
val recordStream = inputStream.map(msg => Injection.injection.invert(msg.value()).get)
// .map(record => (record.get("AuthorizationTransactionSource"),record.get("AuthorizationTransactionSourceID")))
inputStream.map(x => (x.key,x.value)).print()
//recordStream.print()
recordStream.print()
streamingCtx.start()
streamingCtx.awaitTermination()
}
}
我下面的代码在读取 AVRO 时出现错误 row.key 和 row.value 火花结构流中生产者的格式。请帮忙 解决问题。我收到错误消息 row.key 符号和 row.value 未找到。我想在Spark中读取数据并将其写入hadoop中的cassandra system.I 看到这是在 spark 结构流中读取 AVRO 源数据的唯一方法。请让我知道是否有任何其他方法可以从生产者以 AVRO 格式读取 kafka 数据。
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient
import org.apache.spark.sql.execution.streaming.FileStreamSource.Timestamp
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.avro.Schema
import io.confluent.kafka.schemaregistry.client.rest.RestService
import io.confluent.kafka.schemaregistry.client.rest.RestService
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import scala.reflect.runtime.universe._
import scala.collection.JavaConverters._
object ReadKafkaAvro {
case class DeserializedFromKafkaRecord(key: String, value: String)
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder
.appName("ReadKafkaAvro")
.config("spark.master", "local")
.getOrCreate()
import spark.implicits._
val schemaRegistryURL = "http://vtorppsdv01.corp.moneris.com:8081"
val topics = "b24_tx_financial_formatted_clean"
val subjectValueName = topics + "-value"
spark.sparkContext.setLogLevel("ERROR")
val restService = new RestService(schemaRegistryURL)
val valueRestResponseSchema = restService.getLatestVersion(subjectValueName)
//Use Avro parsing classes to get Avro Schema
val parser = new Schema.Parser
val topicValueAvroSchema: Schema = parser.parse(valueRestResponseSchema.getSchema)
//key schema is typically just string but you can do the same process for the key as the value
val keySchemaString = "\"string\""
val keySchema = parser.parse(keySchemaString)
//Create a map with the Schema registry url.
//This is the only Required configuration for Confluent's KafkaAvroDeserializer.
val props = Map("schema.registry.url" -> schemaRegistryURL)
val client = new CachedSchemaRegistryClient(schemaRegistryURL, 20)
//Declare SerDe vars before using Spark structured streaming map. Avoids non serializable class exception.
var keyDeserializer: KafkaAvroDeserializer = null
var valueDeserializer: KafkaAvroDeserializer = null
//Create structured streaming DF to read from the topic.
val rawTopicMessageDF = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "vtorppsdv01.corp.moneris.com:9093,vtorppsdv02.corp.moneris.com:9093,vtorppsdv03.corp.moneris.com:9093")
.option("subscribe", topics)
.option("specific.avro.reader", "true")
.option("startingOffsets", "earliest")
.option("group_id","b24_ptlf_eim_processing")
.option("security.protocol","SSL")
.option("ssl.keystore.location","C:\Users\pawan.likhi\Desktop\spark code\SimpleKafkaConsumer\kafka-eim-dev.jks")
.option("ssl.keystore.password","BW^1=|sY$j")
.option("ssl.key.password","BW^1=|sY$j")
.option("ssl.truststore.location","C:\Users\pawan.likhi\Desktop\spark code\SimpleKafkaConsumer\cpbp-ca-dev.jks")
.option("ssl.truststore.password","iB>3vm@9")//remove for prod
.load()
//instantiate the SerDe classes if not already, then deserialize!
val deserializedTopicMessageDS = rawTopicMessageDF.map{
row =>
if (keyDeserializer == null) {
keyDeserializer = new KafkaAvroDeserializer
keyDeserializer.configure(props.asJava, true) //isKey = true
}
if (valueDeserializer == null) {
valueDeserializer = new KafkaAvroDeserializer
valueDeserializer.configure(props.asJava, false) //isKey = false
}
//Pass the Avro schema.
val deserializedKeyString = keyDeserializer.deserialize(topics, row.key, keySchema).toString //topic name is actually unused in the source code, just required by the signature. Weird right?
val deserializedValueJsonString = valueDeserializer.deserialize(topics, row.value, topicValueAvroSchema).toString
DeserializedFromKafkaRecord(deserializedKeyString, deserializedValueJsonString)
}
val deserializedDSOutputStream = deserializedTopicMessageDS.writeStream
.outputMode("append")
.format("console")
.option("truncate", false)
.start()
.awaitTermination()
}
}
我发现kafka AVRO格式数据不好读。我使用 twitter 双射在 spark streaming 中开发了代码,但我收到了任何建议的反转字节错误。
Error : Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost, executor driver): com.twitter.bijection.InversionFailure: Failed to invert: [B@5335860
我使用的新代码:
import com.twitter.bijection.Injection
import com.twitter.bijection.avro.GenericAvroCodecs
import io.confluent.kafka.schemaregistry.client.{CachedSchemaRegistryClient, SchemaRegistryClient}
import io.confluent.kafka.schemaregistry.client.rest.RestService
import io.confluent.kafka.serializers.{KafkaAvroDecoder, KafkaAvroDeserializer}
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, StringDeserializer}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.avro.generic.GenericRecord
import org.apache.avro.Schema
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.avro.Schema
import org.apache.avro.hadoop.io.AvroDeserializer
import org.apache.commons.codec.StringDecoder
object ReadKafkaAvro1 {
object Injection {
val schemaRegistryURL = "http://vtorppsdv01.corp.moneris.com:8081"
val topics = "b24_tx_financial_formatted_clean"
val subjectValueName = topics + "-value"
val restService = new RestService(schemaRegistryURL)
val valueRestResponseSchema = restService.getLatestVersion(subjectValueName)
val parser = new Schema.Parser()
// val schema = parser.parse(getClass.getResourceAsStream("src\main\resources\b24_tx_financial_formatted_clean.avsc"))
val schema = parser.parse((valueRestResponseSchema.getSchema))
val injection: Injection[GenericRecord, Array[Byte]] = GenericAvroCodecs.toBinary(schema)
}
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("ReadKafkaAvro").setMaster("local[*]")
val streamingCtx = new StreamingContext(conf,Seconds(30))
val schemaRegistryURL1 = "http://vtorppsdv01.corp.moneris.com:8081"
val topics = Array("b24_tx_financial_formatted_clean")
streamingCtx.sparkContext.setLogLevel("ERROR")
val kafkaParms = Map[String,Object]("bootstrap.servers" -> "vtorppsdv01.corp.moneris.com:9093,vtorppsdv02.corp.moneris.com:9093,vtorppsdv03.corp.moneris.com:9093",
"key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer" -> "org.apache.kafka.common.serialization.ByteArrayDeserializer",
"group.id" -> "b24_ptlf_eim_processing" ,
"auto.offset.reset" -> "earliest",
"auto.commit.interval.ms" -> "2000",
"schema.registry.url" -> schemaRegistryURL1,
"enable.auto.commit" -> (false: java.lang.Boolean),
"security.protocol" -> "SSL",
"ssl.keystore.location" -> "C:\Users\pawan.likhi\Desktop\spark code\SimpleKafkaConsumer\kafka-eim-dev.jks",
"ssl.keystore.password" -> "BW^1=|sY$j",
"ssl.key.password" -> "BW^1=|sY$j",
"ssl.truststore.location" -> "C:\Users\pawan.likhi\Desktop\spark code\SimpleKafkaConsumer\cpbp-ca-dev.jks",
"ssl.truststore.password" -> "iB>3vm@9",
"ssl.keystore.type" -> "JCEKS",
"ssl.truststore.type" -> "JCEKS",
"specific.avro.reader" -> "True"
)
val inputStream = KafkaUtils.createDirectStream[String,Array[Byte]](streamingCtx,PreferConsistent,Subscribe[String,Array[Byte]](topics,kafkaParms))
val recordStream = inputStream.map(msg => Injection.injection.invert(msg.value()).get)
// .map(record => (record.get("AuthorizationTransactionSource"),record.get("AuthorizationTransactionSourceID")))
inputStream.map(x => (x.key,x.value)).print()
//recordStream.print()
recordStream.print()
streamingCtx.start()
streamingCtx.awaitTermination()
}
}