保存后点燃缓存为空?
Ignite cache is empty after save?
我的数据管道如下:Kafka
=> 执行一些计算 => 将结果对加载到 Ignite cache
=> 打印出来
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("MainApplication");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaStreamingContext streamingContext = new JavaStreamingContext(sc, Durations.seconds(10));
JavaIgniteContext<String, Float> igniteContext = new JavaIgniteContext<>(sc, PATH, false);
JavaDStream<Message> dStream = KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, Message>
Subscribe(Collections.singletonList(TOPIC), kafkaParams)
)
.map(ConsumerRecord::value);
JavaPairDStream<String, Message> pairDStream =
dStream.mapToPair(message -> new Tuple2<>(message.getName(), message));
JavaPairDStream<String, Float> pairs = pairDStream
.combineByKey(new CreateCombiner(), new MergeValue(), new MergeCombiners(), new HashPartitioner(10))
.mapToPair(new ToPairTransformer());
JavaIgniteRDD<String, Float> myCache = igniteContext.fromCache(new CacheConfiguration<>());
// I know that we put something here:
pairDStream.foreachRDD((VoidFunction<JavaPairRDD<String, Float>>) myCache::savePairs);
// But I can't see anything here:
myCache.foreach(tuple2 -> System.out.println("In cache: " + tuple2._1() + " = " + tuple2._2()));
streamingContext.start();
streamingContext.awaitTermination();
streamingContext.stop();
sc.stop();
但是这段代码没有打印任何内容.. 为什么?
为什么 Ignite cache
即使在 savePairs
之后也是空的?
这里有什么问题吗?
提前致谢!
对我来说,pairDStream.foreachRDD(...)
似乎是一个惰性操作,至少在您开始流式传输上下文 streamingContext.start()
之前没有任何影响。
另一方面,myCache.foreach(...)
是急切操作,您在实际空缓存上执行它。
因此,尝试在流上下文启动后放置 myCache.foreach(...)
。甚至在终止之后。
我的数据管道如下:Kafka
=> 执行一些计算 => 将结果对加载到 Ignite cache
=> 打印出来
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("MainApplication");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaStreamingContext streamingContext = new JavaStreamingContext(sc, Durations.seconds(10));
JavaIgniteContext<String, Float> igniteContext = new JavaIgniteContext<>(sc, PATH, false);
JavaDStream<Message> dStream = KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, Message>
Subscribe(Collections.singletonList(TOPIC), kafkaParams)
)
.map(ConsumerRecord::value);
JavaPairDStream<String, Message> pairDStream =
dStream.mapToPair(message -> new Tuple2<>(message.getName(), message));
JavaPairDStream<String, Float> pairs = pairDStream
.combineByKey(new CreateCombiner(), new MergeValue(), new MergeCombiners(), new HashPartitioner(10))
.mapToPair(new ToPairTransformer());
JavaIgniteRDD<String, Float> myCache = igniteContext.fromCache(new CacheConfiguration<>());
// I know that we put something here:
pairDStream.foreachRDD((VoidFunction<JavaPairRDD<String, Float>>) myCache::savePairs);
// But I can't see anything here:
myCache.foreach(tuple2 -> System.out.println("In cache: " + tuple2._1() + " = " + tuple2._2()));
streamingContext.start();
streamingContext.awaitTermination();
streamingContext.stop();
sc.stop();
但是这段代码没有打印任何内容.. 为什么?
为什么 Ignite cache
即使在 savePairs
之后也是空的?
这里有什么问题吗?
提前致谢!
对我来说,pairDStream.foreachRDD(...)
似乎是一个惰性操作,至少在您开始流式传输上下文 streamingContext.start()
之前没有任何影响。
另一方面,myCache.foreach(...)
是急切操作,您在实际空缓存上执行它。
因此,尝试在流上下文启动后放置 myCache.foreach(...)
。甚至在终止之后。