为文件输入定义手动拆分算法

Defining a manual Split algorithm for File Input

我是 Spark 和 Hadoop 生态系统的新手,并且已经爱上了它。 现在,我正在尝试将现有的 Java 应用程序移植到 Spark。

此 Java 应用程序的结构如下:

  1. 使用自定义解析器 Class 使用 BufferedReader 一个接一个地读取文件,该解析器对输入数据进行一些繁重的计算。每个输入文件的大小为 1 到最大 2.5 GB。
  2. 在内存中存储数据(在HashMap<String, TreeMap<DateTime, List<DataObjectInterface>>>中)
  3. 将内存数据存储写成 JSON。这些 JSON 个文件较小。

我编写了一个 Scala 应用程序,它确实由一个工作人员处理我的文件,但这显然不是我可以从 Spark 中获得的最大性能优势。

现在解决我将其移植到 Spark 的问题: 输入文件是基于行的。我通常每行一条消息。但是,某些消息依赖于前面的行来在解析器中形成实际有效的消息。例如,我可能会在输入文件中按以下顺序获取数据:

  1. {时间戳}#0x033#{data_bytes}\n
  2. {时间戳}#0x034#{data_bytes}\n
  3. {时间戳}#0x035#{data_bytes}\n
  4. {时间戳}#0x0FE#{data_bytes}\n
  5. {时间戳}#0x036#{data_bytes}\n

要从 "composition message" 0x036 中形成一条实际消息,解析器还需要来自消息 0x033、0x034 和 0x035 的行。其他消息也可能介于这些需要的消息之间。大多数消息可以通过读取一行来解析。

现在终于是我的问题了: 如何让 Spark 为我的目的正确拆分我的文件?文件不能分割"randomly";它们必须以确保我的所有消息都可以被解析的方式拆分,并且解析器不会等待他永远不会得到的输入。这意味着每个组合消息(依赖于前面几行的消息)需要在一个拆分中。

我想有几种方法可以实现正确的输出,但我也会将我的一些想法投入其中 post:

第二种方法可能更简单,但是我不知道如何在 Spark 中实现它。有人能指出我正确的方向吗?

提前致谢!

我在 http://blog.ae.be/ingesting-data-spark-using-custom-hadoop-fileinputformat/ 上看到了你对我的博文的评论,并决定在这里发表我的意见。

首先,我不完全确定您要做什么。在这里帮助我:您的文件包含包含 0x033、0x034、0x035 和 0x036 的行,因此 Spark 将分别处理它们?虽然实际上这些行需要一起处理?

如果是这种情况,您不应将其解释为 "corrupt split"。正如您在博文中看到的那样,Spark 将文件拆分为可以单独处理的记录。默认情况下,它通过在换行符上拆分记录来做到这一点。但是,在您的情况下,您的 "record" 实际上分布在多行中。所以是的,您可以使用自定义文件输入格式。不过,我不确定这是否是最简单的解决方案。

您可以尝试使用执行以下操作的自定义文件输入格式来解决此问题:不是像默认文件输入格式那样逐行给出,而是解析文件并跟踪遇到的记录(0x033、0x034 等)。同时你可以过滤掉像 0x0FE 这样的记录(不确定你是否想在其他地方使用它们)。这样做的结果将是 Spark 将所有这些物理记录作为一个逻辑记录。

另一方面,逐行读取文件并使用功能键映射记录可能更容易(例如 [object 33, 0x033]、[object 33, 0x034]、...)。这样您就可以使用您选择的密钥组合这些行。

当然还有其他选择。您选择哪个取决于您的用例。