TCP send/retrieve headers 按字节
TCP send/retrieve headers by bytes
我有客户端配置:
<beans:bean id="itemSerializerDeserializer"
class="org.mbracero.integration.ItemSerializerDeserializer" />
<beans:bean id="resultSerializerDeserializer"
class="org.mbracero.integration.ResultSerializerDeserializer" />
<int-ip:tcp-connection-factory id="clientRequestData"
type="client" host="${clientRequestData.host}" port="${clientRequestData.port}" single-use="true"
serializer="itemSerializerDeserializer" deserializer="resultSerializerDeserializer" />
<int-ip:tcp-outbound-gateway id="requestDataOutGateway"
request-channel="requestData" connection-factory="clientRequestData"
request-timeout="10000" reply-timeout="10000" remote-timeout="10000" />
简单网关:
public interface SimpleGateway {
@Gateway(requestChannel="requestData")
Result sendData(Item item);
}
以及服务器配置:
<int:channel id="channelServerRequestData" />
<int:channel id="channelServerResponseData" />
<beans:bean id="requestService" class="org.mbracero.integration.RequestService" />
<beans:bean id="itemSerializerDeserializer"
class="org.mbracero.integration.ItemSerializerDeserializer" />
<beans:bean id="resultSerializerDeserializer"
class="org.mbracero.integration.ResultSerializerDeserializer" />
<int-ip:tcp-connection-factory id="requestDataServer"
type="server" port="${requestDataServer.port}" single-use="true" deserializer="itemSerializerDeserializer"
serializer="resultSerializerDeserializer" />
<int-ip:tcp-inbound-gateway id="TCPInboundGateway"
connection-factory="requestDataServer" request-channel="channelServerRequestData"
reply-channel="channelServerResponseData" error-channel="errorChannel" />
请求服务:
@Service
public class RequestService {
@ServiceActivator(inputChannel="channelServerRequestData", outputChannel="channelServerResponseData")
public Result requestData(Item input) {
System.out.println("Input :::: " + input);
Result ret = new Result("AAA", "DDDD");
System.out.println("Ret :::: " + ret);
return ret;
}
}
ItemSerializerDeserializer:
public class ItemSerializerDeserializer implements Serializer<Item>, Deserializer<Item> {
(...)
}
ResultSerializerDeserializer:
public class ResultSerializerDeserializer implements Serializer<Result>, Deserializer<Result> {
(...)
}
现在我必须添加一些 headers(发送和检索),但我需要像上面写的那样自定义 serializers/deserializers。
比如我需要发送下一个headers:
- 颁发者:4 个字节。初始位置 1 - 最终位置 4
- 客户端:4字节。初始位置 5 - 最终位置 8
- 产品:2 个字节。初始位置 9 - 最终位置 10
- 类型:3 个字节。初始位置 11 - 最终位置 13
- (...)
我已经按字节和位置发送和检索这些 headers(不像 Map 那样按 name/value)。
根据我的习惯 serializers/deserializers,我可以使用有效载荷,但我不知道如何使用 headers。
我已经阅读了一些关于属性映射器的内容,但我不知道我的方法是否正确。
在客户端配置中类似于:
(...)
<int-ip:tcp-connection-factory id="clientRequestData"
type="client" host="${clientRequestData.host}" port="${clientRequestData.port}" single-use="true"
serializer="itemSerializerDeserializer" deserializer="resultSerializerDeserializer"
mapper="mapper" />
<beans:bean id="mapper"
class="org.springframework.integration.ip.tcp.connection.MessageConvertingTcpMessageMapper">
<beans:constructor-arg name="messageConverter">
<beans:bean class="??????????"/>
</beans:constructor-arg>
</beans:bean>
(...)
有什么帮助吗?
提前致谢。
使用 MessageConvertingTcpMessageMapper
和 MapMessageConverter
。
一般的想法是,出站转换器将 Message
转换为包含所有 headers
和 payload
的 Map
,序列化程序将映射序列化为一个byte[]
。您可以告诉 MapMessageConverter
您希望将哪个 headers 包含在地图中,以便您可以在序列化程序中访问它们。
在入站端,反序列化器从 byte[]
创建一个 Map
,然后 MessageConverter.toMessage()
将映射转换为 Message
。
请参阅 MapJsonSerializer 示例。
另见 this test case。
编辑
这是一个简单的实现(没有对输出进行错误检查或长度检查)...
private volatile Deserializer<byte[]> packetDeserializer = new ByteArrayLfSerializer();
private volatile Serializer<byte[]> packetSerializer = new ByteArrayLfSerializer();
@Override
public Map<?, ?> deserialize(InputStream inputStream) throws IOException {
byte[] bytes = readToEndOfMessage(inputStream);
String asString = new String(bytes, "UTF-8");
Map<String, String> headers = new HashMap<String, String>();
headers.put("issuer", asString.substring(0, 5));
headers.put("client", asString.substring(4, 9));
headers.put("product", asString.substring(9, 11));
headers.put("type", asString.substring(11, 14));
Map<String, Object> map = new HashMap<String, Object>();
map.put("headers", headers);
map.put("payload", createPayloadFromRemainingBytes(bytes));
return map;
}
@Override
public void serialize(Map<?, ?> object, OutputStream outputStream) throws IOException {
Map<String, String> headers = (Map<String, String>) object.get("headers");
outputStream.write(headers.get("issuer").getBytes("UTF-8"));
outputStream.write(headers.get("client").getBytes("UTF-8"));
outputStream.write(headers.get("product").getBytes("UTF-8"));
outputStream.write(headers.get("type").getBytes("UTF-8"));
outputStream.write(convertPayloadToBytes(object.get("payload")));
}
与MapMessageConverter
一起使用,Spring集成headers将与数据一起传送到另一方,他将解码数据并创建入站消息那些 headers.
我有客户端配置:
<beans:bean id="itemSerializerDeserializer"
class="org.mbracero.integration.ItemSerializerDeserializer" />
<beans:bean id="resultSerializerDeserializer"
class="org.mbracero.integration.ResultSerializerDeserializer" />
<int-ip:tcp-connection-factory id="clientRequestData"
type="client" host="${clientRequestData.host}" port="${clientRequestData.port}" single-use="true"
serializer="itemSerializerDeserializer" deserializer="resultSerializerDeserializer" />
<int-ip:tcp-outbound-gateway id="requestDataOutGateway"
request-channel="requestData" connection-factory="clientRequestData"
request-timeout="10000" reply-timeout="10000" remote-timeout="10000" />
简单网关:
public interface SimpleGateway {
@Gateway(requestChannel="requestData")
Result sendData(Item item);
}
以及服务器配置:
<int:channel id="channelServerRequestData" />
<int:channel id="channelServerResponseData" />
<beans:bean id="requestService" class="org.mbracero.integration.RequestService" />
<beans:bean id="itemSerializerDeserializer"
class="org.mbracero.integration.ItemSerializerDeserializer" />
<beans:bean id="resultSerializerDeserializer"
class="org.mbracero.integration.ResultSerializerDeserializer" />
<int-ip:tcp-connection-factory id="requestDataServer"
type="server" port="${requestDataServer.port}" single-use="true" deserializer="itemSerializerDeserializer"
serializer="resultSerializerDeserializer" />
<int-ip:tcp-inbound-gateway id="TCPInboundGateway"
connection-factory="requestDataServer" request-channel="channelServerRequestData"
reply-channel="channelServerResponseData" error-channel="errorChannel" />
请求服务:
@Service
public class RequestService {
@ServiceActivator(inputChannel="channelServerRequestData", outputChannel="channelServerResponseData")
public Result requestData(Item input) {
System.out.println("Input :::: " + input);
Result ret = new Result("AAA", "DDDD");
System.out.println("Ret :::: " + ret);
return ret;
}
}
ItemSerializerDeserializer:
public class ItemSerializerDeserializer implements Serializer<Item>, Deserializer<Item> {
(...)
}
ResultSerializerDeserializer:
public class ResultSerializerDeserializer implements Serializer<Result>, Deserializer<Result> {
(...)
}
现在我必须添加一些 headers(发送和检索),但我需要像上面写的那样自定义 serializers/deserializers。
比如我需要发送下一个headers:
- 颁发者:4 个字节。初始位置 1 - 最终位置 4
- 客户端:4字节。初始位置 5 - 最终位置 8
- 产品:2 个字节。初始位置 9 - 最终位置 10
- 类型:3 个字节。初始位置 11 - 最终位置 13
- (...)
我已经按字节和位置发送和检索这些 headers(不像 Map 那样按 name/value)。
根据我的习惯 serializers/deserializers,我可以使用有效载荷,但我不知道如何使用 headers。
我已经阅读了一些关于属性映射器的内容,但我不知道我的方法是否正确。
在客户端配置中类似于:
(...)
<int-ip:tcp-connection-factory id="clientRequestData"
type="client" host="${clientRequestData.host}" port="${clientRequestData.port}" single-use="true"
serializer="itemSerializerDeserializer" deserializer="resultSerializerDeserializer"
mapper="mapper" />
<beans:bean id="mapper"
class="org.springframework.integration.ip.tcp.connection.MessageConvertingTcpMessageMapper">
<beans:constructor-arg name="messageConverter">
<beans:bean class="??????????"/>
</beans:constructor-arg>
</beans:bean>
(...)
有什么帮助吗?
提前致谢。
使用 MessageConvertingTcpMessageMapper
和 MapMessageConverter
。
一般的想法是,出站转换器将 Message
转换为包含所有 headers
和 payload
的 Map
,序列化程序将映射序列化为一个byte[]
。您可以告诉 MapMessageConverter
您希望将哪个 headers 包含在地图中,以便您可以在序列化程序中访问它们。
在入站端,反序列化器从 byte[]
创建一个 Map
,然后 MessageConverter.toMessage()
将映射转换为 Message
。
请参阅 MapJsonSerializer 示例。
另见 this test case。
编辑
这是一个简单的实现(没有对输出进行错误检查或长度检查)...
private volatile Deserializer<byte[]> packetDeserializer = new ByteArrayLfSerializer();
private volatile Serializer<byte[]> packetSerializer = new ByteArrayLfSerializer();
@Override
public Map<?, ?> deserialize(InputStream inputStream) throws IOException {
byte[] bytes = readToEndOfMessage(inputStream);
String asString = new String(bytes, "UTF-8");
Map<String, String> headers = new HashMap<String, String>();
headers.put("issuer", asString.substring(0, 5));
headers.put("client", asString.substring(4, 9));
headers.put("product", asString.substring(9, 11));
headers.put("type", asString.substring(11, 14));
Map<String, Object> map = new HashMap<String, Object>();
map.put("headers", headers);
map.put("payload", createPayloadFromRemainingBytes(bytes));
return map;
}
@Override
public void serialize(Map<?, ?> object, OutputStream outputStream) throws IOException {
Map<String, String> headers = (Map<String, String>) object.get("headers");
outputStream.write(headers.get("issuer").getBytes("UTF-8"));
outputStream.write(headers.get("client").getBytes("UTF-8"));
outputStream.write(headers.get("product").getBytes("UTF-8"));
outputStream.write(headers.get("type").getBytes("UTF-8"));
outputStream.write(convertPayloadToBytes(object.get("payload")));
}
与MapMessageConverter
一起使用,Spring集成headers将与数据一起传送到另一方,他将解码数据并创建入站消息那些 headers.