Apache Spark 可以使用 TCP 侦听器作为输入吗?

Can Apache Spark use TCP listener as input?

Apache Spark 可以使用 TCP 侦听器作为输入吗?如果是,也许有人有执行该操作的 java 代码示例。

我试图找到关于此的示例,但所有教程都展示了如何通过 TCP 定义到数据服务器的输入连接,而不是使用等待传入数据的 TCP 侦听器。

是的,可以使用 Spark 侦听 TCP 端口并处理任何传入数据。您要找的是 Spark Streaming.

documentation and on github 中有一个侦听 TCP 源的小指南。为了方便起见:

import org.apache.spark.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;

// Create a local StreamingContext with two working thread and batch interval of 1 second
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));

// Create a DStream that will connect to hostname:port, like localhost:9999
JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);

// Split each line into words
JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(x.split(" ")).iterator());

// Count each word in each batch
JavaPairDStream<String, Integer> pairs = words.mapToPair(s -> new Tuple2<>(s, 1));
JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey((i1, i2) -> i1 + i2);

// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print();

jssc.start();              // Start the computation
jssc.awaitTermination();   // Wait for the computation to terminate

Spark 没有内置的 TCP 服务器来等待生产者和缓冲数据。 Spark 通过其 API 库在 TCP、Kafka 等的轮询机制上工作。要使用传入的 TCP 数据,您需要有一个 Spark 可以连接到的外部 TCP 服务器,如 Shaido 在示例中所解释的那样。