如何在猪中生成重复组?

How to generate repetitive groups in pig?

我在用pig做数据准备,遇到了一个看似简单却无法解决的问题: 例如,我有一列姓名 输入:-

 id |  name  
-------------
 1  |  Alicia
 2  |  Ana   
 3  |  Benita
 4  |  Berta 
 5  |  Bertha

我期待期望的输出:-(我们可以使用 FORLOOP 功能来实现这个吗?)

  id     |  name  
--------------------------
 1_XX_1  |  Alicia_id_1
 2_XX_1  |  Ana_id_1   
 3_XX_1  |  Benita_id_1
 4_XX_1  |  Berta_id_1
 5_XX_1  |  Bertha_id_1

 1_XX_2  |  Alicia_id_2
 2_XX_2  |  Ana_id_2   
 3_XX_2  |  Benita_id_2
 4_XX_2  |  Berta_id_2
 5_XX_2  |  Bertha_id_2

 1_XX_3  |  Alicia_id_3
 2_XX_3  |  Ana_id_3   
 3_XX_3  |  Benita_id_3
 4_XX_3  |  Berta_id_3
 5_XX_3  |  Bertha_id_3

您可以使用 UDF 来做到这一点,这将使您在输入要被复制的次数方面具有一定的可重用性。下面的 UDF 将执行此操作。

package pigexerciseudf;


import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.pig.EvalFunc;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;

public class replicateinput extends EvalFunc<DataBag>
{
    public replicateinput()
    {
        
    }
    int rep_factor=0;
    public replicateinput(String a)
    {
        rep_factor=Integer.parseInt(a);
    }
    
    public DataBag exec(Tuple input) throws IOException
    {
        BagFactory bf=BagFactory.getInstance();
        DataBag output=bf.newDefaultBag();
            try
            {
            for(int i=1;i<=rep_factor;i++)
            {
                TupleFactory tp=TupleFactory.getInstance();
                Tuple t1=tp.newTuple(2);
                String key=(String)input.get(0);
                System.out.println("key="+key);
                String value=(String)input.get(1);
                String key_out=key+"_XX_"+i;
                String value_out=value+"_id_"+i;
                t1.set(0,key_out);
                t1.set(1,value_out);
                output.add(t1);
            }
            return output;
            }   
            catch(Exception e)
            {
                throw new IOException(e);
            }
    }


    public Schema outputschema(Schema input)
    {
        try
        {
        List<Schema.FieldSchema> mylist=new ArrayList<Schema.FieldSchema>();
        mylist.add(new Schema.FieldSchema("key_out",DataType.CHARARRAY));
        mylist.add(new Schema.FieldSchema("value_out",DataType.CHARARRAY));
        Schema tupleschema=new Schema(mylist);
        Schema bagschema=new Schema(new Schema.FieldSchema("pair",tupleschema,DataType.TUPLE));
        Schema returnbagsc=new Schema(new Schema.FieldSchema("pairs",bagschema,DataType.BAG));
        return returnbagsc;
        }
        catch(FrontendException e)
        {
            throw new RuntimeException("not able to defime the schema");
        }
    }
}

输入文件:

1,Alicia

2,Ana

3,Benita

4,Berta

5,Bertha

REGISTER '/path/to/pigexerciseudf.jar';
define replicat pigexerciseudf.replicateinput('3');                                     
A = LOAD '/home/hduser/exer.dat' using PigStorage(',') as (a:chararray,b:chararray);    
B = FOREACH A GENERATE FLATTEN(replicat(a,b)) as (line:chararray) ;                     
dump B;   

输出:

(1_XX_1,Alicia_id_1)

(1_XX_2,Alicia_id_2)

(1_XX_3,Alicia_id_3)

(2_XX_1,Ana_id_1)

(2_XX_2,Ana_id_2)

(2_XX_3,Ana_id_3)

(3_XX_1,Benita_id_1)

(3_XX_2,Benita_id_2)

(3_XX_3,Benita_id_3)

(4_XX_1,Berta _id_1)

(4_XX_2,Berta _id_2)

(4_XX_3,Berta _id_3)

(5_XX_1,Bertha_id_1)

(5_XX_2,Bertha_id_2)

(5_XX_3,Bertha_id_3)