将具有字符串列的数据集写入 teradata 时获取 SQLException

Getting SQLException when writing dataset having string columns to teradata

在数据集中有一些字符串数据的情况下,尝试将数据集从 spark 写入 teradata 时出现以下错误:

2018-01-02 15:49:05 [pool-2-thread-2] ERROR c.i.i.t.spark2.algo.JDBCTableWriter:115 - Error in JDBC operation:
java.sql.SQLException: [Teradata Database] [TeraJDBC 15.00.00.20] [Error 3706] [SQLState 42000] Syntax error: Data Type "TEXT" does not match a Defined Type name.
      at com.teradata.jdbc.jdbc_4.util.ErrorFactory.makeDatabaseSQLException(ErrorFactory.java:308)
    at com.teradata.jdbc.jdbc_4.statemachine.ReceiveInitSubState.action(ReceiveInitSubState.java:109)
    at com.teradata.jdbc.jdbc_4.statemachine.StatementReceiveState.subStateMachine(StatementReceiveState.java:307)
    at com.teradata.jdbc.jdbc_4.statemachine.StatementReceiveState.action(StatementReceiveState.java:196)
    at com.teradata.jdbc.jdbc_4.statemachine.StatementController.runBody(StatementController.java:123)
    at com.teradata.jdbc.jdbc_4.statemachine.StatementController.run(StatementController.java:114)
    at com.teradata.jdbc.jdbc_4.TDStatement.executeStatement(TDStatement.java:385)
    at com.teradata.jdbc.jdbc_4.TDStatement.doNonPrepExecuteUpdate(TDStatement.java:602)
    at com.teradata.jdbc.jdbc_4.TDStatement.executeUpdate(TDStatement.java:1109)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.createTable(JdbcUtils.scala:805)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:90)
    at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:472)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:117)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:117)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery.apply(SparkPlan.scala:138)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)

如何确保数据正确写入 teradata。

我正在将 HDFS 中的 csv 文件读入数据集,然后尝试使用 DataFrameWriter 将其写入 Teradata。我为此使用下面给出的代码:

ds.write().mode("append")
            .jdbc(url, tableName, props);

我使用的是 spark 2.2.0,Teradata 是 15.00.00.07 当我在 DB2 中尝试写入 Nettezza 时,我遇到了一些类似的问题,我可以写入但字符串值被替换为 . 写入这些数据库时是否需要任何类型的选项..?

我能够通过为 Teradata 实施自定义 JDBCDialect 来解决这个问题。 同样的方法可用于解决其他数据源(如 Netezza、DB2、Hive 等)的类似问题。

为此,您需要扩展 'JdbcDialect' class 并注册它:

public class TDDialect extends JdbcDialect {

private static final Map<String, Option<JdbcType>> dataTypeMap = new HashMap<String, Option<JdbcType>>();

static {
    dataTypeMap
            .put("int", Option.apply(JdbcType.apply("INTEGER",
                    java.sql.Types.INTEGER)));
    dataTypeMap.put("long",
            Option.apply(JdbcType.apply("BIGINT", java.sql.Types.BIGINT)));
    dataTypeMap.put("double", Option.apply(JdbcType.apply(
            "DOUBLE PRECISION", java.sql.Types.DOUBLE)));
    dataTypeMap.put("float",
            Option.apply(JdbcType.apply("FLOAT", java.sql.Types.FLOAT)));
    dataTypeMap.put("short", Option.apply(JdbcType.apply("SMALLINT",
            java.sql.Types.SMALLINT)));
    dataTypeMap
            .put("byte", Option.apply(JdbcType.apply("BYTEINT",
                    java.sql.Types.TINYINT)));
    dataTypeMap.put("binary",
            Option.apply(JdbcType.apply("BLOB", java.sql.Types.BLOB)));
    dataTypeMap.put("timestamp", Option.apply(JdbcType.apply("TIMESTAMP",
            java.sql.Types.TIMESTAMP)));
    dataTypeMap.put("date",
            Option.apply(JdbcType.apply("DATE", java.sql.Types.DATE)));
    dataTypeMap.put("string", Option.apply(JdbcType.apply("VARCHAR(255)",
            java.sql.Types.VARCHAR)));
    dataTypeMap.put("boolean",
            Option.apply(JdbcType.apply("CHAR(1)", java.sql.Types.CHAR)));
    dataTypeMap.put("text", Option.apply(JdbcType.apply("VARCHAR(255)",
            java.sql.Types.VARCHAR)));
}

/***/
private static final long serialVersionUID = 1L;

@Override
public boolean canHandle(String url) {
    return url.startsWith("jdbc:teradata");
}

@Override
public Option<JdbcType> getJDBCType(DataType dt) {
    Option<JdbcType> option = dataTypeMap.get(dt.simpleString().toLowerCase());
    if(option == null){
        option = Option.empty();
    }
    return option;
}

}

现在您可以在对 spark 调用任何操作之前使用以下代码片段注册它:

JdbcDialects.registerDialect(new TDDialect());

对于某些数据源,例如 Hive,您可能需要重写一种方法以避免 NumberFormatExceptions 或一些类似的异常:

@Override
public String quoteIdentifier(String colName) {
    return colName;
}

希望这对遇到类似问题的人有所帮助。

它对我有用,你能试一下让我知道吗?

Point to be noted:
***Your hive table must be in Text format as storage. It should not be ORC.
Create the schema in Teradata before writing it from your pyspark notebook.***




df = spark.sql("select * from dbname.tableName")
properties = {
"driver": "com.teradata.jdbc.TeraDriver",
"user": "xxxx",
"password": "xxxxx"
}

df.write.jdbc(url='provide_url',table='dbName.tableName', properties=properties)