有没有办法在 hadoop 中为字符串添加定界符?

Is there any way to add delimiters to a string in hadoop?

我正在考虑编辑文件中的每一行并在 hadoop 中为其添加定界符。因为我有非常大的文件,所以在 hadoop 中这样做是否有利?

示例:

输入文件:

001012489MAR01856400004400
001012489FEB01856400004400

输出文件将是

0010|12489|MAR|018564|0000|44|00
0010|12489|FEB|018564|0000|44|00

我怎样才能做到这一点?我搜索了很多博客,但没有找到方法。

这可以通过 map-reduce 或 spark 作业来实现。 (substring()):

Map-reduce (JAVA): 在这种情况下你只需要映射器。只需将您的输入字符串行映射为带分隔符的字符串行:

public class Delimeters {

    public static class DelimetersMapper extends Mapper<Object, Text, Text, Text> {

        @Override
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {

            //001012489FEB01856400004400
            String lineWithDelimeter = value.toString().substring(0, 4) + "|" + value.toString().substring(4, 9)
                    + "|" + value.toString().substring(9, 12) + "|" + value.toString().substring(12, 18)
                    + "|" + value.toString().substring(18, 22) + "|" + value.toString().substring(22, 24)
                    + "|" + value.toString().substring(24,26);

            System.out.println(lineWithDelimeter); //0010|12489|MAR|018564|0000|44|00

            context.write(new Text(lineWithDelimeter),new Text(""));    
        }   
    }
    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "Add-Delimeters-to-flat-file");

        job.setJarByClass(Delimeters.class);
        job.setMapperClass(DelimetersMapper.class); 
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        FileSystem fs = null;
        Path dstFilePath = new Path(args[1]);
        try {
            fs = dstFilePath.getFileSystem(conf);
            if (fs.exists(dstFilePath))
                fs.delete(dstFilePath, true);
        } catch (IOException e1) {
            e1.printStackTrace();
        }

        job.waitForCompletion(true);
    } 

}

Spark (Scala):

object delimeter {
    def main(args: Array[String]) {

            val inputFile = args(0)

                val conf = new SparkConf().setAppName("Add-Delimeters-to-flat-file").setMaster("local")
                val sc = new SparkContext(conf)

                val txtFileLines = sc.textFile(inputFile)

                val fields = txtFileLines.map(line => line.substring(0, 4) + "|" + line.substring(4, 9)
                    + "|" + line.substring(9, 12) + "|" + line.substring(12, 18)
                    + "|" + line.substring(18, 22) + "|" + line.substring(22, 24)
                    + "|" + line.substring(24,26))

        fields.foreach(x => println(x))

            fields.saveAsTextFile(args(1))    
        }
}

更新:

  1. 你可以使用 file:/// uri 让 hadoop 知道寻找本地文件系统作为源(同样的规则适用于 spark):

    hadoop jar <app.jar> <package.classname> <file:///path/to/local/dir> </path/to/hdfs/>
    

示例:

    [cloudera@quickstart Desktop]$ hadoop jar hadoop-stack.jar so.Delimeters file:///home/cloudera/Desktop/test.txt /user/cloudera/delim
    [cloudera@quickstart Desktop]$ hadoop fs -cat /user/cloudera/delim/*
    0010|12489|FEB|018564|0000|44|00    
    0010|12489|MAR|018564|0000|44|00    
  1. 您可以在hdfs中拥有源文件,并在成功处理后删除应用程序本身的源文件:

    int exitcode = job.waitForCompletion(true)? 0: -1;
    
    if (exitcode == 0){
    try {
        Path sourcePath = new Path(args[0]);
        fs = sourcePath.getFileSystem(conf);
        if (fs.exists(sourcePath))
            fs.delete(sourcePath, true);
    } catch (IOException e1) {
        e1.printStackTrace();
    }
    }
    
  2. 创建 oozie 工作流程,该工作流程运行将分隔符添加到目标目的地的应用程序和 shell 删除源 file/dir 末尾的脚本

另一种替代方法是使用 Hive(负责编程部分)

1) 创建一个配置单元 tmp table 指向您的 HDFS 原始数据文件位置,

CREATE EXTERNAL TABLE tmp (raw String) 
LOCATION '<hdfs_path>'

2) 创建 formatted_data table 以 pipe delimeter 作为分隔符

CREATE TABLE formatted_data(
col1 string,col2 string,
col3 string,col4 string, 
col5 string,col6 string,col7 string) 
ROW FORMAT DELIMITED 
FIELDS TERMINATED BY '|';

3) 插入数据从tmp table到formatted_data table:

INSERT INTO formatted_data 
select substr(raw,0, 4),substr(raw,4, 9),substr(raw,9, 12),
substr(raw,12, 18),substr(raw,18, 22),substr(raw,22, 24),
substr(raw,24,26) from TMP ;

4) 验证'formatted_data'的hdfs文件table

hadoop fs -cat /hive/warehouse/formatted_data/000000_0
16/08/30 10:47:31 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

 0010|012489MAR|9MAR01856400|R01856400004400|400004400|04400|400
 0010|012489FEB|9FEB01856400|B01856400004400|400004400|04400|400