使用 Airflow JDBC 连接连接到 Teradata

Connect to Teradata Using Airflow JDBC Connection

我正在尝试使用与 Teradata 数据库的连接在 Airflow 中执行 SqlSensor 任务。连接配置如下:

我特别提供了 2 个由“,”分隔的驱动程序路径,但我不确定这样做是否正确?

DAG执行时,触发错误信息

[2017-08-02 02:32:45,162] {models.py:1342} INFO - Executing <Task(SqlSensor): check_running_batch> on 2017-08-02 02:32:12
[2017-08-02 02:32:45,179] {base_hook.py:67} INFO - Using connection to: jdbc:teradata://myservername.mycompanyname.org/database=MYDBNAME,TMODE=ANSI,CHARSET=UTF8
[2017-08-02 02:32:45,313] {sensors.py:109} INFO - Poking: SELECT BATCH_KEY FROM MYDBNAME.AUDIT_BATCH WHERE BATCH_OWNER='ARO_TEST' AND AUDIT_STATUS_KEY=1;
[2017-08-02 02:32:45,316] {base_hook.py:67} INFO - Using connection to: jdbc:teradata://myservername.mycompanyname.org/database=MYDBNAME,TMODE=ANSI,CHARSET=UTF8
[2017-08-02 02:32:45,497] {models.py:1417} ERROR - java.lang.RuntimeException: Class com.teradata.jdbc.TeraDriver not found

我做错了什么?

我的团队成员提供的解决方案是将两个 jar 合并为一个 jar 文件。这样做并在驱动程序路径中指示新的 jar 文件后,它按预期工作。

这是 JAR 文件的 link:https://github.com/alexisrolland/linux-setup/blob/master/teradataDriverJdbc.jar

这是在 SQLSensor 任务中使用连接的代码片段示例:

CheckRunningBatch = SqlSensor(
  task_id='check_running_batch',
  conn_id='ed_data_quality_edw_dev',
  sql="SELECT CASE WHEN MAX(BATCH_KEY) IS NOT NULL THEN 0 ELSE 1 END FROM DATABASE.AUDIT_BATCH WHERE STATUS_KEY=1;",
  poke_interval=300,
  dag=dag)

在连接页面中输入多个 jar 的适当方法是用逗号分隔两个完全限定的路径,就像您在上面所做的那样。

我可以确认这是我采用的方法并且有效(Airflow 10.1.1 和 10.1.2)。

参见:https://github.com/apache/airflow/blob/master/airflow/hooks/jdbc_hook.py#L51

奖励:如果您在数据分析中使用 Ad Hoc Query 对其进行测试,您会注意到在发送 SELECT 语句时会收到错误,因为 Airflow 将其包装在 LIMIT 子句中TD不支持