为什么 Azure 数据工厂似乎坚持将 DateTimes 作为字符串插入?

Why does Azure Data Factory seemingly insist on inserting DateTimes as string?

我正在尝试设置一个 Azure 数据工厂来将我的数据从一个 AzureSQL 数据库复制并反规范化到另一个 AzureSQL 数据库,用于 reporting/BI 目的流,但我 运行 遇到了插入日期的问题。

这是我的数据流的定义。

{
    "name": "dataflow1",
    "properties": {
        "type": "MappingDataFlow",
        "typeProperties": {
            "sources": [
                {
                    "dataset": {
                        "referenceName": "AzureSqlTable1",
                        "type": "DatasetReference"
                    },
                    "name": "source1"
                }
            ],
            "sinks": [
                {
                    "dataset": {
                        "referenceName": "AzureSqlTable2",
                        "type": "DatasetReference"
                    },
                    "name": "sink1"
                }
            ],
            "script": "\n\nsource(output(\n\t\tBucketId as string,\n\t\tStreamId as string,\n\t\tStreamIdOriginal as string,\n\t\tStreamRevision as integer,\n\t\tItems as integer,\n\t\tCommitId as string,\n\t\tCommitSequence as integer,\n\t\tCommitStamp as timestamp,\n\t\tCheckpointNumber as long,\n\t\tDispatched as boolean,\n\t\tHeaders as binary,\n\t\tPayload as binary\n\t),\n\tallowSchemaDrift: true,\n\tvalidateSchema: false,\n\tisolationLevel: 'READ_UNCOMMITTED',\n\tformat: 'table') ~> source1\nsource1 sink(allowSchemaDrift: true,\n\tvalidateSchema: false,\n\tformat: 'table',\n\tdeletable:false,\n\tinsertable:true,\n\tupdateable:false,\n\tupsertable:false,\n\tmapColumn(\n\t\tBucketId,\n\t\tCommitStamp\n\t)) ~> sink1"
        }
    }
}

这些是我的来源的定义

{
    "name": "AzureSqlTable1",
    "properties": {
        "linkedServiceName": {
            "referenceName": "Source_Test",
            "type": "LinkedServiceReference"
        },
        "annotations": [],
        "type": "AzureSqlTable",
        "schema": [
            {
                "name": "BucketId",
                "type": "varchar"
            },
            {
                "name": "StreamId",
                "type": "char"
            },
            {
                "name": "StreamIdOriginal",
                "type": "nvarchar"
            },
            {
                "name": "StreamRevision",
                "type": "int",
                "precision": 10
            },
            {
                "name": "Items",
                "type": "tinyint",
                "precision": 3
            },
            {
                "name": "CommitId",
                "type": "uniqueidentifier"
            },
            {
                "name": "CommitSequence",
                "type": "int",
                "precision": 10
            },
            {
                "name": "CommitStamp",
                "type": "datetime2",
                "scale": 7
            },
            {
                "name": "CheckpointNumber",
                "type": "bigint",
                "precision": 19
            },
            {
                "name": "Dispatched",
                "type": "bit"
            },
            {
                "name": "Headers",
                "type": "varbinary"
            },
            {
                "name": "Payload",
                "type": "varbinary"
            }
        ],
        "typeProperties": {
            "tableName": "[dbo].[Commits]"
        }
    }
}

和接收器数据集

{
    "name": "AzureSqlTable2",
    "properties": {
        "linkedServiceName": {
            "referenceName": "Dest_Test",
            "type": "LinkedServiceReference"
        },
        "annotations": [],
        "type": "AzureSqlTable",
        "schema": [],
        "typeProperties": {
            "tableName": "dbo.Test2"
        }
    }
}

当 运行 我的数据流管道出现以下错误时:

