将 dask 数据帧写入镶木地板:'TypeError'

Writing a dask dataframe to parquet: 'TypeError'

我正在尝试使用 Dask 编写 parquet 文件。目标是使用它的 repartition 功能,但我似乎无法写出一个简单的镶木地板文件,而不进入 repartition 步骤...

这是我用来从 pyarrow 创建 parquet 文件的代码,通过 dask 读回,然后再次写入。

import pandas as pd
import numpy as np
import pyarrow as pa
import pyarrow.parquet as pq
import dask.dataframe as dd

file = 'example.parquet'
file_res = 'example_res.parquet'

# Generate a random df
df = pd.DataFrame(np.random.randint(100,size=(100000, 20)),columns=['A','B','C','D','E','F','G','H','I','J','K','L','M','N','O','P','Q','R','S','T'])

# Write 1st parquet file with pyarrow
table = pa.Table.from_pandas(df)
pq.write_table(table, file, version='1.0')

# Read it back with Dask, and write it again
dd_df = dd.read_parquet(file)
dd_df.to_parquet(file_res)

最后的写入步骤以TypeError: expected list of bytes结束。完整日志如下:


  File "C:/Users/me/Documents/code/_draft/pyarrow_parquet_store.py", line 31, in <module>
    dd_df.to_parquet(file_res)

  File "C:\Users\me\Documents\Programs\Anaconda\lib\site-packages\dask\dataframe\core.py", line 4075, in to_parquet
    return to_parquet(self, path, *args, **kwargs)

  File "C:\Users\me\Documents\Programs\Anaconda\lib\site-packages\dask\dataframe\io\parquet\core.py", line 665, in to_parquet
    out = out.compute(**compute_kwargs)

  File "C:\Users\me\Documents\Programs\Anaconda\lib\site-packages\dask\base.py", line 279, in compute
    (result,) = compute(self, traverse=False, **kwargs)

  File "C:\Users\me\Documents\Programs\Anaconda\lib\site-packages\dask\base.py", line 567, in compute
    results = schedule(dsk, keys, **kwargs)

  File "C:\Users\me\Documents\Programs\Anaconda\lib\site-packages\dask\threaded.py", line 84, in get
    **kwargs

  File "C:\Users\me\Documents\Programs\Anaconda\lib\site-packages\dask\local.py", line 486, in get_async
    raise_exception(exc, tb)

  File "C:\Users\me\Documents\Programs\Anaconda\lib\site-packages\dask\local.py", line 316, in reraise
    raise exc

  File "C:\Users\me\Documents\Programs\Anaconda\lib\site-packages\dask\local.py", line 222, in execute_task
    result = _execute_task(task, data)

  File "C:\Users\me\Documents\Programs\Anaconda\lib\site-packages\dask\core.py", line 121, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))

  File "C:\Users\me\Documents\Programs\Anaconda\lib\site-packages\dask\utils.py", line 30, in apply
    return func(*args, **kwargs)

  File "C:\Users\me\Documents\Programs\Anaconda\lib\site-packages\dask\dataframe\io\parquet\fastparquet.py", line 625, in write_partition
    fil, df, fmd.schema, compression=compression, fmd=fmd

  File "C:\Users\me\Documents\Programs\Anaconda\lib\site-packages\fastparquet\writer.py", line 631, in make_part_file
    rg = make_row_group(f, data, schema, compression=compression)

  File "C:\Users\me\Documents\Programs\Anaconda\lib\site-packages\fastparquet\writer.py", line 619, in make_row_group
    compression=comp)

  File "C:\Users\me\Documents\Programs\Anaconda\lib\site-packages\fastparquet\writer.py", line 513, in write_column
    data, selement)

  File "C:\Users\me\Documents\Programs\Anaconda\lib\site-packages\fastparquet\writer.py", line 254, in encode_plain
    return pack_byte_array(list(out))

  File "fastparquet\speedups.pyx", line 112, in fastparquet.speedups.pack_byte_array

TypeError: expected list of bytes

感谢您的帮助。最佳。

问题似乎是索引:它存储为纯元数据:RangeIndex(start=0, stop=100000, step=1),但 Dask 将其推断为具有“对象”(即字符串或更复杂的东西);因此尝试尝试将数字列表写成字符串。

虽然这是一个错误,但这里有一些解决方法:

  • 不要写索引dd_df.to_parquet(file_res, write_index=False)
  • 对于像这样的单个分区,没有 dask 的 fastparquet API 工作得很好
  • 删除索引或设置新索引
  • 设置索引数据类型
  • 使用pyarrow,engine="pyarrow"