Twilio API 到 SSIS 数据流
Twilio API to SSIS data flow
尝试从 Twilio API 中读取记录,然后在 SSIS 数据流中使用它,并将记录保存在 SQL 服务器数据库中。使用 https://www.twilio.com/docs/sms/api/message-resource#read-multiple-message-resources 中的 API 文档,我已经能够阅读这些消息,但有点不知道下一步该做什么,我只想将其用作数据流的来源,所以不想尝试在 .net 中做任何太花哨的事情,更多的是 SQL 对 C# 经验不多的人。
我是不是觉得这太直截了当了?假设我能够阅读消息(看起来不像 JSON,各个字段已经使用 Twilio.dll 拆分出来)将其放入一个变量,然后我遍历每一行并传递到输出缓冲区。
本质上,是在尝试这样的事情:
public override void CreateNewOutputRows()
{
string accountSid = "AAAA";
string authToken = "1111";
TwilioClient.Init(accountSid, authToken);
var response = MessageResource.Read();
foreach (var msg in response)
{
Output0Buffer.AddRow();
Output0Buffer.ID = msg.Sid.ToString();
Output0Buffer.Message = msg.Body.ToString();
}
}
```
下面是一个不使用 Twilio 库且不使用数据流的示例。
我们基本上是在看类似的东西:
- 截断我们正在加载的table
- 我们将使用 For 循环容器,因为我们必须考虑分页。
- 进行 REST 调用的 C# 脚本任务
- 用于解析 JSON 的存储过程,插入到 table 并评估下一个 url 值和 return 返回它以查看我们是否需要再绕一圈。
Twilio 默认有 50 条记录,最多 1000 条,return在响应中编辑。如果您想要超过默认值,您可以使用 URL 参数“?PageSize=60”来更改它。如果有更多记录,那么响应将包含下一个特定页面大小 url 以获取下一组记录。
两个变量:
NextUrl 设为
https://api.twilio.com/2010-04-01/Accounts/{AccountSid}/Messages.json
For 循环容器
一旦 NextUrl = STOP,循环就会停止。我们在存储过程中处理它。插入数据后,我们检查该值,如果它为 NULL,我们将其设置为 STOP。
脚本任务
传入 URL,在脚本中我们设置了 apiResponse 所以它会返回。
您需要包括这些::
Main() 代码
try
{
string apiUrl = Dts.Variables["NextUrl"].Value.ToString();
string userName = ""; //accountSid found in Twilio account console
string passwd = ""; //authToken found in Twilio account console
ServicePointManager.SecurityProtocol = SecurityProtocolType.Tls12;
HttpClient client = new HttpClient();
client.DefaultRequestHeaders.Accept.Clear();
byte[] authToken = Encoding.ASCII.GetBytes($"{userName}:{passwd}");
client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Basic", Convert.ToBase64String(authToken));
HttpResponseMessage response;
// Execute the REST GET
response = client.GetAsync(apiUrl).GetAwaiter().GetResult();
// Get the JSON response.
string contentString = response.Content.ReadAsStringAsync().GetAwaiter().GetResult();
Dts.Variables["apiResponse"].Value = contentString; //Passing the response back out.
Dts.TaskResult = (int)ScriptResults.Success;
}
catch (Exception ex)
{
//This code will cause the ssis to fail and bubble back out the error from this code
Dts.Events.FireError(-1, "Error", ex.Message, String.Empty, 0);
Dts.TaskResult = (int)ScriptResults.Failure;
}
创建一个带有 NVARCHAR(MAX) 参数的存储过程以接受 apiResponse 并 return 输出 NextUrl 值。任务类似于:
参数映射:
结果集:
那么存储过程本身就是这样的:
CREATE PROCEDURE [dbo].[InsertTwilioMessage]
@apiResponse NVARCHAR(MAX)
AS
DECLARE @NextUrl NVARCHAR(2000);
INSERT INTO [dbo].[TwilioMessage] (
[body]
, [num_segments]
, [direction]
, [from]
, [date_updated]
, [price]
, [error_message]
, [uri]
, [account_sid]
, [num_media]
, [to]
, [date_created]
, [status]
, [sid]
, [date_sent]
, [messaging_service_sid]
, [error_code]
, [price_unit]
, [api_version]
)
SELECT [rsp].[body]
, [rsp].[num_segments]
, [rsp].[direction]
, [rsp].[from]
, [rsp].[date_updated]
, [rsp].[price]
, [rsp].[error_message]
, [rsp].[uri]
, [rsp].[account_sid]
, [rsp].[num_media]
, [rsp].[to]
, [rsp].[date_created]
, [rsp].[status]
, [rsp].[sid]
, [rsp].[date_sent]
, [rsp].[messaging_service_sid]
, [rsp].[error_code]
, [rsp].[price_unit]
, [rsp].[api_version]
FROM OPENJSON(@apiResponse, '$.messages') --data is in the messages array
WITH (
[body] NVARCHAR(255) '$.body'
, [num_segments] NVARCHAR(255) '$.num_segments'
, [direction] NVARCHAR(255) '$.direction'
, [from] NVARCHAR(255) '$.from'
, [date_updated] NVARCHAR(255) '$.date_updated'
, [price] NVARCHAR(255) '$.price'
, [error_message] NVARCHAR(255) '$.error_message'
, [uri] NVARCHAR(255) '$.uri'
, [account_sid] NVARCHAR(255) '$.account_sid'
, [num_media] NVARCHAR(255) '$.num_media'
, [to] NVARCHAR(255) '$.to'
, [date_created] NVARCHAR(255) '$.date_created'
, [status] NVARCHAR(255) '$.status'
, [sid] NVARCHAR(255) '$.sid'
, [date_sent] NVARCHAR(255) '$.date_sent'
, [messaging_service_sid] NVARCHAR(255) '$.messing_serivce_sid'
, [error_code] NVARCHAR(255) '$.error_code'
, [price_unit] NVARCHAR(255) '$.price_unit'
, [api_version] NVARCHAR(255) '$.api_version'
) AS [rsp];
SELECT @NextUrl = [next_page_uri]
FROM
OPENJSON(@apiResponse)
WITH (
[next_page_uri] NVARCHAR(2000) '$.next_page_uri'
);
-- we check for null of NextUrl and return STOP to stop the for loop
--if not we return out the Url to go back around and get more data
SELECT CASE WHEN @NextUrl IS NOT NULL THEN CONCAT('https://api.twilio.com', @NextUrl)
ELSE 'STOP'
END AS [NextUrl];
SSIS 变量 NextUrl 被存储过程 return 回退的内容覆盖。如果这不等于继续 for 循环的 STOP,现在将 URL 传递到脚本任务中以获取下一组数据,依此类推...
最终能够在 C# 中运行它。问题在于所使用的 dll 版本在最新版本无法相互协作的情况下。
尝试从 Twilio API 中读取记录,然后在 SSIS 数据流中使用它,并将记录保存在 SQL 服务器数据库中。使用 https://www.twilio.com/docs/sms/api/message-resource#read-multiple-message-resources 中的 API 文档,我已经能够阅读这些消息,但有点不知道下一步该做什么,我只想将其用作数据流的来源,所以不想尝试在 .net 中做任何太花哨的事情,更多的是 SQL 对 C# 经验不多的人。
我是不是觉得这太直截了当了?假设我能够阅读消息(看起来不像 JSON,各个字段已经使用 Twilio.dll 拆分出来)将其放入一个变量,然后我遍历每一行并传递到输出缓冲区。
本质上,是在尝试这样的事情:
public override void CreateNewOutputRows()
{
string accountSid = "AAAA";
string authToken = "1111";
TwilioClient.Init(accountSid, authToken);
var response = MessageResource.Read();
foreach (var msg in response)
{
Output0Buffer.AddRow();
Output0Buffer.ID = msg.Sid.ToString();
Output0Buffer.Message = msg.Body.ToString();
}
}
```
下面是一个不使用 Twilio 库且不使用数据流的示例。
我们基本上是在看类似的东西:
- 截断我们正在加载的table
- 我们将使用 For 循环容器,因为我们必须考虑分页。
- 进行 REST 调用的 C# 脚本任务
- 用于解析 JSON 的存储过程,插入到 table 并评估下一个 url 值和 return 返回它以查看我们是否需要再绕一圈。
Twilio 默认有 50 条记录,最多 1000 条,return在响应中编辑。如果您想要超过默认值,您可以使用 URL 参数“?PageSize=60”来更改它。如果有更多记录,那么响应将包含下一个特定页面大小 url 以获取下一组记录。
两个变量:
NextUrl 设为
https://api.twilio.com/2010-04-01/Accounts/{AccountSid}/Messages.json
For 循环容器
一旦 NextUrl = STOP,循环就会停止。我们在存储过程中处理它。插入数据后,我们检查该值,如果它为 NULL,我们将其设置为 STOP。
脚本任务
传入 URL,在脚本中我们设置了 apiResponse 所以它会返回。
您需要包括这些::
Main() 代码
try
{
string apiUrl = Dts.Variables["NextUrl"].Value.ToString();
string userName = ""; //accountSid found in Twilio account console
string passwd = ""; //authToken found in Twilio account console
ServicePointManager.SecurityProtocol = SecurityProtocolType.Tls12;
HttpClient client = new HttpClient();
client.DefaultRequestHeaders.Accept.Clear();
byte[] authToken = Encoding.ASCII.GetBytes($"{userName}:{passwd}");
client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Basic", Convert.ToBase64String(authToken));
HttpResponseMessage response;
// Execute the REST GET
response = client.GetAsync(apiUrl).GetAwaiter().GetResult();
// Get the JSON response.
string contentString = response.Content.ReadAsStringAsync().GetAwaiter().GetResult();
Dts.Variables["apiResponse"].Value = contentString; //Passing the response back out.
Dts.TaskResult = (int)ScriptResults.Success;
}
catch (Exception ex)
{
//This code will cause the ssis to fail and bubble back out the error from this code
Dts.Events.FireError(-1, "Error", ex.Message, String.Empty, 0);
Dts.TaskResult = (int)ScriptResults.Failure;
}
创建一个带有 NVARCHAR(MAX) 参数的存储过程以接受 apiResponse 并 return 输出 NextUrl 值。任务类似于:
参数映射:
结果集:
那么存储过程本身就是这样的:
CREATE PROCEDURE [dbo].[InsertTwilioMessage]
@apiResponse NVARCHAR(MAX)
AS
DECLARE @NextUrl NVARCHAR(2000);
INSERT INTO [dbo].[TwilioMessage] (
[body]
, [num_segments]
, [direction]
, [from]
, [date_updated]
, [price]
, [error_message]
, [uri]
, [account_sid]
, [num_media]
, [to]
, [date_created]
, [status]
, [sid]
, [date_sent]
, [messaging_service_sid]
, [error_code]
, [price_unit]
, [api_version]
)
SELECT [rsp].[body]
, [rsp].[num_segments]
, [rsp].[direction]
, [rsp].[from]
, [rsp].[date_updated]
, [rsp].[price]
, [rsp].[error_message]
, [rsp].[uri]
, [rsp].[account_sid]
, [rsp].[num_media]
, [rsp].[to]
, [rsp].[date_created]
, [rsp].[status]
, [rsp].[sid]
, [rsp].[date_sent]
, [rsp].[messaging_service_sid]
, [rsp].[error_code]
, [rsp].[price_unit]
, [rsp].[api_version]
FROM OPENJSON(@apiResponse, '$.messages') --data is in the messages array
WITH (
[body] NVARCHAR(255) '$.body'
, [num_segments] NVARCHAR(255) '$.num_segments'
, [direction] NVARCHAR(255) '$.direction'
, [from] NVARCHAR(255) '$.from'
, [date_updated] NVARCHAR(255) '$.date_updated'
, [price] NVARCHAR(255) '$.price'
, [error_message] NVARCHAR(255) '$.error_message'
, [uri] NVARCHAR(255) '$.uri'
, [account_sid] NVARCHAR(255) '$.account_sid'
, [num_media] NVARCHAR(255) '$.num_media'
, [to] NVARCHAR(255) '$.to'
, [date_created] NVARCHAR(255) '$.date_created'
, [status] NVARCHAR(255) '$.status'
, [sid] NVARCHAR(255) '$.sid'
, [date_sent] NVARCHAR(255) '$.date_sent'
, [messaging_service_sid] NVARCHAR(255) '$.messing_serivce_sid'
, [error_code] NVARCHAR(255) '$.error_code'
, [price_unit] NVARCHAR(255) '$.price_unit'
, [api_version] NVARCHAR(255) '$.api_version'
) AS [rsp];
SELECT @NextUrl = [next_page_uri]
FROM
OPENJSON(@apiResponse)
WITH (
[next_page_uri] NVARCHAR(2000) '$.next_page_uri'
);
-- we check for null of NextUrl and return STOP to stop the for loop
--if not we return out the Url to go back around and get more data
SELECT CASE WHEN @NextUrl IS NOT NULL THEN CONCAT('https://api.twilio.com', @NextUrl)
ELSE 'STOP'
END AS [NextUrl];
SSIS 变量 NextUrl 被存储过程 return 回退的内容覆盖。如果这不等于继续 for 循环的 STOP,现在将 URL 传递到脚本任务中以获取下一组数据,依此类推...
最终能够在 C# 中运行它。问题在于所使用的 dll 版本在最新版本无法相互协作的情况下。