Apache Flink:使用 TableFunction 的 LEFT JOIN 没有 return 预期结果
Apache Flink: LEFT JOIN with a TableFunction does not return expected result
Flink 版本:1.3.1
我建了两张表,一张靠记忆,一张靠UDTF。当我测试 join 和 left join 时,它们返回相同的结果。我所期望的是 left join 的行数多于 join。
我的测试代码是这样的:
public class ExerciseUDF {
public static void main(String[] args) throws Exception {
test_3();
}
public static void test_3() throws Exception {
// 1. set up execution environment
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
DataSet<WC> input = env.fromElements(
new WC("Hello", 1),
new WC("Ciao", 1),
new WC("Hello", 1));
// 2. register the DataSet as table "WordCount"
tEnv.registerDataSet("WordCount", input, "word, frequency");
Table table;
DataSet<WC> result;
DataSet<WCUpper> resultUpper;
table = tEnv.scan("WordCount");
// 3. table left join user defined table
System.out.println("table left join user defined table");
tEnv.registerFunction("myTableUpperFunc",new MyTableFunc_2());
table = tEnv.sql("SELECT S.word as word, S.frequency as frequency, S.word as myupper FROM WordCount as S left join LATERAL TABLE(myTableUpperFunc(S.word)) as T(word,myupper) on S.word = T.word");
resultUpper = tEnv.toDataSet(table, WCUpper.class);
resultUpper.print(); // out put —— WCUpper Ciao 1 CIAO, however, without the row having Hello
// 4. table join user defined table
System.out.println("table join user defined table");
tEnv.registerFunction("myTableUpperFunc",new MyTableFunc_2());
table = tEnv.scan("WordCount");
table = tEnv.sql("SELECT S.word as word, S.frequency as frequency, T.myupper as myupper FROM WordCount as S join LATERAL TABLE(myTableUpperFunc(S.word)) as T(word,myupper) on S.word = T.word"
);
resultUpper = tEnv.toDataSet(table, WCUpper.class);
resultUpper.print();
}
public static class WC {
public String word;
public long frequency;
// public constructor to make it a Flink POJO
public WC() {
}
public WC(String word, long frequency) {
this.word = word;
this.frequency = frequency;
}
@Override
public String toString() {
return "WC " + word + " " + frequency;
}
}
// user defined table function
public static class MyTableFunc_2 extends TableFunction<Tuple2<String,String>>{
public void eval(String str){ // hello --> hello HELLO
System.out.println("upper func executed for "+str);
if(str.equals("Hello")){
return;
}
collect(new Tuple2<String,String>(str,str.toUpperCase()));
// collect(new Tuple2<String,String>(str,str.toUpperCase()));
}
}
}
左连接和连接查询的输出是一样的。在这两种情况下都只返回一行。
WCUpper Ciao 1 CIAO
但是,我认为左连接查询应该保留 'Hello' 行。
是的,你是对的。
这是 TableFunction 外部连接与谓词的翻译中的错误,需要修复。
谢谢,法比安
Flink 版本:1.3.1
我建了两张表,一张靠记忆,一张靠UDTF。当我测试 join 和 left join 时,它们返回相同的结果。我所期望的是 left join 的行数多于 join。
我的测试代码是这样的:
public class ExerciseUDF {
public static void main(String[] args) throws Exception {
test_3();
}
public static void test_3() throws Exception {
// 1. set up execution environment
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
DataSet<WC> input = env.fromElements(
new WC("Hello", 1),
new WC("Ciao", 1),
new WC("Hello", 1));
// 2. register the DataSet as table "WordCount"
tEnv.registerDataSet("WordCount", input, "word, frequency");
Table table;
DataSet<WC> result;
DataSet<WCUpper> resultUpper;
table = tEnv.scan("WordCount");
// 3. table left join user defined table
System.out.println("table left join user defined table");
tEnv.registerFunction("myTableUpperFunc",new MyTableFunc_2());
table = tEnv.sql("SELECT S.word as word, S.frequency as frequency, S.word as myupper FROM WordCount as S left join LATERAL TABLE(myTableUpperFunc(S.word)) as T(word,myupper) on S.word = T.word");
resultUpper = tEnv.toDataSet(table, WCUpper.class);
resultUpper.print(); // out put —— WCUpper Ciao 1 CIAO, however, without the row having Hello
// 4. table join user defined table
System.out.println("table join user defined table");
tEnv.registerFunction("myTableUpperFunc",new MyTableFunc_2());
table = tEnv.scan("WordCount");
table = tEnv.sql("SELECT S.word as word, S.frequency as frequency, T.myupper as myupper FROM WordCount as S join LATERAL TABLE(myTableUpperFunc(S.word)) as T(word,myupper) on S.word = T.word"
);
resultUpper = tEnv.toDataSet(table, WCUpper.class);
resultUpper.print();
}
public static class WC {
public String word;
public long frequency;
// public constructor to make it a Flink POJO
public WC() {
}
public WC(String word, long frequency) {
this.word = word;
this.frequency = frequency;
}
@Override
public String toString() {
return "WC " + word + " " + frequency;
}
}
// user defined table function
public static class MyTableFunc_2 extends TableFunction<Tuple2<String,String>>{
public void eval(String str){ // hello --> hello HELLO
System.out.println("upper func executed for "+str);
if(str.equals("Hello")){
return;
}
collect(new Tuple2<String,String>(str,str.toUpperCase()));
// collect(new Tuple2<String,String>(str,str.toUpperCase()));
}
}
}
左连接和连接查询的输出是一样的。在这两种情况下都只返回一行。
WCUpper Ciao 1 CIAO
但是,我认为左连接查询应该保留 'Hello' 行。
是的,你是对的。
这是 TableFunction 外部连接与谓词的翻译中的错误,需要修复。
谢谢,法比安