Activity dataflow1 failed: DF-EXEC-1 Conversion failed when converting date and/or time from character string.
com.microsoft.sqlserver.jdbc.SQLServerException: Conversion failed when converting date and/or time from character string.
    at com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:258)
    at com.microsoft.sqlserver.jdbc.TDSTokenHandler.onEOF(tdsparser.java:256)
    at com.microsoft.sqlserver.jdbc.TDSParser.parse(tdsparser.java:108)
    at com.microsoft.sqlserver.jdbc.TDSParser.parse(tdsparser.java:28)
    at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy.doInsertBulk(SQLServerBulkCopy.java:1611)
    at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy.access0(SQLServerBulkCopy.java:58)
    at com.microsoft.sqlserver.jdbc.SQLServerBulkCopyInsertBulk.doExecute(SQLServerBulkCopy.java:709)
    at com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer.java:7151)
    at com.microsoft.sqlserver.jdbc.SQLServerConnection.executeCommand(SQLServerConnection.java:2478)
    at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy.sendBulkLoadBCP(SQLServerBulkCopy.java:739)
    at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy.writeToServer(SQLServerBulkCopy.java:1684)
    at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy.writeToServer(SQLServerBulkCopy.java:669)
    at com.microsoft.azure.sqldb.spark.connect.DataFrameFunctions.com$microsoft$azure$sqldb$spark$connect$DataFrameFunctions$$bulkCopy(DataFrameFunctions.scala:127)
    at com.microsoft.azure.sqldb.spark.connect.DataFrameFunctions$$anonfun$bulkCopyToSqlDB.apply(DataFrameFunctions.scala:72)
    at com.microsoft.azure.sqldb.spark.connect.DataFrameFunctions$$anonfun$bulkCopyToSqlDB.apply(DataFrameFunctions.scala:72)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$$anonfun$apply.apply(RDD.scala:948)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$$anonfun$apply.apply(RDD.scala:948)
    at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:2226)
    at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:2226)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:124)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun.apply(Executor.scala:459)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1401)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:465)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

