有没有办法在 Kiba 作业结束时 return 一些数据?
Is there a way to return some data at the end of a Kiba job?
如果有一种方法可以从 Kiba ETL 运行 中获取某种 return 对象,这样我就可以使用其中的数据来 return a报告管道 运行.
的情况
我们有一项工作 运行 每 10 分钟平均处理 20 - 50k 条记录,并将它们压缩成摘要记录,其中一些是创建的,一些是更新的。问题是,如果不浏览大量日志文件就很难知道发生了什么,显然,日志对最终用户也很有用。
有没有办法像管道 运行 那样用任意数据填充某种结果对象?例如
- 在源中找到 25.7k 行
- 782 条记录被此 t运行sformer
丢弃
- 已插入 100 条记录
- 150 条记录已更新
- 20 条记录有错误(现在是)
- 这条记录的统计数据最高
- 1200条记录属于该VIP客户
- 等等
最后,使用该数据发送电子邮件摘要、填充网页、呈现一些控制台输出等。
目前,我现在能看到这个工作的唯一方法是在设置期间发送一个对象,并在它流经源、t运行sformers 和目的地时改变它。 运行 完成后,随后检查变量并对其中的数据执行一些操作。
是否应该这样做,或者有更好的方法吗?
编辑
只是想补充一点,我不想在 post_process
块中处理这个,因为管道通过许多不同的媒介使用,我希望每个用例都能处理自己的反馈机制。它也更清洁 (imo) ETL 管道,不必担心它的使用位置,以及该使用场景的反馈期望是什么......
答案在很大程度上取决于上下文,但这里有一些准则。
如果结果对象不是太大,实际上我建议您传递一个空的结果对象(通常是 Hash
),然后在运行期间填充它(您也可以使用某种形式的中间件来甚至跟踪异常本身)。
如何填充它取决于上下文和您的实际需要,但这可以以与工作无关的方式完成(也许使用 DSL 扩展 https://github.com/thbar/kiba/wiki/How-to-extend-the-Kiba-DSL,您可以实现一些相当高级的将注册所需的转换或块以实现您需要的扩展。
该对象可以按原样使用,也可以序列化为 JSON 或类似的对象,如果您以后需要提供一些丰富的输出(或者您可以使用它来准备一些东西),甚至可以存储到数据库中否则)。
如果需要,您甚至可以为此目的在特定数据库中构建结构合理的内容(例如,如果您需要一种简单的方法将其公开给客户)。
请注意,您可以通过编程方式定义一个 post_process
,而作业并没有意识到这一点(没有耦合)。这是一个非常简单的例子:
module ETL
module DSLExtensions
module EmailReport
def setup_email_report
pre_process do
@email_report_stats = Hash.new(0)
end
post_process do
# Do the actual email sending
end
end
def track_event!(event:)
@email_report_stats[event] += 1
end
end
end
end
Kiba.parse do
extend ETL::DSLExtensions::EmailReport
# this will register the pre/post process
setup_email_report
source ...
track_event!(event: 'row_read')
transform
transform
transform
track_event!(event: 'row_written')
destination ...
end
如果这样做,请确保使用非常好的命名空间变量,以避免任何冲突。
请注意,如前所述,这不包括失败的情况,但您明白了!
如果有一种方法可以从 Kiba ETL 运行 中获取某种 return 对象,这样我就可以使用其中的数据来 return a报告管道 运行.
的情况我们有一项工作 运行 每 10 分钟平均处理 20 - 50k 条记录,并将它们压缩成摘要记录,其中一些是创建的,一些是更新的。问题是,如果不浏览大量日志文件就很难知道发生了什么,显然,日志对最终用户也很有用。
有没有办法像管道 运行 那样用任意数据填充某种结果对象?例如
- 在源中找到 25.7k 行
- 782 条记录被此 t运行sformer 丢弃
- 已插入 100 条记录
- 150 条记录已更新
- 20 条记录有错误(现在是)
- 这条记录的统计数据最高
- 1200条记录属于该VIP客户
- 等等
最后,使用该数据发送电子邮件摘要、填充网页、呈现一些控制台输出等。
目前,我现在能看到这个工作的唯一方法是在设置期间发送一个对象,并在它流经源、t运行sformers 和目的地时改变它。 运行 完成后,随后检查变量并对其中的数据执行一些操作。
是否应该这样做,或者有更好的方法吗?
编辑
只是想补充一点,我不想在 post_process
块中处理这个,因为管道通过许多不同的媒介使用,我希望每个用例都能处理自己的反馈机制。它也更清洁 (imo) ETL 管道,不必担心它的使用位置,以及该使用场景的反馈期望是什么......
答案在很大程度上取决于上下文,但这里有一些准则。
如果结果对象不是太大,实际上我建议您传递一个空的结果对象(通常是 Hash
),然后在运行期间填充它(您也可以使用某种形式的中间件来甚至跟踪异常本身)。
如何填充它取决于上下文和您的实际需要,但这可以以与工作无关的方式完成(也许使用 DSL 扩展 https://github.com/thbar/kiba/wiki/How-to-extend-the-Kiba-DSL,您可以实现一些相当高级的将注册所需的转换或块以实现您需要的扩展。
该对象可以按原样使用,也可以序列化为 JSON 或类似的对象,如果您以后需要提供一些丰富的输出(或者您可以使用它来准备一些东西),甚至可以存储到数据库中否则)。
如果需要,您甚至可以为此目的在特定数据库中构建结构合理的内容(例如,如果您需要一种简单的方法将其公开给客户)。
请注意,您可以通过编程方式定义一个 post_process
,而作业并没有意识到这一点(没有耦合)。这是一个非常简单的例子:
module ETL
module DSLExtensions
module EmailReport
def setup_email_report
pre_process do
@email_report_stats = Hash.new(0)
end
post_process do
# Do the actual email sending
end
end
def track_event!(event:)
@email_report_stats[event] += 1
end
end
end
end
Kiba.parse do
extend ETL::DSLExtensions::EmailReport
# this will register the pre/post process
setup_email_report
source ...
track_event!(event: 'row_read')
transform
transform
transform
track_event!(event: 'row_written')
destination ...
end
如果这样做,请确保使用非常好的命名空间变量,以避免任何冲突。
请注意,如前所述,这不包括失败的情况,但您明白了!