Apache Pig:将一个字段数据集作为新列添加到另一个字段数据集

Apache Pig: Add one field dataset to another one as a new column

假设我们有这种情况:

dataset1.csv :

datefield
field11, field12, field13
field21, field22, field23
field31, field32, field33

得到这个的最好方法是什么?:

field11, field12, field13, datefield
field21, field22, field23, datefield
field31, field32, field33, datefield

我尝试仅使用此列(加载和生成后)生成一个数据集 (relation1):

field11, field12, field13
field21, field22, field23
field31, field32, field33

只有这一列的另一个(关系 2)(在加载和生成之后):

datefield

然后这样做:

finalResult = FOREACH dataset1 GENERATE UDFFunction1(relation1::f1) 作为 firstFields,UDFFunction2(relation2::f2) 作为 lastField

但我得到“需要从关系中投影列才能将其用作标量

问题出在第二个字段(带有日期字段的那个)。

我想避免加入,因为这会有点混乱。

有什么建议吗?

请忘记我的 UDF 函数。他们只是相应地格式化输入元组。

添加猪脚本:

register 's3://bucketName/lib/MyJar.jar';

define ParseOutFilesUDF packageName.ParseOutFiles;

define FormatTimestartedUDF packageName.FormatTimestarted;

outFile = LOAD 's3://bucketName/input/' USING PigStorage ('|');

--This UDF just reformat each tuple, adding a String to each Tuple and returning a new one. 
resultAll = FOREACH outFile GENERATE ParseOutFilesUDF(*) as initial;

--load the same csv again to get the TIMESTARTED field
timestarted = LOAD 's3://bucketName/input/' USING PigStorage ('|') as f1;

--filter to get only one record, which is somth like TIMESTARTED=20160101
filetered = FILTER timestarted BY (f1 matches '.*TIMESTARTED.*');

timestarted = foreach filetered GENERATE [=14=] as fechaStarted;

-- the FormatTimestartedUDF just gets ride of 'TIMESTARTED=' in order to get the date '20160101'
in this FOREACH sentence is where it fails with the 'A column needs to be projected...'
finalResult = FOREACH outFile GENERATE f1, FormatTimestartedUDF(timestarted) as f2;
STORE finalResult INTO 's3://bucketName/output/';

您收到错误是因为您引用的 f1 在输出文件中不存在,并且 timestarted 是一个关系而不是 field.Also 您应该在 resultALL 中使用该字段并进行过滤。

outFile = LOAD 's3://bucketName/input/' USING PigStorage ('|');
resultAll = FOREACH outFile GENERATE ParseOutFilesUDF(*) as initial;

timestarted = LOAD 's3://bucketName/input/' USING PigStorage ('|') as f1;
filtered = FILTER timestarted BY (f1 matches '.*TIMESTARTED.*');

finalResult = FOREACH resultALL GENERATE resultAll.initial, FormatTimestartedUDF(filtered.[=10=]);
STORE finalResult INTO 's3://bucketName/output/';