我的 Azure SQL 审计日志显示以下语句失败(考虑到它使用 VARCHAR(50) 作为 [CommitStamp] 的类型,这并不奇怪:

INSERT BULK dbo.T_301fcb5e4a4148d4a48f2943011b2f04 (
  [BucketId] NVARCHAR(MAX), 
  [CommitStamp] VARCHAR(50), 
  [StreamId] NVARCHAR(MAX), 
  [StreamIdOriginal] NVARCHAR(MAX),
  [StreamRevision] INT,
  [Items] INT,
  [CommitId] NVARCHAR(MAX),
  [CommitSequence] INT, 
  [CheckpointNumber] BIGINT, 
  [Dispatched] BIT,
  [Headers] VARBINARY(MAX),
  [Payload] VARBINARY(MAX),
  [r8e440f7252bb401b9ead107597de6293] INT) 
with (ROWS_PER_BATCH = 4096, TABLOCK)

我完全不知道为什么会这样。看起来架构信息是正确的,但不知何故,数据 factory/data 流似乎想要将 CommitStamp 作为字符串类型插入。

根据要求,数据flow/code/plan视图的输出:



source(output(
        BucketId as string,
        StreamId as string,
        StreamIdOriginal as string,
        StreamRevision as integer,
        Items as integer,
        CommitId as string,
        CommitSequence as integer,
        CommitStamp as timestamp,
        CheckpointNumber as long,
        Dispatched as boolean,
        Headers as binary,
        Payload as binary
    ),
    allowSchemaDrift: true,
    validateSchema: false,
    isolationLevel: 'READ_UNCOMMITTED',
    format: 'table',
    schemaName: '[dbo]',
    tableName: '[Commits]',
    store: 'sqlserver',
    server: 'sign2025-sqldata.database.windows.net',
    database: 'SignPath.Application',
    user: 'Sign2025Admin',
    password: '**********') ~> source1
source1 sink(allowSchemaDrift: true,
    validateSchema: false,
    format: 'table',
    deletable:false,
    insertable:true,
    updateable:false,
    upsertable:false,
    mapColumn(
        BucketId,
        CommitStamp
    ),
    schemaName: 'dbo',
    tableName: 'Test2',
    store: 'sqlserver',
    server: 'sign2025-sqldata.database.windows.net',
    database: 'SignPath.Reporting',
    user: 'Sign2025Admin',
    password: '**********') ~> sink1

我创建了一个数据流来将数据从 Azure SQL 数据库复制到另一个 Azure SQL 数据库。它成功地将datatime2转换为VARCHAR(50)

这是我的数据流的定义:

{
    "name": "dataflow1",
    "properties": {
        "type": "MappingDataFlow",
        "typeProperties": {
            "sources": [
                {
                    "dataset": {
                        "referenceName": "DestinationDataset_sto",
                        "type": "DatasetReference"
                    },
                    "name": "source1"
                }
            ],
            "sinks": [
                {
                    "dataset": {
                        "referenceName": "DestinationDataset_mex",
                        "type": "DatasetReference"
                    },
                    "name": "sink1"
                }
            ],
            "script": "\n\nsource(output(\n\t\tID as integer,\n\t\ttName as string,\n\t\tmyTime as timestamp\n\t),\n\tallowSchemaDrift: true,\n\tvalidateSchema: false,\n\tisolationLevel: 'READ_UNCOMMITTED',\n\tformat: 'table') ~> source1\nsource1 sink(input(\n\t\tID as integer,\n\t\ttName as string,\n\t\tmyTime as string\n\t),\n\tallowSchemaDrift: true,\n\tvalidateSchema: false,\n\tformat: 'table',\n\tdeletable:false,\n\tinsertable:true,\n\tupdateable:false,\n\tupsertable:false) ~> sink1"
        }
    }
}

我源的定义:

{
    "name": "DestinationDataset_sto",
    "properties": {
        "linkedServiceName": {
            "referenceName": "AzureSqlDatabase1",
            "type": "LinkedServiceReference"
        },
        "annotations": [],
        "type": "AzureSqlTable",
        "schema": [
            {
                "name": "ID",
                "type": "int",
                "precision": 10
            },
            {
                "name": "tName",
                "type": "varchar"
            },
            {
                "name": "myTime",
                "type": "datetime2",
                "scale": 7
            }
        ],
        "typeProperties": {
            "tableName": "[dbo].[demo]"
        }
    },
    "type": "Microsoft.DataFactory/factories/datasets"
}

我的接收器设置:

{
    "name": "DestinationDataset_mex",
    "properties": {
        "linkedServiceName": {
            "referenceName": "AzureSqlDatabase1",
            "type": "LinkedServiceReference"
        },
        "annotations": [],
        "type": "AzureSqlTable",
        "schema": [
            {
                "name": "ID",
                "type": "int",
                "precision": 10
            },
            {
                "name": "tName",
                "type": "varchar"
            },
            {
                "name": "myTime",
                "type": "varchar"
            }
        ],
        "typeProperties": {
            "tableName": "[dbo].[demo1]"
        }
    },
    "type": "Microsoft.DataFactory/factories/datasets"
}

这是我的数据流步骤。

第 1 步:源设置:

第 2 步:接收器设置:

运行 成功:

table demo 和 demo1 除了 myTime 外几乎具有相同的架构。

我的来源 table 及其数据:

我的接收器table和从demo复制的数据:

数据流计划:

source(output(
        ID as integer,
        tName as string,
        myTime as timestamp
    ),
    allowSchemaDrift: true,
    validateSchema: true,
    isolationLevel: 'SERIALIZABLE',
    format: 'table',
    schemaName: '[dbo]',
    tableName: '[demo]',
    store: 'sqlserver',
    server: '****.database.windows.net',
    database: '****',
    user: 'ServerAdmin',
    password: '**********') ~> source1
source1 sink(input(
        ID as integer,
        tName as string,
        myTime as string
    ),
    allowSchemaDrift: true,
    validateSchema: false,
    format: 'table',
    deletable:false,
    insertable:true,
    updateable:false,
    upsertable:false,
    schemaName: '[dbo]',
    tableName: '[demo1]',
    store: 'sqlserver',
    server: '****.database.windows.net',
    database: '****',
    user: 'ServerAdmin',
    password: '**********') ~> sink1

更新1:

我手动创建接收器 table 并发现:

Data Flow can convert datatime2 to VARCHAR()(maybe NVARCHAR()) , date ,datetimeoffset.

当我尝试日期类型 timedatetimedatetime2smalldatetime 时,数据流总是给出错误:

"message": "DF-EXEC-1 Conversion failed when converting date and/or time from character 

2019-7-11更新:

我向Azure Support求助,他们回复我:这是Data Flow的一个bug,暂时没有解决办法。

2019-7-12更新:

我通过 Azure 支持进行了测试,他们认为这是一个错误。这是新的电子邮件:

他们还告诉我 修复已经完成,将在下一个部署序列中部署。这可能是下周末.

希望对您有所帮助。

看起来您的 Sink 数据集将 myTime 定义为字符串:

接收器(输入( ID为整数, tName 作为字符串, myTime 作为字符串 )

你能把它改成时间戳或日期吗?

或者,您可以通过在 Sink 上设置 "Recreate table" 并让 ADF 在 table 上生成新的 table 定义,从而将数据放置在 SQL 中的临时暂存区 table苍蝇使用数据流中映射字段的数据类型。