在 Apache Flink 中测试 RichCoFlatMapFunction
Test a RichCoFlatMapFunction in Apache Flink
我正在尝试测试我用来对两个流进行左连接的 RichCoFlatMapFunction,它是这样的:
private ValueState<Card> currentValueState;
private ListState<Card> historicListState;
@Override
public void open(Configuration parameters) throws Exception {
currentValueState = getRuntimeContext().getState(new ValueStateDescriptor<>("Current State", Card.class));
historicListState = getRuntimeContext().getListState(new ListStateDescriptor<>("historic state", Card.class));
}
@Override
public void flatMap1(Card currentCard, Collector<Tuple2<Card, List<Card>>> out) throws Exception {
Iterable<Card> historicCardList = historicListState.get();
if (Iterables.size(historicCardList) > 0) {
out.collect(new Tuple2<>(currentCard, Lists.newArrayList(historicCardList) ));
} else {
currentValueState.update(currentCard);
out.collect(new Tuple2<>(currentCard, null));
}
}
@Override
public void flatMap2(Card historicCard, Collector<Tuple2<Card, List<Card>>> out) throws Exception {
historicListState.add(historicCard);
}
在方法 flatMap1 中,当未找到历史卡片时我将返回 null
out.collect(new Tuple2<>(currentCard, null));
问题是,当我尝试测试这整个功能时,我收到了这个错误:
Automatic type extraction is not possible on candidates with null values. Please specify the types directly.
这就是我尝试测试 richCoFlatMapFunction 的方式
@Test
public void testFlatMap() throws Exception {
final Card current = currentCard(2L);
final Card historic = historicCard(2L);
final List<Card> historicList = new ArrayList<>();
historicList.add(historic);
CoStreamFlatMap<Card, Card, Tuple2<Card, List<Card>>> operator = new CoStreamFlatMap<>(new LeftJoin());
KeyedTwoInputStreamOperatorTestHarness<Long, Card, Card, Tuple2<Card, List<Card>>> testHarness =
new KeyedTwoInputStreamOperatorTestHarness<>(
operator,
(Card c) -> c.getCardHash(),
(Card h) -> h.getCardHash(),
BasicTypeInfo.LONG_TYPE_INFO);
testHarness.setup();
testHarness.open();
testHarness.processElement1(new StreamRecord<>(current));
testHarness.processElement2(new StreamRecord<>(historic));
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
expectedOutput.add(new StreamRecord<>(new Tuple2<>(current, historicList)));
// Check that the result is correct
ConcurrentLinkedQueue<Object> actualOutput = testHarness.getOutput();
TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, actualOutput);
}
非常感谢任何帮助,我对 Apache Flink 和单元测试有点陌生
谢谢
问题是 KeyedTwoInputStreamOperatorTestHarness
不知道如何序列化 LeftJoin
运算符的输出。您可以通过 KeyedTwoInputStreamOperatorTestHarness.setup(TypeSerializer<OUT> outputSerializer)
.
指定输出序列化程序
你的情况是:
testHarness.setup(TypeInformation.of(new TypeHint<Tuple2<Card, List<Card>>>() {}).createSerializer(new ExecutionConfig()));
我正在尝试测试我用来对两个流进行左连接的 RichCoFlatMapFunction,它是这样的:
private ValueState<Card> currentValueState;
private ListState<Card> historicListState;
@Override
public void open(Configuration parameters) throws Exception {
currentValueState = getRuntimeContext().getState(new ValueStateDescriptor<>("Current State", Card.class));
historicListState = getRuntimeContext().getListState(new ListStateDescriptor<>("historic state", Card.class));
}
@Override
public void flatMap1(Card currentCard, Collector<Tuple2<Card, List<Card>>> out) throws Exception {
Iterable<Card> historicCardList = historicListState.get();
if (Iterables.size(historicCardList) > 0) {
out.collect(new Tuple2<>(currentCard, Lists.newArrayList(historicCardList) ));
} else {
currentValueState.update(currentCard);
out.collect(new Tuple2<>(currentCard, null));
}
}
@Override
public void flatMap2(Card historicCard, Collector<Tuple2<Card, List<Card>>> out) throws Exception {
historicListState.add(historicCard);
}
在方法 flatMap1 中,当未找到历史卡片时我将返回 null
out.collect(new Tuple2<>(currentCard, null));
问题是,当我尝试测试这整个功能时,我收到了这个错误:
Automatic type extraction is not possible on candidates with null values. Please specify the types directly.
这就是我尝试测试 richCoFlatMapFunction 的方式
@Test
public void testFlatMap() throws Exception {
final Card current = currentCard(2L);
final Card historic = historicCard(2L);
final List<Card> historicList = new ArrayList<>();
historicList.add(historic);
CoStreamFlatMap<Card, Card, Tuple2<Card, List<Card>>> operator = new CoStreamFlatMap<>(new LeftJoin());
KeyedTwoInputStreamOperatorTestHarness<Long, Card, Card, Tuple2<Card, List<Card>>> testHarness =
new KeyedTwoInputStreamOperatorTestHarness<>(
operator,
(Card c) -> c.getCardHash(),
(Card h) -> h.getCardHash(),
BasicTypeInfo.LONG_TYPE_INFO);
testHarness.setup();
testHarness.open();
testHarness.processElement1(new StreamRecord<>(current));
testHarness.processElement2(new StreamRecord<>(historic));
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
expectedOutput.add(new StreamRecord<>(new Tuple2<>(current, historicList)));
// Check that the result is correct
ConcurrentLinkedQueue<Object> actualOutput = testHarness.getOutput();
TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, actualOutput);
}
非常感谢任何帮助,我对 Apache Flink 和单元测试有点陌生 谢谢
问题是 KeyedTwoInputStreamOperatorTestHarness
不知道如何序列化 LeftJoin
运算符的输出。您可以通过 KeyedTwoInputStreamOperatorTestHarness.setup(TypeSerializer<OUT> outputSerializer)
.
你的情况是:
testHarness.setup(TypeInformation.of(new TypeHint<Tuple2<Card, List<Card>>>() {}).createSerializer(new ExecutionConfig()));