Apache Camel:发布请求正文时出现问题

Apache Camel: Problem POSTing request body

我有一个服务器和客户端应用程序,它们都使用 Apache Camel 来配置和管理它们的路由。我正在尝试使用 REST 组件从客户端向服务器发出 POST 请求,并在请求正文中发送消息。似乎正在调用 REST 端点,但正文没有通过。我想我要么遗漏了一些步骤,要么我做错了。

另一件要提及的事情是,我使用的是通用消息 class,它基本上可以包含任何对象(包括字节或字符数组)。也许这是我遇到的问题背后的部分(或全部)原因。

GenericMessage.java:

public class GenericMessage<T extends Object> {
  private T payload;

  public GenericMessage(T payload) {
    this.payload = payload;
  }

  public T getPayload() {
    return payload;
  }

  public void setPayload(T payload) {
    this.payload = payload;
  }

  @Override
  public String toString() {
    return "GenericMessage [payload=" + payload + "]";
  }
}

服务器

驼色-context.xml:

  <camelContext id="camel-context" xmlns="http://camel.apache.org/schema/spring">
    <restConfiguration component="jetty" bindingMode="json"
      contextPath="/" host="localhost" port="9100">
      <dataFormatProperty key="prettyPrint" value="true" />
    </restConfiguration>

    <rest>
      <get uri="/data?size={size}" produces="JSON">
        <route id="get-data-by-size">
          <log loggingLevel="INFO" message="start - get-data-by-size" />
          <process ref="get-request-processor" />
          <log loggingLevel="INFO" message="end - get-data-by-size" />
        </route>
      </get>

      <post uri="post-data" consumes="json">
        <route id="post-data">
          <log loggingLevel="INFO" message="start - post-data" />
          <process ref="post-request-processor" />
          <log loggingLevel="INFO" message="end - post-data" />
        </route>
      </post>
    </rest>
  </camelContext>

GetRequestProcessor.java:

@Component("get-request-processor")
public class GetRequestProcessor implements Processor {
  @Autowired
  private DataProvider provider;

  public void process(Exchange exchange) throws Exception {
    int size = exchange.getIn().getHeader("size", Integer.class);
    // TODO currently only sends char[] data
    GenericMessage<?> data = provider.getCharData(size);
    exchange.getMessage().setBody(data, GenericMessage.class);
  }
}

PostRequestProcessor.java:

@Component("post-request-processor")
public class PostRequestProcessor implements Processor {
  private final Logger logger = LoggerFactory.getLogger(PostRequestProcessor.class);

  public void process(Exchange exchange) throws Exception {
    GenericMessage<?> data = exchange.getIn().getBody(GenericMessage.class);

    if (data != null) {
      logger.info("process: object received: {}", data);
    } else {
      logger.warn("process: null object");
    }
  }
}

DataProvider.java:

@Component
public class DataProvider {
  private final Logger logger = LoggerFactory.getLogger(DataProvider.class);

  public GenericMessage<?> getByteData(final int size) {
    logger.info("getByteData: size={}", size);
    return new GenericMessage<byte[]>(DataGenerator.generateByteArray(size));
  }

  public GenericMessage<?> getCharData(final int size) {
    logger.info("getCharData: size={}", size);
    return new GenericMessage<char[]>(DataGenerator.generateCharArray(size));
  }
}

客户

驼色-context.xml:

  <camelContext id="camel-context" xmlns="http://camel.apache.org/schema/spring">
    <restConfiguration component="jetty" bindingMode="json"
      contextPath="/" host="{{http-client.server.host}}"
      port="{{http-client.server.port}}">
      <dataFormatProperty key="prettyPrint" value="true" />
    </restConfiguration>

    <route id="rest-get">
      <from
        uri="timer:{{http-client.timers.http-get.name}}?delay={{http-client.timers.http-get.start-delay}}&amp;fixedRate=true&amp;period={{http-client.timers.http-get.period}}&amp;repeatCount={{http-client.timers.http-get.repeat-count}}" />
      <log loggingLevel="INFO" message="start - rest-get" />
      <to uri="rest:get:{{http-client.endpoints.http-get}}" />
      <process ref="process-get-response" />
      <log loggingLevel="INFO" message="end - rest-get" />
    </route>

    <route id="rest-post">
      <from
        uri="timer:{{http-client.timers.http-post.name}}?delay={{http-client.timers.http-post.start-delay}}&amp;fixedRate=true&amp;period={{http-client.timers.http-post.period}}&amp;repeatCount={{http-client.timers.http-post.repeat-count}}" />
      <log loggingLevel="INFO" message="start - rest-post" />
      <process ref="add-post-body" />
      <to uri="rest:post:{{http-client.endpoints.http-post}}" />
      <log loggingLevel="INFO" message="end - rest-post" />
    </route>
  </camelContext>

AddPostRequestBody.java:

@Component("add-post-body")
public class AddPostRequestBody implements Processor {
  private final Logger logger = LoggerFactory.getLogger(AddPostRequestBody.class);

  @Autowired
  private DataProvider provider;

  @Override
  public void process(Exchange exchange) throws Exception {
    GenericMessage<?> data = null;
    int intValue = RandomUtil.generateInt(1);

    switch (intValue) {
      case 0:
        data = provider.produceByteArrayData();
        break;
      case 1:
        data = provider.produceCharArrayData();
        break;
      default:
        break;
    }

    logger.info("adding POST request body:\n{}", data);

    exchange.getMessage().setHeader(Exchange.HTTP_METHOD, "POST");
    exchange.getMessage().setHeader(Exchange.CONTENT_TYPE, "application/json");
    exchange.getMessage().setBody(data, GenericMessage.class);
  }
}

GetResponseProcessor.java:

