函数在生产中成功执行但在 Flink 测试中未成功执行

Function executes succesfully in production but not in test in Flink

我在Flink 1.12.3写了一个集成测试,测试了StreamingJobclass中的execute方法。奇怪的是,这个方法在生产环境中输出记录sink成功,但是在本地测试中却没有任何输出。我该如何解决这个问题并启用测试?

This may be related

    private static final DeviceIdSink deviceIdSink = new DeviceIdSink();
    
    @ClassRule
    public static MiniClusterWithClientResource flinkCluster =
            new MiniClusterWithClientResource(
                    new MiniClusterResourceConfiguration.Builder()
                            .setNumberSlotsPerTaskManager(1)
                            .setNumberTaskManagers(2)
                            .build());
    
    @Test
    public void testingAStreamingJob() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        
        List<JsonNode> events = getListFromResource("events.json");
        DataStream<JsonNode> testStream = env.fromCollection(events);

        StreamingJob job = new StreamingJob(env, Time.seconds(60),
                 testStream, deviceIdSink);


        job.execute();

        System.out.println(deviceIdSink.values);


    ```

一旦 testStream 源耗尽,作业将终止。因此,如果您有任何基于时间的窗口发生,您将有永远不会发出的未决结果。

我使用的 MockSource 在调用 cancel() 方法之前不会终止,例如

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * A very simple, non-parallel source based on a list of elements. You can specify a delay
 * for between each element that is emitted.
 * 
 * @param <T>
 */
@SuppressWarnings("serial")
public class MockSource<T> implements SourceFunction<T>, ResultTypeQueryable<T>, Serializable {
    
    private static final Logger LOGGER = LoggerFactory.getLogger(MockSource.class);
    
    private int listSize;
    private byte[] elementsSerialized;
    private TypeInformation<T> typeInfo;
    private TypeSerializer<T> serializer;
    private Time delay = null;
    
    private transient volatile boolean running;

    // Constructor for cases where you want an empty list as the source.
    public MockSource(TypeInformation<T> typeInfo) throws IOException {
        this(Collections.emptyList(), typeInfo);
    }

    @SuppressWarnings("unchecked")
    public MockSource(T... elements) throws IOException {
        this((List<T>) Arrays.asList(elements));
    }

    /**
     * Create a source from <data>, which cannot be empty (if so, use the other constructor that takes a typeInfo
     * argument.
     * 
     * @param data
     * @throws IOException
     */
    public MockSource(List<T> data) throws IOException {
        this(data, TypeExtractor.getForObject(data.get(0)));
    }

    public MockSource(List<T> data, TypeInformation<T> typeInfo) throws IOException {
        this.typeInfo = typeInfo;
        this.serializer = typeInfo.createSerializer(new ExecutionConfig());

        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        DataOutputViewStreamWrapper wrapper = new DataOutputViewStreamWrapper(baos);

        listSize = 0;
        try {
            for (T element : data) {
                serializer.serialize(element, wrapper);
                listSize++;
            }
        } catch (Exception e) {
            throw new IOException("Serializing the source elements failed: " + e.getMessage(), e);
        }

        this.elementsSerialized = baos.toByteArray();
    }

    public MockSource<T> setDelay(Time delay) {
        this.delay = delay;
        return this;
    }
    
    @Override
    public void run(SourceContext<T> ctx) throws Exception {
        running = true;
        Object lock = ctx.getCheckpointLock();

        ByteArrayInputStream bais = new ByteArrayInputStream(elementsSerialized);
        final DataInputView input = new DataInputViewStreamWrapper(bais);

        int i = 0;
        while (running && (i < this.listSize)) {
            T next;
            try {
                next = serializer.deserialize(input);
            } catch (Exception e) {
                throw new IOException("Failed to deserialize an element from the source. "
                        + "If you are using user-defined serialization (Value and Writable types), check the "
                        + "serialization functions.\nSerializer is " + serializer, e);
            }

            synchronized (lock) {
                ctx.collect(next);
                i++;
                
                if (delay != null) {
                    LOGGER.debug("MockSource delaying for {}ms", delay.toMilliseconds());
                    
                    Thread.sleep(delay.toMilliseconds());
                }
            }
        }
    }

    @Override
    public void cancel() {
        running = false;
    }

    @Override
    public TypeInformation<T> getProducedType() {
        return typeInfo;
    }
}