spark中不同Kafka Direct Stream函数之间的透明

Transparency between different Kafka Direct Stream functions in spark

我正在使用 Kafka 直接流编写一个简单的 spark-streaming 作业。可以使用两种方法创建Kafka直接流。

  1. 从 Kafka 参数中获取偏移量 -

    createDirectStream[K, V, KD <: Decoder[K], VD <: Decoder[V]](jssc: JavaStreamingContext, keyClass: Class[K], valueClass: Class[V], keyDecoderClass: Class[KD], valueDecoderClass: Class[VD], kafkaParams: Map[String, String], topics: Set[String]): JavaPairInputDStream[K, V]

  2. 补给偏移 -

    createDirectStream[K, V, KD <: Decoder[K], VD <: Decoder[V], R](jssc: JavaStreamingContext, keyClass: Class[K], valueClass: Class[V], keyDecoderClass: Class[KD], valueDecoderClass: Class[VD], recordClass: Class[R], kafkaParams: Map[String, String], fromOffsets: Map[TopicAndPartition, Long], messageHandler: Function[MessageAndMetadata[K, V], R]): JavaInputDStream[R]

从定义可以看出,两个函数的return类型是不同的。我想基于映射在上述函数之一上创建一个方法,如果该映射为空,则应恢复为第二种方法,否则使用第一种方法创建流。但是,由于这两个函数具有不同的 return 类型,因此我的其余代码无法对 directStream 的创建方式这一事实保持透明。有没有更优雅的方法来实现这个?

好吧,我真的很蠢。只是张贴这个以防其他人被卡住。可以通过函数指定偏移量:

createDirectStream[K, V, KD <: Decoder[K], VD <: Decoder[V], R](jssc: JavaStreamingContext, keyClass: Class[K], valueClass: Class[V], keyDecoderClass: Class[KD], valueDecoderClass: Class[VD], recordClass: Class[R], kafkaParams: Map[String, String], fromOffsets: Map[TopicAndPartition, Long], messageHandler: Function[MessageAndMetadata[K, V], R]): JavaInputDStream[R]

输出流包含数据类型JavaInputDStream[R]。所需要的只是提供一个有效地从 MessageAndMetadata[K,V] 转换为 R 的函数。这可以通过构造函数中的函数 messageHandler 来实现。我所要做的就是添加以下行作为 messageHandler。

(mmd: MessageAndMetadata[String, String]) => (mmd.key() -> mmd.message())

我需要 Tuple2。您可以将其转换为您想要的任何数据类型。