使用 ADFV2 创建具有计划触发器的管道

Creating a pipeline with a scheduled trigger with ADFV2

我尝试将 ADFV1 中已存在的管道迁移到 ADFV2,但对触发器的概念有一些疑问。我的管道有两个活动,第一个是 Azure Data Lake Analytics activity,第二个是副本 activity。 第一个 activity 运行一个 usql 脚本,其中数据从分区文件夹 /{yyyy}/{MM}/{dd}/ 读取,处理它并写入文件夹 /{yyyy}-{MM}-{dd} /. 这是我工厂的一些 JSON 文件(管道、触发器和数据集)。

管道:

{
"name": "StreamCompressionBlob2SQL",
"properties": {
    "activities": [
        {
            "name": "compress",
            "type": "DataLakeAnalyticsU-SQL",
            "policy": {
                "timeout": "7.00:00:00",
                "retry": 0,
                "retryIntervalInSeconds": 30,
                "secureOutput": false,
                "secureInput": false
            },
            "typeProperties": {
                "scriptPath": "d00044653/azure-configurations/usql-scripts/stream/compression.usql",
                "scriptLinkedService": {
                    "referenceName": "AzureBlobStorage",
                    "type": "LinkedServiceReference"
                },
                "parameters": {
                    "Year": {
                        "value": "@formatDateTime(pipeline().parameters.windowStartTime,'yyyy')",
                        "type": "Expression"
                    },
                    "Month": {
                        "value": "@formatDateTime(pipeline().parameters.windowStartTime,'MM')",
                        "type": "Expression"
                    },
                    "Day": {
                        "value": "@formatDateTime(pipeline().parameters.windowStartTime,'dd')",
                        "type": "Expression"
                    }
                }
            },
            "linkedServiceName": {
                "referenceName": "AzureDataLakeAnalytics1",
                "type": "LinkedServiceReference"
            }
        },
        {
            "name": "Blob2SQL",
            "type": "Copy",
            "dependsOn": [
                {
                    "activity": "compress",
                    "dependencyConditions": [
                        "Succeeded"
                    ]
                }
            ],
            "policy": {
                "timeout": "7.00:00:00",
                "retry": 0,
                "retryIntervalInSeconds": 30,
                "secureOutput": false,
                "secureInput": false
            },
            "typeProperties": {
                "source": {
                    "type": "BlobSource",
                    "recursive": true
                },
                "sink": {
                    "type": "SqlSink",
                    "writeBatchSize": 10000
                },
                "enableStaging": false,
                "dataIntegrationUnits": 0,
                "translator": {
                    "type": "TabularTranslator",
                    "columnMappings": {
                        "tag": "TAG",
                        "device_id": "DEVICE_ID",
                        "system_id": "SYSTEM_ID",
                        "utc": "UTC",
                        "ts": "TS",
                        "median": "MEDIAN",
                        "min": "MIN",
                        "max": "MAX",
                        "avg": "AVG",
                        "stdev": "STDEV",
                        "first_value": "FIRST_VALUE",
                        "last_value": "LAST_VALUE",
                        "message_count": "MESSAGE_COUNT"
                    }
                }
            },
            "inputs": [
                {
                    "referenceName": "AzureBlobDataset_COMPRESSED_ASA_v1",
                    "type": "DatasetReference"
                }
            ],
            "outputs": [
                {
                    "referenceName": "AzureSQLDataset_T_ASSET_MONITORING_WARM_ASA_v1",
                    "type": "DatasetReference"
                }
            ]
        }
    ],
    "parameters": {
        "windowStartTime": {
            "type": "String"
        }
    }
}

}

触发器:

{
"name": "trigger1",
"properties": {
    "runtimeState": "Started",
    "pipelines": [
        {
            "pipelineReference": {
                "referenceName": "StreamCompressionBlob2SQL",
                "type": "PipelineReference"
            },
            "parameters": {
                "windowStartTime": "@trigger().scheduledTime"
            }
        }
    ],
    "type": "ScheduleTrigger",
    "typeProperties": {
        "recurrence": {
            "frequency": "Day",
            "interval": 1,
            "startTime": "2018-08-17T10:46:00.000Z",
            "endTime": "2018-11-04T10:46:00.000Z",
            "timeZone": "UTC"
        }
    }
}

}

