如何以编程方式将文本写入 Flink 套接字?
How to programmatically write text to Flink socket?
我的目标是将字符串 "SUCCESS" 发送到套接字,让 Apache Flink 的 DataStream 获取该字符串并使用单词 "SUCCESS" 更新本地文本文件 我已经设置了一个“ Consumer DataStream”,它监视从终端发送到我本地主机上的端口 9999 的文本。请参阅下面的代码以获取有效的 Consumer DataStream 代码:
package p1;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.core.fs.FileSystem.WriteMode;
public class TestClientStreaming {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> textStream = env.socketTextStream("localhost", 9999);
SingleOutputStreamOperator<String> filteredStream = textStream.filter(new FilterFunction<String>() {
public boolean filter(String value) throws Exception {
return value.equals("SUCCESS");
}
});
filteredStream.writeAsText(<FILE-PATH>, WriteMode.OVERWRITE);
env.execute("Reading Flink Stream");
}
}
这对我有用;当我在我的终端上 运行 "nc -l 9999" 时,如果我在终端,然后按 Enter 键。 到目前为止太棒了!
现在,我想避免使用终端,但仍将文本写入此套接字,以便我的 "Consumer DataStream" 可以接收它。此文本可以是任何文本,因为我的 "Consumer DataStream" 会过滤掉它想要的文本,也就是 "SUCCESS".
我知道的唯一两种写入套接字的方法是使用终端(如前所述)或使用 Producer DataStream 不断从“Producer Text File" 并调用 DataStream.writeToSocket() 将其读取的文本写入套接字。然后我的 "Consumer Datastream" 会选择它,并按预期运行。
是否有任何其他选项可以将文本写入套接字,以便 DataStream 可以获取该文本?我尝试使用套接字库将 "SUCCESS" 写入 localhost:9999 但无济于事。
这张图片可能有助于可视化我要解决的问题以及我已经解决的问题:https://ibb.co/41GzNWM
感谢您的宝贵时间!
您需要创建一个等价于 nc -l
又名 socket server.
示例代码为:
public class SocketWriter {
public static void main(String[] args) {
int port = 9999;
boolean stop = false;
ServerSocket serverSocket = null;
try {
serverSocket = new ServerSocket(port);
while (!stop) {
Socket echoSocket = serverSocket.accept();
PrintWriter out =
new PrintWriter(echoSocket.getOutputStream(), true);
BufferedReader in =
new BufferedReader(
new InputStreamReader(echoSocket.getInputStream()));
out.println("Hello Amazing");
// do whatever logic you want.
in.close();
out.close();
echoSocket.close();
}
}
catch (IOException e) {
e.printStackTrace();
}
finally {
try {
serverSocket.close();
}
catch (IOException e) {
e.printStackTrace();
}
}
}
}
我的目标是将字符串 "SUCCESS" 发送到套接字,让 Apache Flink 的 DataStream 获取该字符串并使用单词 "SUCCESS" 更新本地文本文件 我已经设置了一个“ Consumer DataStream”,它监视从终端发送到我本地主机上的端口 9999 的文本。请参阅下面的代码以获取有效的 Consumer DataStream 代码:
package p1;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.core.fs.FileSystem.WriteMode;
public class TestClientStreaming {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> textStream = env.socketTextStream("localhost", 9999);
SingleOutputStreamOperator<String> filteredStream = textStream.filter(new FilterFunction<String>() {
public boolean filter(String value) throws Exception {
return value.equals("SUCCESS");
}
});
filteredStream.writeAsText(<FILE-PATH>, WriteMode.OVERWRITE);
env.execute("Reading Flink Stream");
}
}
这对我有用;当我在我的终端上 运行 "nc -l 9999" 时,如果我在终端,然后按 Enter 键。 到目前为止太棒了!
现在,我想避免使用终端,但仍将文本写入此套接字,以便我的 "Consumer DataStream" 可以接收它。此文本可以是任何文本,因为我的 "Consumer DataStream" 会过滤掉它想要的文本,也就是 "SUCCESS".
我知道的唯一两种写入套接字的方法是使用终端(如前所述)或使用 Producer DataStream 不断从“Producer Text File" 并调用 DataStream.writeToSocket() 将其读取的文本写入套接字。然后我的 "Consumer Datastream" 会选择它,并按预期运行。
是否有任何其他选项可以将文本写入套接字,以便 DataStream 可以获取该文本?我尝试使用套接字库将 "SUCCESS" 写入 localhost:9999 但无济于事。
这张图片可能有助于可视化我要解决的问题以及我已经解决的问题:https://ibb.co/41GzNWM
感谢您的宝贵时间!
您需要创建一个等价于 nc -l
又名 socket server.
示例代码为:
public class SocketWriter {
public static void main(String[] args) {
int port = 9999;
boolean stop = false;
ServerSocket serverSocket = null;
try {
serverSocket = new ServerSocket(port);
while (!stop) {
Socket echoSocket = serverSocket.accept();
PrintWriter out =
new PrintWriter(echoSocket.getOutputStream(), true);
BufferedReader in =
new BufferedReader(
new InputStreamReader(echoSocket.getInputStream()));
out.println("Hello Amazing");
// do whatever logic you want.
in.close();
out.close();
echoSocket.close();
}
}
catch (IOException e) {
e.printStackTrace();
}
finally {
try {
serverSocket.close();
}
catch (IOException e) {
e.printStackTrace();
}
}
}
}