如何将任何带分隔符的文本文件转换为 parquet/avro - 使用 spark sql 将列 number/stucture 动态更改为 avro/parquet?

how to convert any delimited text file to parquet/avro - dynamically changing column number/stucture into avro/parquet using spark sql?

我们需要每天将文本数据转换为 parquet/avro,其中来自多个来源的输入具有不同的结构我们希望有基于 spark sql 的 scala 代码来实现这一点,而不管分隔符和列数或结构。

分析你的问题陈述后,我做出以下假设,

1. data source can be anything, primarily HDFS
2. delimiter can be anything
3. you're maintaining  structure for each source. 
4. file does not contains header

建议:这里的问题是如果你的数据不包含header,你必须生成StructType。想出一些结构可能是 json 结构来定义您的数据源。然后使用 scala 使用 jackson 加载和解析 json。或者简单地将 column_map 传递给您的程序。

Example: 
{
    "inputLocation": "",
    "delimiter" : ",",
    "column_map" : "col1, datatype; col12, datatype;col1, datatype; col12, datatype"
    "outputLocation": ""
}

现在,使用 column_map 动态生成结构类型。

object GenerateStructType {

  import org.apache.spark.sql.types.{BooleanType, StringType, StructField, StructType}

  def generateStrucType(columnsList: Seq[String]): StructType = {

    val res=columnsList.map( columnDetail => {
      val  columnName = columnDetail.split(",")(0).trim
      val  columnType = columnDetail.split(",")(1).trim

      columnType match {
        case "String" => StructField(columnName,StringType,true)
        case "Bool" => StructField(columnName,BooleanType,true)
        case _ => StructField(columnName,StringType,true)

      }
    })
    StructType(res)
  }

  def main(args: Array[String]): Unit = {
    val columnMap=  "col1, datatype; col12, datatype;col1, datatype; col12, datatype"

    val result= GenerateStructType.generateStrucType(    columnMap.split(";"))
    println(result)
  }

}

动态生成StructType

StructType(StructField(col1,StringType,true), StructField(col12,StringType,true), StructField(col1,StringType,true), StructField(col12,StringType,true))

加载数据时使用结构类型。

希望这对您有所帮助....

我在 spark 2.1.0 中编写了这段代码 - Spark SQL

使用的输入

1238769|Michael|Hoffman|50000|New York
1238769|Michael1|Hoffman1|50000|New York1
1238770|Michael2|Hoffman2|50000|New York2
1238771|Michael3|Hoffman3|50000|New York3
1238772|Michael4|Hoffman4|50000|New York4
1238773|Michael5|Hoffman5|50000|New York5
1238774|Michael6|Hoffman6|50000|New York6
1238775|Michael7|Hoffman7|50000|New York7
1238776|Michael8|Hoffman8|50000|New York8
1238777|Michael9|Hoffman9|50000|New York9

在这个例子中,我将把一个管道(“|”)文本文件转换成镶木地板

第 1 步:读取输入变量

//creating spark session
val spark = SparkSession.builder().appName("Text to Parquet").master("local[*]").getOrCreate()
import spark.implicits._

//Assigning values to the variables
val input_location = args(0).trim.toString()
val delimiter = "\|" //You can make it dynamic by passing it as an argument
val selectColString_location = args(1).trim().toString()
val output_location = args(2).trim().toString()

步骤#2:读取输入文本数据并根据分隔符拆分

//Reading data from text file
val input_rdd = spark.sparkContext.textFile(input_location)

//Split the input data using the delimiter(we are suing pipe(\|) as delimiter for this example)
val input_array_rdd:RDD[Array[String]] = input_rdd.map(x => x.split(delimiter, -1))

第 3 步:使用 toDF 将第 2 步中创建的 rdd 转换为数据框,只有一个列 - col,它将是一个数组列

//Converting input_array_rdd into dataframe with only one column - col
val input_df:DataFrame = input_array_rdd.toDF("col")

//Creating temp table on top of input_df with the name TABLE1
input_df.createOrReplaceTempView("TABLE1")

第 4 步:使用临时 table - TABLE1 和数组列 - 列根据输入结构准备 select 语句并将其作为单行保存在文本文件中

select cast(col[0] as bigint) as cust_id, col[1] as first_name, col[2] as last_name, cast(col[3] as decimal(18,6)) as amount, col[4] as city from table1

第 5 步:从文件中读取 select 语句并执行它以生成输出

//Reading the selectColString, remember we are reading only the first row from the file
//Select SQL should be only one row in the selectColString.txt file
val sqlColString = spark.sparkContext.textFile(selectColString_location).first().toString()
//Generating the output using the colString
val output_df = spark.sql(sqlColString)

第 6 步:将输出写为 parquet

output_df.write.mode(SaveMode.Overwrite).parquet(output_location)

输出镶木地板架构

root
 |-- cust_id: long (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- amount: decimal(18,6) (nullable = true)
 |-- city: string (nullable = true)

使用这个程序,我们只需根据输入文本修改 selectColString 文件,即可将所有文本文件转换为 parquet。

Github代码Link:https://github.com/sangamgavini/ReusableCodes/tree/master/src/main/scala/com/sangam/TexttoParquet