通过流功能流时间戳

Flow time stamp through streaming functions

How/is 每次使用 Spark Streaming 时批处理 运行 是否可以生成随机数或获取系统时间?

我有两个处理一批消息的函数: 1 - 首先处理密钥,创建一个文件 (csv) 并写入 headers 2 - 第二个处理每条消息并将数据添加到 csv

我希望将每个批次的文件存储在单独的文件夹中:

/output/folderBatch1/file1.csv, file2.csv, etc.csv
/output/folderBatch2/file1.csv, file2.csv, etc.csv
/output/folderBatch3/file1.csv, file2.csv, etc.csv

如何创建一个变量,即使只是一个 Spark Streaming 可以使用的简单计数器?

下面的代码获取系统时间,但因为它是 'plain Java',它只执行一次,并且在批处理的每个 运行 上都是相同的值。

JavaPairInputDStream<String, byte[]> messages;
messages = KafkaUtils.createDirectStream(
        jssc,
        String.class,
        byte[].class,
        StringDecoder.class,
        DefaultDecoder.class,
        kafkaParams,
        topicsSet
);

/**
 * Declare what computation needs to be done
 */
JavaPairDStream<String, Iterable<byte[]>> groupedMessages = messages.groupByKey();

String time = Long.toString(System.currentTimeMillis());        //this is only ever run once and is the same value for each batch!

groupedMessages.map(new WriteHeaders(time)).print();

groupedMessages.map(new ProcessMessages(time)).print();

谢谢, KA.

您可以通过额外的 map 调用添加时间戳并将其传递。这意味着您将拥有 Tuple2<Long, Iterable<byte[]>):

类型的值,而不是 Iterable<byte[]> 类型的值
JavaDStream<Tuple2<String, Tuple2<Long, Iterable<byte[]>>>> groupedWithTimeStamp = 
  groupedMessages
    .map((Function<Tuple2<String, Iterable<byte[]>>, 
      Tuple2<String, Tuple2<Long, Iterable<byte[]>>>>) kvp -> 
        new Tuple2<>(kvp._1, new Tuple2<>(System.currentTimeMillis(), kvp._2)));

现在您在每个 map 中都有时间戳,您可以通过以下方式访问它:

groupedWithTimeStamp.map(value -> value._2._1); // This will access the timestamp.