在kafka集群节点之间分发数据套接字
Distributing data socket among kafka cluster nodes
我想从socket中获取数据放到kafka topic中,让我的flink程序可以从topic中读取数据并进行处理。我可以在一个节点上做到这一点。但是我想要一个至少有三个不同节点(不同 IP 地址)的 kafka 集群,并从套接字轮询数据以将其分发给 nodes.I 不知道如何执行此操作并更改此代码。我的简单程序如下:
public class WordCount {
public static void main(String[] args) throws Exception {
kafka_test objKafka=new kafka_test();
// Checking input parameters
final ParameterTool params = ParameterTool.fromArgs(args);
int myport = 9999;
String hostname = "localhost";
// set up the execution environment
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// make parameters available in the web interface
env.getConfig().setGlobalJobParameters(params);
DataStream<String> stream = env.socketTextStream(hostname,myport);
stream.addSink(objKafka.createStringProducer("testFlink",
"localhost:9092"));
DataStream<String> text =
env.addSource(objKafka.createStringConsumerForTopic("testFlink",
"localhost:9092", "test"));
DataStream<Tuple2<String, Long>> counts = text
.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Long>> out)
{
// normalize and split the line
String[] words = value.toLowerCase().split("\W+");
// emit the pairs
for (String word : words) {
if (!word.isEmpty()) {
out.collect(new Tuple2<String, Long>(word, 1L));
}
}
}
})
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1);
// emit result
if (params.has("output")) {
counts.writeAsText(params.get("output"));
} else {
System.out.println("Printing result to stdout. Use --output
to specify output path.");
counts.print();
}
// execute program
env.execute("Streaming WordCount");
}//main
}
public class kafka_test {
public FlinkKafkaConsumer<String> createStringConsumerForTopic(
String topic, String kafkaAddress, String kafkaGroup) {
// ************************** KAFKA Properties ******
Properties props = new Properties();
props.setProperty("bootstrap.servers", kafkaAddress);
props.setProperty("group.id", kafkaGroup);
FlinkKafkaConsumer<String> myconsumer = new FlinkKafkaConsumer<>(
topic, new SimpleStringSchema(), props);
myconsumer.setStartFromLatest();
return myconsumer;
}
public FlinkKafkaProducer<String> createStringProducer(
String topic, String kafkaAddress) {
return new FlinkKafkaProducer<>(kafkaAddress,
topic, new SimpleStringSchema());
}
}
能否指导我如何在不同的kafka节点之间广播socket流数据?
如有任何帮助,我们将不胜感激。
我认为你的代码是正确的。 Kafka 会处理数据的"distribution"。数据将如何在 Kafka 代理之间分发将取决于主题配置。
检查答案 以更好地理解 Kafka 主题和分区。
假设您有 3 个 Kafka 代理。然后,如果您使用 3 个副本和 3 个分区创建主题
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic my-topic
这将导致您的主题将有 3 个分区,每个分区将在您的集群中存储 3 次。使用 3 个代理,您将在每个代理上存储 1 个分区和 2 个副本。
然后你只需要创建你的 Kafka Sink
FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<String>(
"broker1:9092,broker2:9092,broker3:9092",
"my-topic",
new SimpleStringSchema());
stream.addSink(myProducer);
我想从socket中获取数据放到kafka topic中,让我的flink程序可以从topic中读取数据并进行处理。我可以在一个节点上做到这一点。但是我想要一个至少有三个不同节点(不同 IP 地址)的 kafka 集群,并从套接字轮询数据以将其分发给 nodes.I 不知道如何执行此操作并更改此代码。我的简单程序如下:
public class WordCount {
public static void main(String[] args) throws Exception {
kafka_test objKafka=new kafka_test();
// Checking input parameters
final ParameterTool params = ParameterTool.fromArgs(args);
int myport = 9999;
String hostname = "localhost";
// set up the execution environment
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// make parameters available in the web interface
env.getConfig().setGlobalJobParameters(params);
DataStream<String> stream = env.socketTextStream(hostname,myport);
stream.addSink(objKafka.createStringProducer("testFlink",
"localhost:9092"));
DataStream<String> text =
env.addSource(objKafka.createStringConsumerForTopic("testFlink",
"localhost:9092", "test"));
DataStream<Tuple2<String, Long>> counts = text
.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Long>> out)
{
// normalize and split the line
String[] words = value.toLowerCase().split("\W+");
// emit the pairs
for (String word : words) {
if (!word.isEmpty()) {
out.collect(new Tuple2<String, Long>(word, 1L));
}
}
}
})
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1);
// emit result
if (params.has("output")) {
counts.writeAsText(params.get("output"));
} else {
System.out.println("Printing result to stdout. Use --output
to specify output path.");
counts.print();
}
// execute program
env.execute("Streaming WordCount");
}//main
}
public class kafka_test {
public FlinkKafkaConsumer<String> createStringConsumerForTopic(
String topic, String kafkaAddress, String kafkaGroup) {
// ************************** KAFKA Properties ******
Properties props = new Properties();
props.setProperty("bootstrap.servers", kafkaAddress);
props.setProperty("group.id", kafkaGroup);
FlinkKafkaConsumer<String> myconsumer = new FlinkKafkaConsumer<>(
topic, new SimpleStringSchema(), props);
myconsumer.setStartFromLatest();
return myconsumer;
}
public FlinkKafkaProducer<String> createStringProducer(
String topic, String kafkaAddress) {
return new FlinkKafkaProducer<>(kafkaAddress,
topic, new SimpleStringSchema());
}
}
能否指导我如何在不同的kafka节点之间广播socket流数据?
如有任何帮助,我们将不胜感激。
我认为你的代码是正确的。 Kafka 会处理数据的"distribution"。数据将如何在 Kafka 代理之间分发将取决于主题配置。
检查答案
假设您有 3 个 Kafka 代理。然后,如果您使用 3 个副本和 3 个分区创建主题
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic my-topic
这将导致您的主题将有 3 个分区,每个分区将在您的集群中存储 3 次。使用 3 个代理,您将在每个代理上存储 1 个分区和 2 个副本。
然后你只需要创建你的 Kafka Sink
FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<String>(
"broker1:9092,broker2:9092,broker3:9092",
"my-topic",
new SimpleStringSchema());
stream.addSink(myProducer);