在 Mule 服务器之间使用来自 S3 平衡的对象
Consume objects from S3 balancing between Mule servers
场景是这样的:
- S3 Bucket 充满了 csv 文件,每个文件有数百条格式化行。
- N 个 Mule 服务器。集群与否。两种选择都可用。
- 在所有 mule 服务器中安装了一个独特的 Mule 流。
- Mule 流行为很简单。轮询 S3 以延迟获取可用文件,检索每个文件内容,将 csv 行转换为 sql 语句并插入 DB。
问题:
- 来自不同 Mule 服务器的所有流成功轮询 s3、检索文件、处理它们并插入数据库。所以文件和注册表被处理了几次。
愿望清单:
- 在所有活动服务器之间完成负载平衡。
- 安装在不同的mule服务器上的flows是相等的(我们不修改flow来获取不同的文件)
- 其中的文件和注册表不会被处理两次
失败的方法:
- 我们在集群模式下尝试了所有 mule 服务器通用的 processed/non 处理机制。我们使用 Mule 的 3.5 对象存储来保存已处理文件的列表,对所有服务器可见。这里的问题是,我们没有平衡,所有工作负载都在一台服务器上,其余几乎一直处于空闲状态。
问题:
- 我们想要负载平衡,哪种架构设计可能是最好的?
- 也许我们需要一个特定的 mule 应用程序来下载 s3 文件,让这个
应用程序平均分配 Mule 服务器之间的工作负载?
这是场景的架构:
配置您的 S3 存储桶以将事件推送到 SQS 队列(请参阅 here),并让您的 mule 服务器从该队列中拉取事件,而不是轮询 S3。这样,每个事件将仅由一名工作人员拉取。
其工作原理如下:在每个worker中,需要反复调用ReceiveMessage()
获取队列中的下一条消息。一旦一个工作人员收到一条消息,该消息将在一定时间内对其他工作人员不可见(您可以通过 setVisibilityTimeout()
控制)。工作人员处理一条消息后,它应该调用 deleteMessage()
将其从队列中完全删除。如果 worker 发生故障,deleteMessage()
不会被调用,因此在可见性超时期限后,另一个 worker 将接收该消息。
换句话说,SQS 中的队列不处理分配工作。工作人员在准备就绪时从队列中拉取消息,这就是创建负载平衡的原因。
场景是这样的:
- S3 Bucket 充满了 csv 文件,每个文件有数百条格式化行。
- N 个 Mule 服务器。集群与否。两种选择都可用。
- 在所有 mule 服务器中安装了一个独特的 Mule 流。
- Mule 流行为很简单。轮询 S3 以延迟获取可用文件,检索每个文件内容,将 csv 行转换为 sql 语句并插入 DB。
问题:
- 来自不同 Mule 服务器的所有流成功轮询 s3、检索文件、处理它们并插入数据库。所以文件和注册表被处理了几次。
愿望清单:
- 在所有活动服务器之间完成负载平衡。
- 安装在不同的mule服务器上的flows是相等的(我们不修改flow来获取不同的文件)
- 其中的文件和注册表不会被处理两次
失败的方法:
- 我们在集群模式下尝试了所有 mule 服务器通用的 processed/non 处理机制。我们使用 Mule 的 3.5 对象存储来保存已处理文件的列表,对所有服务器可见。这里的问题是,我们没有平衡,所有工作负载都在一台服务器上,其余几乎一直处于空闲状态。
问题:
- 我们想要负载平衡,哪种架构设计可能是最好的?
- 也许我们需要一个特定的 mule 应用程序来下载 s3 文件,让这个 应用程序平均分配 Mule 服务器之间的工作负载?
这是场景的架构:
配置您的 S3 存储桶以将事件推送到 SQS 队列(请参阅 here),并让您的 mule 服务器从该队列中拉取事件,而不是轮询 S3。这样,每个事件将仅由一名工作人员拉取。
其工作原理如下:在每个worker中,需要反复调用ReceiveMessage()
获取队列中的下一条消息。一旦一个工作人员收到一条消息,该消息将在一定时间内对其他工作人员不可见(您可以通过 setVisibilityTimeout()
控制)。工作人员处理一条消息后,它应该调用 deleteMessage()
将其从队列中完全删除。如果 worker 发生故障,deleteMessage()
不会被调用,因此在可见性超时期限后,另一个 worker 将接收该消息。
换句话说,SQS 中的队列不处理分配工作。工作人员在准备就绪时从队列中拉取消息,这就是创建负载平衡的原因。