如何使用 Apache Kafka、Amazon Glue 和 Amazon S3 创建 Datalake?
How to create a Datalake using Apache Kafka, Amazon Glue and Amazon S3?
我想将 Kafka 主题中的所有数据存储到 Amazon S3 中。我有一个 Kafka 集群,在一个主题中每秒接收 200.000 条消息,每条值消息有 50 个字段(字符串、时间戳、整数和浮点数)。
我的主要想法是使用Kafka Connector 将数据存储在存储桶s3 中,然后使用Amazon Glue 将数据转换并保存到另一个存储桶中。我有下一个问题:
1) 怎么做?该架构会运作良好吗?我尝试使用 Amazon EMR (Spark Streaming),但我有太多顾虑 How to decrease the processing time and failed tasks using Apache Spark for events streaming from Apache Kafka?
2) 我尝试使用 Confluent 中的 Kafka Connect,但我有几个问题:
我可以从其他 Kafka 实例连接到我的 Kafka 集群吗?
运行 以独立方式我的 Kafka 连接器 s3?
这个错误是什么意思“ERROR Task s3-sink-0 throw an uncaught an
不可恢复的异常”?
ERROR Task s3-sink-0 threw an uncaught and unrecoverable exception
(org.apache.kafka.connect.runtime.WorkerTask:142)
java.lang.NullPointerException at
io.confluent.connect.hdfs.HdfsSinkTask.close(HdfsSinkTask.java:122)
at
org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:290)
at
org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:421)
at
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:146)
at
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
at
org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266) at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745) [2018-10-05 15:32:26,086]
ERROR Task is being killed and will not recover until manually
restarted (org.apache.kafka.connect.runtime.WorkerTask:143)
[2018-10-05 15:32:27,980] WARN could not create Dir using directory
from url file:/targ. skipping. (org.reflections.Reflections:104)
java.lang.NullPointerException at
org.reflections.vfs.Vfs$DefaultUrlTypes.matches(Vfs.java:239) at
org.reflections.vfs.Vfs.fromURL(Vfs.java:98) at
org.reflections.vfs.Vfs.fromURL(Vfs.java:91) at
org.reflections.Reflections.scan(Reflections.java:237) at
org.reflections.Reflections.scan(Reflections.java:204) at
org.reflections.Reflections.(Reflections.java:129) at
org.apache.kafka.connect.runtime.AbstractHerder.connectorPlugins(AbstractHerder.java:268)
at
org.apache.kafka.connect.runtime.AbstractHerder.run(AbstractHerder.java:377)
at java.lang.Thread.run(Thread.java:745) [2018-10-05 15:32:27,981]
WARN could not create Vfs.Dir from url. ignoring the exception and
continuing (org.reflections.Reflections:208)
org.reflections.ReflectionsException: could not create Vfs.Dir from
url, no matching UrlType was found [file:/targ] either use
fromURL(final URL url, final List urlTypes) or use the static
setDefaultURLTypes(final List urlTypes) or
addDefaultURLTypes(UrlType urlType) with your specialized UrlType. at
org.reflections.vfs.Vfs.fromURL(Vfs.java:109) at
org.reflections.vfs.Vfs.fromURL(Vfs.java:91) at
org.reflections.Reflections.scan(Reflections.java:237) at
org.reflections.Reflections.scan(Reflections.java:204) at
org.reflections.Reflections.(Reflections.java:129) at
org.apache.kafka.connect.runtime.AbstractHerder.connectorPlugins(AbstractHerder.java:268)
at
org.apache.kafka.connect.runtime.AbstractHerder.run(AbstractHerder.java:377)
at java.lang.Thread.run(Thread.java:745) [2018-10-05 15:32:35,441]
INFO Reflections took 12393 ms to scan 429 urls, producing 13521 keys
and 95814 values (org.reflections.Reflections:229)
- 如果您可以从
恢复连接到 Kafka 的步骤并继续使用 s3
另一个 Kafka 实例,你会怎么做?
- 所有这些字段是什么意思 key.converter、value.converter、key.converter.schemas.enable、value.converter.schemas.enable、internal.key.converter internal.value.converter, internal.key.converter.schemas.enable, internal.value.converter.schemas.enable?
key.converter、value.converter 的可能值是多少?
3) 一旦我的原始数据在一个桶中,我想使用 Amazon Glue 来获取这些数据,反序列化 Protobuffer,更改一些字段的格式,最后将其存储在 Parquet 中的另一个桶中.如何在 Amazon Glue 中使用我自己的 java protobuffer 库?
4)如果我想用Amazon Athena查询,如何自动加载分区(年、月、日、时)?使用 Amazon Glue 的爬虫和调度程序?
我们将 S3 Connect 用于数百个主题,并使用 Hive、Athena、Spark、Presto 等处理数据。似乎工作正常,但我觉得实际数据库可能 return 结果更快。
无论如何,要回答有关连接的问题
Can I connect to my Kafka Cluster from other Kafka instance and run in a standalone way my Kafka Connector s3?
我不确定我是否理解问题,但是 Kafka Connect 需要连接到一个集群,你不需要两个 Kafka 集群来使用它。您通常 运行 Kafka Connect 进程作为他们自己的集群的一部分,而不是在代理上。
What means this error "ERROR Task s3-sink-0 threw an uncaught an unrecoverable exception"?
这意味着您需要查看日志以找出抛出的异常并阻止连接器读取数据。
WARN could not create Dir using directory from url file:/targ
...如果您使用的是 HDFS 连接器,我认为您不应该使用默认的 file:// URI
If you can resume the steps to connect to Kafka and keep on s3 from another Kafka instance, how will you do?
你不能"resume from another Kafka instance"。如前所述,Connect 只能从单个 Kafka 集群中消费,并且任何消费的偏移量和消费者组都存储在其中。
What means all these fields
这些字段已从最新的 Kafka 版本中删除,您可以忽略它们。你绝对不应该改变它们
internal.key.converter,internal.value.converter, internal.key.converter.schemas.enable, internal.value.converter.schemas.enable
这些是你的序列化器和反序列化器,就像普通的生产者消费者 API 有
key.converter, value.converter
我认为这些仅对 JSON 转换器很重要。参见 https://rmoff.net/2017/09/06/kafka-connect-jsondeserializer-with-schemas-enable-requires-schema-and-payload-fields
key.converter.schemas.enable, value.converter.schemas.enable
to deserialize Protobuf, to change the format of some fields, and finally to store it in another bucket in Parquet
Kafka Connect 需要加载 Protobuf 转换器,我不知道有没有(我认为 Blue Apron 写了一些东西...搜索 github)。
一般来说,Avro 转换为 Parquet 会容易得多,因为已经存在可以做到这一点的原生库。 Confluent 的 S3 Connect 目前不编写 Parquet 格式,但在一个开放的 PR 中。另一种方法是使用 Pinterest Secor 库。
我不知道 Glue,但如果它像 Hive,你会在查询期间使用 ADD JAR
来加载外部代码插件和函数
我对 Athena 的经验很少,但 Glue 将所有分区维护为 Hive 元存储。自动部分是爬虫,你可以在查询上放一个过滤器来做分区 p运行ing
补充@cricket_007的回答
Can I connect to my Kafka Cluster from other Kafka instance and run in a standalone way my Kafka Connector s3?
Kafka S3 Connector 是 Confluent 发行版的一部分,其中还包括 Kafka 以及其他相关服务,但它并不意味着 运行 直接在您的代理上,而是:
- 作为独立工作人员运行在启动服务时给定连接器配置
- 或作为额外的工人集群 运行在您的 Kafka Brokers 集群的一侧。在这种情况下,interaction/running 的连接器通过 Kafka Connect REST API 更好(搜索 "Managing Kafka Connectors" 以获取带有示例的文档)
If you can resume the steps to connect to Kafka and keep on s3 from
another Kafka instance, how will you do?
您是在谈论另一个 Kafka Connect 实例吗?
- 如果是这样,您可以简单地以分布式模式执行 Kafka Connect 服务,这旨在提供您似乎正在寻找的可靠性...
或者您的意思是另一个 Kafka(经纪人)集群?
- 在那种情况下,您可以尝试(但那将是实验性的,我自己还没有尝试过...)以独立模式 运行 Kafka Connect 并简单地更新
bootstrap.servers
连接器配置的参数指向新集群。为什么这可能有效:在独立模式下,您的接收器连接器的偏移量存储在您的工作人员本地(与偏移量直接存储在 Kafka 集群上的分布式模式相反......)。为什么这可能不起作用:它根本不适合此用途,我猜您可能需要您的主题和分区完全相同...?
What are the possible values for key.converter, value.converter?
检查 Confluent's documentation for kafka-connect-s3 ;)
How can I use my own java protobuffer library in Amazon Glue?
不确定实际的方法,但是 Glue 作业在幕后从 EMR 集群中产生,所以我不明白为什么它不可能...
If I want to query with Amazon Athena, how can I load the partitions automatically (year, month, day, hour)? With the crawlers and schedulers of Amazon Glue?
是的。
假设每日分区,您实际上可以安排 运行 爬虫在早上的第一件事,只要您预计新数据会在 S3 上创建当天的文件夹(所以在S3 上至少存在当天的一个对象)...爬虫将添加当天的分区,然后可用于查询任何新添加的对象。
我想将 Kafka 主题中的所有数据存储到 Amazon S3 中。我有一个 Kafka 集群,在一个主题中每秒接收 200.000 条消息,每条值消息有 50 个字段(字符串、时间戳、整数和浮点数)。
我的主要想法是使用Kafka Connector 将数据存储在存储桶s3 中,然后使用Amazon Glue 将数据转换并保存到另一个存储桶中。我有下一个问题:
1) 怎么做?该架构会运作良好吗?我尝试使用 Amazon EMR (Spark Streaming),但我有太多顾虑 How to decrease the processing time and failed tasks using Apache Spark for events streaming from Apache Kafka?
2) 我尝试使用 Confluent 中的 Kafka Connect,但我有几个问题:
我可以从其他 Kafka 实例连接到我的 Kafka 集群吗? 运行 以独立方式我的 Kafka 连接器 s3?
这个错误是什么意思“ERROR Task s3-sink-0 throw an uncaught an
不可恢复的异常”?
ERROR Task s3-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:142) java.lang.NullPointerException at io.confluent.connect.hdfs.HdfsSinkTask.close(HdfsSinkTask.java:122) at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:290) at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:421) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:146) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) [2018-10-05 15:32:26,086] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:143) [2018-10-05 15:32:27,980] WARN could not create Dir using directory from url file:/targ. skipping. (org.reflections.Reflections:104) java.lang.NullPointerException at org.reflections.vfs.Vfs$DefaultUrlTypes.matches(Vfs.java:239) at org.reflections.vfs.Vfs.fromURL(Vfs.java:98) at org.reflections.vfs.Vfs.fromURL(Vfs.java:91) at org.reflections.Reflections.scan(Reflections.java:237) at org.reflections.Reflections.scan(Reflections.java:204) at org.reflections.Reflections.(Reflections.java:129) at org.apache.kafka.connect.runtime.AbstractHerder.connectorPlugins(AbstractHerder.java:268) at org.apache.kafka.connect.runtime.AbstractHerder.run(AbstractHerder.java:377) at java.lang.Thread.run(Thread.java:745) [2018-10-05 15:32:27,981] WARN could not create Vfs.Dir from url. ignoring the exception and continuing (org.reflections.Reflections:208) org.reflections.ReflectionsException: could not create Vfs.Dir from url, no matching UrlType was found [file:/targ] either use fromURL(final URL url, final List urlTypes) or use the static setDefaultURLTypes(final List urlTypes) or addDefaultURLTypes(UrlType urlType) with your specialized UrlType. at org.reflections.vfs.Vfs.fromURL(Vfs.java:109) at org.reflections.vfs.Vfs.fromURL(Vfs.java:91) at org.reflections.Reflections.scan(Reflections.java:237) at org.reflections.Reflections.scan(Reflections.java:204) at org.reflections.Reflections.(Reflections.java:129) at org.apache.kafka.connect.runtime.AbstractHerder.connectorPlugins(AbstractHerder.java:268) at org.apache.kafka.connect.runtime.AbstractHerder.run(AbstractHerder.java:377) at java.lang.Thread.run(Thread.java:745) [2018-10-05 15:32:35,441] INFO Reflections took 12393 ms to scan 429 urls, producing 13521 keys and 95814 values (org.reflections.Reflections:229)
- 如果您可以从
恢复连接到 Kafka 的步骤并继续使用 s3 另一个 Kafka 实例,你会怎么做? - 所有这些字段是什么意思 key.converter、value.converter、key.converter.schemas.enable、value.converter.schemas.enable、internal.key.converter internal.value.converter, internal.key.converter.schemas.enable, internal.value.converter.schemas.enable?
key.converter、value.converter 的可能值是多少?
3) 一旦我的原始数据在一个桶中,我想使用 Amazon Glue 来获取这些数据,反序列化 Protobuffer,更改一些字段的格式,最后将其存储在 Parquet 中的另一个桶中.如何在 Amazon Glue 中使用我自己的 java protobuffer 库?
4)如果我想用Amazon Athena查询,如何自动加载分区(年、月、日、时)?使用 Amazon Glue 的爬虫和调度程序?
我们将 S3 Connect 用于数百个主题,并使用 Hive、Athena、Spark、Presto 等处理数据。似乎工作正常,但我觉得实际数据库可能 return 结果更快。
无论如何,要回答有关连接的问题
Can I connect to my Kafka Cluster from other Kafka instance and run in a standalone way my Kafka Connector s3?
我不确定我是否理解问题,但是 Kafka Connect 需要连接到一个集群,你不需要两个 Kafka 集群来使用它。您通常 运行 Kafka Connect 进程作为他们自己的集群的一部分,而不是在代理上。
What means this error "ERROR Task s3-sink-0 threw an uncaught an unrecoverable exception"?
这意味着您需要查看日志以找出抛出的异常并阻止连接器读取数据。
WARN could not create Dir using directory from url file:/targ
...如果您使用的是 HDFS 连接器,我认为您不应该使用默认的 file:// URI
If you can resume the steps to connect to Kafka and keep on s3 from another Kafka instance, how will you do?
你不能"resume from another Kafka instance"。如前所述,Connect 只能从单个 Kafka 集群中消费,并且任何消费的偏移量和消费者组都存储在其中。
What means all these fields
这些字段已从最新的 Kafka 版本中删除,您可以忽略它们。你绝对不应该改变它们
internal.key.converter,internal.value.converter, internal.key.converter.schemas.enable, internal.value.converter.schemas.enable
这些是你的序列化器和反序列化器,就像普通的生产者消费者 API 有
key.converter, value.converter
我认为这些仅对 JSON 转换器很重要。参见 https://rmoff.net/2017/09/06/kafka-connect-jsondeserializer-with-schemas-enable-requires-schema-and-payload-fields
key.converter.schemas.enable, value.converter.schemas.enable
to deserialize Protobuf, to change the format of some fields, and finally to store it in another bucket in Parquet
Kafka Connect 需要加载 Protobuf 转换器,我不知道有没有(我认为 Blue Apron 写了一些东西...搜索 github)。
一般来说,Avro 转换为 Parquet 会容易得多,因为已经存在可以做到这一点的原生库。 Confluent 的 S3 Connect 目前不编写 Parquet 格式,但在一个开放的 PR 中。另一种方法是使用 Pinterest Secor 库。
我不知道 Glue,但如果它像 Hive,你会在查询期间使用 ADD JAR
来加载外部代码插件和函数
我对 Athena 的经验很少,但 Glue 将所有分区维护为 Hive 元存储。自动部分是爬虫,你可以在查询上放一个过滤器来做分区 p运行ing
补充@cricket_007的回答
Can I connect to my Kafka Cluster from other Kafka instance and run in a standalone way my Kafka Connector s3?
Kafka S3 Connector 是 Confluent 发行版的一部分,其中还包括 Kafka 以及其他相关服务,但它并不意味着 运行 直接在您的代理上,而是:
- 作为独立工作人员运行在启动服务时给定连接器配置
- 或作为额外的工人集群 运行在您的 Kafka Brokers 集群的一侧。在这种情况下,interaction/running 的连接器通过 Kafka Connect REST API 更好(搜索 "Managing Kafka Connectors" 以获取带有示例的文档)
If you can resume the steps to connect to Kafka and keep on s3 from another Kafka instance, how will you do?
您是在谈论另一个 Kafka Connect 实例吗?
- 如果是这样,您可以简单地以分布式模式执行 Kafka Connect 服务,这旨在提供您似乎正在寻找的可靠性...
或者您的意思是另一个 Kafka(经纪人)集群?
- 在那种情况下,您可以尝试(但那将是实验性的,我自己还没有尝试过...)以独立模式 运行 Kafka Connect 并简单地更新
bootstrap.servers
连接器配置的参数指向新集群。为什么这可能有效:在独立模式下,您的接收器连接器的偏移量存储在您的工作人员本地(与偏移量直接存储在 Kafka 集群上的分布式模式相反......)。为什么这可能不起作用:它根本不适合此用途,我猜您可能需要您的主题和分区完全相同...?
What are the possible values for key.converter, value.converter?
检查 Confluent's documentation for kafka-connect-s3 ;)
How can I use my own java protobuffer library in Amazon Glue?
不确定实际的方法,但是 Glue 作业在幕后从 EMR 集群中产生,所以我不明白为什么它不可能...
If I want to query with Amazon Athena, how can I load the partitions automatically (year, month, day, hour)? With the crawlers and schedulers of Amazon Glue?
是的。
假设每日分区,您实际上可以安排 运行 爬虫在早上的第一件事,只要您预计新数据会在 S3 上创建当天的文件夹(所以在S3 上至少存在当天的一个对象)...爬虫将添加当天的分区,然后可用于查询任何新添加的对象。