如何以编程方式将文本写入 Flink 套接字?

How to programmatically write text to Flink socket?

我的目标是将字符串 "SUCCESS" 发送到套接字,让 Apache Flink 的 DataStream 获取该字符串并使用单词 "SUCCESS" 更新本地文本文件 我已经设置了一个“ Consumer DataStream”,它监视从终端发送到我本地主机上的端口 9999 的文本。请参阅下面的代码以获取有效的 Consumer DataStream 代码:​​

package p1;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.core.fs.FileSystem.WriteMode;

public class TestClientStreaming {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<String> textStream = env.socketTextStream("localhost", 9999);

        SingleOutputStreamOperator<String> filteredStream = textStream.filter(new FilterFunction<String>() {
            public boolean filter(String value) throws Exception {
                return value.equals("SUCCESS");
            }
        });

        filteredStream.writeAsText(<FILE-PATH>, WriteMode.OVERWRITE);

        env.execute("Reading Flink Stream");
    }
}

这对我有用;当我在我的终端上 运行 "nc -l 9999" 时,如果我在终端,然后按 Enter 键。 到目前为止太棒了!

现在,我想避免使用终端,但仍将文本写入此套接字,以便我的 "Consumer DataStream" 可以接收它。此文本可以是任何文本,因为我的 "Consumer DataStream" 会过滤掉它想要的文本,也就是 "SUCCESS".

我知道的唯一两种写入套接字的方法是使用终端(如前所述)或使用 Producer DataStream 不断从“Producer Text File" 并调用 DataStream.writeToSocket() 将其读取的文本写入套接字。然后我的 "Consumer Datastream" 会选择它,并按预期运行。

是否有任何其他选项可以将文本写入套接字,以便 DataStream 可以获取该文本?我尝试使用套接字库将 "SUCCESS" 写入 localhost:9999 但无济于事。

这张图片可能有助于可视化我要解决的问题以及我已经解决的问题:https://ibb.co/41GzNWM

感谢您的宝贵时间!

您需要创建一个等价于 nc -l 又名 socket server.

示例代码为:

public class SocketWriter {

    public static void main(String[] args) {
        int port = 9999;
        boolean stop = false;
        ServerSocket serverSocket = null;


        try {

            serverSocket = new ServerSocket(port);

            while (!stop) {
                Socket echoSocket = serverSocket.accept();
                PrintWriter out =
                        new PrintWriter(echoSocket.getOutputStream(), true);
                BufferedReader in =
                        new BufferedReader(
                                new InputStreamReader(echoSocket.getInputStream()));

                out.println("Hello Amazing");
                // do whatever logic you want.

                in.close();
                out.close();
                echoSocket.close();

            }
        }
        catch (IOException e) {
            e.printStackTrace();
        }
        finally {
            try {
                serverSocket.close();
            }
            catch (IOException e) {
                e.printStackTrace();
            }
        }

    }
}