无法在 beamSql 中调用 UDF
not able to call UDF in beamSql
我有一个 beamSql 查询:
PCollectionTuple query0 = PCollectionTuple.of(
new TupleTag<BeamRecord>("temp2"), temp2).and(new TupleTag<BeamRecord>("temp3"), temp3)"));
PCollection<BeamRecord> rec_3 = query0.apply(
BeamSql.queryMulti("SELECT a.*, \r\n" +
"(case \r\n" +
"when a.grp > 5 then 1 \r\n" +
"when b.grp > 5 then 1 \r\n" +
"else 0 end) as flag \r\n" +
"from temp2 a left join \r\n" +
"temp3 b on a.eventid = b.eventid and b.Weekint = c1(a.Weekint)").withUdf("c1", AddS.class));
在上面的查询中,我在表 temp2 和 temp3 之间进行左连接,在 ON 条件下,我调用了名称为 'AddS' 的 UDF。
在这个 UDF AddS 中,Weekint 被当作 BigInt。 UDF 将 Weekint 作为输入并将其转换为日期格式,然后将其加 7 并将 return 值作为 BigInt。下面是 UDF :
public static class AddS implements BeamSqlUdf {
private static final long serialVersionUID = 1L;
public static BigInteger eval(BigInteger input) throws ParseException{
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMdd");
String strdate = input.toString();
Date date1 = dateFormat.parse(strdate);
Calendar c = Calendar.getInstance();
c.setTime(date1);
c.add(Calendar.DATE, 7);
String f =c.getTime().toString();
BigInteger x = new BigInteger(f);
return (x);
}
}
我遇到以下错误:
Exception in thread "main" java.lang.AssertionError: No assign rules for OTHER defined
at org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.sql.type.SqlTypeAssignmentRules.canCastFrom(SqlTypeAssignmentRules.java:326)
at org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.sql.type.SqlTypeUtil.canCastFrom(SqlTypeUtil.java:863)
at org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.sql.SqlUtil.test(SqlUtil.java:567)
at org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.sql.SqlUtil.test(SqlUtil.java:527)
at org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.runtime.PredicateImpl.apply(PredicateImpl.java:36)
at org.apache.beam.sdks.java.extensions.sql.repackaged.com.google.common.collect.Iterators.computeNext(Iterators.java:617)
at org.apache.beam.sdks.java.extensions.sql.repackaged.com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:145)
at org.apache.beam.sdks.java.extensions.sql.repackaged.com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:140)
at org.apache.beam.sdks.java.extensions.sql.repackaged.com.google.common.collect.Iterators.addAll(Iterators.java:366)
at org.apache.beam.sdks.java.extensions.sql.repackaged.com.google.common.collect.Lists.newArrayList(Lists.java:163)
at org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.sql.SqlUtil.lookupSubjectRoutines(SqlUtil.java:438)
at org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.sql.SqlUtil.lookupRoutine(SqlUtil.java:371)
at org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:245)
at org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:223)
at org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5053)
at org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5040)
at org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.sql.SqlCall.accept(SqlCall.java:137)
at org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1588)
at org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1573)
at org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.sql.SqlNode.validateExpr(SqlNode.java:225)
at org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.sql.SqlOperator.validateCall(SqlOperator.java:407)
at org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.sql.validate.SqlValidatorImpl.validateCall(SqlValidatorImpl.java:4764)
at org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.sql.SqlCall.validate(SqlCall.java:114)
at org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.sql.SqlNode.validateExpr(SqlNode.java:224)
at org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.sql.SqlOperator.validateCall(SqlOperator.java:407)
at org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.sql.validate.SqlValidatorImpl.validateCall(SqlValidatorImpl.java:4764)
at org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.sql.SqlCall.validate(SqlCall.java:114)
at org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.sql.validate.SqlValidatorImpl.validateWhereOrOn(SqlValidatorImpl.java:3636)
at org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.sql.validate.SqlValidatorImpl.validateJoin(SqlValidatorImpl.java:2988)
at
我无法弄清楚是什么导致了此错误,可能是 UDF 未正确创建或未正确调用?
或者如果有人可以向我解释这个错误的原因。
您的 UDF 未正确创建。 Beam SQL 内部不支持 Java BigInteger 类型。如果您的 SQL 数据类型是 BigInt,那么您应该改用 java Long 类型。
(我有 opened an issue 和 Beam 以使这个错误更容易理解。)
我有一个 beamSql 查询:
PCollectionTuple query0 = PCollectionTuple.of(
new TupleTag<BeamRecord>("temp2"), temp2).and(new TupleTag<BeamRecord>("temp3"), temp3)"));
PCollection<BeamRecord> rec_3 = query0.apply(
BeamSql.queryMulti("SELECT a.*, \r\n" +
"(case \r\n" +
"when a.grp > 5 then 1 \r\n" +
"when b.grp > 5 then 1 \r\n" +
"else 0 end) as flag \r\n" +
"from temp2 a left join \r\n" +
"temp3 b on a.eventid = b.eventid and b.Weekint = c1(a.Weekint)").withUdf("c1", AddS.class));
在上面的查询中,我在表 temp2 和 temp3 之间进行左连接,在 ON 条件下,我调用了名称为 'AddS' 的 UDF。 在这个 UDF AddS 中,Weekint 被当作 BigInt。 UDF 将 Weekint 作为输入并将其转换为日期格式,然后将其加 7 并将 return 值作为 BigInt。下面是 UDF :
public static class AddS implements BeamSqlUdf {
private static final long serialVersionUID = 1L;
public static BigInteger eval(BigInteger input) throws ParseException{
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMdd");
String strdate = input.toString();
Date date1 = dateFormat.parse(strdate);
Calendar c = Calendar.getInstance();
c.setTime(date1);
c.add(Calendar.DATE, 7);
String f =c.getTime().toString();
BigInteger x = new BigInteger(f);
return (x);
}
}
我遇到以下错误:
Exception in thread "main" java.lang.AssertionError: No assign rules for OTHER defined
at org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.sql.type.SqlTypeAssignmentRules.canCastFrom(SqlTypeAssignmentRules.java:326)
at org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.sql.type.SqlTypeUtil.canCastFrom(SqlTypeUtil.java:863)
at org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.sql.SqlUtil.test(SqlUtil.java:567)
at org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.sql.SqlUtil.test(SqlUtil.java:527)
at org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.runtime.PredicateImpl.apply(PredicateImpl.java:36)
at org.apache.beam.sdks.java.extensions.sql.repackaged.com.google.common.collect.Iterators.computeNext(Iterators.java:617)
at org.apache.beam.sdks.java.extensions.sql.repackaged.com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:145)
at org.apache.beam.sdks.java.extensions.sql.repackaged.com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:140)
at org.apache.beam.sdks.java.extensions.sql.repackaged.com.google.common.collect.Iterators.addAll(Iterators.java:366)
at org.apache.beam.sdks.java.extensions.sql.repackaged.com.google.common.collect.Lists.newArrayList(Lists.java:163)
at org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.sql.SqlUtil.lookupSubjectRoutines(SqlUtil.java:438)
at org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.sql.SqlUtil.lookupRoutine(SqlUtil.java:371)
at org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:245)
at org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:223)
at org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5053)
at org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5040)
at org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.sql.SqlCall.accept(SqlCall.java:137)
at org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1588)
at org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1573)
at org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.sql.SqlNode.validateExpr(SqlNode.java:225)
at org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.sql.SqlOperator.validateCall(SqlOperator.java:407)
at org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.sql.validate.SqlValidatorImpl.validateCall(SqlValidatorImpl.java:4764)
at org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.sql.SqlCall.validate(SqlCall.java:114)
at org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.sql.SqlNode.validateExpr(SqlNode.java:224)
at org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.sql.SqlOperator.validateCall(SqlOperator.java:407)
at org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.sql.validate.SqlValidatorImpl.validateCall(SqlValidatorImpl.java:4764)
at org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.sql.SqlCall.validate(SqlCall.java:114)
at org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.sql.validate.SqlValidatorImpl.validateWhereOrOn(SqlValidatorImpl.java:3636)
at org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.sql.validate.SqlValidatorImpl.validateJoin(SqlValidatorImpl.java:2988)
at
我无法弄清楚是什么导致了此错误,可能是 UDF 未正确创建或未正确调用? 或者如果有人可以向我解释这个错误的原因。
您的 UDF 未正确创建。 Beam SQL 内部不支持 Java BigInteger 类型。如果您的 SQL 数据类型是 BigInt,那么您应该改用 java Long 类型。
(我有 opened an issue 和 Beam 以使这个错误更容易理解。)