在 java 中与 mongodb 进行火花流式传输
spark streaming with mongodb in java
在我的应用程序中,我想将数据从 MongoDB 流式传输到 Java 中的 Spark Streaming。为此,我使用了队列流,因为我认为我可以在 rdd 上保留 mongodb 数据。但是这个方法不管用还是我做错了
有人从 mongodb 流式传输到 spark 流式传输吗?我的方法是否错误,如果是,正确的方法是什么?
我的代码在这里
package com.mongodb.spark.stream;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
import java.util.Queue;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.bson.BSONObject;
import com.mongodb.hadoop.MongoInputFormat;
import scala.Tuple2;
public class MongoStream {
public static void main(String[] args) {
Configuration conf = new Configuration();
conf.set("mongo.job.input.format", "com.mongodb.hadoop.MongoInputFormat");
conf.set("mongo.input.uri", "mongodb://192.168.1.149:27017/test.observations");
SparkConf spc = new SparkConf().setMaster("local[2]").setAppName("mongo");
JavaStreamingContext sc = new JavaStreamingContext(spc, Durations.seconds(1));
final Queue q = new LinkedList<JavaRDD<String>>();
final JavaPairRDD<Object, BSONObject> rdd = sc.sparkContext().newAPIHadoopRDD(conf, MongoInputFormat.class,
Object.class, BSONObject.class);
JavaRDD<String> words = rdd.flatMap(new FlatMapFunction<Tuple2<Object, BSONObject>, String>() {
private static final long serialVersionUID = -5974149698144908239L;
@Override
public Iterable<String> call(Tuple2<Object, BSONObject> arg0) throws Exception {
Object o = arg0._2.get("SensorId").toString();
if (o instanceof String) {
String str = (String) o;
str = str.replace("[.,!?|\n]", " ");
System.out.println(str);
q.add(str.split(""));
System.out.println("dsdssd : " + q);
return Arrays.asList(str.split(" "));
} else
return Collections.emptyList();
}
});
@SuppressWarnings("unchecked")
JavaReceiverInputDStream<String> rec = (JavaReceiverInputDStream<String>) sc.queueStream(q);
}
}
这个方法不行。 QueueDStream
在每个时间间隔内消耗给定队列中的一个 RDD,因此该过程只会工作一次。
你可以这样想象它:
QueueDStream(Queue(RDD-time1, RDD-time2, ..., RDD-time-n))
然后在每个流式传输间隔,将处理队列头部的 RDD 实例。如果您使用可变并发队列,则进程可以在队列尾部添加 RDD,而 Spark Streaming 将在每个间隔处理头部。
我不知道 Mongo 的流媒体支持,所以您可能需要从其他方面重新考虑这个过程。例如创建您自己的 Custom Receiver
String brokers = "localhost:2181";
String group="spark";
Map<String, Integer> topicMap = new HashMap<String,Integer>();
topicMap.put("twitter-topic",1);
JavaStreamingContext jssc = new JavaStreamingContext("local[4]", "SparkStream", new Duration(1200));
JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, brokers, group, topicMap);
JavaDStream<String> data = messages.map(new Function<Tuple2<String, String>, String>()
{
public String call(Tuple2<String, String> message)
{
return message._2();
}
});
data.foreachRDD(new Function<JavaRDD<String>, Void>() {
Mongo mongo = new Mongo("localhost", 27017);
DB db = mongo.getDB("mongodb");
DBCollection collection = db.getCollection("fb");
public Void call(JavaRDD<String> data) throws Exception {
// TODO Auto-generated method stub
if(data!=null){
List<String>result=data.collect();
for (String temp :result) {
System.out.println(temp);
DBObject dbObject = (DBObject) JSON.parse(temp.toString());
collection.insert(dbObject);
}
System.out.println("Inserted Data Done");
}else {
System.out.println("Got no data in this window");
}
return null;
}
});
jssc.start();
jssc.awaitTermination();
}
您使用这种类型意味着:
QueueDStream(Queue(RDD-time1, RDD-time2, ..., RDD-time-n))
这个答案:
SparkConf sparkConf = new SparkConf().setAppName("JavaQueueStream");
// Create the context
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(1000));
// Create the queue through which RDDs can be pushed to
// a QueueInputDStream
Queue<JavaRDD<Integer>> rddQueue = new LinkedList<JavaRDD<Integer>>();
// Create and push some RDDs into the queue
List<Integer> list = Lists.newArrayList();
for (int i = 0; i < 1000; i++) {
list.add(i);
}
for (int i = 0; i < 30; i++) {
rddQueue.add(ssc.sparkContext().parallelize(list));
}
// Create the QueueInputDStream and use it do some processing
JavaDStream<Integer> inputStream = ssc.queueStream(rddQueue);
JavaPairDStream<Integer, Integer> mappedStream = inputStream.mapToPair(
new PairFunction<Integer, Integer, Integer>() {
@Override
public Tuple2<Integer, Integer> call(Integer i) {
return new Tuple2<Integer, Integer>(i % 10, 1);
}
});
您可以使用 Debezium 连接器将对 Mongodb 数据库所做的任何更改推送到 Kafka,Spark 可以实时从 kafka 获取流并进行处理
在我的应用程序中,我想将数据从 MongoDB 流式传输到 Java 中的 Spark Streaming。为此,我使用了队列流,因为我认为我可以在 rdd 上保留 mongodb 数据。但是这个方法不管用还是我做错了
有人从 mongodb 流式传输到 spark 流式传输吗?我的方法是否错误,如果是,正确的方法是什么?
我的代码在这里
package com.mongodb.spark.stream;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
import java.util.Queue;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.bson.BSONObject;
import com.mongodb.hadoop.MongoInputFormat;
import scala.Tuple2;
public class MongoStream {
public static void main(String[] args) {
Configuration conf = new Configuration();
conf.set("mongo.job.input.format", "com.mongodb.hadoop.MongoInputFormat");
conf.set("mongo.input.uri", "mongodb://192.168.1.149:27017/test.observations");
SparkConf spc = new SparkConf().setMaster("local[2]").setAppName("mongo");
JavaStreamingContext sc = new JavaStreamingContext(spc, Durations.seconds(1));
final Queue q = new LinkedList<JavaRDD<String>>();
final JavaPairRDD<Object, BSONObject> rdd = sc.sparkContext().newAPIHadoopRDD(conf, MongoInputFormat.class,
Object.class, BSONObject.class);
JavaRDD<String> words = rdd.flatMap(new FlatMapFunction<Tuple2<Object, BSONObject>, String>() {
private static final long serialVersionUID = -5974149698144908239L;
@Override
public Iterable<String> call(Tuple2<Object, BSONObject> arg0) throws Exception {
Object o = arg0._2.get("SensorId").toString();
if (o instanceof String) {
String str = (String) o;
str = str.replace("[.,!?|\n]", " ");
System.out.println(str);
q.add(str.split(""));
System.out.println("dsdssd : " + q);
return Arrays.asList(str.split(" "));
} else
return Collections.emptyList();
}
});
@SuppressWarnings("unchecked")
JavaReceiverInputDStream<String> rec = (JavaReceiverInputDStream<String>) sc.queueStream(q);
}
}
这个方法不行。 QueueDStream
在每个时间间隔内消耗给定队列中的一个 RDD,因此该过程只会工作一次。
你可以这样想象它:
QueueDStream(Queue(RDD-time1, RDD-time2, ..., RDD-time-n))
然后在每个流式传输间隔,将处理队列头部的 RDD 实例。如果您使用可变并发队列,则进程可以在队列尾部添加 RDD,而 Spark Streaming 将在每个间隔处理头部。
我不知道 Mongo 的流媒体支持,所以您可能需要从其他方面重新考虑这个过程。例如创建您自己的 Custom Receiver
String brokers = "localhost:2181";
String group="spark";
Map<String, Integer> topicMap = new HashMap<String,Integer>();
topicMap.put("twitter-topic",1);
JavaStreamingContext jssc = new JavaStreamingContext("local[4]", "SparkStream", new Duration(1200));
JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, brokers, group, topicMap);
JavaDStream<String> data = messages.map(new Function<Tuple2<String, String>, String>()
{
public String call(Tuple2<String, String> message)
{
return message._2();
}
});
data.foreachRDD(new Function<JavaRDD<String>, Void>() {
Mongo mongo = new Mongo("localhost", 27017);
DB db = mongo.getDB("mongodb");
DBCollection collection = db.getCollection("fb");
public Void call(JavaRDD<String> data) throws Exception {
// TODO Auto-generated method stub
if(data!=null){
List<String>result=data.collect();
for (String temp :result) {
System.out.println(temp);
DBObject dbObject = (DBObject) JSON.parse(temp.toString());
collection.insert(dbObject);
}
System.out.println("Inserted Data Done");
}else {
System.out.println("Got no data in this window");
}
return null;
}
});
jssc.start();
jssc.awaitTermination();
}
您使用这种类型意味着:
QueueDStream(Queue(RDD-time1, RDD-time2, ..., RDD-time-n))
这个答案:
SparkConf sparkConf = new SparkConf().setAppName("JavaQueueStream");
// Create the context
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(1000));
// Create the queue through which RDDs can be pushed to
// a QueueInputDStream
Queue<JavaRDD<Integer>> rddQueue = new LinkedList<JavaRDD<Integer>>();
// Create and push some RDDs into the queue
List<Integer> list = Lists.newArrayList();
for (int i = 0; i < 1000; i++) {
list.add(i);
}
for (int i = 0; i < 30; i++) {
rddQueue.add(ssc.sparkContext().parallelize(list));
}
// Create the QueueInputDStream and use it do some processing
JavaDStream<Integer> inputStream = ssc.queueStream(rddQueue);
JavaPairDStream<Integer, Integer> mappedStream = inputStream.mapToPair(
new PairFunction<Integer, Integer, Integer>() {
@Override
public Tuple2<Integer, Integer> call(Integer i) {
return new Tuple2<Integer, Integer>(i % 10, 1);
}
});
您可以使用 Debezium 连接器将对 Mongodb 数据库所做的任何更改推送到 Kafka,Spark 可以实时从 kafka 获取流并进行处理