如何在 Flink 中连接两个 unkeyed 流并相互共享状态?
How to connect two unkeyed streams and share states with each other in Flink?
Flink 版本 1.6.1
在下面的示例中,我想连接两个未加密的流。但似乎这两个流无法正确共享状态。我不知道什么是正确的实现方式。
代码:
public class TransactionJob {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> stream1 = env.fromElements("1", "2");
DataStream<Integer> stream2 = env.fromElements(3, 4, 5);
ConnectedStreams<String, Integer> connectedStreams = stream1.connect(stream2);
DataStream<String> resultStream = connectedStreams.process(new StringIntegerCoProcessFunction());
resultStream.print().setParallelism(1);
env.execute();
}
private static class StringIntegerCoProcessFunction extends CoProcessFunction<String, Integer, String> implements CheckpointedFunction {
private transient ListState<String> state1;
private transient ListState<Integer> state2;
@Override
public void processElement1(String value, Context ctx, Collector<String> out) throws Exception {
state1.add(value);
print(value);
}
@Override
public void processElement2(Integer value, Context ctx, Collector<String> out) throws Exception {
state2.add(value);
print(value.toString());
}
private void print(String value) throws Exception {
StringBuilder builder = new StringBuilder();
builder.append("input value is " + value + ".");
builder.append("state1 has ");
for (String str : state1.get()) {
builder.append(str + ",");
}
builder.append("state2 has ");
for (Integer integer : state2.get()) {
builder.append(integer.toString() + ",");
}
System.out.println(builder.toString());
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
ListStateDescriptor<String> descriptor1 =
new ListStateDescriptor<>(
"state1",
TypeInformation.of(new TypeHint<String>() {
}));
ListStateDescriptor<Integer> descriptor2 =
new ListStateDescriptor<>(
"state2",
TypeInformation.of(new TypeHint<Integer>() {
}));
state1 = context.getOperatorStateStore().getListState(descriptor1);
state2 = context.getOperatorStateStore().getListState(descriptor2);
}
}
}
输出:
input value is 4.state1 has state2 has 4,
input value is 2.state1 has 2,state2 has 4,
input value is 3.state1 has state2 has 3,
input value is 1.state1 has 1,state2 has 3,
input value is 5.state1 has state2 has 5,
我希望最后的输出是
input value is XX .state1 has 1,2 state2 has 3,4,5
但实际上输出看起来像是输入项被分区了。 4和2在一个分区中,3和1在另一个分区中。我想访问 processElement1
和 processElement2
.
中存储在 state1 和 state2 中的所有数据
您应该修改作业的开头,如下所示:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
...
这将导致整个作业 运行 的并行度为 1。您确实有
resultStream.print().setParallelism(1);
其效果是将打印接收器的并行度设置为 1,但作业的其余部分是 运行 默认并行度,明显大于 1。
或者,您可以使用相同的常量键对两个流进行键控,然后使用键控状态。
Flink 版本 1.6.1
在下面的示例中,我想连接两个未加密的流。但似乎这两个流无法正确共享状态。我不知道什么是正确的实现方式。
代码:
public class TransactionJob {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> stream1 = env.fromElements("1", "2");
DataStream<Integer> stream2 = env.fromElements(3, 4, 5);
ConnectedStreams<String, Integer> connectedStreams = stream1.connect(stream2);
DataStream<String> resultStream = connectedStreams.process(new StringIntegerCoProcessFunction());
resultStream.print().setParallelism(1);
env.execute();
}
private static class StringIntegerCoProcessFunction extends CoProcessFunction<String, Integer, String> implements CheckpointedFunction {
private transient ListState<String> state1;
private transient ListState<Integer> state2;
@Override
public void processElement1(String value, Context ctx, Collector<String> out) throws Exception {
state1.add(value);
print(value);
}
@Override
public void processElement2(Integer value, Context ctx, Collector<String> out) throws Exception {
state2.add(value);
print(value.toString());
}
private void print(String value) throws Exception {
StringBuilder builder = new StringBuilder();
builder.append("input value is " + value + ".");
builder.append("state1 has ");
for (String str : state1.get()) {
builder.append(str + ",");
}
builder.append("state2 has ");
for (Integer integer : state2.get()) {
builder.append(integer.toString() + ",");
}
System.out.println(builder.toString());
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
ListStateDescriptor<String> descriptor1 =
new ListStateDescriptor<>(
"state1",
TypeInformation.of(new TypeHint<String>() {
}));
ListStateDescriptor<Integer> descriptor2 =
new ListStateDescriptor<>(
"state2",
TypeInformation.of(new TypeHint<Integer>() {
}));
state1 = context.getOperatorStateStore().getListState(descriptor1);
state2 = context.getOperatorStateStore().getListState(descriptor2);
}
}
}
输出:
input value is 4.state1 has state2 has 4,
input value is 2.state1 has 2,state2 has 4,
input value is 3.state1 has state2 has 3,
input value is 1.state1 has 1,state2 has 3,
input value is 5.state1 has state2 has 5,
我希望最后的输出是
input value is XX .state1 has 1,2 state2 has 3,4,5
但实际上输出看起来像是输入项被分区了。 4和2在一个分区中,3和1在另一个分区中。我想访问 processElement1
和 processElement2
.
您应该修改作业的开头,如下所示:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
...
这将导致整个作业 运行 的并行度为 1。您确实有
resultStream.print().setParallelism(1);
其效果是将打印接收器的并行度设置为 1,但作业的其余部分是 运行 默认并行度,明显大于 1。
或者,您可以使用相同的常量键对两个流进行键控,然后使用键控状态。