如何使用 python 将 pandas 数据帧数据存储到 azure blob?
How to store pandas dataframe data to azure blobs using python?
我想将 pandas 数据框中处理过的数据存储到 parquet 文件格式的 azure blob 中。但在上传到 blob 之前,我必须将它作为镶木地板文件存储在本地磁盘中,然后再上传。我想把pyarrow.table写成pyarrow.parquet.NativeFile直接上传。谁能帮我这个。下面的代码工作正常:
import pyarrow as pa
import pyarrow.parquet as pq
battery_pq = pd.read_csv('test.csv')
######## 一些数据处理
battery_pq = pa.Table.from_pandas(battery_pq)
pq.write_table(battery_pq,'example.parquet')
block_blob_service.create_blob_from_path(container_name,'example.parquet','example.parquet')
需要在内存中创建文件(I/O文件类型对象),然后上传到blob。
您可以为此使用 io.BytesIO,或者 Apache Arrow 也提供其本机实现 BufferOutputStream
。这样做的好处是写入流时没有通过 Python 的开销。因此减少了副本并释放了GIL。
import pyarrow as pa
import pyarrow.parquet as pq
df = some pandas.DataFrame
table = pa.Table.from_pandas(df)
buf = pa.BufferOutputStream()
pq.write_table(table, buf)
block_blob_service.create_blob_from_bytes(
container,
"example.parquet",
buf.getvalue().to_pybytes()
)
有一个新的 python SDK 版本。 create_blob_from_bytes
现已成为旧版本
import pandas as pd
from azure.storage.blob import BlobServiceClient
from io import BytesIO
blob_service_client = BlobServiceClient.from_connection_string(blob_store_conn_str)
blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_path)
parquet_file = BytesIO()
df.to_parquet(parquet_file, engine='pyarrow')
parquet_file.seek(0) # change the stream position back to the beginning after writing
blob_client.upload_blob(
data=parquet_file
)
我想将 pandas 数据框中处理过的数据存储到 parquet 文件格式的 azure blob 中。但在上传到 blob 之前,我必须将它作为镶木地板文件存储在本地磁盘中,然后再上传。我想把pyarrow.table写成pyarrow.parquet.NativeFile直接上传。谁能帮我这个。下面的代码工作正常:
import pyarrow as pa
import pyarrow.parquet as pq
battery_pq = pd.read_csv('test.csv')
######## 一些数据处理
battery_pq = pa.Table.from_pandas(battery_pq)
pq.write_table(battery_pq,'example.parquet')
block_blob_service.create_blob_from_path(container_name,'example.parquet','example.parquet')
需要在内存中创建文件(I/O文件类型对象),然后上传到blob。
您可以为此使用 io.BytesIO,或者 Apache Arrow 也提供其本机实现 BufferOutputStream
。这样做的好处是写入流时没有通过 Python 的开销。因此减少了副本并释放了GIL。
import pyarrow as pa
import pyarrow.parquet as pq
df = some pandas.DataFrame
table = pa.Table.from_pandas(df)
buf = pa.BufferOutputStream()
pq.write_table(table, buf)
block_blob_service.create_blob_from_bytes(
container,
"example.parquet",
buf.getvalue().to_pybytes()
)
有一个新的 python SDK 版本。 create_blob_from_bytes
现已成为旧版本
import pandas as pd
from azure.storage.blob import BlobServiceClient
from io import BytesIO
blob_service_client = BlobServiceClient.from_connection_string(blob_store_conn_str)
blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_path)
parquet_file = BytesIO()
df.to_parquet(parquet_file, engine='pyarrow')
parquet_file.seek(0) # change the stream position back to the beginning after writing
blob_client.upload_blob(
data=parquet_file
)