将 .txt 文件拆分为元素
Splitting .txt file into elements
由于我们无法直接从 Json 文件中读取,我使用的是 .txt 文件。
它看起来像用“,”分隔的更多元素。
[
{
"Item_Identifier": "FDW58",
"Outlet_Size": "Medium"
},
{
"Item_Identifier": "FDW14",
"Outlet_Size": "Small"
},
]
我想统计元素的个数,这里我会得到2。
问题是我无法将文本分隔成用逗号“,”分隔的元素。
即使我将其转换为 json 格式,我也会单独获取每一行。
lines = p | 'receive_data' >> beam.io.ReadFromText(
known_args.input)\
| 'jsondumps' >> beam.Map(lambda x: json.dumps(x))\
| 'jsonloads' >> beam.Map(lambda x: json.loads(x))\
| 'print' >> beam.ParDo(PrintFn()) \
.json
文件只是一个文本文件,其内容采用 JSON 可解析格式。
您的 JSON 无效,因为它有尾随逗号。这应该有效:
import json
j = r"""
[
{
"Item_Identifier": "FDW58",
"Outlet_Size": "Medium"
},
{
"Item_Identifier": "FDW14",
"Outlet_Size": "Small"
}
]
"""
print(json.loads(j))
我不认为这是一种安全的方法。我没有使用 python sdk(我使用 java)但是 java 端的 io.TextIO
很清楚它将发出一个 PCollection,其中每个元素都是一行来自源文件的输入。分层数据格式(json、xml 等)不能修改为以这种方式拆分。
如果您的文件与您所包含的 json 一样格式良好且没有嵌套,您可以:
- 逐行读取文件(我相信你正在做)
- 仅过滤包含
}
的行
- 计算生成的 pcollection 大小
不过,为了更普遍地与 json 集成,我们采用了不同的方法:
- 从字符串的 PCollection 开始,其中每个值都是文件的路径
- 使用本机库访问文件并以流式方式解析它(我们使用 scala,它有一些可用的流式 json 解析库)
- 或者,使用 Beam 的 API 从
MatchResult
获取 ReadableFile
实例并通过该实例访问文件
我的理解是并非所有文件格式都适合分布式处理器。例如,Gzip 不能 'split' 或容易分块。与 JSON 相同。 CSV 有一个问题,即值是无意义的,除非你也有方便的开场白。
由于我们无法直接从 Json 文件中读取,我使用的是 .txt 文件。 它看起来像用“,”分隔的更多元素。
[
{
"Item_Identifier": "FDW58",
"Outlet_Size": "Medium"
},
{
"Item_Identifier": "FDW14",
"Outlet_Size": "Small"
},
]
我想统计元素的个数,这里我会得到2。 问题是我无法将文本分隔成用逗号“,”分隔的元素。 即使我将其转换为 json 格式,我也会单独获取每一行。
lines = p | 'receive_data' >> beam.io.ReadFromText(
known_args.input)\
| 'jsondumps' >> beam.Map(lambda x: json.dumps(x))\
| 'jsonloads' >> beam.Map(lambda x: json.loads(x))\
| 'print' >> beam.ParDo(PrintFn()) \
.json
文件只是一个文本文件,其内容采用 JSON 可解析格式。
您的 JSON 无效,因为它有尾随逗号。这应该有效:
import json
j = r"""
[
{
"Item_Identifier": "FDW58",
"Outlet_Size": "Medium"
},
{
"Item_Identifier": "FDW14",
"Outlet_Size": "Small"
}
]
"""
print(json.loads(j))
我不认为这是一种安全的方法。我没有使用 python sdk(我使用 java)但是 java 端的 io.TextIO
很清楚它将发出一个 PCollection,其中每个元素都是一行来自源文件的输入。分层数据格式(json、xml 等)不能修改为以这种方式拆分。
如果您的文件与您所包含的 json 一样格式良好且没有嵌套,您可以:
- 逐行读取文件(我相信你正在做)
- 仅过滤包含
}
的行
- 计算生成的 pcollection 大小
不过,为了更普遍地与 json 集成,我们采用了不同的方法:
- 从字符串的 PCollection 开始,其中每个值都是文件的路径
- 使用本机库访问文件并以流式方式解析它(我们使用 scala,它有一些可用的流式 json 解析库)
- 或者,使用 Beam 的 API 从
MatchResult
获取ReadableFile
实例并通过该实例访问文件
- 或者,使用 Beam 的 API 从
我的理解是并非所有文件格式都适合分布式处理器。例如,Gzip 不能 'split' 或容易分块。与 JSON 相同。 CSV 有一个问题,即值是无意义的,除非你也有方便的开场白。