google pub sub 在不同时间获取消息
google pub sub fetches messages in different times
我有一个 java 客户端 google-pubsub api。
我同时向pubsub发送了4条消息。
我的订阅者每小时触发一次并在 pubsub 上循环直到它为空。
尽管如此,我看到消息是在不同的时间提取的,即使它们都是在同一时间发送的,并且在下一个自动订阅者同步提取之前有很长的延迟。
// A hotfix as cofman is too slow for ack
while (pubSubIsFull) {
// Fetch messages by order
log("before fetched messages by order");
List<ReceivedMessage> messages = mySyncSubscriber.fetch(100000, true);
messages = messages.stream().sorted(Comparator.comparingLong(message2 -> message2.getMessage().getPublishTime().getSeconds())).collect(Collectors.toList());
log("fetched " + messages.size() + " messages by order");
pubSubIsFull = messages.size() > 0;
log("before processMessages");
List<String> ackIds = processMessages(messages);
log("after processMessages");
mySyncSubscriber.sendAck(ackIds);
}
import com.google.pubsub.v1.PullRequest.Builder;
public List<ReceivedMessage> fetch(int maxMessages, boolean returnImmediately) {
String subscriptionName = this.getSubscriptionName(this.getSubscriptionId()).toString();
Builder pullRequestBuilder = PullRequest.newBuilder().setSubscription(subscriptionName).setReturnImmediately(returnImmediately);
if (maxMessages != 0) {
pullRequestBuilder.setMaxMessages(maxMessages);
}
PullRequest pullRequest = pullRequestBuilder.build();
PullResponse pullResponse = (PullResponse)this.subscriber.pullCallable().call(pullRequest);
return pullResponse.getReceivedMessagesList();
}
这是 pub sub 中的错误吗?
我可以做些什么来改变它吗?
Cloud Pub/Sub 不提供订购保证。如果您发布多条消息——即使相隔一段时间——然后稍后获取所有消息,则无法保证订阅者首先收到之前发布的消息。
此外,在单个响应中收到 0 条消息并不是确定没有可用消息的好方法。有时即使有消息可用,响应也将包含 0 条消息,特别是如果 returnImmediately
设置为 true(如 ). At the very least, you'd want to make sure that several requests over a period of time all return 0 messages before deciding that there are no message available. Even better would be to query the Stackdriver metric for subscription/num_undelivered_messages
中所述,这表示您的订阅尚未确认的消息数。
我有一个 java 客户端 google-pubsub api。
我同时向pubsub发送了4条消息。
我的订阅者每小时触发一次并在 pubsub 上循环直到它为空。
尽管如此,我看到消息是在不同的时间提取的,即使它们都是在同一时间发送的,并且在下一个自动订阅者同步提取之前有很长的延迟。
// A hotfix as cofman is too slow for ack
while (pubSubIsFull) {
// Fetch messages by order
log("before fetched messages by order");
List<ReceivedMessage> messages = mySyncSubscriber.fetch(100000, true);
messages = messages.stream().sorted(Comparator.comparingLong(message2 -> message2.getMessage().getPublishTime().getSeconds())).collect(Collectors.toList());
log("fetched " + messages.size() + " messages by order");
pubSubIsFull = messages.size() > 0;
log("before processMessages");
List<String> ackIds = processMessages(messages);
log("after processMessages");
mySyncSubscriber.sendAck(ackIds);
}
import com.google.pubsub.v1.PullRequest.Builder;
public List<ReceivedMessage> fetch(int maxMessages, boolean returnImmediately) {
String subscriptionName = this.getSubscriptionName(this.getSubscriptionId()).toString();
Builder pullRequestBuilder = PullRequest.newBuilder().setSubscription(subscriptionName).setReturnImmediately(returnImmediately);
if (maxMessages != 0) {
pullRequestBuilder.setMaxMessages(maxMessages);
}
PullRequest pullRequest = pullRequestBuilder.build();
PullResponse pullResponse = (PullResponse)this.subscriber.pullCallable().call(pullRequest);
return pullResponse.getReceivedMessagesList();
}
这是 pub sub 中的错误吗?
我可以做些什么来改变它吗?
Cloud Pub/Sub 不提供订购保证。如果您发布多条消息——即使相隔一段时间——然后稍后获取所有消息,则无法保证订阅者首先收到之前发布的消息。
此外,在单个响应中收到 0 条消息并不是确定没有可用消息的好方法。有时即使有消息可用,响应也将包含 0 条消息,特别是如果 returnImmediately
设置为 true(如 subscription/num_undelivered_messages
中所述,这表示您的订阅尚未确认的消息数。