为什么 sqlalchemy 可以使用从 jupyter notebook 到 snowflake 的插入,而 python 连接器不能?

Why does sqlalchemy work with insert from jupter notebooks to snowflake and python connector does not?

现在我已经在 ipython3 笔记本中建立了连接,我正在测试使用 python 连接器插入和选择数据。然而,当我测试这个时,我 运行 遇到了从 python 连接器插入数据帧的正确方法的问题,最终我发现 sqlalchemy 引擎工作:https://support.snowflake.net/s/question/0D50Z00009C6023SAB/how-can-i-insert-data-into-snowflake-table-from-a-panda-data-frame

但现在我很好奇我是否也可以使用 python 连接器。我已经从笔记本中包含了我的代码。我不明白错误消息,当我查找它时,我尝试了 .to_sql https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_sql.html 中的几个 attributesf 老实说,我在错误中有点迷失,只是使用了 sqlalchemy 方法。如果我需要让 python 连接工作以插入或追加数据,你以前见过这个错误吗?我确定这只是一个编码错误。

此方法有效,但我在尝试 df_to_sql 时仅通过 python 连接不断出错

from sqlalchemy import create_engine

#df_data
data = pd.read_csv("data/data.csv")
data.head()
df_data = pd.DataFrame(data)
df_data=df_data.rename(columns = {'Updated ?':'updated'})
df_data=df_data.rename(columns = {'Article Id':'article_id'})

engine = create_engine(URL(
    account = ACCOUNT,
    user = USER,
    password = PASSWORD,
    database = 'testdb',
    schema = 'public',
    warehouse = 'MYWH',
    role='ACCOUNTADMIN',
))
 
connection = engine.connect()
 
df_data.to_sql('testtb2', con=engine, index=False) #make sure index is False, Snowflake doesnt accept indexes
 
connection.close()
engine.dispose()

使用 python 连接器和

的第二种方法

import snowflake.connector

conn = snowflake.connector.connect(
  user=USER,
  password=PASSWORD,
  account=ACCOUNT)

conn.cursor().execute("create or replace table testtbl(article_id string, link string, status string)")
conn.cursor().execute("begin")
df_data.to_sql('testtbl', con=conn, schema ='testdb.testschema', dtype='varchar', method=None, if_exists='append',index=False) #make sure index is False, Snowflake doesnt accept indexes


conn.close()

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/pandas/io/sql.py in execute(self, *args, **kwargs)
   1594             else:
-> 1595                 cur.execute(*args)
   1596             return cur

/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/snowflake/connector/cursor.py in execute(self, command, params, timeout, _do_reset, _put_callback, _put_azure_callback, _put_callback_output_stream, _get_callback, _get_azure_callback, _get_callback_output_stream, _show_progress_bar, _statement_params, _is_internal, _no_results, _use_ijson, _is_put_get, _raise_put_get_error, _force_put_overwrite)
    489                 if len(processed_params) > 0:
--> 490                     query = command % processed_params
    491                 else:

TypeError: not all arguments converted during string formatting

During handling of the above exception, another exception occurred:

DatabaseError                             Traceback (most recent call last)
<ipython-input-49-60b60380b42a> in <module>
     34 conn.cursor().execute("create or replace table testtbl(article_id string, link string, status string)")
     35 conn.cursor().execute("begin")
---> 36 df_data.to_sql('testtbl', con=conn, schema ='testdb.testschema', dtype='varchar', method=None, if_exists='append',index=False) #make sure index is False, Snowflake doesnt accept indexes
     37 
     38 ##use sqlalchemy instead

/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/pandas/core/generic.py in to_sql(self, name, con, schema, if_exists, index, index_label, chunksize, dtype, method)
   2710             chunksize=chunksize,
   2711             dtype=dtype,
-> 2712             method=method,
   2713         )
   2714 

/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/pandas/io/sql.py in to_sql(frame, name, con, schema, if_exists, index, index_label, chunksize, dtype, method)
    516         chunksize=chunksize,
    517         dtype=dtype,
--> 518         method=method,
    519     )
    520 

/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/pandas/io/sql.py in to_sql(self, frame, name, if_exists, index, index_label, schema, chunksize, dtype, method)
   1747             dtype=dtype,
   1748         )
-> 1749         table.create()
   1750         table.insert(chunksize, method)
   1751 

/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/pandas/io/sql.py in create(self)
    639 
    640     def create(self):
--> 641         if self.exists():
    642             if self.if_exists == "fail":
    643                 raise ValueError(

/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/pandas/io/sql.py in exists(self)
    626 
    627     def exists(self):
--> 628         return self.pd_sql.has_table(self.name, self.schema)
    629 
    630     def sql_schema(self):

/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/pandas/io/sql.py in has_table(self, name, schema)
   1760         ).format(wld=wld)
   1761 
-> 1762         return len(self.execute(query, [name]).fetchall()) > 0
   1763 
   1764     def get_table(self, table_name, schema=None):

/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/pandas/io/sql.py in execute(self, *args, **kwargs)
   1608                 "Execution failed on sql '{sql}': {exc}".format(sql=args[0], exc=exc)
   1609             )
-> 1610             raise_with_traceback(ex)
   1611 
   1612     @staticmethod

/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/pandas/compat/__init__.py in raise_with_traceback(exc, traceback)
     45     if traceback == Ellipsis:
     46         _, _, traceback = sys.exc_info()
---> 47     raise exc.with_traceback(traceback)
     48 
     49 

/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/pandas/io/sql.py in execute(self, *args, **kwargs)
   1593                 cur.execute(*args, **kwargs)
   1594             else:
-> 1595                 cur.execute(*args)
   1596             return cur
   1597         except Exception as exc:

/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/snowflake/connector/cursor.py in execute(self, command, params, timeout, _do_reset, _put_callback, _put_azure_callback, _put_callback_output_stream, _get_callback, _get_azure_callback, _get_callback_output_stream, _show_progress_bar, _statement_params, _is_internal, _no_results, _use_ijson, _is_put_get, _raise_put_get_error, _force_put_overwrite)
    488                                  params, processed_params)
    489                 if len(processed_params) > 0:
--> 490                     query = command % processed_params
    491                 else:
    492                     query = command

DatabaseError: Execution failed on sql 'SELECT name FROM sqlite_master WHERE type='table' AND name=?;': not all arguments converted during string formatting

如我所见,Pandas to_sql 函数接受 "sqlalchemy.engine.Engine" 和 "sqlite3.Connection" 对象作为连接。当您尝试使用雪花连接对象时,它假定它是一个 sqlite3.Connection 对象,因此您会收到以下错误:

pandas.io.sql.DatabaseError: Execution failed on sql 'SELECT name FROM sqlite_master WHERE type='table' AND name=?;': not all arguments converted during string formatting

如果要使用 Pandas to_sql 函数,您需要使用 SQLAlchemy 引擎。