JavaInputDStream 中的 "Class<R> recordClass" 是什么?
What is "Class<R> recordClass" in JavaInputDStream?
我正在尝试使用下面的 API 通过 Kafka 进行 Spark 流式传输。我必须使用 spark 流式传输 avro 序列化数据,数据位于 Kafka 中。
static <K,V,KD extends kafka.serializer.Decoder<K>,VD extends kafka.serializer.Decoder<V>,R>
JavaInputDStream<R> createDirectStream(JavaStreamingContext jssc, Class<K> keyClass, Class<V> valueClass, Class<KD> keyDecoderClass, Class<VD> valueDecoderClass, Class<R> recordClass, java.util.Map<String,String> kafkaParams, java.util.Map<kafka.common.TopicAndPartition,Long> fromOffsets, Function<kafka.message.MessageAndMetadata<K,V>,R> messageHandler)
:: Experimental :: Create an input stream that directly pulls messages from Kafka Brokers without using any receiver.
我能知道我需要为 API 中的参数 Class 记录 Class 提供什么吗?我使用了如下所示的 API,但它给出了编译错误。
我只想从 kafka 接收字节流数据到 spark streaming。
JavaInputDStream<byte[]> directKafkaStream = KafkaUtils.createDirectStream(jsc, String.class, byte[].class,
StringDecoder.class, DefaultDecoder.class, byte[].class, kafkaParams, topicMap,
(Function<MessageAndMetadata<String, String>, String>) MessageAndMetadata::message);
Exception in thread "main" java.lang.Error: Unresolved compilation
problems: The method createDirectStream(JavaStreamingContext,
Class, Class, Class, Class, Class,
Map, Map,
Function,R>) in the type KafkaUtils is not
applicable for the arguments (JavaStreamingContext, Class,
Class, Class, Class,
Class, Map, Map,
Function,String>)
试试这个。
JavaInputDStream<byte[]> directKafkaStream = KafkaUtils.createDirectStream(jssc, String.class, byte[].class,
StringDecoder.class, DefaultDecoder.class, byte[].class, kafkaParams, fromOffset,
(Function<MessageAndMetadata<String, byte[]>, byte[]>) MessageAndMetadata::message);
Here 是一篇针对 Kafka、Avro 和 Spark 的文章。
我正在尝试使用下面的 API 通过 Kafka 进行 Spark 流式传输。我必须使用 spark 流式传输 avro 序列化数据,数据位于 Kafka 中。
static <K,V,KD extends kafka.serializer.Decoder<K>,VD extends kafka.serializer.Decoder<V>,R>
JavaInputDStream<R> createDirectStream(JavaStreamingContext jssc, Class<K> keyClass, Class<V> valueClass, Class<KD> keyDecoderClass, Class<VD> valueDecoderClass, Class<R> recordClass, java.util.Map<String,String> kafkaParams, java.util.Map<kafka.common.TopicAndPartition,Long> fromOffsets, Function<kafka.message.MessageAndMetadata<K,V>,R> messageHandler)
:: Experimental :: Create an input stream that directly pulls messages from Kafka Brokers without using any receiver.
我能知道我需要为 API 中的参数 Class 记录 Class 提供什么吗?我使用了如下所示的 API,但它给出了编译错误。
我只想从 kafka 接收字节流数据到 spark streaming。
JavaInputDStream<byte[]> directKafkaStream = KafkaUtils.createDirectStream(jsc, String.class, byte[].class,
StringDecoder.class, DefaultDecoder.class, byte[].class, kafkaParams, topicMap,
(Function<MessageAndMetadata<String, String>, String>) MessageAndMetadata::message);
Exception in thread "main" java.lang.Error: Unresolved compilation problems: The method createDirectStream(JavaStreamingContext, Class, Class, Class, Class, Class, Map, Map, Function,R>) in the type KafkaUtils is not applicable for the arguments (JavaStreamingContext, Class, Class, Class, Class, Class, Map, Map, Function,String>)
试试这个。
JavaInputDStream<byte[]> directKafkaStream = KafkaUtils.createDirectStream(jssc, String.class, byte[].class,
StringDecoder.class, DefaultDecoder.class, byte[].class, kafkaParams, fromOffset,
(Function<MessageAndMetadata<String, byte[]>, byte[]>) MessageAndMetadata::message);
Here 是一篇针对 Kafka、Avro 和 Spark 的文章。