TCP send/retrieve headers 按字节

TCP send/retrieve headers by bytes

我有客户端配置:

<beans:bean id="itemSerializerDeserializer"
    class="org.mbracero.integration.ItemSerializerDeserializer" />

<beans:bean id="resultSerializerDeserializer"
    class="org.mbracero.integration.ResultSerializerDeserializer" />

<int-ip:tcp-connection-factory id="clientRequestData"
    type="client" host="${clientRequestData.host}" port="${clientRequestData.port}" single-use="true"
    serializer="itemSerializerDeserializer" deserializer="resultSerializerDeserializer" />

<int-ip:tcp-outbound-gateway id="requestDataOutGateway"
    request-channel="requestData" connection-factory="clientRequestData"
    request-timeout="10000" reply-timeout="10000" remote-timeout="10000" />

简单网关:

public interface SimpleGateway {
     @Gateway(requestChannel="requestData")
     Result sendData(Item item);
}

以及服务器配置:

<int:channel id="channelServerRequestData" />
<int:channel id="channelServerResponseData" />

<beans:bean id="requestService" class="org.mbracero.integration.RequestService" />

<beans:bean id="itemSerializerDeserializer"
    class="org.mbracero.integration.ItemSerializerDeserializer" />

<beans:bean id="resultSerializerDeserializer"
    class="org.mbracero.integration.ResultSerializerDeserializer" />

<int-ip:tcp-connection-factory id="requestDataServer"
    type="server" port="${requestDataServer.port}" single-use="true" deserializer="itemSerializerDeserializer"
    serializer="resultSerializerDeserializer" />

<int-ip:tcp-inbound-gateway id="TCPInboundGateway"
    connection-factory="requestDataServer" request-channel="channelServerRequestData"
    reply-channel="channelServerResponseData" error-channel="errorChannel" />

请求服务:

@Service
public class RequestService {
@ServiceActivator(inputChannel="channelServerRequestData", outputChannel="channelServerResponseData")
public Result requestData(Item input) {
    System.out.println("Input :::: " + input);
    Result ret = new Result("AAA", "DDDD");
    System.out.println("Ret :::: " + ret);
    return ret;
  }
}

ItemSerializerDeserializer:

public class ItemSerializerDeserializer implements Serializer<Item>, Deserializer<Item> {
(...)
}

ResultSerializerDeserializer:

public class ResultSerializerDeserializer implements Serializer<Result>, Deserializer<Result> {
(...)
}

现在我必须添加一些 headers(发送和检索),但我需要像上面写的那样自定义 serializers/deserializers。

比如我需要发送下一个headers:

我已经按字节和位置发送和检索这些 headers(不像 Map 那样按 name/value)。

根据我的习惯 serializers/deserializers,我可以使用有效载荷,但我不知道如何使用 headers。

我已经阅读了一些关于属性映射器的内容,但我不知道我的方法是否正确。

在客户端配置中类似于:

(...)
<int-ip:tcp-connection-factory id="clientRequestData"
    type="client" host="${clientRequestData.host}" port="${clientRequestData.port}" single-use="true"
    serializer="itemSerializerDeserializer" deserializer="resultSerializerDeserializer"
    mapper="mapper" />

<beans:bean id="mapper"
      class="org.springframework.integration.ip.tcp.connection.MessageConvertingTcpMessageMapper">
    <beans:constructor-arg name="messageConverter">
        <beans:bean class="??????????"/>
    </beans:constructor-arg>
</beans:bean>
(...)

有什么帮助吗?

提前致谢。

使用 MessageConvertingTcpMessageMapperMapMessageConverter

一般的想法是,出站转换器将 Message 转换为包含所有 headerspayloadMap,序列化程序将映射序列化为一个byte[]。您可以告诉 MapMessageConverter 您希望将哪个 headers 包含在地图中,以便您可以在序列化程序中访问它们。

在入站端,反序列化器从 byte[] 创建一个 Map,然后 MessageConverter.toMessage() 将映射转换为 Message

请参阅 MapJsonSerializer 示例。

另见 this test case

编辑

这是一个简单的实现(没有对输出进行错误检查或长度检查)...

private volatile Deserializer<byte[]> packetDeserializer = new ByteArrayLfSerializer();

private volatile Serializer<byte[]> packetSerializer = new ByteArrayLfSerializer();


@Override
public Map<?, ?> deserialize(InputStream inputStream) throws IOException {
    byte[] bytes = readToEndOfMessage(inputStream);
    String asString = new String(bytes, "UTF-8");
    Map<String, String> headers = new HashMap<String, String>();
    headers.put("issuer", asString.substring(0, 5));
    headers.put("client", asString.substring(4, 9));
    headers.put("product", asString.substring(9, 11));
    headers.put("type", asString.substring(11, 14));
    Map<String, Object> map = new HashMap<String, Object>();
    map.put("headers", headers);
    map.put("payload", createPayloadFromRemainingBytes(bytes));
    return map;
}

@Override
public void serialize(Map<?, ?> object, OutputStream outputStream) throws IOException {
    Map<String, String> headers = (Map<String, String>) object.get("headers");
    outputStream.write(headers.get("issuer").getBytes("UTF-8"));
    outputStream.write(headers.get("client").getBytes("UTF-8"));
    outputStream.write(headers.get("product").getBytes("UTF-8"));
    outputStream.write(headers.get("type").getBytes("UTF-8"));
    outputStream.write(convertPayloadToBytes(object.get("payload")));
}

MapMessageConverter一起使用,Spring集成headers将与数据一起传送到另一方,他将解码数据并创建入站消息那些 headers.