如何在猪中生成重复组?
How to generate repetitive groups in pig?
我在用pig做数据准备,遇到了一个看似简单却无法解决的问题:
例如,我有一列姓名
输入:-
id | name
-------------
1 | Alicia
2 | Ana
3 | Benita
4 | Berta
5 | Bertha
我期待期望的输出:-(我们可以使用 FORLOOP 功能来实现这个吗?)
id | name
--------------------------
1_XX_1 | Alicia_id_1
2_XX_1 | Ana_id_1
3_XX_1 | Benita_id_1
4_XX_1 | Berta_id_1
5_XX_1 | Bertha_id_1
1_XX_2 | Alicia_id_2
2_XX_2 | Ana_id_2
3_XX_2 | Benita_id_2
4_XX_2 | Berta_id_2
5_XX_2 | Bertha_id_2
1_XX_3 | Alicia_id_3
2_XX_3 | Ana_id_3
3_XX_3 | Benita_id_3
4_XX_3 | Berta_id_3
5_XX_3 | Bertha_id_3
您可以使用 UDF 来做到这一点,这将使您在输入要被复制的次数方面具有一定的可重用性。下面的 UDF 将执行此操作。
package pigexerciseudf;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.pig.EvalFunc;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
public class replicateinput extends EvalFunc<DataBag>
{
public replicateinput()
{
}
int rep_factor=0;
public replicateinput(String a)
{
rep_factor=Integer.parseInt(a);
}
public DataBag exec(Tuple input) throws IOException
{
BagFactory bf=BagFactory.getInstance();
DataBag output=bf.newDefaultBag();
try
{
for(int i=1;i<=rep_factor;i++)
{
TupleFactory tp=TupleFactory.getInstance();
Tuple t1=tp.newTuple(2);
String key=(String)input.get(0);
System.out.println("key="+key);
String value=(String)input.get(1);
String key_out=key+"_XX_"+i;
String value_out=value+"_id_"+i;
t1.set(0,key_out);
t1.set(1,value_out);
output.add(t1);
}
return output;
}
catch(Exception e)
{
throw new IOException(e);
}
}
public Schema outputschema(Schema input)
{
try
{
List<Schema.FieldSchema> mylist=new ArrayList<Schema.FieldSchema>();
mylist.add(new Schema.FieldSchema("key_out",DataType.CHARARRAY));
mylist.add(new Schema.FieldSchema("value_out",DataType.CHARARRAY));
Schema tupleschema=new Schema(mylist);
Schema bagschema=new Schema(new Schema.FieldSchema("pair",tupleschema,DataType.TUPLE));
Schema returnbagsc=new Schema(new Schema.FieldSchema("pairs",bagschema,DataType.BAG));
return returnbagsc;
}
catch(FrontendException e)
{
throw new RuntimeException("not able to defime the schema");
}
}
}
输入文件:
1,Alicia
2,Ana
3,Benita
4,Berta
5,Bertha
REGISTER '/path/to/pigexerciseudf.jar';
define replicat pigexerciseudf.replicateinput('3');
A = LOAD '/home/hduser/exer.dat' using PigStorage(',') as (a:chararray,b:chararray);
B = FOREACH A GENERATE FLATTEN(replicat(a,b)) as (line:chararray) ;
dump B;
输出:
(1_XX_1,Alicia_id_1)
(1_XX_2,Alicia_id_2)
(1_XX_3,Alicia_id_3)
(2_XX_1,Ana_id_1)
(2_XX_2,Ana_id_2)
(2_XX_3,Ana_id_3)
(3_XX_1,Benita_id_1)
(3_XX_2,Benita_id_2)
(3_XX_3,Benita_id_3)
(4_XX_1,Berta _id_1)
(4_XX_2,Berta _id_2)
(4_XX_3,Berta _id_3)
(5_XX_1,Bertha_id_1)
(5_XX_2,Bertha_id_2)
(5_XX_3,Bertha_id_3)
我在用pig做数据准备,遇到了一个看似简单却无法解决的问题: 例如,我有一列姓名 输入:-
id | name
-------------
1 | Alicia
2 | Ana
3 | Benita
4 | Berta
5 | Bertha
我期待期望的输出:-(我们可以使用 FORLOOP 功能来实现这个吗?)
id | name
--------------------------
1_XX_1 | Alicia_id_1
2_XX_1 | Ana_id_1
3_XX_1 | Benita_id_1
4_XX_1 | Berta_id_1
5_XX_1 | Bertha_id_1
1_XX_2 | Alicia_id_2
2_XX_2 | Ana_id_2
3_XX_2 | Benita_id_2
4_XX_2 | Berta_id_2
5_XX_2 | Bertha_id_2
1_XX_3 | Alicia_id_3
2_XX_3 | Ana_id_3
3_XX_3 | Benita_id_3
4_XX_3 | Berta_id_3
5_XX_3 | Bertha_id_3
您可以使用 UDF 来做到这一点,这将使您在输入要被复制的次数方面具有一定的可重用性。下面的 UDF 将执行此操作。
package pigexerciseudf;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.pig.EvalFunc;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
public class replicateinput extends EvalFunc<DataBag>
{
public replicateinput()
{
}
int rep_factor=0;
public replicateinput(String a)
{
rep_factor=Integer.parseInt(a);
}
public DataBag exec(Tuple input) throws IOException
{
BagFactory bf=BagFactory.getInstance();
DataBag output=bf.newDefaultBag();
try
{
for(int i=1;i<=rep_factor;i++)
{
TupleFactory tp=TupleFactory.getInstance();
Tuple t1=tp.newTuple(2);
String key=(String)input.get(0);
System.out.println("key="+key);
String value=(String)input.get(1);
String key_out=key+"_XX_"+i;
String value_out=value+"_id_"+i;
t1.set(0,key_out);
t1.set(1,value_out);
output.add(t1);
}
return output;
}
catch(Exception e)
{
throw new IOException(e);
}
}
public Schema outputschema(Schema input)
{
try
{
List<Schema.FieldSchema> mylist=new ArrayList<Schema.FieldSchema>();
mylist.add(new Schema.FieldSchema("key_out",DataType.CHARARRAY));
mylist.add(new Schema.FieldSchema("value_out",DataType.CHARARRAY));
Schema tupleschema=new Schema(mylist);
Schema bagschema=new Schema(new Schema.FieldSchema("pair",tupleschema,DataType.TUPLE));
Schema returnbagsc=new Schema(new Schema.FieldSchema("pairs",bagschema,DataType.BAG));
return returnbagsc;
}
catch(FrontendException e)
{
throw new RuntimeException("not able to defime the schema");
}
}
}
输入文件:
1,Alicia
2,Ana
3,Benita
4,Berta
5,Bertha
REGISTER '/path/to/pigexerciseudf.jar';
define replicat pigexerciseudf.replicateinput('3');
A = LOAD '/home/hduser/exer.dat' using PigStorage(',') as (a:chararray,b:chararray);
B = FOREACH A GENERATE FLATTEN(replicat(a,b)) as (line:chararray) ;
dump B;
输出:
(1_XX_1,Alicia_id_1)
(1_XX_2,Alicia_id_2)
(1_XX_3,Alicia_id_3)
(2_XX_1,Ana_id_1)
(2_XX_2,Ana_id_2)
(2_XX_3,Ana_id_3)
(3_XX_1,Benita_id_1)
(3_XX_2,Benita_id_2)
(3_XX_3,Benita_id_3)
(4_XX_1,Berta _id_1)
(4_XX_2,Berta _id_2)
(4_XX_3,Berta _id_3)
(5_XX_1,Bertha_id_1)
(5_XX_2,Bertha_id_2)
(5_XX_3,Bertha_id_3)