java - 消息在通过 camel 处理后无法从 SQS 中删除
java - Unable to delete the message from the SQS after it gets processed through the camel
处理完消息后,我无法从 SQS 中删除消息...我尝试了 Whosebug 上介绍的几种方法,但 none 对我有用...
这是我要发送的 SQS 部分
@Override
public void sendTransaction(TransactionModel transactionToSend, String preText) throws EMException {
try {
Long companyID = transactionToSend.getCompanyID();
validateRequest(companyID, transactionToSend.getCostCenterID());
VendorTransaction vtr = ediManager.convertTransactionToVendorTransaction(transactionToSend);
EDIOrderRequest ediOrderRequest = ediManager.createEDIOrderRequest(transactionToSend);
String messageBody = getSQSMessageBody(vtr);
SendMessageRequest sendMessageRequest = new SendMessageRequest(getQueueName(), messageBody);
setMessageAttributes(sendMessageRequest, transactionToSend, ediOrderRequest);
AmazonSQSAsync client = ServerConnector.getServerBean(AWSSQSService.ILocal.class).getSQSClient();
SendMessageResult result = client.sendMessage(sendMessageRequest);
if (result == null) {
throw new EDIException("An error occurred while sending the message.");
}
log.info("Transaction: " + transactionToSend.getId() + " for a company: " + companyID + " sent. Message id is: " + result.getMessageId());
} catch (Exception e) {
e.printStackTrace();
log.error(String.format("Error sending transaction: %s", e.getMessage()));
throw new EMException(-1, String.format("Error sending transaction: %s", e.getMessage()), e);
}
}
private void setMessageAttributes(SendMessageRequest sendMessageRequest, TransactionModel transactionToSend, EDIOrderRequest ediOrderRequest) {
addMessageAttributesEntry(sendMessageRequest, "companyId", transactionToSend.getCompanyID());
addMessageAttributesEntry(sendMessageRequest, "standard", ediConfig.getMapOfParameters().get("EDI_STD"));
addMessageAttributesEntry(sendMessageRequest, "endpoint", ediConfig.getTargetAddress());
addMessageAttributesEntry(sendMessageRequest, "prefix", ediConfig.getMapOfParameters().get("FILE_PREFIX"));
addMessageAttributesEntry(sendMessageRequest, "requestId", ediOrderRequest != null ? ediOrderRequest.getId() : null);
}
这是路线本身
@Override
public void configure() {
// on exception deal with EDIOrder stuff
onException(Exception.class).handled(true).process(failedOrdersProcessor).end();
String destination = from != null ? from : (sqsQueue);
destination += "&attributeNames=All&messageAttributeNames=All";
log.info(destination);
from(destination)
.to("direct:ediOrder");
// order processing
from("direct:ediOrder")
.choice()
.when(body().isInstanceOf(String.class))
.process(exchange -> {
//filter out duplicates
String body = "undefined";
try {
body = exchange.getIn().getBody().toString();
VendorTransaction transaction = objectMapper.readValue(body, VendorTransaction.class);
if (!IDEMPOTENT_ORDERS_REPOSITORY.containsKey(transaction.getDeduplicatableIdentifier())) {
IDEMPOTENT_ORDERS_REPOSITORY.put(transaction.getDeduplicatableIdentifier(), transaction);
} else {
exchange.getIn().setHeader(Exchange.DUPLICATE_MESSAGE, Boolean.TRUE);
if (transaction.getDeduplicatableIdentifier() != null) {
exchange.getIn().setHeader(ConfigKey.DEDUPLICATE_IDENTIFIER.getKey(), transaction.getDeduplicatableIdentifier());
}
}
} catch (IOException e) {
log.error("Unable to convert the body to VendorTransaction, body:\n" + body + "\n" + e.getMessage());
}
})
.end()
.filter(header(Exchange.DUPLICATE_MESSAGE).isEqualTo(Boolean.TRUE))
.bean(DuplicateOrderProcessor.class)
.stop()
.end()
.bean(SQSOrderQueueBean.class)
.choice()
.when(body().isInstanceOf(EDIOrder.class))
.recipientList(simple("${body.endpoint}"))
.log("Uploaded ${body.genericFile.fileName} to ${body.endpointNoPassword}")
.bean(OrderCleaner.class)
.endChoice()
.when(body().isInstanceOf(VendorTransactionContainer.class))
.log("Send to direct:confirmation")
.to(RouteConstants.DIRECT_CONFIRMATION)
.endChoice()
.end();
文件已处理并发送到服务器,但处理后我无法删除邮件。
这是我得到的堆栈跟踪:
WARN o.a.c.component.aws.sqs.SqsConsumer.log - Error occurred during deleting message. This exception is ignored.. Exchange[ID-AD-MacBook-Pro-local-1353421234311-0-1]. Caused by: [com.amazonaws.services.sqs.model.AmazonSQSException - The request must contain the parameter MessageHandle. (Service: AmazonSQS; Status Code: 400; Error Code: MissingParameter; Request ID: e3252adsaf-1242-2244-a5f6-1b51e2e47e9c)]com.amazonaws.services.sqs.model.AmazonSQSException: The request must contain the parameter MessageHandle. (Service: AmazonSQS; Status Code: 400; Error Code: MissingParameter; Request ID: e3252adsaf-1242-2244-a5f6-1b51e2e47e9c)
如果有人认为这可能是一个重复的问题,在 Whosebug 上很少有与这个问题类似的答案,但是 none 帮助我解决了这个问题...
我通过将 headers 传递给 exchange.getOut() 消息解决了这个问题,就像我们这样设置 body 时一样
exchange.getOut().setBody(body);
我们正在创建消息的新实例,因此来自 exchange.getIn() 的 headers 和来自 getIn() 的所有其他内容将不会出现在 exchange.getOut() 中留言。
处理完消息后,我无法从 SQS 中删除消息...我尝试了 Whosebug 上介绍的几种方法,但 none 对我有用...
这是我要发送的 SQS 部分
@Override
public void sendTransaction(TransactionModel transactionToSend, String preText) throws EMException {
try {
Long companyID = transactionToSend.getCompanyID();
validateRequest(companyID, transactionToSend.getCostCenterID());
VendorTransaction vtr = ediManager.convertTransactionToVendorTransaction(transactionToSend);
EDIOrderRequest ediOrderRequest = ediManager.createEDIOrderRequest(transactionToSend);
String messageBody = getSQSMessageBody(vtr);
SendMessageRequest sendMessageRequest = new SendMessageRequest(getQueueName(), messageBody);
setMessageAttributes(sendMessageRequest, transactionToSend, ediOrderRequest);
AmazonSQSAsync client = ServerConnector.getServerBean(AWSSQSService.ILocal.class).getSQSClient();
SendMessageResult result = client.sendMessage(sendMessageRequest);
if (result == null) {
throw new EDIException("An error occurred while sending the message.");
}
log.info("Transaction: " + transactionToSend.getId() + " for a company: " + companyID + " sent. Message id is: " + result.getMessageId());
} catch (Exception e) {
e.printStackTrace();
log.error(String.format("Error sending transaction: %s", e.getMessage()));
throw new EMException(-1, String.format("Error sending transaction: %s", e.getMessage()), e);
}
}
private void setMessageAttributes(SendMessageRequest sendMessageRequest, TransactionModel transactionToSend, EDIOrderRequest ediOrderRequest) {
addMessageAttributesEntry(sendMessageRequest, "companyId", transactionToSend.getCompanyID());
addMessageAttributesEntry(sendMessageRequest, "standard", ediConfig.getMapOfParameters().get("EDI_STD"));
addMessageAttributesEntry(sendMessageRequest, "endpoint", ediConfig.getTargetAddress());
addMessageAttributesEntry(sendMessageRequest, "prefix", ediConfig.getMapOfParameters().get("FILE_PREFIX"));
addMessageAttributesEntry(sendMessageRequest, "requestId", ediOrderRequest != null ? ediOrderRequest.getId() : null);
}
这是路线本身
@Override
public void configure() {
// on exception deal with EDIOrder stuff
onException(Exception.class).handled(true).process(failedOrdersProcessor).end();
String destination = from != null ? from : (sqsQueue);
destination += "&attributeNames=All&messageAttributeNames=All";
log.info(destination);
from(destination)
.to("direct:ediOrder");
// order processing
from("direct:ediOrder")
.choice()
.when(body().isInstanceOf(String.class))
.process(exchange -> {
//filter out duplicates
String body = "undefined";
try {
body = exchange.getIn().getBody().toString();
VendorTransaction transaction = objectMapper.readValue(body, VendorTransaction.class);
if (!IDEMPOTENT_ORDERS_REPOSITORY.containsKey(transaction.getDeduplicatableIdentifier())) {
IDEMPOTENT_ORDERS_REPOSITORY.put(transaction.getDeduplicatableIdentifier(), transaction);
} else {
exchange.getIn().setHeader(Exchange.DUPLICATE_MESSAGE, Boolean.TRUE);
if (transaction.getDeduplicatableIdentifier() != null) {
exchange.getIn().setHeader(ConfigKey.DEDUPLICATE_IDENTIFIER.getKey(), transaction.getDeduplicatableIdentifier());
}
}
} catch (IOException e) {
log.error("Unable to convert the body to VendorTransaction, body:\n" + body + "\n" + e.getMessage());
}
})
.end()
.filter(header(Exchange.DUPLICATE_MESSAGE).isEqualTo(Boolean.TRUE))
.bean(DuplicateOrderProcessor.class)
.stop()
.end()
.bean(SQSOrderQueueBean.class)
.choice()
.when(body().isInstanceOf(EDIOrder.class))
.recipientList(simple("${body.endpoint}"))
.log("Uploaded ${body.genericFile.fileName} to ${body.endpointNoPassword}")
.bean(OrderCleaner.class)
.endChoice()
.when(body().isInstanceOf(VendorTransactionContainer.class))
.log("Send to direct:confirmation")
.to(RouteConstants.DIRECT_CONFIRMATION)
.endChoice()
.end();
文件已处理并发送到服务器,但处理后我无法删除邮件。
这是我得到的堆栈跟踪:
WARN o.a.c.component.aws.sqs.SqsConsumer.log - Error occurred during deleting message. This exception is ignored.. Exchange[ID-AD-MacBook-Pro-local-1353421234311-0-1]. Caused by: [com.amazonaws.services.sqs.model.AmazonSQSException - The request must contain the parameter MessageHandle. (Service: AmazonSQS; Status Code: 400; Error Code: MissingParameter; Request ID: e3252adsaf-1242-2244-a5f6-1b51e2e47e9c)]com.amazonaws.services.sqs.model.AmazonSQSException: The request must contain the parameter MessageHandle. (Service: AmazonSQS; Status Code: 400; Error Code: MissingParameter; Request ID: e3252adsaf-1242-2244-a5f6-1b51e2e47e9c)
如果有人认为这可能是一个重复的问题,在 Whosebug 上很少有与这个问题类似的答案,但是 none 帮助我解决了这个问题...
我通过将 headers 传递给 exchange.getOut() 消息解决了这个问题,就像我们这样设置 body 时一样
exchange.getOut().setBody(body);
我们正在创建消息的新实例,因此来自 exchange.getIn() 的 headers 和来自 getIn() 的所有其他内容将不会出现在 exchange.getOut() 中留言。