CSV 每天上传到同一个 table
CSV every day upload in to the same table
我需要每天将一个 csv 文件上传到 teradata 数据库。
我每天早上都会收到这个文件到 ftp 服务器,我想提取数据并将其上传到 teradata,
到目前为止,我已经设法从 ftp 中提取数据,保存它,添加一个日期列并用一个虚拟对象填充空值。
df2['WM_ID']=df2['WM_ID'].fillna('999999')
确实有效,但使用另一种方法也让我很伤心。(不管怎样)
现在我到了必须将数据上传到现有 table 的部分。我设法建立连接:
create_engine('teradata://'+ user +':' + pasw + '@'+ host +'/' + '?authentication=LDAP')
这连接到主数据库,但 table 在子数据库中(不确定这是不是一个词)让我们称它为 user_dev
。让我们调用 table test
我每天有一个包含 10 列和大约 4000 行的 csv。
还有一些空值(不在主键中)。
现在我不太确定声明性基础和其余部分,我尝试使用它们但失败了。
td_engine = create_engine('teradata://'+ user +':' + pasw + '@'+ host +'/' + '?authentication=LDAP')
print (td_engine)
connection = td_engine.raw_connection()
print ('logged in to teradata')
def Load_Data(file_name):
data = genfromtxt(file_name, delimiter=',', skip_header=1, converters={0: lambda s: str(s)})
return data.tolist()
Base = declarative_base()
class pb_test(Base):
__Tablename__= "test"
entry_date = Column(Date)
WID = Column(VARCHAR(50),primary_key=True)
User_IP_co = Column(VARCHAR(50))
User_S = Column(VARCHAR(50))
Visitors = Column(Integer)
Reg = Column(Integer)
Real = Column(Integer)
Regi = Column(Integer)
First = Column(Integer)
First_D = Column(Integer)
def __repr__(self):
return "(entry_date='%s', WID='%s', User_IP_co='%s', User_S='%s', Visitors='%s', Reg='%s', Real='%s', Regi='%s', First='%s', First_D='%s')" % (self.entry_date, self.WID, self.User_IP_Co, self.User_S, self.Visitors, self.Reg, self.Real, self.Regi, self.First, self.First_D)
yesterday = date.today() - timedelta(1)
FileToRead = pd.read_csv('Report20'+str(yesterday.strftime('%y%m%d'))+'.csv')
tableToWriteTo = 'USER_DEVELOPMENT.test'
#data.head()
df = pd.DataFrame(FileToRead)
listToWrite = df.to_dict(orient='records')
metadata = sqlalchemy.schema.MetaData(bind=td_engine,reflect=True)
table = sqlalchemy.Table(tableToWriteTo, metadata, autoload=True)
Session = sessionmaker(bind=td_engine)
session = Session()
connection.execute(table.insert(), listToWrite)
session.commit()
session.close()
不确定出了什么问题,但我收到此错误消息:
sqlalchemy.exc.InvalidRequestError: Class <class '__main__.test'> does not have a __table__ or __tablename__ specified and does not inherit from an existing table-mapped class.
我觉得你的做法有点糊涂。您定义了一个声明性基础,然后无法使用它。如果您想走那条路,请参阅此 link。 (并且,如评论所述,您想要的参数是 tablename。此外,您可能需要使用 table_args 来指定模式)。
或者,您可以在底部尝试定义 table 及其列。有关此方法的详细信息,请参阅此 link.
鉴于您的任务很简单,我可能会选择选项 #2。
#table = sqlalchemy.Table(tableToWriteTo, metadata, autoload=True)
table = Table(
'test',
metadata,
Column('WID', VARCHAR(50),primary_key=True),
Column('entry_date', Date),
Column('User_IP_co', VARCHAR(50)),
Column('User_S', VARCHAR(50)),
...,
schema='USER_DEVELOPMENT'
)
我需要每天将一个 csv 文件上传到 teradata 数据库。 我每天早上都会收到这个文件到 ftp 服务器,我想提取数据并将其上传到 teradata, 到目前为止,我已经设法从 ftp 中提取数据,保存它,添加一个日期列并用一个虚拟对象填充空值。
df2['WM_ID']=df2['WM_ID'].fillna('999999')
确实有效,但使用另一种方法也让我很伤心。(不管怎样)
现在我到了必须将数据上传到现有 table 的部分。我设法建立连接:
create_engine('teradata://'+ user +':' + pasw + '@'+ host +'/' + '?authentication=LDAP')
这连接到主数据库,但 table 在子数据库中(不确定这是不是一个词)让我们称它为 user_dev
。让我们调用 table test
我每天有一个包含 10 列和大约 4000 行的 csv。 还有一些空值(不在主键中)。
现在我不太确定声明性基础和其余部分,我尝试使用它们但失败了。
td_engine = create_engine('teradata://'+ user +':' + pasw + '@'+ host +'/' + '?authentication=LDAP')
print (td_engine)
connection = td_engine.raw_connection()
print ('logged in to teradata')
def Load_Data(file_name):
data = genfromtxt(file_name, delimiter=',', skip_header=1, converters={0: lambda s: str(s)})
return data.tolist()
Base = declarative_base()
class pb_test(Base):
__Tablename__= "test"
entry_date = Column(Date)
WID = Column(VARCHAR(50),primary_key=True)
User_IP_co = Column(VARCHAR(50))
User_S = Column(VARCHAR(50))
Visitors = Column(Integer)
Reg = Column(Integer)
Real = Column(Integer)
Regi = Column(Integer)
First = Column(Integer)
First_D = Column(Integer)
def __repr__(self):
return "(entry_date='%s', WID='%s', User_IP_co='%s', User_S='%s', Visitors='%s', Reg='%s', Real='%s', Regi='%s', First='%s', First_D='%s')" % (self.entry_date, self.WID, self.User_IP_Co, self.User_S, self.Visitors, self.Reg, self.Real, self.Regi, self.First, self.First_D)
yesterday = date.today() - timedelta(1)
FileToRead = pd.read_csv('Report20'+str(yesterday.strftime('%y%m%d'))+'.csv')
tableToWriteTo = 'USER_DEVELOPMENT.test'
#data.head()
df = pd.DataFrame(FileToRead)
listToWrite = df.to_dict(orient='records')
metadata = sqlalchemy.schema.MetaData(bind=td_engine,reflect=True)
table = sqlalchemy.Table(tableToWriteTo, metadata, autoload=True)
Session = sessionmaker(bind=td_engine)
session = Session()
connection.execute(table.insert(), listToWrite)
session.commit()
session.close()
不确定出了什么问题,但我收到此错误消息:
sqlalchemy.exc.InvalidRequestError: Class <class '__main__.test'> does not have a __table__ or __tablename__ specified and does not inherit from an existing table-mapped class.
我觉得你的做法有点糊涂。您定义了一个声明性基础,然后无法使用它。如果您想走那条路,请参阅此 link。 (并且,如评论所述,您想要的参数是 tablename。此外,您可能需要使用 table_args 来指定模式)。
或者,您可以在底部尝试定义 table 及其列。有关此方法的详细信息,请参阅此 link.
鉴于您的任务很简单,我可能会选择选项 #2。
#table = sqlalchemy.Table(tableToWriteTo, metadata, autoload=True)
table = Table(
'test',
metadata,
Column('WID', VARCHAR(50),primary_key=True),
Column('entry_date', Date),
Column('User_IP_co', VARCHAR(50)),
Column('User_S', VARCHAR(50)),
...,
schema='USER_DEVELOPMENT'
)