如何使用 scala 向 amazon sqs 队列正确发送消息?
How to properly send a message to amazon sqs queue with scala?
我看过这个 common-aws on github 如何使用它,这是他们的示例(仅适用于发件人,因为这是我需要的):
import com.amazonaws.services.sqs.AmazonSQSAsyncClient
import com.pellucid.wrap.sqs.AmazonSQSScalaClient
import com.mfglabs.commons.aws.sqs._
val sqs = new AmazonSQSScalaClient(new AmazonSQSAsyncClient(), ec)
val builder = SQSStreamBuilder(sqs)
val sender: Flow[String, SendMessageResult, Unit] =
Flow[String].map { body =>
val req = new SendMessageRequest()
req.setMessageBody(body)
req.setQueueUrl(queueUrl)
req
}
.via(builder.sendMessageAsStream())
但是我遇到了一些错误,我不太理解这个例子,我需要的是创建一个函数,它接受一个案例 类 的列表,将每个列表序列化为 json并将其发送到 sqs 队列...就是这样,所以这就是我到目前为止所尝试的:
val queueUrl = "the url to my queue"
//the objects here are of case class type ObjectUploadRequest
val listOfObjects = List(Obj1, Obj2, Obj3, Obj4, Obj5)
def pushListToSQS(listOfObjectsRequests: List[ObjectUploadRequest]): Future[SendMessageRequest] = {
listOfObjectsRequests.map(objReq => {
val ser = swrite(objReq)
val sender: Flow[String, SendMessageResult, Unit] =
Flow[String].map { body =>
val req = new SendMessageRequest()
req.setMessageBody(body)
req.setQueueUrl(queueUrl)
req
}.via(builder.sendMessageAsStream())
})
}
我收到此错误:
如果有人能提供帮助,将不胜感激,谢谢
如果您不介意使用旧的 AWS Java SDK 及其同步 SQS 客户端,那么这对我有用:
import com.amazonaws.services.sqs.AmazonSQSClient
import com.amazonaws.services.sqs.model.SendMessageRequest
val sqs = new AmazonSQSClient()
listOfObjects.foreach { obj =>
val json = // convert obj to json
sqs.sendMessage(new SendMessageRequest()
.withQueueUrl("your queue url")
.withMessageBody(json))
}
我看过这个 common-aws on github 如何使用它,这是他们的示例(仅适用于发件人,因为这是我需要的):
import com.amazonaws.services.sqs.AmazonSQSAsyncClient
import com.pellucid.wrap.sqs.AmazonSQSScalaClient
import com.mfglabs.commons.aws.sqs._
val sqs = new AmazonSQSScalaClient(new AmazonSQSAsyncClient(), ec)
val builder = SQSStreamBuilder(sqs)
val sender: Flow[String, SendMessageResult, Unit] =
Flow[String].map { body =>
val req = new SendMessageRequest()
req.setMessageBody(body)
req.setQueueUrl(queueUrl)
req
}
.via(builder.sendMessageAsStream())
但是我遇到了一些错误,我不太理解这个例子,我需要的是创建一个函数,它接受一个案例 类 的列表,将每个列表序列化为 json并将其发送到 sqs 队列...就是这样,所以这就是我到目前为止所尝试的:
val queueUrl = "the url to my queue"
//the objects here are of case class type ObjectUploadRequest
val listOfObjects = List(Obj1, Obj2, Obj3, Obj4, Obj5)
def pushListToSQS(listOfObjectsRequests: List[ObjectUploadRequest]): Future[SendMessageRequest] = {
listOfObjectsRequests.map(objReq => {
val ser = swrite(objReq)
val sender: Flow[String, SendMessageResult, Unit] =
Flow[String].map { body =>
val req = new SendMessageRequest()
req.setMessageBody(body)
req.setQueueUrl(queueUrl)
req
}.via(builder.sendMessageAsStream())
})
}
我收到此错误:
如果有人能提供帮助,将不胜感激,谢谢
如果您不介意使用旧的 AWS Java SDK 及其同步 SQS 客户端,那么这对我有用:
import com.amazonaws.services.sqs.AmazonSQSClient
import com.amazonaws.services.sqs.model.SendMessageRequest
val sqs = new AmazonSQSClient()
listOfObjects.foreach { obj =>
val json = // convert obj to json
sqs.sendMessage(new SendMessageRequest()
.withQueueUrl("your queue url")
.withMessageBody(json))
}