Azure Python SDK 数据表

Azure Python SDK data tables

我需要帮助才能完成此工作流程。 我有 2 个存储帐户,我将其命名为 storage1storage2

storage1 包含一个包含一些数据的 table 列表,我想遍历所有这些 table,将它们的内容复制到 storage2 .我尝试使用 azCopy,但运气不佳,因为此功能仅在 azCopy v7.3 中可用,而且我找不到适用于 MacOs M1 的此版本。另一种选择是数据工厂,但它对于我想要实现的目标来说太复杂了。所以我决定使用 azure Python sdk.

作为图书馆,我正在使用 azure.data.tables import TableServiceClient

我写的代码是这样的:

from azure.data.tables import TableServiceClient
my_conn_str_out = 'storage1-Conn-Str'

table_service_client_out = TableServiceClient.from_connection_string(my_conn_str_out)
list_table = []
for table in table_service_client_out.list_tables():
    list_table.append(table.table_name)

my_conn_str_in = 'Storage2-Conn-str'

table_service_client_in = TableServiceClient.from_connection_string(my_conn_str_in)
for new_tables in table_service_client_out.list_tables():
    table_service_client_in.create_table_if_not_exists(new_tables.table_name)
    print(f'tables created successfully {new_tables.table_name}')

这就是我构建代码的方式。

for table in table_service_client_out.list_tables():
    list_table.append(table.table_name)

我遍历存储帐户中的所有 table 并将它们附加到列表中。

然后:

for new_tables in table_service_client_out.list_tables():
    table_service_client_in.create_table_if_not_exists(new_tables.table_name)
    print(f'tables created successfully {new_tables.table_name}')

我在 storage2

中创建了相同的 table

到目前为止一切正常。

我现在想实现的,就是查询allstorage1中每个table中的数据,并传递给[=]中各自的table 18=]

根据 Microsoft 文档,我可以使用以下方法实现查询 table:

query = table_service_client_out.query_tables(filter=table)

所以我将它集成到我的循环中,如下所示:

for table in table_service_client_out.list_tables():
    query = table_service_client_out.query_tables(filter=table)
    list_table.append(table.table_name)
    print(query)

当我 运行 我的 python 代码时,我取回了查询的内存分配,而不是 tables:

中的数据
<iterator object azure.core.paging.ItemPaged at 0x7fcd90c8fbb0>
<iterator object azure.core.paging.ItemPaged at 0x7fcd90c8f7f0>
<iterator object azure.core.paging.ItemPaged at 0x7fcd90c8fd60>

我想知道是否有办法查询我的 table 中的所有数据并将它们传递给我的 storage2

试试这个:

from azure.cosmosdb.table.tableservice import TableService,ListGenerator

table_service_out = TableService(account_name='', account_key='')
table_service_in = TableService(account_name='', account_key='')

#query 100 items per request, in case of consuming too much menory load all data in one time
query_size = 100

#save data to storage2 and check if there is lefted data in current table,if yes recurrence
def queryAndSaveAllDataBySize(tb_name,resp_data:ListGenerator ,table_out:TableService,table_in:TableService,query_size:int):
    for item in resp_data:
        #remove etag and Timestamp appended by table service
        del item.etag
        del item.Timestamp
        print("instet data:" + str(item) + "into table:"+ tb_name)
        table_in.insert_entity(tb_name,item)
    if resp_data.next_marker:
        data = table_out.query_entities(table_name=tb_name,num_results=query_size,marker=resp_data.next_marker)
        queryAndSaveAllDataBySize(tb_name,data,table_out,table_in,query_size)


tbs_out = table_service_out.list_tables()

for tb in tbs_out:
    #create table with same name in storage2
    table_service_in.create_table(tb.name)
    #first query 
    data = table_service_out.query_entities(tb.name,num_results=query_size)
    queryAndSaveAllDataBySize(tb.name,data,table_service_out,table_service_in,query_size)

当然,这是一个简单的演示,可以让您requirement.For提高效率,您也可以通过分区键查询table数据并提交它们by batch

如果您还有其他问题,请告诉我。