为文件输入定义手动拆分算法
Defining a manual Split algorithm for File Input
我是 Spark 和 Hadoop 生态系统的新手,并且已经爱上了它。
现在,我正在尝试将现有的 Java 应用程序移植到 Spark。
此 Java 应用程序的结构如下:
- 使用自定义解析器 Class 使用
BufferedReader
一个接一个地读取文件,该解析器对输入数据进行一些繁重的计算。每个输入文件的大小为 1 到最大 2.5 GB。
- 在内存中存储数据(在
HashMap<String, TreeMap<DateTime, List<DataObjectInterface>>>
中)
- 将内存数据存储写成 JSON。这些 JSON 个文件较小。
我编写了一个 Scala 应用程序,它确实由一个工作人员处理我的文件,但这显然不是我可以从 Spark 中获得的最大性能优势。
现在解决我将其移植到 Spark 的问题:
输入文件是基于行的。我通常每行一条消息。但是,某些消息依赖于前面的行来在解析器中形成实际有效的消息。例如,我可能会在输入文件中按以下顺序获取数据:
- {时间戳}#0x033#{data_bytes}\n
- {时间戳}#0x034#{data_bytes}\n
- {时间戳}#0x035#{data_bytes}\n
- {时间戳}#0x0FE#{data_bytes}\n
- {时间戳}#0x036#{data_bytes}\n
要从 "composition message" 0x036 中形成一条实际消息,解析器还需要来自消息 0x033、0x034 和 0x035 的行。其他消息也可能介于这些需要的消息之间。大多数消息可以通过读取一行来解析。
现在终于是我的问题了:
如何让 Spark 为我的目的正确拆分我的文件?文件不能分割"randomly";它们必须以确保我的所有消息都可以被解析的方式拆分,并且解析器不会等待他永远不会得到的输入。这意味着每个组合消息(依赖于前面几行的消息)需要在一个拆分中。
我想有几种方法可以实现正确的输出,但我也会将我的一些想法投入其中 post:
- 为文件输入定义一个手动拆分算法?这将检查拆分的最后几行不包含 "big" 消息 [0x033、0x034、0x035] 的开头。
- 根据 spark 的需要拆分文件,但也从上一个拆分到下一个拆分添加固定数量的行(比如 50,这肯定会完成工作)。多个数据将由解析器 class 正确处理,不会引入任何问题。
第二种方法可能更简单,但是我不知道如何在 Spark 中实现它。有人能指出我正确的方向吗?
提前致谢!
我在 http://blog.ae.be/ingesting-data-spark-using-custom-hadoop-fileinputformat/ 上看到了你对我的博文的评论,并决定在这里发表我的意见。
首先,我不完全确定您要做什么。在这里帮助我:您的文件包含包含 0x033、0x034、0x035 和 0x036 的行,因此 Spark 将分别处理它们?虽然实际上这些行需要一起处理?
如果是这种情况,您不应将其解释为 "corrupt split"。正如您在博文中看到的那样,Spark 将文件拆分为可以单独处理的记录。默认情况下,它通过在换行符上拆分记录来做到这一点。但是,在您的情况下,您的 "record" 实际上分布在多行中。所以是的,您可以使用自定义文件输入格式。不过,我不确定这是否是最简单的解决方案。
您可以尝试使用执行以下操作的自定义文件输入格式来解决此问题:不是像默认文件输入格式那样逐行给出,而是解析文件并跟踪遇到的记录(0x033、0x034 等)。同时你可以过滤掉像 0x0FE 这样的记录(不确定你是否想在其他地方使用它们)。这样做的结果将是 Spark 将所有这些物理记录作为一个逻辑记录。
另一方面,逐行读取文件并使用功能键映射记录可能更容易(例如 [object 33, 0x033]、[object 33, 0x034]、...)。这样您就可以使用您选择的密钥组合这些行。
当然还有其他选择。您选择哪个取决于您的用例。
我是 Spark 和 Hadoop 生态系统的新手,并且已经爱上了它。 现在,我正在尝试将现有的 Java 应用程序移植到 Spark。
此 Java 应用程序的结构如下:
- 使用自定义解析器 Class 使用
BufferedReader
一个接一个地读取文件,该解析器对输入数据进行一些繁重的计算。每个输入文件的大小为 1 到最大 2.5 GB。 - 在内存中存储数据(在
HashMap<String, TreeMap<DateTime, List<DataObjectInterface>>>
中) - 将内存数据存储写成 JSON。这些 JSON 个文件较小。
我编写了一个 Scala 应用程序,它确实由一个工作人员处理我的文件,但这显然不是我可以从 Spark 中获得的最大性能优势。
现在解决我将其移植到 Spark 的问题: 输入文件是基于行的。我通常每行一条消息。但是,某些消息依赖于前面的行来在解析器中形成实际有效的消息。例如,我可能会在输入文件中按以下顺序获取数据:
- {时间戳}#0x033#{data_bytes}\n
- {时间戳}#0x034#{data_bytes}\n
- {时间戳}#0x035#{data_bytes}\n
- {时间戳}#0x0FE#{data_bytes}\n
- {时间戳}#0x036#{data_bytes}\n
要从 "composition message" 0x036 中形成一条实际消息,解析器还需要来自消息 0x033、0x034 和 0x035 的行。其他消息也可能介于这些需要的消息之间。大多数消息可以通过读取一行来解析。
现在终于是我的问题了: 如何让 Spark 为我的目的正确拆分我的文件?文件不能分割"randomly";它们必须以确保我的所有消息都可以被解析的方式拆分,并且解析器不会等待他永远不会得到的输入。这意味着每个组合消息(依赖于前面几行的消息)需要在一个拆分中。
我想有几种方法可以实现正确的输出,但我也会将我的一些想法投入其中 post:
- 为文件输入定义一个手动拆分算法?这将检查拆分的最后几行不包含 "big" 消息 [0x033、0x034、0x035] 的开头。
- 根据 spark 的需要拆分文件,但也从上一个拆分到下一个拆分添加固定数量的行(比如 50,这肯定会完成工作)。多个数据将由解析器 class 正确处理,不会引入任何问题。
第二种方法可能更简单,但是我不知道如何在 Spark 中实现它。有人能指出我正确的方向吗?
提前致谢!
我在 http://blog.ae.be/ingesting-data-spark-using-custom-hadoop-fileinputformat/ 上看到了你对我的博文的评论,并决定在这里发表我的意见。
首先,我不完全确定您要做什么。在这里帮助我:您的文件包含包含 0x033、0x034、0x035 和 0x036 的行,因此 Spark 将分别处理它们?虽然实际上这些行需要一起处理?
如果是这种情况,您不应将其解释为 "corrupt split"。正如您在博文中看到的那样,Spark 将文件拆分为可以单独处理的记录。默认情况下,它通过在换行符上拆分记录来做到这一点。但是,在您的情况下,您的 "record" 实际上分布在多行中。所以是的,您可以使用自定义文件输入格式。不过,我不确定这是否是最简单的解决方案。
您可以尝试使用执行以下操作的自定义文件输入格式来解决此问题:不是像默认文件输入格式那样逐行给出,而是解析文件并跟踪遇到的记录(0x033、0x034 等)。同时你可以过滤掉像 0x0FE 这样的记录(不确定你是否想在其他地方使用它们)。这样做的结果将是 Spark 将所有这些物理记录作为一个逻辑记录。
另一方面,逐行读取文件并使用功能键映射记录可能更容易(例如 [object 33, 0x033]、[object 33, 0x034]、...)。这样您就可以使用您选择的密钥组合这些行。
当然还有其他选择。您选择哪个取决于您的用例。