Flink 1.11 FlinkKafkaConsumer 传播水印失败,Flink 1.12 传播成功

Flink 1.11 FlinkKafkaConsumer fails to propagate watermarks while Flink 1.12 succeeds

我看到一些奇怪的行为。我使用 Flink 1.12 编写了一些 Flink 处理器,并试图让它们在 Amazon EMR 上运行。不过 Amazon EMR 目前只支持 Flink 1.11.2。去降级的时候,莫名其妙的发现水印不再传播了

主题上只有一个分区,并行度设置为 1。我是否遗漏了什么?我觉得我有点疯了。

这是 Flink 1.12 的输出:

Topic:input partitions=1
(name=input, internal=false, partitions=(partition=0, leader=mycomputer:9092 (id: 0 rack: null), replicas=mycomputer:9092 (id: 0 rack: null), isr=mycomputer:9092 (id: 0 rack: null)), authorizedOperations=null)
{
  "nodes" : [ {
    "id" : 1,
    "type" : "Source: Custom Source",
    "pact" : "Data Source",
    "contents" : "Source: Custom Source",
    "parallelism" : 1
  }, {
    "id" : 2,
    "type" : "Process",
    "pact" : "Operator",
    "contents" : "Process",
    "parallelism" : 1,
    "predecessors" : [ {
      "id" : 1,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  } ]
}
Topic:input partitions=1
(name=input, internal=false, partitions=(partition=0, leader=mycomputer:9092 (id: 0 rack: null), replicas=mycomputer:9092 (id: 0 rack: null), isr=mycomputer:9092 (id: 0 rack: null)), authorizedOperations=null)
Assigning timestamp 86400000
Source [timestamp=86400000 watermark=-9223372036854775808] test message
Emitting watermark 0
Assigning timestamp 864000000
Source [timestamp=864000000 watermark=0] test message
Emitting watermark 777600000
Assigning timestamp 8640000000
Source [timestamp=8640000000 watermark=777600000] test message
Emitting watermark 8553600000
Assigning timestamp 86400000000
Source [timestamp=86400000000 watermark=8553600000] test message
Emitting watermark 86313600000
Assigning timestamp 9223372036854775807
Source [timestamp=9223372036854775807 watermark=86313600000] test message
Emitting watermark 9223372036768375807

这是 Flink 1.11 的输出:

Topic:input partitions=1
(name=input, internal=false, partitions=(partition=0, leader=mycomputer:9092 (id: 0 rack: null), replicas=mycomputer:9092 (id: 0 rack: null), isr=mycomputer:9092 (id: 0 rack: null)), authorizedOperations=null)
{
  "nodes" : [ {
    "id" : 1,
    "type" : "Source: Custom Source",
    "pact" : "Data Source",
    "contents" : "Source: Custom Source",
    "parallelism" : 1
  }, {
    "id" : 2,
    "type" : "Process",
    "pact" : "Operator",
    "contents" : "Process",
    "parallelism" : 1,
    "predecessors" : [ {
      "id" : 1,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  } ]
}
Topic:input partitions=1
(name=input, internal=false, partitions=(partition=0, leader=mycomputer:9092 (id: 0 rack: null), replicas=mycomputer:9092 (id: 0 rack: null), isr=mycomputer:9092 (id: 0 rack: null)), authorizedOperations=null)
Assigning timestamp 86400000
Source [timestamp=0 watermark=-9223372036854775808] test message
Emitting watermark 0
Assigning timestamp 864000000
Source [timestamp=0 watermark=-9223372036854775808] test message
Emitting watermark 777600000
Assigning timestamp 8640000000
Source [timestamp=0 watermark=-9223372036854775808] test message
Emitting watermark 8553600000
Assigning timestamp 86400000000
Source [timestamp=0 watermark=-9223372036854775808] test message
Emitting watermark 86313600000
Assigning timestamp 9223372036854775807
Source [timestamp=0 watermark=-9223372036854775808] test message
Emitting watermark 9223372036768375807

这是公开它的集成测试:

package mytest;

import java.io.FileInputStream;
import java.io.InputStream;
import java.io.IOException;

import java.nio.file.Files;
import java.nio.file.Paths;

import java.text.SimpleDateFormat;

import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;

import kafka.utils.MockTime;
import kafka.utils.TestUtils;

import kafka.zk.EmbeddedZookeeper;

import org.apache.flink.api.common.eventtime.TimestampAssigner;
import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.client.JobCancellationException;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction.Context;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.StreamsConfig;

import org.junit.*;

public class FailTest {
    public Properties getKafkaConsumerProperties() {
        Properties result = new Properties();
        result.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-application");
        result.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        result.put("group.id", "0");
        result.put("enable.auto.commit", "true");
        result.put("auto.commit.interval.ms", "1000");
        result.put("session.timeout.ms", "30000");
        return result;
    }

    public Properties getProducerProperties() {
        // Use Kafka provided properties
        Properties result = new Properties();
        result.put("bootstrap.servers", "localhost:9092");
        result.put("compression.type", "none");
        return result;
    }

    public Properties getServerProperties(int port) {
        // Use Kafka provided properties
        Properties result = new Properties();
        result.put("broker.id", "0");
        result.put("num.network.threads", "3");
        result.put("num.io.threads", "8");
        result.put("socket.send.buffer.bytes", "102400");
        result.put("socket.recv.buffer.bytes", "102400");
        result.put("num.partitions", "1");
        result.put("offset.topic.replication.factor", "1");
        result.put("transaction.state.log.replication.factor", "1");
        result.put("transaction.state.log.min.isr", "1");
        result.put("log.retention.hours", "168");
        result.put("log.segment.bytes", "1073741824");
        result.put("log.retention.check.interval.ms", "300000");
        result.put("zookeeper.connect", "localhost:" + port);
        result.put("zookeeper.connection.timeout.ms", "18000");
        result.put("group.initial.rebalance.delay.ms", "0");

        String path = "target/kafka-logs/run.";
        int index = 0;
        while (!Files.notExists(Paths.get(path + String.valueOf(index)))) {
            index += 1;
        }
        result.put("log.dirs", path + String.valueOf(index));
        return result;
    }

    public void printTopics(AdminClient admin, String inputTopic) throws Exception {
        Map<String, TopicDescription> topics = admin.describeTopics(Arrays.asList(inputTopic)).all().get();
        for (Map.Entry<String, TopicDescription> topic : topics.entrySet()) {
            System.out.printf("Topic:%s partitions=%d\n", topic.getValue().name(), topic.getValue().partitions().size());
            System.out.println(topic.getValue().toString());
        }
    }

    @ClassRule
    public static MiniClusterWithClientResource flinkCluster =
        new MiniClusterWithClientResource(
            new MiniClusterResourceConfiguration.Builder()
                .setNumberSlotsPerTaskManager(1)
                .setNumberTaskManagers(1)
                .build());

    @Test
    public void testFail() throws Exception {
        StringSerializer stringSerializer = new StringSerializer();
        StringDeserializer stringDeserializer = new StringDeserializer();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        
        EmbeddedZookeeper zooKeeper = new EmbeddedZookeeper();
        KafkaServer server = TestUtils.createServer(new KafkaConfig(getServerProperties(zooKeeper.port())), new MockTime());
        AdminClient admin = AdminClient.create(getProducerProperties());

        String inputTopic = "input";

        Map<String, String> configs = new HashMap<>();
        int partitions = 1;
        short replication = 1;

        CreateTopicsResult result = admin.createTopics(Arrays.asList(
            new NewTopic(inputTopic, partitions, replication).configs(configs)
        ));
        result.all().get();

        printTopics(admin, inputTopic);

        // Some subscription events
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(getProducerProperties(), stringSerializer, stringSerializer);
        producer.send(new ProducerRecord<String, String>(inputTopic, 0, Time.days(1).toMilliseconds(), "0", "test message"));
        producer.send(new ProducerRecord<String, String>(inputTopic, 0, Time.days(10).toMilliseconds(), "0", "test message"));
        producer.send(new ProducerRecord<String, String>(inputTopic, 0, Time.days(100).toMilliseconds(), "0", "test message"));
        producer.send(new ProducerRecord<String, String>(inputTopic, 0, Time.days(1000).toMilliseconds(), "0", "test message"));
        producer.send(new ProducerRecord<String, String>(inputTopic, 0, Long.MAX_VALUE, "0", "test message"));
        producer.flush();
        producer.close();

        FlinkKafkaConsumer<String> source = new FlinkKafkaConsumer<String>(inputTopic, new SimpleStringSchema(), getKafkaConsumerProperties());
        source.setStartFromEarliest();
        source.assignTimestampsAndWatermarks(
            new WatermarkStrategy<String>() {
                @Override
                public TimestampAssigner<String> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
                    return new TimestampAssigner<String>() {
                        @Override
                        public long extractTimestamp(String event, long recordTimestamp) {
                            System.out.printf("Assigning timestamp %d\n", recordTimestamp);
                            return recordTimestamp;
                        }
                    };
                }

                @Override
                public WatermarkGenerator<String> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
                    return new WatermarkGenerator<String>() {
                        public long latestWatermark = Long.MIN_VALUE;

                        @Override
                        public void onEvent(String event, long timestamp, WatermarkOutput output) {
                            long eventWatermark = timestamp - Time.days(1).toMilliseconds();
                            if (eventWatermark > latestWatermark) {
                                System.out.printf("Emitting watermark %d\n", eventWatermark);
                                output.emitWatermark(new Watermark(eventWatermark));
                                latestWatermark = eventWatermark;
                            }
                        }

                        @Override
                        public void onPeriodicEmit(WatermarkOutput output) {
                        }
                    };
                }
            });

        env.addSource(source)
            .process(new ProcessFunction<String, String>() {
                @Override
                public void processElement(String value, Context ctx, Collector<String> out) {
                    System.out.printf("Source ");
                    if (ctx != null) {
                        TimerService srv = ctx.timerService();
                        Long timestampLong = ctx.timestamp();
                        long timestamp = 0;
                        if (timestampLong != null) {
                            timestamp = timestampLong;
                        }
                        long watermark = 0;
                        if (srv != null) {
                            watermark = srv.currentWatermark();
                        }
                        System.out.printf("[timestamp=%d watermark=%d] ", timestamp, watermark);
                    }

                    System.out.println(value);
                    out.collect(value);
                }
            });

        System.out.println(env.getExecutionPlan());
        JobClient client = null;
        try {
            client = env.executeAsync("Fail Test");
        } catch (Exception e) {
            e.printStackTrace();
            throw e;
        }

        printTopics(admin, inputTopic);

        TimeUnit.SECONDS.sleep(5);
        client.cancel().get(5, TimeUnit.SECONDS);

        try {
            server.shutdown();
            zooKeeper.shutdown();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

事实证明,Flink 1.12 将 TimeCharacteristic 默认为 EventTime,并弃用了整个 TimeCharacteristic 流程。所以要降级到Flink 1.11,必须添加如下语句来配置StreamExecutionEnvironment。

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);