将包含多个表的 250GB JSON 文件拆分为镶木地板
Splitting 250GB JSON file containing multiple tables into parquet
我有一个 JSON 文件,格式如下,
{
"Table1": {
"Records": [
{
"Key1Tab1": "SomeVal",
"Key2Tab1": "AnotherVal"
},
{
"Key1Tab1": "SomeVal2",
"Key2Tab1": "AnotherVal2"
}
]
},
"Table2": {
"Records": [
{
"Key1Tab1": "SomeVal",
"Key2Tab1": "AnotherVal"
},
{
"Key1Tab1": "SomeVal2",
"Key2Tab1": "AnotherVal2"
}
]
}
}
根键是来自 SQL 数据库的 table 名称,其对应的值是行。
我想将 JSON 文件拆分为单独的镶木地板文件,每个文件代表一个 table。
IE。 Table1.parquet
和 Table2.parquet
.
最大的问题是文件的大小使我无法将其加载到内存中。
因此,我尝试使用 dask.bag 来适应文件的嵌套结构。
import dask.bag as db
from dask.distributed import Client
client = Client(n_workers=4)
lines = db.read_text("filename.json")
但是使用 lines.take(4)
评估输出显示 dask 无法正确读取新行。
('{\n', ' "Table1": {\n', ' "Records": [\n', ' {\n')
我尝试寻找特定问题的解决方案,但没有成功。
是否有可能使用 dask 解决拆分问题,或者是否有其他工具可以完成这项工作?
按照建议here try the dask.dataframe.read_json()
方法
这可能就足够了,但我不确定如果您没有足够的内存来将整个结果数据帧存储在内存中,它会如何表现..
import dask.dataframe as dd
from dask.distributed import Client
client = Client()
df = dd.read_json("filename.json")
df.to_parquet("filename.parquet", engine='pyarrow')
文档
- https://distributed.dask.org/en/latest/manage-computation.html#dask-collections-to-futures
- https://examples.dask.org/dataframes/01-data-access.html#Write-to-Parquet
如果 Dask 在单个系统上不分块处理文件(它可能不会愉快地这样做,因为 JSON 以这种方式解析显然不友好..虽然不幸的是我不这样做可以访问我的测试系统来验证这一点)并且系统内存无法处理这个巨大的文件,您可以通过创建一个大的交换文件 space 来使用磁盘扩展系统内存。
请注意,这将创建一个约 300G 的文件(增加 count
字段以获得更多)并且与内存相比可能 难以置信地 慢(但也许仍然足够快满足您的需求,特别是如果它是 1-off)。
# create and configure swapfile
dd if=/dev/zero of=swapfile.img bs=10M count=30000 status=progress
chmod 600 swapfile.img
mkswap swapfile.img
swapon swapfile.img
#
# run memory-greedy task
# ...
# ensure processes have exited
#
# disable and remove swapfile to reclaim disk space
swapoff swapfile.img # may hang for a long time
rm swapfile.img
问题是,默认情况下,dask 会根据换行符拆分文件,您不能保证这不会出现在您的某个表格的中间。事实上,即使你做对了,你仍然需要操作结果文本来为每个分区制作完整的 JSON 个对象。
例如:
def myfunc(x):
x = "".join(x)
if not x.endswith("}"):
x = x[:-2] + "}"
if not x.startswith("{"):
x = "{" + x
return [json.loads(x)]
db.read_text('temp.json',
linedelimiter="\n },\n",
blocksize=100).map_partitions(myfunc)
在这种情况下,我故意使块大小小于每个部分以演示:您将得到一个 JSON 对象或每个分区什么都没有。
_.compute()
[{'Table1': {'Records': [{'Key1Tab1': 'SomeVal', 'Key2Tab1': 'AnotherVal'},
{'Key1Tab1': 'SomeVal2', 'Key2Tab1': 'AnotherVal2'}]}},
{},
{'Table2': {'Records': [{'Key1Tab1': 'SomeVal', 'Key2Tab1': 'AnotherVal'},
{'Key1Tab1': 'SomeVal2', 'Key2Tab1': 'AnotherVal2'}]}},
{},
{},
{}]
当然,在您的情况下,您可以立即使用 JSON 而不是 return 它,或者您可以 map
链中的下一个写入函数。
处理大文件时,成功的关键是将数据作为流处理,即在类似过滤器的程序中。
JSON 格式易于解析。以下程序逐个读取输入字符(I/O 应该被缓冲)并将顶级 JSON 对象切割为单独的对象。它正确地遵循数据结构而不是格式。
演示程序只是打印“--NEXT OUTPUT FILE--”,真正的输出文件切换应该在这里实现。空白剥离是作为奖励实施的。
import collections
OBJ = 'object'
LST = 'list'
def out(ch):
print(ch, end='')
with open('json.data') as f:
stack = collections.deque(); push = stack.append; pop = stack.pop
esc = string = False
while (ch := f.read(1)):
if esc:
esc = False
elif ch == '\':
esc = True
elif ch == '"':
string = not string
if not string:
if ch in {' ', '\t', '\r', '\n'}:
continue
if ch == ',':
if len(stack) == 1 and stack[0] == OBJ:
out('}\n')
print("--- NEXT OUTPUT FILE ---")
out('{')
continue
elif ch == '{':
push(OBJ)
elif ch == '}':
if pop() is not OBJ:
raise ValueError("unmatched { }")
elif ch == '[':
push(LST)
elif ch == ']':
if pop() is not LST:
raise ValueError("unmatched [ ]")
out(ch)
这是我的测试文件的示例输出:
{"key1":{"name":"John","surname":"Doe"}}
--- NEXT OUTPUT FILE ---
{"key2":"string \" ] }"}
--- NEXT OUTPUT FILE ---
{"key3":13}
--- NEXT OUTPUT FILE ---
{"key4":{"sub1":[null,{"l3":true},null]}}
原始文件是:
{
"key1": {
"name": "John",
"surname": "Doe"
},
"key2": "string \" ] }", "key3": 13,
"key4": {
"sub1": [null, {"l3": true}, null]
}
}
我有一个 JSON 文件,格式如下,
{
"Table1": {
"Records": [
{
"Key1Tab1": "SomeVal",
"Key2Tab1": "AnotherVal"
},
{
"Key1Tab1": "SomeVal2",
"Key2Tab1": "AnotherVal2"
}
]
},
"Table2": {
"Records": [
{
"Key1Tab1": "SomeVal",
"Key2Tab1": "AnotherVal"
},
{
"Key1Tab1": "SomeVal2",
"Key2Tab1": "AnotherVal2"
}
]
}
}
根键是来自 SQL 数据库的 table 名称,其对应的值是行。
我想将 JSON 文件拆分为单独的镶木地板文件,每个文件代表一个 table。
IE。 Table1.parquet
和 Table2.parquet
.
最大的问题是文件的大小使我无法将其加载到内存中。 因此,我尝试使用 dask.bag 来适应文件的嵌套结构。
import dask.bag as db
from dask.distributed import Client
client = Client(n_workers=4)
lines = db.read_text("filename.json")
但是使用 lines.take(4)
评估输出显示 dask 无法正确读取新行。
('{\n', ' "Table1": {\n', ' "Records": [\n', ' {\n')
我尝试寻找特定问题的解决方案,但没有成功。
是否有可能使用 dask 解决拆分问题,或者是否有其他工具可以完成这项工作?
按照建议here try the dask.dataframe.read_json()
方法
这可能就足够了,但我不确定如果您没有足够的内存来将整个结果数据帧存储在内存中,它会如何表现..
import dask.dataframe as dd
from dask.distributed import Client
client = Client()
df = dd.read_json("filename.json")
df.to_parquet("filename.parquet", engine='pyarrow')
文档
- https://distributed.dask.org/en/latest/manage-computation.html#dask-collections-to-futures
- https://examples.dask.org/dataframes/01-data-access.html#Write-to-Parquet
如果 Dask 在单个系统上不分块处理文件(它可能不会愉快地这样做,因为 JSON 以这种方式解析显然不友好..虽然不幸的是我不这样做可以访问我的测试系统来验证这一点)并且系统内存无法处理这个巨大的文件,您可以通过创建一个大的交换文件 space 来使用磁盘扩展系统内存。
请注意,这将创建一个约 300G 的文件(增加 count
字段以获得更多)并且与内存相比可能 难以置信地 慢(但也许仍然足够快满足您的需求,特别是如果它是 1-off)。
# create and configure swapfile
dd if=/dev/zero of=swapfile.img bs=10M count=30000 status=progress
chmod 600 swapfile.img
mkswap swapfile.img
swapon swapfile.img
#
# run memory-greedy task
# ...
# ensure processes have exited
#
# disable and remove swapfile to reclaim disk space
swapoff swapfile.img # may hang for a long time
rm swapfile.img
问题是,默认情况下,dask 会根据换行符拆分文件,您不能保证这不会出现在您的某个表格的中间。事实上,即使你做对了,你仍然需要操作结果文本来为每个分区制作完整的 JSON 个对象。
例如:
def myfunc(x):
x = "".join(x)
if not x.endswith("}"):
x = x[:-2] + "}"
if not x.startswith("{"):
x = "{" + x
return [json.loads(x)]
db.read_text('temp.json',
linedelimiter="\n },\n",
blocksize=100).map_partitions(myfunc)
在这种情况下,我故意使块大小小于每个部分以演示:您将得到一个 JSON 对象或每个分区什么都没有。
_.compute()
[{'Table1': {'Records': [{'Key1Tab1': 'SomeVal', 'Key2Tab1': 'AnotherVal'},
{'Key1Tab1': 'SomeVal2', 'Key2Tab1': 'AnotherVal2'}]}},
{},
{'Table2': {'Records': [{'Key1Tab1': 'SomeVal', 'Key2Tab1': 'AnotherVal'},
{'Key1Tab1': 'SomeVal2', 'Key2Tab1': 'AnotherVal2'}]}},
{},
{},
{}]
当然,在您的情况下,您可以立即使用 JSON 而不是 return 它,或者您可以 map
链中的下一个写入函数。
处理大文件时,成功的关键是将数据作为流处理,即在类似过滤器的程序中。
JSON 格式易于解析。以下程序逐个读取输入字符(I/O 应该被缓冲)并将顶级 JSON 对象切割为单独的对象。它正确地遵循数据结构而不是格式。
演示程序只是打印“--NEXT OUTPUT FILE--”,真正的输出文件切换应该在这里实现。空白剥离是作为奖励实施的。
import collections
OBJ = 'object'
LST = 'list'
def out(ch):
print(ch, end='')
with open('json.data') as f:
stack = collections.deque(); push = stack.append; pop = stack.pop
esc = string = False
while (ch := f.read(1)):
if esc:
esc = False
elif ch == '\':
esc = True
elif ch == '"':
string = not string
if not string:
if ch in {' ', '\t', '\r', '\n'}:
continue
if ch == ',':
if len(stack) == 1 and stack[0] == OBJ:
out('}\n')
print("--- NEXT OUTPUT FILE ---")
out('{')
continue
elif ch == '{':
push(OBJ)
elif ch == '}':
if pop() is not OBJ:
raise ValueError("unmatched { }")
elif ch == '[':
push(LST)
elif ch == ']':
if pop() is not LST:
raise ValueError("unmatched [ ]")
out(ch)
这是我的测试文件的示例输出:
{"key1":{"name":"John","surname":"Doe"}}
--- NEXT OUTPUT FILE ---
{"key2":"string \" ] }"}
--- NEXT OUTPUT FILE ---
{"key3":13}
--- NEXT OUTPUT FILE ---
{"key4":{"sub1":[null,{"l3":true},null]}}
原始文件是:
{
"key1": {
"name": "John",
"surname": "Doe"
},
"key2": "string \" ] }", "key3": 13,
"key4": {
"sub1": [null, {"l3": true}, null]
}
}