特殊格式压缩的Spark Reading

Spark Reading Compressed with Special Format

我有一个 .gz 文件,我需要读取这个文件并将时间和文件名添加到这个文件中我遇到了一些问题,需要你的帮助来推荐解决这一点的方法。

  1. 因为文件被压缩了,第一行读取的格式不正确我认为是由于编码问题我尝试了下面的代码但没有工作

    implicit val codec = Codec("UTF-8")
    codec.onMalformedInput(CodingErrorAction.REPLACE)
    codec.onUnmappableCharacter(CodingErrorAction.REPLACE)
    
  2. 文件有特殊格式,我需要使用 Regex 将其读入数据名 ==> 我发现的唯一方法是使用 RDD 读取它并将其映射到 regex 有没有办法直接读到 DF 并传递正则表达式?

    val Test_special_format_RawData = sc.textFile("file://"+filename.toString())
      .map(line ⇒ line.replace("||", "|NA|NA"))
      .map(line ⇒ if (line.takeRight(1) == "|") line+"NA" else line)
      .map { x ⇒ regex_var.findAllIn(x).toArray }
    
    import hiveSqlContext.implicits._
    
    val Test_special_format_DF = Test_special_format_RawData.filter { x⇒x.length==30 }
      .filter { x⇒x(0) !=header(0) }
      .map { x⇒ (x(0), x(1), x(2), x(3), x(4), x(5), x(6), x(7),
                 x(8), x(9), x(10), x(11), x(12), x(13), x(14),
                 x(15),x(16), x(17), x(18), x(19))}.toDF()
    
    val Test_special_format_Tranformed_Data = Test_special_format_DF.withColumn("FileName", lit(filename.getName))
      .withColumn("rtm_insertion_date", lit(RTM_DATE_FORMAT.format(Cal.getInstance().getTime())))
    
  3. 我可以忽略任何特殊字符之间的任何定界符吗?例如,如果“|” pipe coming between ^~ ^~ 忽略它?

  4. 有时接收到的数据帧列类型是错误的数据类型。我们如何处理这个问题以应用数据质量检查?

  5. 当我尝试使用 Dataframe 从 Spark 插入配置单元时。我可以为未处理行错误指定拒绝目录吗?下面是我使用的代码?

    Test_special_format_Tranformed_Data.write.partitionBy("rtm_insertion_date")
      .mode(SaveMode.Append).insertInto("dpi_Test_special_format_source")
    

文件样本是here

我会回答我关于文件格式问题的问题。解决方案是覆盖 gzib 的默认扩展格式。

import org.apache.hadoop.io.compress.GzipCodec

class TmpGzipCodec extends GzipCodec {

  override def getDefaultExtension(): String = ".gz.tmp"

}

现在我们刚刚注册了这个编解码器,在 SparkConf 上设置 spark.hadoop.io.compression.codecs:

val conf = new SparkConf()

// Custom Codec that process .gz.tmp extensions as a common Gzip format
conf.set("spark.hadoop.io.compression.codecs", "smx.ananke.spark.util.codecs.TmpGzipCodec")

val sc = new SparkContext(conf)

val data = sc.textFile("s3n://my-data-bucket/2015/09/21/13/*")

我发现这个解决方案是这个 link

关于格式错误的记录,有以下两种解决方法:

  1. 个案class然后检查它的模式是否匹配这个案例class。
  2. 逐行解析 RDD,但它需要在 spark.csv 库中更新。

关于delimiter delimiter 问题,它需要使用带正则表达式的RDD。