Flink RichParallelSourceFunction - close() 与 cancel()

Flink RichParallelSourceFunction - close() vs cancel()

我正在实现一个通过 SFTP 读取文件的 RichParallelSourceFunction。 RichParallelSourceFunction 继承了 SourceFunction 的 cancel() 和 RichFunction() 的 close()。据我了解,在拆除源代码之前会调用 cancel() 和 close() 。所以在他们两个中我都必须添加逻辑来停止读取文件的无限循环。

当我将源的并行度设置为 1 并且我 运行 来自 IDE 的 Flink 作业时,Flink 运行time 在调用 start( ) 并且整个作业停止。我没想到会这样。

当我将源的并行度设置为 1 并且我 运行 集群中的 Flink 作业时,作业 运行 照常进行。 如果我将源的并行度保留为默认值(在我的例子中为 4),作业 运行s 照常进行。

使用 Flink 1.7。


public class SftpSource<TYPE_OF_RECORD>
    extends RichParallelSourceFunction<TYPE_OF_RECORD>
{
    private final SftpConnection mConnection;
    private boolean mSourceIsRunning;

    @Override 
    public void open(Configuration parameters) throws Exception
    {
        mConnection.open();
    }

    @Override 
    public void close()
    {
        mSourceIsRunning = false;
    }


    @Override
    public void run(SourceContext<TYPE_OF_RECORD> aContext)
    {
        while (mSourceIsRunning)
        {
            synchronized ( aContext.getCheckpointLock() )
            {
                // use mConnection
                // aContext.collect() ...
            }

            try
            {
                Thread.sleep(1000);
            }
            catch (InterruptedException ie)
            {
                mLogger.warn("Thread error: {}", ie.getMessage() );
            }
        }

        mConnection.close();
    }


    @Override
    public void cancel()
    {
        mSourceIsRunning = false;
    }
}

所以我有解决方法,问题更多是关于理论的。为什么在并行度为 1 且作业是来自 IDE(即来自命令行)的 运行 时调用 close()? 另外,close() 和 cancel() 在 RichParallelSourceFunction 中做同样的事情吗?

我认为 javadoc 比 self-explanatory:

Gracefully Stopping Functions
Functions may additionally implement the {@link org.apache.flink.api.common.functions.StoppableFunction} interface. "Stopping" a function, in contrast to "canceling" means a graceful exit that leaves the state and the emitted elements in a consistent state.

-- SourceFunction.cancel

Cancels the source. Most sources will have a while loop inside the run(SourceContext) method. The implementation needs to ensure that the source will break out of that loop after this method is called.
A typical pattern is to have an "volatile boolean isRunning" flag that is set to false in this method. That flag is checked in the loop condition.

When a source is canceled, the executing thread will also be interrupted (via Thread.interrupt()). The interruption happens strictly after this method has been called, so any interruption handler can rely on the fact that this method has completed. It is good practice to make any flags altered by this method "volatile", in order to guarantee the visibility of the effects of this method to any interruption handler.

-- SourceContext.close

This method is called by the system to shut down the context.

注意,可以取消SourceFunction,但停止SourceContext

Why is close() invoked if parallelism is 1 and the job is run from the IDE.

close 在最后一次调用主要工作方法(例如 map 或 join)之后调用。此方法可用于清理工作。 它将独立于并行度中定义的数量被调用。

Also, do close() and cancel() do the same in a RichParallelSourceFunction?

它们不是一回事,看看它是如何描述的。

Cancels the source. Most sources will have a while loop inside the run(SourceContext) method. The implementation needs to ensure that the source will break out of that loop after this method is called.

https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.html#cancel--

以下link可以帮助您理解任务生命周期: https://ci.apache.org/projects/flink/flink-docs-stable/internals/task_lifecycle.html#operator-lifecycle-in-a-nutshell

我在我的代码中发现了一个错误。这是修复

public void open(Configuration parameters) throws Exception
{
    mConnection.open();
    mSourceIsRunning = true;
}

现在,在我决定停止工作流之前不会调用 close(),在这种情况下,首先调用 cancel(),然后调用 close()。我仍然想知道并行性是如何影响行为的。