将 .txt 文件拆分为元素

Splitting .txt file into elements

由于我们无法直接从 Json 文件中读取,我使用的是 .txt 文件。 它看起来像用“,”分隔的更多元素。

[
  {
    "Item_Identifier": "FDW58", 
    "Outlet_Size": "Medium"

  },
  {
    "Item_Identifier": "FDW14",
    "Outlet_Size": "Small"
  },
]

我想统计元素的个数,这里我会得到2。 问题是我无法将文本分隔成用逗号“,”分隔的元素。 即使我将其转换为 json 格式,我也会单独获取每一行。

lines = p | 'receive_data' >> beam.io.ReadFromText(
    known_args.input)\
    | 'jsondumps' >> beam.Map(lambda x: json.dumps(x))\
    | 'jsonloads' >> beam.Map(lambda x: json.loads(x))\
    | 'print' >> beam.ParDo(PrintFn()) \

.json 文件只是一个文本文件,其内容采用 JSON 可解析格式。

您的 JSON 无效,因为它有尾随逗号。这应该有效:

import json

j = r"""
[
    {
        "Item_Identifier": "FDW58",
        "Outlet_Size": "Medium"
    },
    {
        "Item_Identifier": "FDW14",
        "Outlet_Size": "Small"
    }
]
"""

print(json.loads(j))

我不认为这是一种安全的方法。我没有使用 python sdk(我使用 java)但是 java 端的 io.TextIO 很清楚它将发出一个 PCollection,其中每个元素都是一行来自源文件的输入。分层数据格式(json、xml 等)不能修改为以这种方式拆分。

如果您的文件与您所包含的 json 一样格式良好且没有嵌套,您可以:

  • 逐行读取文件(我相信你正在做)
  • 仅过滤包含 }
  • 的行
  • 计算生成的 pcollection 大小

不过,为了更普遍地与 json 集成,我们采用了不同的方法:

  • 从字符串的 PCollection 开始,其中每个值都是文件的路径
  • 使用本机库访问文件并以流式方式解析它(我们使用 scala,它有一些可用的流式 json 解析库)
    • 或者,使用 Beam 的 API 从 MatchResult 获取 ReadableFile 实例并通过该实例访问文件

我的理解是并非所有文件格式都适合分布式处理器。例如,Gzip 不能 'split' 或容易分块。与 JSON 相同。 CSV 有一个问题,即值是无意义的,除非你也有方便的开场白。