Python clickhouse-driver: ValueError: Parameters are expected in dict form
Python clickhouse-driver: ValueError: Parameters are expected in dict form
我有一些 ETL 可以使用 clickhouse-driver 将数据保存到 clickhouse。
保存函数看起来完全像这样:
def insert_data(data: Iterable[Dict], table: str, client: Client = None):
columns = get_table_cols(table)
client = client or get_ch_client(0)
query = f"insert into {table} ({', '.join(columns)}) values"
data = map(lambda row: {key: row[key] for key in columns}, data)
client.execute(query, data)
调用 insert_data
的函数如下所示:
def save_data(data: DataFrame, client: Client):
mapper = get_mapper(my_table_map)
data = map(lambda x: {col_new: getattr(x, col_old)
for col_old, col_new in map_dataframe_to_ch.items()},
data.collect())
data = map(mapper, data)
insert_data(data, 'my_table_name', client)
get_mapper
returns 一个看起来像这样的地图函数:
def map_row(row: Dict[str, Any]) -> Dict[str, Any]:
nonlocal map_
return {key: map_[key](val) for key, val in row.items()}
所以基本上最后我有一些生成字典的嵌套生成器。为了确认这一点,如果我把 print(next(data))
放在 client.execute
之前,我就会得到我期望的命令。这是一个隐藏敏感信息的示例:
{'account_currency': '***',
'instrument': '***',
'operation': 'open',
'event_time': datetime.datetime(2020, 7, 7, 19, 11, 49),
'country': 'CN',
'region': 'Asia and Pacific',
'registration_source': '***',
'account_type': '***',
'platform': '***',
'server_key': '***'}
Table架构如下:
"account_currency": "String",
"instrument": "String",
"operation": "String",
"event_time": "DateTime",
"country": "String",
"region": "String",
"registration_source": "String",
"account_type": "String",
"platform": "String",
"server_key": "String"
但无论出于何种原因,我收到此错误:
File "src/etl/usd_volume/prepare_users.py", line 356, in <module>
main()
File "src/etl/usd_volume/prepare_users.py", line 348, in main
save_data(data, client)
File "src/etl/usd_volume/prepare_users.py", line 302, in save_data
insert_data(data, 'report_traded_volume_usd', client)
File "/root/data/src/common/clickhouse_helper.py", line 14, in insert_data
client.execute(query, data)
File "/usr/local/lib/python3.6/dist-packages/clickhouse_driver/client.py", line 224, in execute
columnar=columnar
File "/usr/local/lib/python3.6/dist-packages/clickhouse_driver/client.py", line 341, in process_ordinary_query
query = self.substitute_params(query, params)
File "/usr/local/lib/python3.6/dist-packages/clickhouse_driver/client.py", line 422, in substitute_params
raise ValueError('Parameters are expected in dict form')
根据文档:
:param params: substitution parameters for SELECT queries and data for
INSERT queries. Data for INSERT can be list
, tuple
or :data:~types.GeneratorType
.
Defaults to None
(no parameters or data).
很明显我的数据符合这些要求。
然而在源代码中只有这个检查:
def substitute_params(self, query, params):
if not isinstance(params, dict):
raise ValueError('Parameters are expected in dict form')
escaped = escape_params(params)
return query % escaped
我没有真正找到他们在哪里检查它是否是发电机。
Clickhouse-driver 版本为 0.1.4
非常感谢对此问题的任何帮助。
好的,进一步研究源代码揭示了根本原因。
在 Client
class 的 process_ordinary_query
方法中调用了抛出错误 substitute_params
的函数。除了 INSERT.
之外的任何查询基本上都会调用此方法
execute
方法的这一部分检查查询的符号是 INSERT 还是任何其他:
is_insert = isinstance(params, (list, tuple, types.GeneratorType))
if is_insert:
rv = self.process_insert_query(
query, params, external_tables=external_tables,
query_id=query_id, types_check=types_check,
columnar=columnar
)
else:
rv = self.process_ordinary_query(
query, params=params, with_column_types=with_column_types,
external_tables=external_tables,
query_id=query_id, types_check=types_check,
columnar=columnar
)
关键是isinstance(params, (list, tuple, types.GeneratorType))
types.GeneratorType
定义如下:
def _g():
yield 1
GeneratorType = type(_g())
由此导致:
>>>GeneratorType
<class 'generator'>
显然,对于我的数据是 map
:
>>>type(map(...))
<class 'map'>
>>>isinstance(map(...), GeneratorType)
False
因此,避免此问题的最简单解决方案是简单地将 data
转换为生成器理解的生成器。这完全解决了问题。
>>>data = (i for i in data)
>>>isinstance(data, GeneratorType)
True
或者,如果您要专门执行 INSERT 查询,您可以直接调用 process_insert_query
,这样就无需将数据转换为生成器。
我认为 clickhouse-driver 的类型检查有点模糊,但这就是我们所拥有的。
我有一些 ETL 可以使用 clickhouse-driver 将数据保存到 clickhouse。
保存函数看起来完全像这样:
def insert_data(data: Iterable[Dict], table: str, client: Client = None):
columns = get_table_cols(table)
client = client or get_ch_client(0)
query = f"insert into {table} ({', '.join(columns)}) values"
data = map(lambda row: {key: row[key] for key in columns}, data)
client.execute(query, data)
调用 insert_data
的函数如下所示:
def save_data(data: DataFrame, client: Client):
mapper = get_mapper(my_table_map)
data = map(lambda x: {col_new: getattr(x, col_old)
for col_old, col_new in map_dataframe_to_ch.items()},
data.collect())
data = map(mapper, data)
insert_data(data, 'my_table_name', client)
get_mapper
returns 一个看起来像这样的地图函数:
def map_row(row: Dict[str, Any]) -> Dict[str, Any]:
nonlocal map_
return {key: map_[key](val) for key, val in row.items()}
所以基本上最后我有一些生成字典的嵌套生成器。为了确认这一点,如果我把 print(next(data))
放在 client.execute
之前,我就会得到我期望的命令。这是一个隐藏敏感信息的示例:
{'account_currency': '***',
'instrument': '***',
'operation': 'open',
'event_time': datetime.datetime(2020, 7, 7, 19, 11, 49),
'country': 'CN',
'region': 'Asia and Pacific',
'registration_source': '***',
'account_type': '***',
'platform': '***',
'server_key': '***'}
Table架构如下:
"account_currency": "String",
"instrument": "String",
"operation": "String",
"event_time": "DateTime",
"country": "String",
"region": "String",
"registration_source": "String",
"account_type": "String",
"platform": "String",
"server_key": "String"
但无论出于何种原因,我收到此错误:
File "src/etl/usd_volume/prepare_users.py", line 356, in <module>
main()
File "src/etl/usd_volume/prepare_users.py", line 348, in main
save_data(data, client)
File "src/etl/usd_volume/prepare_users.py", line 302, in save_data
insert_data(data, 'report_traded_volume_usd', client)
File "/root/data/src/common/clickhouse_helper.py", line 14, in insert_data
client.execute(query, data)
File "/usr/local/lib/python3.6/dist-packages/clickhouse_driver/client.py", line 224, in execute
columnar=columnar
File "/usr/local/lib/python3.6/dist-packages/clickhouse_driver/client.py", line 341, in process_ordinary_query
query = self.substitute_params(query, params)
File "/usr/local/lib/python3.6/dist-packages/clickhouse_driver/client.py", line 422, in substitute_params
raise ValueError('Parameters are expected in dict form')
根据文档:
:param params: substitution parameters for SELECT queries and data for INSERT queries. Data for INSERT can be
list
,tuple
or :data:~types.GeneratorType
. Defaults toNone
(no parameters or data).
很明显我的数据符合这些要求。
然而在源代码中只有这个检查:
def substitute_params(self, query, params):
if not isinstance(params, dict):
raise ValueError('Parameters are expected in dict form')
escaped = escape_params(params)
return query % escaped
我没有真正找到他们在哪里检查它是否是发电机。 Clickhouse-driver 版本为 0.1.4
非常感谢对此问题的任何帮助。
好的,进一步研究源代码揭示了根本原因。
在 Client
class 的 process_ordinary_query
方法中调用了抛出错误 substitute_params
的函数。除了 INSERT.
execute
方法的这一部分检查查询的符号是 INSERT 还是任何其他:
is_insert = isinstance(params, (list, tuple, types.GeneratorType))
if is_insert:
rv = self.process_insert_query(
query, params, external_tables=external_tables,
query_id=query_id, types_check=types_check,
columnar=columnar
)
else:
rv = self.process_ordinary_query(
query, params=params, with_column_types=with_column_types,
external_tables=external_tables,
query_id=query_id, types_check=types_check,
columnar=columnar
)
关键是isinstance(params, (list, tuple, types.GeneratorType))
types.GeneratorType
定义如下:
def _g():
yield 1
GeneratorType = type(_g())
由此导致:
>>>GeneratorType
<class 'generator'>
显然,对于我的数据是 map
:
>>>type(map(...))
<class 'map'>
>>>isinstance(map(...), GeneratorType)
False
因此,避免此问题的最简单解决方案是简单地将 data
转换为生成器理解的生成器。这完全解决了问题。
>>>data = (i for i in data)
>>>isinstance(data, GeneratorType)
True
或者,如果您要专门执行 INSERT 查询,您可以直接调用 process_insert_query
,这样就无需将数据转换为生成器。
我认为 clickhouse-driver 的类型检查有点模糊,但这就是我们所拥有的。