如何从 Amazon SQS 加载流式数据?

How to load streaming data from Amazon SQS?

我使用的是 Spark 2.2.0.

如何使用 pyspark 将 Amazon SQS 流提供给 spark 结构化流?

This 问题试图通过创建自定义接收器来回答非结构化流和 Scala 的问题。
pyspark 中可能有类似的东西吗?

spark.readStream \
   .format("s3-sqs") \
   .option("fileFormat", "json") \
   .option("queueUrl", ...) \
   .schema(...) \
   .load()

根据上述Databricks,接收器可用于S3-SQS文件源。但是,对于仅SQS 怎么可能一种方法。

我尝试从 AWS-SQS-Receive_Message 理解来接收消息。但是,如何直接发送stream到spark streaming就不清楚了。

我对 Amazon SQS 一无所知,但是 "how can I feed Amazon SQS stream to spark structured stream using pyspark." 无法使用任何外部消息传递系统或使用 Spark 结构化流的数据源(又名 Spark "Streams").

在 Spark Structured Streaming 中,情况正好相反,Spark 会定期拉取数据(类似于 Kafka Consumer API 在不提供数据的情况下拉取数据的工作方式)。

换句话说,Spark "Streams" 只是 Amazon SQS 中 "queue" 消息的另一个消费者。

每当我被要求将外部系统与 Spark "Streams" 集成时,我就开始使用 client/consumer API.

为系统编写客户端

一旦我有了它,下一步就是为外部系统开发自定义流 Source,例如Amazon SQS,使用上面的示例客户端代码。

开发自定义流式传输时 Source 您必须执行以下步骤:

  1. 编写一个 Scala class 实现 Source 特征

  2. 使用带有 fully-qualified META-INF/services/org.apache.spark.sql.sources.DataSourceRegister 文件的 Spark SQL 注册 Scala class(自定义 Source)class 名称或在 format

  3. 中使用 fully-qualified class 名称

拥有自定义流源是一项 two-part 开发,包括开发源(并可选择将其注册到 Spark SQL)并在 Spark Structured Streaming 应用程序中使用它(在 Python) 通过 format 方法。