复制的输入数据集Activity:

{
"name": "AzureBlobDataset_COMPRESSED_ASA_v1",
"properties": {
    "linkedServiceName": {
        "referenceName": "AzureBlobStorage",
        "type": "LinkedServiceReference"
    },
    "parameters": {
        "Year": {
            "type": "String",
            "defaultValue": "@formatDateTime(pipeline().parameters.windowStartTime,'yyyy')"
        },
        "Month": {
            "type": "String",
            "defaultValue": "@formatDateTime(pipeline().parameters.windowStartTime,'yyyy')"
        },
        "Day": {
            "type": "String",
            "defaultValue": "@formatDateTime(pipeline().parameters.windowStartTime,'yyyy')"
        }
    },
    "type": "AzureBlob",
    "structure": [
        {
            "name": "tag",
            "type": "String"
        },
        {
            "name": "device_id",
            "type": "String"
        },
        {
            "name": "system_id",
            "type": "String"
        },
        {
            "name": "utc",
            "type": "DateTime"
        },
        {
            "name": "ts",
            "type": "DateTime"
        },
        {
            "name": "median",
            "type": "Double"
        },
        {
            "name": "min",
            "type": "Double"
        },
        {
            "name": "max",
            "type": "Double"
        },
        {
            "name": "avg",
            "type": "Double"
        },
        {
            "name": "stdev",
            "type": "Double"
        },
        {
            "name": "first_value",
            "type": "Double"
        },
        {
            "name": "last_value",
            "type": "Double"
        },
        {
            "name": "message_count",
            "type": "Int16"
        }
    ],
    "typeProperties": {
        "format": {
            "type": "TextFormat",
            "columnDelimiter": ";",
            "nullValue": "\N",
            "treatEmptyAsNull": true,
            "skipLineCount": 0,
            "firstRowAsHeader": true
        },
        "fileName": "",
        "folderPath": {
            "value": "@concat('d00044653/processed/stream/compressed',dataset().Year,'-',dataset().Month,'-',dataset().Day)",
            "type": "Expression"
        }
    }
},
"type": "Microsoft.DataFactory/factories/datasets"

}

复制的输出数据集Activity:

{
"name": "AzureSQLDataset_T_ASSET_MONITORING_WARM_ASA_v1",
"properties": {
    "linkedServiceName": {
        "referenceName": "AzureSqlDatabase1",
        "type": "LinkedServiceReference"
    },
    "type": "AzureSqlTable",
    "structure": [
        {
            "name": "TAG",
            "type": "String"
        },
        {
            "name": "DEVICE_ID",
            "type": "String"
        },
        {
            "name": "SYSTEM_ID",
            "type": "String"
        },
        {
            "name": "UTC",
            "type": "DateTime"
        },
        {
            "name": "TS",
            "type": "DateTime"
        },
        {
            "name": "MEDIAN",
            "type": "Decimal"
        },
        {
            "name": "MIN",
            "type": "Decimal"
        },
        {
            "name": "MAX",
            "type": "Decimal"
        },
        {
            "name": "AVG",
            "type": "Decimal"
        },
        {
            "name": "STDEV",
            "type": "Decimal"
        },
        {
            "name": "FIRST_VALUE",
            "type": "Decimal"
        },
        {
            "name": "LAST_VALUE",
            "type": "Decimal"
        },
        {
            "name": "MESSAGE_COUNT",
            "type": "Int32"
        }
    ],
    "typeProperties": {
        "tableName": "[dbo].[T_ASSET_MONITORING_WARM]"
    }
},
"type": "Microsoft.DataFactory/factories/datasets"

}

我的问题是发布后没有任何反应。 有什么建议吗??

计划触发器不支持回填方案(根据您的触发器定义 - 您从 2018 年 8 月 17 日开始)。在计划触发器中,管​​道运行只能在从当前时间和未来的时间段执行。

在您的情况下,对于回填场景,请使用 Tumbling window 触发器。