在 java 中与 mongodb 进行火花流式传输

spark streaming with mongodb in java

在我的应用程序中,我想将数据从 MongoDB 流式传输到 Java 中的 Spark Streaming。为此,我使用了队列流,因为我认为我可以在 rdd 上保留 mongodb 数据。但是这个方法不管用还是我做错了

有人从 mongodb 流式传输到 spark 流式传输吗?我的方法是否错误,如果是,正确的方法是什么?

我的代码在这里

package com.mongodb.spark.stream;

import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
import java.util.Queue;

import org.apache.hadoop.conf.Configuration;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.bson.BSONObject;

import com.mongodb.hadoop.MongoInputFormat;

import scala.Tuple2;

public class MongoStream {

public static void main(String[] args) {

    Configuration conf = new Configuration();
    conf.set("mongo.job.input.format", "com.mongodb.hadoop.MongoInputFormat");
    conf.set("mongo.input.uri", "mongodb://192.168.1.149:27017/test.observations");

    SparkConf spc = new SparkConf().setMaster("local[2]").setAppName("mongo");

    JavaStreamingContext sc = new JavaStreamingContext(spc, Durations.seconds(1));

    final Queue q = new LinkedList<JavaRDD<String>>();

    final JavaPairRDD<Object, BSONObject> rdd = sc.sparkContext().newAPIHadoopRDD(conf, MongoInputFormat.class,
            Object.class, BSONObject.class);

    JavaRDD<String> words = rdd.flatMap(new FlatMapFunction<Tuple2<Object, BSONObject>, String>() {

        private static final long serialVersionUID = -5974149698144908239L;

        @Override
        public Iterable<String> call(Tuple2<Object, BSONObject> arg0) throws Exception {

            Object o = arg0._2.get("SensorId").toString();
            if (o instanceof String) {
                String str = (String) o;
                str = str.replace("[.,!?|\n]", " ");
                System.out.println(str);

                q.add(str.split(""));
                System.out.println("dsdssd : " + q);
                return Arrays.asList(str.split(" "));

            } else
                return Collections.emptyList();

        }
    });

    @SuppressWarnings("unchecked")
    JavaReceiverInputDStream<String> rec = (JavaReceiverInputDStream<String>) sc.queueStream(q);

}

}

这个方法不行。 QueueDStream 在每个时间间隔内消耗给定队列中的一个 RDD,因此该过程只会工作一次。

你可以这样想象它:

QueueDStream(Queue(RDD-time1, RDD-time2, ..., RDD-time-n))

然后在每个流式传输间隔,将处理队列头部的 RDD 实例。如果您使用可变并发队列,则进程可以在队列尾部添加 RDD,而 Spark Streaming 将在每个间隔处理头部。

我不知道 Mongo 的流媒体支持,所以您可能需要从其他方面重新考虑这个过程。例如创建您自己的 Custom Receiver

 String brokers = "localhost:2181";
    String group="spark";   

    Map<String, Integer> topicMap = new HashMap<String,Integer>();
            topicMap.put("twitter-topic",1);
    JavaStreamingContext jssc = new JavaStreamingContext("local[4]", "SparkStream", new  Duration(1200));
      JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, brokers, group, topicMap);

    JavaDStream<String> data = messages.map(new Function<Tuple2<String, String>, String>() 
                                                    {
                                                        public String call(Tuple2<String, String> message)
                                                        {
                                                            return message._2();
                                                        }

                                                    });
            data.foreachRDD(new Function<JavaRDD<String>, Void>() {
                Mongo mongo = new Mongo("localhost", 27017);
                DB db = mongo.getDB("mongodb");

                DBCollection collection = db.getCollection("fb");

                public Void call(JavaRDD<String> data) throws Exception {
                    // TODO Auto-generated method stub
                    if(data!=null){
                        List<String>result=data.collect();

                        for (String temp :result) {

                            System.out.println(temp);

                            DBObject dbObject = (DBObject) JSON.parse(temp.toString());

                            collection.insert(dbObject);

                        }
                        System.out.println("Inserted Data Done");

                    }else {
                        System.out.println("Got no data in this window");
                    }

                    return null;
                } 

            });
    jssc.start();
    jssc.awaitTermination();
}

您使用这种类型意味着:

QueueDStream(Queue(RDD-time1, RDD-time2, ...,  RDD-time-n))

这个答案:

SparkConf sparkConf = new SparkConf().setAppName("JavaQueueStream");

// Create the context
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(1000));

// Create the queue through which RDDs can be pushed to
// a QueueInputDStream
Queue<JavaRDD<Integer>> rddQueue = new LinkedList<JavaRDD<Integer>>();

// Create and push some RDDs into the queue
List<Integer> list = Lists.newArrayList();
for (int i = 0; i < 1000; i++) {
  list.add(i);
}

for (int i = 0; i < 30; i++) {
  rddQueue.add(ssc.sparkContext().parallelize(list));
}

// Create the QueueInputDStream and use it do some processing
JavaDStream<Integer> inputStream = ssc.queueStream(rddQueue);
JavaPairDStream<Integer, Integer> mappedStream = inputStream.mapToPair(
    new PairFunction<Integer, Integer, Integer>() {
      @Override
      public Tuple2<Integer, Integer> call(Integer i) {
        return new Tuple2<Integer, Integer>(i % 10, 1);
      }
    });

您可以使用 Debezium 连接器将对 Mongodb 数据库所做的任何更改推送到 Kafka,Spark 可以实时从 kafka 获取流并进行处理