Impala- 查询由 java UDF 生成的数据时产生的错误
Impala- Error produced when querying data produced by a java UDF
首先,我的objective不是让你理解我的UDF代码,这样我就可以实现我的目标(我知道它确实如此),而是要知道为什么我在调用它在以后的查询中生成的字符串。
我制作了一个自定义 UDF,其代码为:
import java.util.HashMap;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.hive.ql.udf.UDFType;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
public class Calculate_states extends UDF
{
HashMap<String, Long> last_ts =
new HashMap<String, Long>();
HashMap<String, Integer> last_val =
new HashMap<String, Integer>();
HashMap<String, Long> ts_last_start =
new HashMap<String, Long>();
HashMap<String, String> start_type =
new HashMap<String, String>();
public String evaluate( Integer bit, Long ts, Long next_ts, Integer next_bit, Integer time, String Ut)
{
Object[] result = new Object[4];
String estado = new String();
if(bit==null)
{
result[0]=new Text("no state");
}
else
{
if(bit==1 && (
(
( next_ts == null || ((next_ts-ts)/1000) > time )
&&
( last_ts.get(Ut) == null || ((ts-last_ts.get(Ut))/1000) > time )
)
||
(
(last_val.get(Ut)!=null) &&
last_val.get(Ut)==0 && ((ts-last_ts.get(Ut))/1000) <=time &&
(next_ts == null ||
(next_ts-ts)/1000 > time)
)
||
(
(next_bit!=null) && // Condición necesaria para no entrar en problemas con los nulls
( next_bit==0 && ((next_ts-ts)/1000) <= time &&
( (last_ts.get(Ut) == null ||
((ts-last_ts.get(Ut))/1000) > time )
)
)
)
)
)
{ estado= "isolated point";
result[0]=new Text("isolated point");}
else if
(
bit==1 &&
(
last_val.get(Ut) != null && // Para evitar problemas de nulls
last_val.get(Ut)==0 && ((ts-last_ts.get(Ut))/1000 ) <=time)
){ estado= "start";
result[0]=new Text("start");}
else if
(
bit==0 &&
( last_val.get(Ut) != null && // Para evitar problemas de nulls
last_val.get(Ut)==1 && ((ts-last_ts.get(Ut))/1000 ) <=time )
){estado= "stop";
result[0]=new Text("stop");}
else if
(
bit==1 && (last_ts.get(Ut)==null || ((ts-last_ts.get(Ut))/1000 ) > time )
){estado= "no info start";
result[0]=new Text("no info start");}
else if
(
bit==1 && (next_bit==null || ((next_ts-ts)/1000 ) > time )
){estado= "no info stop";
result[0]=new Text("no info stop");}
else if
(bit==1 ){
result[0]=new Text("working");}
else if
(bit==0 ){
result[0]=new Text("stopped");}
// Actualizar valores
last_val.put(Ut,bit);
last_ts.put(Ut,ts);
}
if (estado.equals("isolated point"))
{ result[1]= new LongWritable(1);
// Podria ser freq. muestreo, nuevo parametro
result[2]= new Text("isolated point");
result[3]= new LongWritable(ts);
}
else if (
estado.equals("start") ||
estado.equals("no info start")
){
ts_last_start.put(Ut,ts);
start_type.put(Ut,estado);
//result[2]=null;
result[3]=new LongWritable(ts);
}
else if (
estado.equals("stop") ||
estado.equals("no info stop")
){
result[3]=new LongWritable(ts_last_start.get(Ut));
result[1]= new LongWritable((ts-ts_last_start.get(Ut))/1000);
result[2]= new Text(start_type.get(Ut)+"-"+estado);
ts_last_start.put(Ut,null);
}
else
//result[2]=null;
if (ts_last_start.get(Ut) == null)
{
result[3] =null;
}
else
result[3]=new LongWritable(ts_last_start.get(Ut));
String resultado="";
for (int i=0;i<4;i++)
{
if (i==3)
resultado=resultado+String.valueOf(result[i]);
else
resultado=resultado+String.valueOf(result[i])+";";
}
return resultado;
}
}
objective 是计算组件的状态(它开始工作、停止工作的位置)并为开始和停止之间的所有行添加一个标识符。 1/0 表示 working/not 工作组件。
例如,这个 QUERY :
select
ut,ts, bit,
calculate_states(bit,ts,if (bit is null, null,next_ts),next_bit,1,ut) as states
from
(
select
ut,
ts,
bit, -- Means component bit
last_value(bit ignore nulls) over (partition by ut order by ts desc rows between 1 preceding and 1 preceding) as next_bit,
min(if (bit is not null, ts, null)) over (partition by ut order by ts desc rows between unbounded preceding and 1 preceding) as next_ts
from my_table
order by 1,2
)b
order by 1,2;
会return(在此table):
UT | ts | bit | States
a 1000 0 stopped;null;null;null
a 2000 0 stopped;null;null;null
a 3000 0 stopped;null;null;null
a 4000 1 start;null;null;4000
a 5000 1 no info stop;2;start-no info stop;4000
a 6000 null no state;null;null;null
a 7000 1 no info start;null;null;7000
a 8000 1 working;null;null;7000
a 9000 0 stop;3;no info start-stop;7000
a 10000 1 start;null;null;10000
a 11000 1 working;null;null;10000
a 12000 1 no info stop;3;start-no info stop;10000
到这里为止都是正确的。现在,我只需添加
select * 来自 QUERY 按 ut,ts
排序
或
创建 table new_table 作为 QUERY
和
select * 来自 new_table 按 ut,ts
排序
在我的日志中收到此错误后:
UDF WARNING: Hive UDF path=hdfs://mypath class=UDFpackImpala.Calculate_states failed due to: ImpalaRuntimeException: UDF::evaluate() ran into a problem.
CAUSED BY: ImpalaRuntimeException: UDF failed to evaluate
CAUSED BY: InvocationTargetException: null
CAUSED BY: NullPointerException: null
我的结果会变成我之前标记的结果
UT | ts | bit | States
a 1000 0 stopped;null;null;null
a 2000 0 stopped;null;null;null
a 3000 0 NULL
a 4000 1 stop;null;null;4000
a 5000 1 working;null;null;null
a 6000 null start;null;null;null
a 7000 1 working;null;null;null
a 8000 1 working;null;null;null
a 9000 0 stop;-1;no info start-stop;10000
a 10000 1 start;null;null;10000
a 11000 1 working;null;null;10000
a 12000 1 isolated point;1;null;12000
完全随机的东西。我的问题是,为什么?
Impala版本为:2.9.0-cdh5.12.2
这一切都是因为 impala 不遵守第一个 select 中的 order by 子句,如果你不包含限制的话。
如果你设置一个限制 99999999999
第一次按 1,2 排序后,问题解决。
首先,我的objective不是让你理解我的UDF代码,这样我就可以实现我的目标(我知道它确实如此),而是要知道为什么我在调用它在以后的查询中生成的字符串。
我制作了一个自定义 UDF,其代码为:
import java.util.HashMap;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.hive.ql.udf.UDFType;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
public class Calculate_states extends UDF
{
HashMap<String, Long> last_ts =
new HashMap<String, Long>();
HashMap<String, Integer> last_val =
new HashMap<String, Integer>();
HashMap<String, Long> ts_last_start =
new HashMap<String, Long>();
HashMap<String, String> start_type =
new HashMap<String, String>();
public String evaluate( Integer bit, Long ts, Long next_ts, Integer next_bit, Integer time, String Ut)
{
Object[] result = new Object[4];
String estado = new String();
if(bit==null)
{
result[0]=new Text("no state");
}
else
{
if(bit==1 && (
(
( next_ts == null || ((next_ts-ts)/1000) > time )
&&
( last_ts.get(Ut) == null || ((ts-last_ts.get(Ut))/1000) > time )
)
||
(
(last_val.get(Ut)!=null) &&
last_val.get(Ut)==0 && ((ts-last_ts.get(Ut))/1000) <=time &&
(next_ts == null ||
(next_ts-ts)/1000 > time)
)
||
(
(next_bit!=null) && // Condición necesaria para no entrar en problemas con los nulls
( next_bit==0 && ((next_ts-ts)/1000) <= time &&
( (last_ts.get(Ut) == null ||
((ts-last_ts.get(Ut))/1000) > time )
)
)
)
)
)
{ estado= "isolated point";
result[0]=new Text("isolated point");}
else if
(
bit==1 &&
(
last_val.get(Ut) != null && // Para evitar problemas de nulls
last_val.get(Ut)==0 && ((ts-last_ts.get(Ut))/1000 ) <=time)
){ estado= "start";
result[0]=new Text("start");}
else if
(
bit==0 &&
( last_val.get(Ut) != null && // Para evitar problemas de nulls
last_val.get(Ut)==1 && ((ts-last_ts.get(Ut))/1000 ) <=time )
){estado= "stop";
result[0]=new Text("stop");}
else if
(
bit==1 && (last_ts.get(Ut)==null || ((ts-last_ts.get(Ut))/1000 ) > time )
){estado= "no info start";
result[0]=new Text("no info start");}
else if
(
bit==1 && (next_bit==null || ((next_ts-ts)/1000 ) > time )
){estado= "no info stop";
result[0]=new Text("no info stop");}
else if
(bit==1 ){
result[0]=new Text("working");}
else if
(bit==0 ){
result[0]=new Text("stopped");}
// Actualizar valores
last_val.put(Ut,bit);
last_ts.put(Ut,ts);
}
if (estado.equals("isolated point"))
{ result[1]= new LongWritable(1);
// Podria ser freq. muestreo, nuevo parametro
result[2]= new Text("isolated point");
result[3]= new LongWritable(ts);
}
else if (
estado.equals("start") ||
estado.equals("no info start")
){
ts_last_start.put(Ut,ts);
start_type.put(Ut,estado);
//result[2]=null;
result[3]=new LongWritable(ts);
}
else if (
estado.equals("stop") ||
estado.equals("no info stop")
){
result[3]=new LongWritable(ts_last_start.get(Ut));
result[1]= new LongWritable((ts-ts_last_start.get(Ut))/1000);
result[2]= new Text(start_type.get(Ut)+"-"+estado);
ts_last_start.put(Ut,null);
}
else
//result[2]=null;
if (ts_last_start.get(Ut) == null)
{
result[3] =null;
}
else
result[3]=new LongWritable(ts_last_start.get(Ut));
String resultado="";
for (int i=0;i<4;i++)
{
if (i==3)
resultado=resultado+String.valueOf(result[i]);
else
resultado=resultado+String.valueOf(result[i])+";";
}
return resultado;
}
}
objective 是计算组件的状态(它开始工作、停止工作的位置)并为开始和停止之间的所有行添加一个标识符。 1/0 表示 working/not 工作组件。
例如,这个 QUERY :
select
ut,ts, bit,
calculate_states(bit,ts,if (bit is null, null,next_ts),next_bit,1,ut) as states
from
(
select
ut,
ts,
bit, -- Means component bit
last_value(bit ignore nulls) over (partition by ut order by ts desc rows between 1 preceding and 1 preceding) as next_bit,
min(if (bit is not null, ts, null)) over (partition by ut order by ts desc rows between unbounded preceding and 1 preceding) as next_ts
from my_table
order by 1,2
)b
order by 1,2;
会return(在此table):
UT | ts | bit | States
a 1000 0 stopped;null;null;null
a 2000 0 stopped;null;null;null
a 3000 0 stopped;null;null;null
a 4000 1 start;null;null;4000
a 5000 1 no info stop;2;start-no info stop;4000
a 6000 null no state;null;null;null
a 7000 1 no info start;null;null;7000
a 8000 1 working;null;null;7000
a 9000 0 stop;3;no info start-stop;7000
a 10000 1 start;null;null;10000
a 11000 1 working;null;null;10000
a 12000 1 no info stop;3;start-no info stop;10000
到这里为止都是正确的。现在,我只需添加
select * 来自 QUERY 按 ut,ts
排序或
创建 table new_table 作为 QUERY 和 select * 来自 new_table 按 ut,ts
排序在我的日志中收到此错误后:
UDF WARNING: Hive UDF path=hdfs://mypath class=UDFpackImpala.Calculate_states failed due to: ImpalaRuntimeException: UDF::evaluate() ran into a problem.
CAUSED BY: ImpalaRuntimeException: UDF failed to evaluate
CAUSED BY: InvocationTargetException: null
CAUSED BY: NullPointerException: null
我的结果会变成我之前标记的结果
UT | ts | bit | States
a 1000 0 stopped;null;null;null
a 2000 0 stopped;null;null;null
a 3000 0 NULL
a 4000 1 stop;null;null;4000
a 5000 1 working;null;null;null
a 6000 null start;null;null;null
a 7000 1 working;null;null;null
a 8000 1 working;null;null;null
a 9000 0 stop;-1;no info start-stop;10000
a 10000 1 start;null;null;10000
a 11000 1 working;null;null;10000
a 12000 1 isolated point;1;null;12000
完全随机的东西。我的问题是,为什么?
Impala版本为:2.9.0-cdh5.12.2
这一切都是因为 impala 不遵守第一个 select 中的 order by 子句,如果你不包含限制的话。
如果你设置一个限制 99999999999 第一次按 1,2 排序后,问题解决。