Flink中BroadcastProcessFunction如何使用ListState
How to use ListState for BroadcastProcessFunction in Flink
我们有一个包含交易的非键控数据流和一个包含规则的广播流。事实上,我们希望根据最后一次看到的规则来处理交易。如果我们最后看到的规则是 daily
,我们必须将当前交易添加到 dailyTrnsList
。此外,如果 dailyTrnsList
大小大于阈值,我们必须清除列表并将事务写入数据库。如果最后看到的规则是 temp
.
,我们做同样的事情
代码如下:
public class TransactionProcess extends BroadcastProcessFunction<String, String, String>{
private List<String> dailyTrnsList = new ArrayList<>();
private List<String> tempTrnsList = new ArrayList<>();
private final static int threshold = 100;
private final MapStateDescriptor<String, String> ruleStateDesc =
new MapStateDescriptor<>(
"ControlMapState",
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO);
@Override
public void processElement(String s,
ReadOnlyContext readOnlyContext,
Collector<Transaction> collector) throws Exception
{
String ruleName = readOnlyContext.getBroadcastState(ruleStateDesc).get("rule");
if(ruleName.equals("daily"))
{
dailyTrnsList.add(s);
if(dailyTrnsList.size()>=threshold)
{
List<String> buffer = dailyTrnsList;
dailyTrnsList = new ArrayList<>();
insert_to_db(buffer,"daily");
}
}
else if(ruleName.equals("temp"))
{
tempTrnsList.add(s);
if(tempTrnsList.size()>=threshold)
{
List<String> buffer = tempTrnsList;
tempTrnsList = new ArrayList<>();
insert_to_db(buffer,"temp");
}
}
collector.collect(s);
}
@Override
public void processBroadcastElement(String s,
Context context,
Collector<CardTransaction> collector) throws Exception
{
if (s.equals("temp"))
{
context.getBroadcastState(ruleStateDesc).put("rule", "temp");
List<String> buffer = dailyTrnsList;
dailyTrnsList = new ArrayList<>();
insert_to_db(buffer,"daily");
}
else if (s.equals("daily"))
{
context.getBroadcastState(ruleStateDesc).put("rule", "daily");
List<String> buffer = tempTrnsList;
tempTrnsList = new ArrayList<>();
insert_to_db(buffer,"temp");
}
}
}
我们的问题是编写容错方法。我们不知道如何使用 ListState
来解决我们的问题。到目前为止,我们找到的唯一解决方案是 CheckpointedFunction
接口的实现,该接口位于 Flink 文档的 Working with State 部分下。
private ListState<String> dailyTrns;
private ListState<String> tempTrns;
@Override
public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
dailyTrns.clear();
tempTrns.clear();
for (String[] element : dailyTrnsList)
dailyTrns.add(element);
for (String[] element : tempTrnsList)
tempTrns.add(element);
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
dailyTrns = context.getOperatorStateStore().getListState(dailyDescriptor);
tempTrns = context.getOperatorStateStore().getListState(tempDescriptor);
if (context.isRestored()) {
for (String[] element : dailyTrns.get())
dailyTrnsList.add(element);
for (String[] element : tempTrns.get())
tempTrnsList.add(element);
}
}
请您指导我们,如果这种方法不是正确的解决方案,我们还能做什么?如果解决方案是正确的,那么对于未从 dailyTrnsList
和 tempTrnsList
转移到 dailyTrns
和 tempTrns
的元素会发生什么?
如有任何帮助,我们将不胜感激。
提前谢谢你。
- 您不需要
dailyTrnsList
和 tempTrnsList
,因为您已经有了 dailyTrns
和 tempTrns
。只需使用那些 ListState
变量来记录交易。
- 我不确定你为什么要
MapState<String, String>
。看起来你的状态基于最新的规则,它是 daily
或 temp
,所以只需将其存储在 ValueState<String>
. 中
你可以简化你的实现,这样就不用担心这个了。您可以执行以下操作:
(1) 简化 BroadcastProcessFunction,使其所做的只是将传入流分成两个流:一个日常事务流和一个临时事务流。它通过根据最新规则选择两侧输出之一来实现。
(2) 跟随 BroadcastProcessFunction 计数 windows 创建批处理并将它们写入数据库。
或者不使用辅助输出,BroadcastProcessFunction 可以写出(规则,事务)的元组,然后您可以按规则对流进行键控。无论哪种方式,想法都是让 window API 为您管理容错列表。
我们有一个包含交易的非键控数据流和一个包含规则的广播流。事实上,我们希望根据最后一次看到的规则来处理交易。如果我们最后看到的规则是 daily
,我们必须将当前交易添加到 dailyTrnsList
。此外,如果 dailyTrnsList
大小大于阈值,我们必须清除列表并将事务写入数据库。如果最后看到的规则是 temp
.
代码如下:
public class TransactionProcess extends BroadcastProcessFunction<String, String, String>{
private List<String> dailyTrnsList = new ArrayList<>();
private List<String> tempTrnsList = new ArrayList<>();
private final static int threshold = 100;
private final MapStateDescriptor<String, String> ruleStateDesc =
new MapStateDescriptor<>(
"ControlMapState",
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO);
@Override
public void processElement(String s,
ReadOnlyContext readOnlyContext,
Collector<Transaction> collector) throws Exception
{
String ruleName = readOnlyContext.getBroadcastState(ruleStateDesc).get("rule");
if(ruleName.equals("daily"))
{
dailyTrnsList.add(s);
if(dailyTrnsList.size()>=threshold)
{
List<String> buffer = dailyTrnsList;
dailyTrnsList = new ArrayList<>();
insert_to_db(buffer,"daily");
}
}
else if(ruleName.equals("temp"))
{
tempTrnsList.add(s);
if(tempTrnsList.size()>=threshold)
{
List<String> buffer = tempTrnsList;
tempTrnsList = new ArrayList<>();
insert_to_db(buffer,"temp");
}
}
collector.collect(s);
}
@Override
public void processBroadcastElement(String s,
Context context,
Collector<CardTransaction> collector) throws Exception
{
if (s.equals("temp"))
{
context.getBroadcastState(ruleStateDesc).put("rule", "temp");
List<String> buffer = dailyTrnsList;
dailyTrnsList = new ArrayList<>();
insert_to_db(buffer,"daily");
}
else if (s.equals("daily"))
{
context.getBroadcastState(ruleStateDesc).put("rule", "daily");
List<String> buffer = tempTrnsList;
tempTrnsList = new ArrayList<>();
insert_to_db(buffer,"temp");
}
}
}
我们的问题是编写容错方法。我们不知道如何使用 ListState
来解决我们的问题。到目前为止,我们找到的唯一解决方案是 CheckpointedFunction
接口的实现,该接口位于 Flink 文档的 Working with State 部分下。
private ListState<String> dailyTrns;
private ListState<String> tempTrns;
@Override
public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
dailyTrns.clear();
tempTrns.clear();
for (String[] element : dailyTrnsList)
dailyTrns.add(element);
for (String[] element : tempTrnsList)
tempTrns.add(element);
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
dailyTrns = context.getOperatorStateStore().getListState(dailyDescriptor);
tempTrns = context.getOperatorStateStore().getListState(tempDescriptor);
if (context.isRestored()) {
for (String[] element : dailyTrns.get())
dailyTrnsList.add(element);
for (String[] element : tempTrns.get())
tempTrnsList.add(element);
}
}
请您指导我们,如果这种方法不是正确的解决方案,我们还能做什么?如果解决方案是正确的,那么对于未从 dailyTrnsList
和 tempTrnsList
转移到 dailyTrns
和 tempTrns
的元素会发生什么?
如有任何帮助,我们将不胜感激。
提前谢谢你。
- 您不需要
dailyTrnsList
和tempTrnsList
,因为您已经有了dailyTrns
和tempTrns
。只需使用那些ListState
变量来记录交易。 - 我不确定你为什么要
MapState<String, String>
。看起来你的状态基于最新的规则,它是daily
或temp
,所以只需将其存储在ValueState<String>
. 中
你可以简化你的实现,这样就不用担心这个了。您可以执行以下操作:
(1) 简化 BroadcastProcessFunction,使其所做的只是将传入流分成两个流:一个日常事务流和一个临时事务流。它通过根据最新规则选择两侧输出之一来实现。
(2) 跟随 BroadcastProcessFunction 计数 windows 创建批处理并将它们写入数据库。
或者不使用辅助输出,BroadcastProcessFunction 可以写出(规则,事务)的元组,然后您可以按规则对流进行键控。无论哪种方式,想法都是让 window API 为您管理容错列表。