Apache Camel 创建消费者组件
Apache Camel creating Consumer component
我是 Apache Camel 的新手。在 hp nonstop 中有一个接收器接收事件管理器生成的事件,假设像流一样。我的目标是设置一个接收传入消息并通过 Camel 处理它的消费者端点。
另一个终点我只需要将它写在日志中。从我的研究中我了解到,对于消费者端点,我需要创建自己的组件和配置就像
from("myComp:receive").to("log:net.javaforge.blog.camel?level=INFO")
这是我的代码片段,它从事件系统接收消息。
Receive receive = com.tandem.ext.guardian.Receive.getInstance();
byte[] maxMsg = new byte[500]; // holds largest possible request
short errorReturn = 0;
do { // read messages from $receive until last close
try {
countRead = receive.read(maxMsg, maxMsg.length);
String receivedMessage=new String(maxMsg, "UTF-8");
//Here I need to handover receivedMessage to camel
} catch (ReceiveNoOpeners ex) {
moreOpeners = false;
} catch(Exception e) {
moreOpeners = false;
}
} while (moreOpeners);
有人可以提供一些提示作为消费者如何实现这一点。
10'000 英尺的景色是这样的:
您需要从实现一个组件开始。最简单的入门方法是扩展 org.apache.camel.impl.DefaultComponent
。您唯一需要做的就是覆盖 DefaultComponent::createEndpoint(..)
。很明显,它的作用是创建您的端点。
因此,接下来您需要实现端点。为此扩展 org.apache.camel.impl.DefaultEndpoint
。至少重写 DefaultEndpoint::createConsumer(Processor)
以创建您自己的使用者。
最后但同样重要的是,您需要实施消费者。同样,最好扩展 org.apache.camel.impl.DefaultConsumer
。消费者是您的代码必须去的地方,它会生成您的消息。通过构造函数,您会收到对端点的引用。使用端点引用创建一个新的 Exchange,填充它并沿着路由发送它。类似于
Exchange ex = endpoint.createExchange(ExchangePattern.InOnly);
setMyMessageHeaders(ex.getIn(), myMessagemetaData);
setMyMessageBody(ex.getIn(), myMessage);
getAsyncProcessor().process(ex, new AsyncCallback() {
@Override
public void done(boolean doneSync) {
LOG.debug("Mssage was processed " + (doneSync ? "synchronously" : "asynchronously"));
}
});
我建议您选择一个简单的组件 (DirectComponent
?) 作为示例。
特此添加我自己的消费者组件可能会对某些人有所帮助。
public class MessageConsumer extends DefaultConsumer {
private final MessageEndpoint endpoint;
private boolean moreOpeners = true;
public MessageConsumer(MessageEndpoint endpoint, Processor processor) {
super(endpoint, processor);
this.endpoint = endpoint;
}
@Override
protected void doStart() throws Exception {
int countRead=0; // number of bytes read
do {
countRead++;
String msg = String.valueOf(countRead)+" "+System.currentTimeMillis();
Exchange ex = endpoint.createExchange(ExchangePattern.InOnly);
ex.getIn().setBody(msg);
getAsyncProcessor().process(ex, new AsyncCallback() {
@Override
public void done(boolean doneSync) {
log.info("Mssage was processed " + (doneSync ? "synchronously" : "asynchronously"));
}
});
// This is an echo server so echo request back to requester
} while (moreOpeners);
}
@Override
protected void doStop() throws Exception {
moreOpeners = false;
log.debug("Message processor is shutdown");
}
}
我是 Apache Camel 的新手。在 hp nonstop 中有一个接收器接收事件管理器生成的事件,假设像流一样。我的目标是设置一个接收传入消息并通过 Camel 处理它的消费者端点。
另一个终点我只需要将它写在日志中。从我的研究中我了解到,对于消费者端点,我需要创建自己的组件和配置就像
from("myComp:receive").to("log:net.javaforge.blog.camel?level=INFO")
这是我的代码片段,它从事件系统接收消息。
Receive receive = com.tandem.ext.guardian.Receive.getInstance();
byte[] maxMsg = new byte[500]; // holds largest possible request
short errorReturn = 0;
do { // read messages from $receive until last close
try {
countRead = receive.read(maxMsg, maxMsg.length);
String receivedMessage=new String(maxMsg, "UTF-8");
//Here I need to handover receivedMessage to camel
} catch (ReceiveNoOpeners ex) {
moreOpeners = false;
} catch(Exception e) {
moreOpeners = false;
}
} while (moreOpeners);
有人可以提供一些提示作为消费者如何实现这一点。
10'000 英尺的景色是这样的:
您需要从实现一个组件开始。最简单的入门方法是扩展 org.apache.camel.impl.DefaultComponent
。您唯一需要做的就是覆盖 DefaultComponent::createEndpoint(..)
。很明显,它的作用是创建您的端点。
因此,接下来您需要实现端点。为此扩展 org.apache.camel.impl.DefaultEndpoint
。至少重写 DefaultEndpoint::createConsumer(Processor)
以创建您自己的使用者。
最后但同样重要的是,您需要实施消费者。同样,最好扩展 org.apache.camel.impl.DefaultConsumer
。消费者是您的代码必须去的地方,它会生成您的消息。通过构造函数,您会收到对端点的引用。使用端点引用创建一个新的 Exchange,填充它并沿着路由发送它。类似于
Exchange ex = endpoint.createExchange(ExchangePattern.InOnly);
setMyMessageHeaders(ex.getIn(), myMessagemetaData);
setMyMessageBody(ex.getIn(), myMessage);
getAsyncProcessor().process(ex, new AsyncCallback() {
@Override
public void done(boolean doneSync) {
LOG.debug("Mssage was processed " + (doneSync ? "synchronously" : "asynchronously"));
}
});
我建议您选择一个简单的组件 (DirectComponent
?) 作为示例。
特此添加我自己的消费者组件可能会对某些人有所帮助。
public class MessageConsumer extends DefaultConsumer {
private final MessageEndpoint endpoint;
private boolean moreOpeners = true;
public MessageConsumer(MessageEndpoint endpoint, Processor processor) {
super(endpoint, processor);
this.endpoint = endpoint;
}
@Override
protected void doStart() throws Exception {
int countRead=0; // number of bytes read
do {
countRead++;
String msg = String.valueOf(countRead)+" "+System.currentTimeMillis();
Exchange ex = endpoint.createExchange(ExchangePattern.InOnly);
ex.getIn().setBody(msg);
getAsyncProcessor().process(ex, new AsyncCallback() {
@Override
public void done(boolean doneSync) {
log.info("Mssage was processed " + (doneSync ? "synchronously" : "asynchronously"));
}
});
// This is an echo server so echo request back to requester
} while (moreOpeners);
}
@Override
protected void doStop() throws Exception {
moreOpeners = false;
log.debug("Message processor is shutdown");
}
}