@Component("process-get-response")
public class GetResponseProcessor implements Processor {
  private final Logger logger = LoggerFactory.getLogger(GetResponseProcessor.class);

  @Override
  public void process(Exchange exchange) throws Exception {
    GenericMessage<?> body = (GenericMessage<?>) exchange.getIn().getBody();
    logger.info("body: {}", body);
  }
}

DataProvider.java:

@Component
public class DataProvider {
  private final Logger logger = LoggerFactory.getLogger(DataProvider.class);

  public GenericMessage<byte[]> produceByteArrayData() {
    int size = RandomUtil.generateInt(1000, 20000000);
    byte[] bytes = DataGenerator.generateByteArray(size);
    logger.info("produceByteArrayData: generated {} bytes", size);
    GenericMessage<byte[]> data = new GenericMessage<>(bytes);
    return data;
  }

  public GenericMessage<char[]> produceCharArrayData() {
    int size = RandomUtil.generateInt(1000, 20000000);
    char[] chars = DataGenerator.generateCharArray(size);
    logger.info("produceCharArrayData: generated {} characters", size);
    GenericMessage<char[]> data = new GenericMessage<>(chars);
    return data;
  }
}

我已将其配置为执行来自客户端的一个 POST 请求。散布着日志记录语句,以了解发生了什么。

客户端控制台:

2020-04-13 13:23:39.805  INFO 17464 --- [           main] o.a.c.i.e.AbstractCamelContext           : Route: rest-get started and consuming from: timer://http-get
2020-04-13 13:23:39.806  INFO 17464 --- [           main] o.a.c.i.e.AbstractCamelContext           : Route: rest-post started and consuming from: timer://http-post
2020-04-13 13:23:39.812  INFO 17464 --- [           main] o.a.c.i.e.AbstractCamelContext           : Total 2 routes, of which 2 are started
2020-04-13 13:23:39.812  INFO 17464 --- [           main] o.a.c.i.e.AbstractCamelContext           : Apache Camel 3.1.0 (CamelContext: RESTClient) started in 1.679 seconds
2020-04-13 13:23:42.308  INFO 17464 --- [mer://http-post] rest-post                                : start - rest-post
2020-04-13 13:23:42.312  INFO 17464 --- [mer://http-post] e.m.l.m.s.DataProvider                   : produceCharArrayData: generated 1039 characters
2020-04-13 13:23:42.313  INFO 17464 --- [mer://http-post] e.m.l.m.p.AddPostRequestBody             : CLIENT: Adding POST request body:
GenericMessage [payload=[C@2124c204]
2020-04-13 13:23:42.408  INFO 17464 --- [mer://http-post] rest-post                                : end - rest-post

服务器控制台:

2020-04-13 13:23:10.846  INFO 23260 --- [           main] o.a.c.i.e.AbstractCamelContext           : Route: get-data-by-size started and consuming from: jetty:http://localhost:9100/data
2020-04-13 13:23:10.847  INFO 23260 --- [           main] o.a.c.i.e.AbstractCamelContext           : Route: post-data started and consuming from: jetty:http://localhost:9100/post-data
2020-04-13 13:23:10.852  INFO 23260 --- [           main] o.a.c.i.e.AbstractCamelContext           : Total 2 routes, of which 2 are started
2020-04-13 13:23:10.853  INFO 23260 --- [           main] o.a.c.i.e.AbstractCamelContext           : Apache Camel 3.1.0 (CamelContext: RESTServer) started in 0.314 seconds
2020-04-13 13:23:42.395  INFO 23260 --- [tp1276761134-37] post-data                                : start - post-data
2020-04-13 13:23:42.396  INFO 23260 --- [tp1276761134-37] e.m.l.m.p.PostRequestProcessor           : SERVER: Processing POST request
2020-04-13 13:23:42.400  WARN 23260 --- [tp1276761134-37] e.m.l.m.p.PostRequestProcessor           : null object
2020-04-13 13:23:42.400  INFO 23260 --- [tp1276761134-37] post-data                                : end - post-data

Camel 不认识你的 GenericMessage class,所以它不知道如何转换 to/from 它。

Camel 使用可插入的 type converters 来告诉它如何在不同类型的对象之间转换消息(正文)。

内置类型转换器涵盖了很多常见情况(例如,字符串到整数、迭代器到 ArrayList 等),但它们不知道如何与任何自定义类型相互转换 classes你写。

这就是为什么服务器处理器中的这一行 returns null:

GenericMessage<?> data = exchange.getIn().getBody(GenericMessage.class);

因为当时 Body 是 Stream,但 Camel 不知道如何将其转换为您要求的 GenericMessage

(如果您希望抛出异常,则 getBody(Class<T>) is to return null if it can't convert. Try using getMandatoryBody(Class<T>) 的默认行为。)

在客户端:如何获得 GenericMessage 的响应?您需要使用 unmarshalling。您首先将 JSON 解组为 Java 对象。正是在这一点上,您需要向 Camel 提示要解组到哪个 class。

<dataFormats>
    <json id="myGeneric" .... unmarshalTypeName="....GenericMessage"/>
</dataFormats>

...
<unmarshal><custom ref="myGeneric"/></unmarshal>

然后 Body 将是您的 GenericMessage 类型。

请注意,在 Camel 中,类型转换和 marshalling/unmarshalling 是两个不同的东西。

在服务器端: Camel 将尝试将来自 JSON 的请求解组为简单的 Java 对象(例如 HashMap 等)自动地。要告诉它解组为特定的 POJO,请在 REST 动词定义上设置 type,例如:

<post uri="post-data" consumes="json" type="com.example.demo.GenericMessage">

您也可以使用相同的方法来定义 outType(用于将响应编组回 JSON)