Apache Flink - 多条输出线

Apache Flink - multiple output lines

我正在尝试 运行 如下所示的 flink 作业以从 Apache Kafka 读取数据并打印:

Java 计划

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", "test.net:9092");
    properties.setProperty("group.id", "flink_consumer");
    properties.setProperty("zookeeper.connect", "dev.com:2181,dev2.com:2181,dev.com:2181/dev2");
    properties.setProperty("topic", "topic_name");

    DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer082<>("topic_name", new SimpleStringSchema(), properties));

            messageStream.rebalance().map(new MapFunction<String, String>() {
                private static final long serialVersionUID = -6867736771747690202L;

                public String map(String value) throws Exception {
                    return "Kafka and Flink says: " + value;
                }
            }).print();

            env.execute();

Scala 代码

  var properties = new Properties();
  properties.setProperty("bootstrap.servers", "msg01.staging.bigdata.sv2.247-inc.net:9092");
  properties.setProperty("group.id", "flink_consumer");
  properties.setProperty("zookeeper.connect", "host33.dev.swamp.sv2.tellme.com:2181,host37.dev.swamp.sv2.tellme.com:2181,host38.dev.swamp.sv2.tellme.com:2181/staging_sv2");
  properties.setProperty("topic", "sv2.staging.rtdp.idm.events.omnichannel");
  var env = StreamExecutionEnvironment.getExecutionEnvironment();
  var stream:DataStream[(String)] = env
.addSource(new FlinkKafkaConsumer082[String]("sv2.staging.rtdp.idm.events.omnichannel", new SimpleStringSchema(), properties));
  stream.print();
  env.execute();

每当我在 eclipse 的应用程序中 运行 这个时,我会看到下面的开头:

03/27/2017 20:06:19 作业执行切换到状态 运行。

03/27/2017 20:06:19 来源:自定义来源 -> 接收器:未命名(1/4) 切换到 SCHEDULED 03/27/2017 20:06:19 来源:自定义来源 -> 接收器:未命名(1/4)切换到部署 03/27/2017 20:06:19 来源:自定义来源 -> 接收器:未命名(2/4)切换到预定 03/27/2017 20:06:19 来源:自定义来源 -> 接收器:未命名(2/4)切换到部署 03/27/2017 20:06:19 来源:自定义来源 -> 接收器:未命名(3/4)切换到预定 03/27/2017 20:06:19 来源:自定义来源 -> 接收器:未命名(3/4)切换到部署 03/27/2017 20:06:19 来源:自定义来源 -> 接收器:未命名(4/4)切换到预定 03/27/2017 20:06:19 来源:自定义来源 -> 接收器:未命名(4/4)切换到部署 03/27/2017 20:06:19 来源:自定义来源 -> 接收器:未命名(4/4)切换到 运行 03/27/2017 20:06:19 来源:自定义来源 -> 接收器:未命名(2/4)切换到 运行 03/27/2017 20:06:19 来源:自定义来源 -> 接收器:未命名(1/4)切换到 运行 03/27/2017 20:06:19 来源:自定义来源 -> 接收器:未命名(3/4)切换到 运行

我的问题是:

1) 为什么我在所有情况下(计划、部署和 运行ning)看到 4 个接收器实例。

2) 对于在 Apache Kafka 中收到的每一行,我看到这里被打印了多次,大部分是 4 次。什么原因?

理想情况下,我只想读取每一行一次并对其进行进一步处理。任何 input/help 都将是可观的!

如果您 运行 LocalStreamEnvironment 中的程序(当您在 IDE 中调用 StreamExecutionEnvironment.getExecutionEnvironment() 时得到)所有运算符的默认并行度等于CPU 核心的数量。

因此在您的示例中,每个运算符都并行化为四个子任务。在日志中,您会看到这四个子任务中每一个的消息(3/4 表示这是总共四个任务中的第三个)。

您可以通过调用 StreamExecutionEnvironment.setParallelism(int) 或对每个单独的操作员调用 setParallelism(int) 来控制子任务的数量。

鉴于您的程序,不应复制 Kafka 记录。每条记录只能打印一次。但是,由于记录是并行写入的,因此输出行以 x> 为前缀,其中 x 表示发出该行的并行子任务的 ID。