通过流功能流时间戳
Flow time stamp through streaming functions
How/is 每次使用 Spark Streaming 时批处理 运行 是否可以生成随机数或获取系统时间?
我有两个处理一批消息的函数:
1 - 首先处理密钥,创建一个文件 (csv) 并写入 headers
2 - 第二个处理每条消息并将数据添加到 csv
我希望将每个批次的文件存储在单独的文件夹中:
/output/folderBatch1/file1.csv, file2.csv, etc.csv
/output/folderBatch2/file1.csv, file2.csv, etc.csv
/output/folderBatch3/file1.csv, file2.csv, etc.csv
如何创建一个变量,即使只是一个 Spark Streaming 可以使用的简单计数器?
下面的代码获取系统时间,但因为它是 'plain Java',它只执行一次,并且在批处理的每个 运行 上都是相同的值。
JavaPairInputDStream<String, byte[]> messages;
messages = KafkaUtils.createDirectStream(
jssc,
String.class,
byte[].class,
StringDecoder.class,
DefaultDecoder.class,
kafkaParams,
topicsSet
);
/**
* Declare what computation needs to be done
*/
JavaPairDStream<String, Iterable<byte[]>> groupedMessages = messages.groupByKey();
String time = Long.toString(System.currentTimeMillis()); //this is only ever run once and is the same value for each batch!
groupedMessages.map(new WriteHeaders(time)).print();
groupedMessages.map(new ProcessMessages(time)).print();
谢谢,
KA.
您可以通过额外的 map
调用添加时间戳并将其传递。这意味着您将拥有 Tuple2<Long, Iterable<byte[]>)
:
类型的值,而不是 Iterable<byte[]>
类型的值
JavaDStream<Tuple2<String, Tuple2<Long, Iterable<byte[]>>>> groupedWithTimeStamp =
groupedMessages
.map((Function<Tuple2<String, Iterable<byte[]>>,
Tuple2<String, Tuple2<Long, Iterable<byte[]>>>>) kvp ->
new Tuple2<>(kvp._1, new Tuple2<>(System.currentTimeMillis(), kvp._2)));
现在您在每个 map
中都有时间戳,您可以通过以下方式访问它:
groupedWithTimeStamp.map(value -> value._2._1); // This will access the timestamp.
How/is 每次使用 Spark Streaming 时批处理 运行 是否可以生成随机数或获取系统时间?
我有两个处理一批消息的函数: 1 - 首先处理密钥,创建一个文件 (csv) 并写入 headers 2 - 第二个处理每条消息并将数据添加到 csv
我希望将每个批次的文件存储在单独的文件夹中:
/output/folderBatch1/file1.csv, file2.csv, etc.csv
/output/folderBatch2/file1.csv, file2.csv, etc.csv
/output/folderBatch3/file1.csv, file2.csv, etc.csv
如何创建一个变量,即使只是一个 Spark Streaming 可以使用的简单计数器?
下面的代码获取系统时间,但因为它是 'plain Java',它只执行一次,并且在批处理的每个 运行 上都是相同的值。
JavaPairInputDStream<String, byte[]> messages;
messages = KafkaUtils.createDirectStream(
jssc,
String.class,
byte[].class,
StringDecoder.class,
DefaultDecoder.class,
kafkaParams,
topicsSet
);
/**
* Declare what computation needs to be done
*/
JavaPairDStream<String, Iterable<byte[]>> groupedMessages = messages.groupByKey();
String time = Long.toString(System.currentTimeMillis()); //this is only ever run once and is the same value for each batch!
groupedMessages.map(new WriteHeaders(time)).print();
groupedMessages.map(new ProcessMessages(time)).print();
谢谢, KA.
您可以通过额外的 map
调用添加时间戳并将其传递。这意味着您将拥有 Tuple2<Long, Iterable<byte[]>)
:
Iterable<byte[]>
类型的值
JavaDStream<Tuple2<String, Tuple2<Long, Iterable<byte[]>>>> groupedWithTimeStamp =
groupedMessages
.map((Function<Tuple2<String, Iterable<byte[]>>,
Tuple2<String, Tuple2<Long, Iterable<byte[]>>>>) kvp ->
new Tuple2<>(kvp._1, new Tuple2<>(System.currentTimeMillis(), kvp._2)));
现在您在每个 map
中都有时间戳,您可以通过以下方式访问它:
groupedWithTimeStamp.map(value -> value._2._1); // This will access the timestamp.