来自 IBM MQ 的 Spark 流数据
Spark stream data from IBM MQ
我想从 IBM MQ 流式传输数据。我已经尝试了 this code 我在 Github.
上找到的
我能够从队列流式传输数据,但每次流式传输时,它都会从中获取所有数据。我只想获取当前推入队列的数据。我查了很多网站,但没有找到正确的解决方案。
在 Kafka 中,我们有类似 KafkaStreamUtils
的东西来流式传输近乎实时的数据。是否有类似 IBM MQ 中的东西,以便它只流式传输最新数据?
您提供的 link 中的示例显示它调用以下方法从 IBM MQ 接收:
CustomMQReciever(String host , int port, String qm, String channel, String qn)
如果您查看 CustomMQReciever here,您会发现它只是浏览队列中的消息。这意味着该消息仍将在队列中,下次连接时您将收到相同的消息:
MQQueueBrowser browser = (MQQueueBrowser) qSession.createBrowser(queue);
如果您想从队列中删除消息,您需要调用一个方法来从队列中使用它们,而不是从队列中浏览它们。以下是 CustomMQReciever.java
的更改示例,应该可以实现您想要的结果:
在 initConnection()
下,将上面的代码更改为以下代码,使其从队列中删除消息:
MQMessageConsumer consumer = (MQMessageConsumer) qSession.createConsumer(queue);
摆脱:
enumeration= browser.getEnumeration();
在 receive()
下更改以下内容:
while (!isStopped() && enumeration.hasMoreElements() )
{
receivedMessage= (JMSMessage) enumeration.nextElement();
String userInput = convertStreamToString(receivedMessage);
//System.out.println("Received data :'" + userInput + "'");
store(userInput);
}
像这样:
while (!isStopped() && (receivedMessage = consumer.receiveNoWait()) != null))
{
String userInput = convertStreamToString(receivedMessage);
//System.out.println("Received data :'" + userInput + "'");
store(userInput);
}
我想从 IBM MQ 流式传输数据。我已经尝试了 this code 我在 Github.
上找到的我能够从队列流式传输数据,但每次流式传输时,它都会从中获取所有数据。我只想获取当前推入队列的数据。我查了很多网站,但没有找到正确的解决方案。
在 Kafka 中,我们有类似 KafkaStreamUtils
的东西来流式传输近乎实时的数据。是否有类似 IBM MQ 中的东西,以便它只流式传输最新数据?
您提供的 link 中的示例显示它调用以下方法从 IBM MQ 接收:
CustomMQReciever(String host , int port, String qm, String channel, String qn)
如果您查看 CustomMQReciever here,您会发现它只是浏览队列中的消息。这意味着该消息仍将在队列中,下次连接时您将收到相同的消息:
MQQueueBrowser browser = (MQQueueBrowser) qSession.createBrowser(queue);
如果您想从队列中删除消息,您需要调用一个方法来从队列中使用它们,而不是从队列中浏览它们。以下是 CustomMQReciever.java
的更改示例,应该可以实现您想要的结果:
在 initConnection()
下,将上面的代码更改为以下代码,使其从队列中删除消息:
MQMessageConsumer consumer = (MQMessageConsumer) qSession.createConsumer(queue);
摆脱:
enumeration= browser.getEnumeration();
在 receive()
下更改以下内容:
while (!isStopped() && enumeration.hasMoreElements() )
{
receivedMessage= (JMSMessage) enumeration.nextElement();
String userInput = convertStreamToString(receivedMessage);
//System.out.println("Received data :'" + userInput + "'");
store(userInput);
}
像这样:
while (!isStopped() && (receivedMessage = consumer.receiveNoWait()) != null))
{
String userInput = convertStreamToString(receivedMessage);
//System.out.println("Received data :'" + userInput + "'");
store(userInput);
}