如何将任意元数据分配给 pyarrow.Table / Parquet 列
How to assign arbitrary metadata to pyarrow.Table / Parquet columns
用例
我正在使用 Apache Parquet 文件作为我在 Python 中使用 GeoPandas 处理的大型空间数据的快速 IO 格式。我将要素几何存储为 WKB,并希望将坐标参考系统 (CRS) 记录为与 WKB 数据关联的元数据。
代码问题
我正在尝试将任意元数据分配给 pyarrow.Field
对象。
我试过的
假设 table
是从 df
实例化的 pyarrow.Table
,pandas.DataFrame
:
df = pd.DataFrame({
'foo' : [1, 3, 2],
'bar' : [6, 4, 5]
})
table = pa.Table.from_pandas(df)
根据 pyarrow
文档,列元数据包含在属于 schema
(source), and optional metadata may be added to a field
(source) 的 field
中。
如果我尝试为 metadata
属性赋值,它会引发错误:
>>> table.schema.field_by_name('foo').metadata = {'crs' : '4283'}
AttributeError: attribute 'metadata' of 'pyarrow.lib.Field' objects is not writable
>>> table.column(0).field.metadata = {'crs' : '4283'}
AttributeError: attribute 'metadata' of 'pyarrow.lib.Field' objects is not writable
如果我尝试将一个字段(具有通过 add_metadata
方法关联的元数据)分配给一个字段,它 returns 一个错误:
>>> table.schema.field_by_name('foo') = (
table.schema.field_by_name('foo').add_metadata({'crs' : '4283'})
)
SyntaxError: can't assign to function call
>>> table.column(0).field = table.column(0).field.add_metadata({'crs' : '4283'})
AttributeError: attribute 'field' of 'pyarrow.lib.Column' objects is not writable
我什至尝试过将元数据分配给 pandas.Series
对象,例如
df['foo']._metadata.append({'crs' : '4283'})
但是当在 table
对象的 schema
属性上调用 pandas_metadata
(docs) 方法时,元数据中不会返回。
研究
在 Whosebug 上, question remains unanswered, and related question concerns Scala, not Python and pyarrow
. Elsewhere 我看到了与 pyarrow.Field
对象关联的元数据,但只是通过从头开始实例化 pyarrow.Field
和 pyarrow.Table
对象。
PS
这是我第一次在 Whosebug 上发帖,所以提前致谢,如有任何错误,我们深表歉意。
Arrow 中的 "Everything" 是 immutable,因此正如您所经历的,您不能简单地 修改 任何字段或模式的元数据。执行此操作的唯一方法是使用添加的元数据创建 "new" table。我将 new 放在引号之间,因为这可以在不实际复制 table 的情况下完成,因为在幕后这只是移动指针。下面是一些代码,展示了如何在 Arrow 元数据中存储任意字典(只要它们是 json-serializable)以及如何检索它们:
def set_metadata(tbl, col_meta={}, tbl_meta={}):
"""Store table- and column-level metadata as json-encoded byte strings.
Table-level metadata is stored in the table's schema.
Column-level metadata is stored in the table columns' fields.
To update the metadata, first new fields are created for all columns.
Next a schema is created using the new fields and updated table metadata.
Finally a new table is created by replacing the old one's schema, but
without copying any data.
Args:
tbl (pyarrow.Table): The table to store metadata in
col_meta: A json-serializable dictionary with column metadata in the form
{
'column_1': {'some': 'data', 'value': 1},
'column_2': {'more': 'stuff', 'values': [1,2,3]}
}
tbl_meta: A json-serializable dictionary with table-level metadata.
"""
# Create updated column fields with new metadata
if col_meta or tbl_meta:
fields = []
for col in tbl.itercolumns():
if col.name in col_meta:
# Get updated column metadata
metadata = col.field.metadata or {}
for k, v in col_meta[col.name].items():
metadata[k] = json.dumps(v).encode('utf-8')
# Update field with updated metadata
fields.append(col.field.add_metadata(metadata))
else:
fields.append(col.field)
# Get updated table metadata
tbl_metadata = tbl.schema.metadata
for k, v in tbl_meta.items():
tbl_metadata[k] = json.dumps(v).encode('utf-8')
# Create new schema with updated field metadata and updated table metadata
schema = pa.schema(fields, metadata=tbl_metadata)
# With updated schema build new table (shouldn't copy data)
# tbl = pa.Table.from_batches(tbl.to_batches(), schema)
tbl = pa.Table.from_arrays(list(tbl.itercolumns()), schema=schema)
return tbl
def decode_metadata(metadata):
"""Arrow stores metadata keys and values as bytes.
We store "arbitrary" data as json-encoded strings (utf-8),
which are here decoded into normal dict.
"""
if not metadata:
# None or {} are not decoded
return metadata
decoded = {}
for k, v in metadata.items():
key = k.decode('utf-8')
val = json.loads(v.decode('utf-8'))
decoded[key] = val
return decoded
def table_metadata(tbl):
"""Get table metadata as dict."""
return decode_metadata(tbl.schema.metadata)
def column_metadata(tbl):
"""Get column metadata as dict."""
return {col.name: decode_metadata(col.field.metadata) for col in tbl.itercolumns()}
def get_metadata(tbl):
"""Get column and table metadata as dicts."""
return column_metadata(tbl), table_metadata(tbl)
简而言之,您使用添加的元数据创建新字段,将字段聚合到新架构中,然后从现有 table 和新架构创建新的 table。都有点long-winded。理想情况下,pyarrow 应该有方便的功能来用更少的代码行来做到这一点,但最后我检查这是唯一的方法。
唯一的复杂之处在于元数据在 Arrow 中存储为字节,因此在上面的代码中,我将元数据存储为 json-serializable 字典,我将其编码为 utf-8。
这里有一个不太复杂的方法来解决这个问题:
import pandas as pd
df = pd.DataFrame({
'foo' : [1, 3, 2],
'bar' : [6, 4, 5]
})
table = pa.Table.from_pandas(df)
your_schema = pa.schema([
pa.field("foo", "int64", False, metadata={"crs": "4283"}),
pa.field("bar", "int64", True)],
metadata={"diamond": "under_pressure"})
table2 = table.cast(your_schema)
table2.field('foo').metadata[b'crs'] # => b'4283'
我还添加了一个模式元数据字段来展示它是如何工作的。
table2.schema.metadata[b'diamond'] # => b'under_pressure'
请注意,元数据键/值是字节字符串 - 这就是为什么它是 b'under_pressure'
而不是 'under_pressure'
。需要字节字符串,因为 Parquet 是二进制文件格式。
from thomas 非常好,但 pyarrow 营地的情况发生了变化。以下是我需要进行的代码调整(包括从 table 元数据传递已编码字节的可疑更改):
def set_metadata(tbl, col_meta={}, tbl_meta={}):
"""Store table- and column-level metadata as json-encoded byte strings.
Table-level metadata is stored in the table's schema.
Column-level metadata is stored in the table columns' fields.
To update the metadata, first new fields are created for all columns.
Next a schema is created using the new fields and updated table metadata.
Finally a new table is created by replacing the old one's schema, but
without copying any data.
Args:
tbl (pyarrow.Table): The table to store metadata in
col_meta: A json-serializable dictionary with column metadata in the form
{
'column_1': {'some': 'data', 'value': 1},
'column_2': {'more': 'stuff', 'values': [1,2,3]}
}
tbl_meta: A json-serializable dictionary with table-level metadata.
"""
# Create updated column fields with new metadata
if col_meta or tbl_meta:
fields = []
for col in tbl.schema.names:
if col in col_meta:
# Get updated column metadata
metadata = tbl.field(col).metadata or {}
for k, v in col_meta[col].items():
metadata[k] = json.dumps(v).encode('utf-8')
# Update field with updated metadata
fields.append(tbl.field(col).with_metadata(metadata))
else:
fields.append(tbl.field(col))
# Get updated table metadata
tbl_metadata = tbl.schema.metadata or {}
for k, v in tbl_meta.items():
if type(v)==bytes:
tbl_metadata[k] = v
else:
tbl_metadata[k] = json.dumps(v).encode('utf-8')
# Create new schema with updated field metadata and updated table metadata
schema = pa.schema(fields, metadata=tbl_metadata)
# With updated schema build new table (shouldn't copy data)
# tbl = pa.Table.from_batches(tbl.to_batches(), schema)
tbl = tbl.cast(schema)
return tbl
用例
我正在使用 Apache Parquet 文件作为我在 Python 中使用 GeoPandas 处理的大型空间数据的快速 IO 格式。我将要素几何存储为 WKB,并希望将坐标参考系统 (CRS) 记录为与 WKB 数据关联的元数据。
代码问题
我正在尝试将任意元数据分配给 pyarrow.Field
对象。
我试过的
假设 table
是从 df
实例化的 pyarrow.Table
,pandas.DataFrame
:
df = pd.DataFrame({
'foo' : [1, 3, 2],
'bar' : [6, 4, 5]
})
table = pa.Table.from_pandas(df)
根据 pyarrow
文档,列元数据包含在属于 schema
(source), and optional metadata may be added to a field
(source) 的 field
中。
如果我尝试为 metadata
属性赋值,它会引发错误:
>>> table.schema.field_by_name('foo').metadata = {'crs' : '4283'}
AttributeError: attribute 'metadata' of 'pyarrow.lib.Field' objects is not writable
>>> table.column(0).field.metadata = {'crs' : '4283'}
AttributeError: attribute 'metadata' of 'pyarrow.lib.Field' objects is not writable
如果我尝试将一个字段(具有通过 add_metadata
方法关联的元数据)分配给一个字段,它 returns 一个错误:
>>> table.schema.field_by_name('foo') = (
table.schema.field_by_name('foo').add_metadata({'crs' : '4283'})
)
SyntaxError: can't assign to function call
>>> table.column(0).field = table.column(0).field.add_metadata({'crs' : '4283'})
AttributeError: attribute 'field' of 'pyarrow.lib.Column' objects is not writable
我什至尝试过将元数据分配给 pandas.Series
对象,例如
df['foo']._metadata.append({'crs' : '4283'})
但是当在 table
对象的 schema
属性上调用 pandas_metadata
(docs) 方法时,元数据中不会返回。
研究
在 Whosebug 上,pyarrow
. Elsewhere 我看到了与 pyarrow.Field
对象关联的元数据,但只是通过从头开始实例化 pyarrow.Field
和 pyarrow.Table
对象。
PS
这是我第一次在 Whosebug 上发帖,所以提前致谢,如有任何错误,我们深表歉意。
"Everything" 是 immutable,因此正如您所经历的,您不能简单地 修改 任何字段或模式的元数据。执行此操作的唯一方法是使用添加的元数据创建 "new" table。我将 new 放在引号之间,因为这可以在不实际复制 table 的情况下完成,因为在幕后这只是移动指针。下面是一些代码,展示了如何在 Arrow 元数据中存储任意字典(只要它们是 json-serializable)以及如何检索它们:
def set_metadata(tbl, col_meta={}, tbl_meta={}):
"""Store table- and column-level metadata as json-encoded byte strings.
Table-level metadata is stored in the table's schema.
Column-level metadata is stored in the table columns' fields.
To update the metadata, first new fields are created for all columns.
Next a schema is created using the new fields and updated table metadata.
Finally a new table is created by replacing the old one's schema, but
without copying any data.
Args:
tbl (pyarrow.Table): The table to store metadata in
col_meta: A json-serializable dictionary with column metadata in the form
{
'column_1': {'some': 'data', 'value': 1},
'column_2': {'more': 'stuff', 'values': [1,2,3]}
}
tbl_meta: A json-serializable dictionary with table-level metadata.
"""
# Create updated column fields with new metadata
if col_meta or tbl_meta:
fields = []
for col in tbl.itercolumns():
if col.name in col_meta:
# Get updated column metadata
metadata = col.field.metadata or {}
for k, v in col_meta[col.name].items():
metadata[k] = json.dumps(v).encode('utf-8')
# Update field with updated metadata
fields.append(col.field.add_metadata(metadata))
else:
fields.append(col.field)
# Get updated table metadata
tbl_metadata = tbl.schema.metadata
for k, v in tbl_meta.items():
tbl_metadata[k] = json.dumps(v).encode('utf-8')
# Create new schema with updated field metadata and updated table metadata
schema = pa.schema(fields, metadata=tbl_metadata)
# With updated schema build new table (shouldn't copy data)
# tbl = pa.Table.from_batches(tbl.to_batches(), schema)
tbl = pa.Table.from_arrays(list(tbl.itercolumns()), schema=schema)
return tbl
def decode_metadata(metadata):
"""Arrow stores metadata keys and values as bytes.
We store "arbitrary" data as json-encoded strings (utf-8),
which are here decoded into normal dict.
"""
if not metadata:
# None or {} are not decoded
return metadata
decoded = {}
for k, v in metadata.items():
key = k.decode('utf-8')
val = json.loads(v.decode('utf-8'))
decoded[key] = val
return decoded
def table_metadata(tbl):
"""Get table metadata as dict."""
return decode_metadata(tbl.schema.metadata)
def column_metadata(tbl):
"""Get column metadata as dict."""
return {col.name: decode_metadata(col.field.metadata) for col in tbl.itercolumns()}
def get_metadata(tbl):
"""Get column and table metadata as dicts."""
return column_metadata(tbl), table_metadata(tbl)
简而言之,您使用添加的元数据创建新字段,将字段聚合到新架构中,然后从现有 table 和新架构创建新的 table。都有点long-winded。理想情况下,pyarrow 应该有方便的功能来用更少的代码行来做到这一点,但最后我检查这是唯一的方法。
唯一的复杂之处在于元数据在 Arrow 中存储为字节,因此在上面的代码中,我将元数据存储为 json-serializable 字典,我将其编码为 utf-8。
这里有一个不太复杂的方法来解决这个问题:
import pandas as pd
df = pd.DataFrame({
'foo' : [1, 3, 2],
'bar' : [6, 4, 5]
})
table = pa.Table.from_pandas(df)
your_schema = pa.schema([
pa.field("foo", "int64", False, metadata={"crs": "4283"}),
pa.field("bar", "int64", True)],
metadata={"diamond": "under_pressure"})
table2 = table.cast(your_schema)
table2.field('foo').metadata[b'crs'] # => b'4283'
我还添加了一个模式元数据字段来展示它是如何工作的。
table2.schema.metadata[b'diamond'] # => b'under_pressure'
请注意,元数据键/值是字节字符串 - 这就是为什么它是 b'under_pressure'
而不是 'under_pressure'
。需要字节字符串,因为 Parquet 是二进制文件格式。
def set_metadata(tbl, col_meta={}, tbl_meta={}):
"""Store table- and column-level metadata as json-encoded byte strings.
Table-level metadata is stored in the table's schema.
Column-level metadata is stored in the table columns' fields.
To update the metadata, first new fields are created for all columns.
Next a schema is created using the new fields and updated table metadata.
Finally a new table is created by replacing the old one's schema, but
without copying any data.
Args:
tbl (pyarrow.Table): The table to store metadata in
col_meta: A json-serializable dictionary with column metadata in the form
{
'column_1': {'some': 'data', 'value': 1},
'column_2': {'more': 'stuff', 'values': [1,2,3]}
}
tbl_meta: A json-serializable dictionary with table-level metadata.
"""
# Create updated column fields with new metadata
if col_meta or tbl_meta:
fields = []
for col in tbl.schema.names:
if col in col_meta:
# Get updated column metadata
metadata = tbl.field(col).metadata or {}
for k, v in col_meta[col].items():
metadata[k] = json.dumps(v).encode('utf-8')
# Update field with updated metadata
fields.append(tbl.field(col).with_metadata(metadata))
else:
fields.append(tbl.field(col))
# Get updated table metadata
tbl_metadata = tbl.schema.metadata or {}
for k, v in tbl_meta.items():
if type(v)==bytes:
tbl_metadata[k] = v
else:
tbl_metadata[k] = json.dumps(v).encode('utf-8')
# Create new schema with updated field metadata and updated table metadata
schema = pa.schema(fields, metadata=tbl_metadata)
# With updated schema build new table (shouldn't copy data)
# tbl = pa.Table.from_batches(tbl.to_batches(), schema)
tbl = tbl.cast(schema)
return tbl