CSV 每天上传到同一个 table

CSV every day upload in to the same table

我需要每天将一个 csv 文件上传到 teradata 数据库。 我每天早上都会收到这个文件到 ftp 服务器,我想提取数据并将其上传到 teradata, 到目前为止,我已经设法从 ftp 中提取数据,保存它,添加一个日期列并用一个虚拟对象填充空值。

df2['WM_ID']=df2['WM_ID'].fillna('999999')

确实有效,但使用另一种方法也让我很伤心。(不管怎样)

现在我到了必须将数据上传到现有 table 的部分。我设法建立连接:

create_engine('teradata://'+ user +':' + pasw + '@'+ host +'/' + '?authentication=LDAP')

这连接到主数据库,但 table 在子数据库中(不确定这是不是一个词)让我们称它为 user_dev。让我们调用 table test

我每天有一个包含 10 列和大约 4000 行的 csv。 还有一些空值(不在主键中)。

现在我不太确定声明性基础和其余部分,我尝试使用它们但失败了。

td_engine = create_engine('teradata://'+ user +':' + pasw + '@'+ host +'/' + '?authentication=LDAP')
print (td_engine)

connection = td_engine.raw_connection()

print ('logged in to teradata')
def Load_Data(file_name):
    data = genfromtxt(file_name, delimiter=',', skip_header=1, converters={0: lambda s: str(s)})
    return data.tolist()

Base = declarative_base()

class pb_test(Base):
  __Tablename__= "test"

  entry_date = Column(Date)
  WID = Column(VARCHAR(50),primary_key=True)
  User_IP_co = Column(VARCHAR(50))
  User_S = Column(VARCHAR(50))
  Visitors = Column(Integer)
  Reg = Column(Integer)
  Real = Column(Integer)
  Regi = Column(Integer)
  First = Column(Integer)
  First_D = Column(Integer)



  def __repr__(self):
    return "(entry_date='%s', WID='%s', User_IP_co='%s', User_S='%s', Visitors='%s', Reg='%s', Real='%s', Regi='%s', First='%s', First_D='%s')" % (self.entry_date, self.WID, self.User_IP_Co, self.User_S, self.Visitors, self.Reg, self.Real, self.Regi, self.First, self.First_D)


yesterday = date.today() - timedelta(1)
FileToRead = pd.read_csv('Report20'+str(yesterday.strftime('%y%m%d'))+'.csv')
tableToWriteTo = 'USER_DEVELOPMENT.test'
#data.head()
df = pd.DataFrame(FileToRead)


listToWrite = df.to_dict(orient='records')

metadata = sqlalchemy.schema.MetaData(bind=td_engine,reflect=True)
table = sqlalchemy.Table(tableToWriteTo, metadata, autoload=True)

Session = sessionmaker(bind=td_engine)
session = Session()

connection.execute(table.insert(), listToWrite)

session.commit()

session.close()

不确定出了什么问题,但我收到此错误消息:

sqlalchemy.exc.InvalidRequestError: Class <class '__main__.test'> does not have a __table__ or __tablename__ specified and does not inherit from an existing table-mapped class.

我觉得你的做法有点糊涂。您定义了一个声明性基础,然后无法使用它。如果您想走那条路,请参阅此 link。 (并且,如评论所述,您想要的参数是 tablename。此外,您可能需要使用 table_args 来指定模式)。

或者,您可以在底部尝试定义 table 及其列。有关此方法的详细信息,请参阅此 link.

鉴于您的任务很简单,我可能会选择选项 #2。

#table = sqlalchemy.Table(tableToWriteTo, metadata, autoload=True)
table = Table(
    'test',
    metadata,
    Column('WID', VARCHAR(50),primary_key=True),
    Column('entry_date', Date),
    Column('User_IP_co', VARCHAR(50)),
    Column('User_S', VARCHAR(50)),
    ...,
    schema='USER_DEVELOPMENT'
)