leftjoin 两个 GlobalKTables
leftjoin on two GlobalKTables
我正在尝试将流加入 2 个不同的 GlobalTables,将它们视为查找,更具体地说,是设备(用户代理)和地理编码(IP 地址)。
序列化的问题,但我不明白为什么。它卡在 DEFAULT_VALUE_SERDE_CLASS_CONFIG 但我要写的主题已正确序列化。
//
// Set up serialization / de-serialization
private static Serde<String> stringSerde = Serdes.String();
private static Serde<PodcastData> podcastSerde = StreamsSerdes.PodCastSerde();
private static Serde<GeoCodedData> geocodedSerde = StreamsSerdes.GeoIPSerde();
private static Serde<DeviceData> deviceSerde = StreamsSerdes.DeviceSerde();
private static Serde<JoinedPodcastGeoDeviceData> podcastGeoDeviceSerde = StreamsSerdes.PodcastGeoDeviceSerde();
private static Serde<JoinedPodCastDeviceData> podcastDeviceSerde = StreamsSerdes.PodcastDeviceDataSerde()
...
GlobalKTable<String, DeviceData> deviceIDTable = builder.globalTable(kafkaProperties.getProperty("deviceid-topic"));
GlobalKTable<String, GeoCodedData> geoIPTable = builder.globalTable(kafkaProperties.getProperty("geoip-topic"));
//
// Stream from source topic
KStream<String, PodcastData> podcastStream = builder.stream(
kafkaProperties.getProperty("source-topic"),
Consumed.with(stringSerde, podcastSerde));
//
podcastStream
// left join the podcast stream to the device table, looking up the device
.leftJoin(deviceIDTable,
// get a DeviceData object from the user agent
(podcastID, podcastData) -> podcastData.getUser_agent(),
// join podcast and device and return a JoinedPodCastDeviceData object
(podcastData, deviceData) -> {
JoinedPodCastDeviceData data =
JoinedPodCastDeviceData.builder().build();
data.setPodcastObject(podcastData);
data.setDeviceData(deviceData);
return data;
})
// left join the podcast stream to the geo table, looking up the geo data
.leftJoin(geoIPTable,
// get a Geo object from the ip address
(podcastID, podcastDeviceData) -> podcastDeviceData.getPodcastObject().getIp_address(),
// join podcast and geo
(podcastDeviceData, geoCodedData) -> {
JoinedPodcastGeoDeviceData data=
JoinedPodcastGeoDeviceData.builder().build();
data.setGeoData(geoCodedData);
data.setDeviceData(podcastDeviceData.getDeviceData());
data.setPodcastData(podcastDeviceData.getPodcastObject());
return data;
})
//
.to(kafkaProperties.getProperty("sink-topic"),
Produced.with(stringSerde, podcastGeoDeviceSerde));
...
...
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, stringSerde.getClass().getName());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, stringSerde.getClass().getName());
错误
错误 java.lang.String 无法转换为 DeviceData
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, stringSerde.getClass().getName());
由于上述值,应用程序将使用 String serde 作为默认值 serde,除非您在制作 KTable/KStream/GlobalKTable 时明确指定。
由于 deviceIDTable 的预期值类型是 DeviceData
,请按以下方式指定:
您需要在 GlobalKTable 中定义值 serde。
GlobalKTable<String, DeviceData> deviceIDTable = builder.globalTable(kafkaProperties.getProperty("deviceid-topic"), Materialized.<String, DeviceData, KeyValueStore<Bytes, byte[]>>as(DEVICE_STORE)
.withKeySerde(stringSerde)
.withValueSerde(deviceSerde));
我正在尝试将流加入 2 个不同的 GlobalTables,将它们视为查找,更具体地说,是设备(用户代理)和地理编码(IP 地址)。
序列化的问题,但我不明白为什么。它卡在 DEFAULT_VALUE_SERDE_CLASS_CONFIG 但我要写的主题已正确序列化。
//
// Set up serialization / de-serialization
private static Serde<String> stringSerde = Serdes.String();
private static Serde<PodcastData> podcastSerde = StreamsSerdes.PodCastSerde();
private static Serde<GeoCodedData> geocodedSerde = StreamsSerdes.GeoIPSerde();
private static Serde<DeviceData> deviceSerde = StreamsSerdes.DeviceSerde();
private static Serde<JoinedPodcastGeoDeviceData> podcastGeoDeviceSerde = StreamsSerdes.PodcastGeoDeviceSerde();
private static Serde<JoinedPodCastDeviceData> podcastDeviceSerde = StreamsSerdes.PodcastDeviceDataSerde()
...
GlobalKTable<String, DeviceData> deviceIDTable = builder.globalTable(kafkaProperties.getProperty("deviceid-topic"));
GlobalKTable<String, GeoCodedData> geoIPTable = builder.globalTable(kafkaProperties.getProperty("geoip-topic"));
//
// Stream from source topic
KStream<String, PodcastData> podcastStream = builder.stream(
kafkaProperties.getProperty("source-topic"),
Consumed.with(stringSerde, podcastSerde));
//
podcastStream
// left join the podcast stream to the device table, looking up the device
.leftJoin(deviceIDTable,
// get a DeviceData object from the user agent
(podcastID, podcastData) -> podcastData.getUser_agent(),
// join podcast and device and return a JoinedPodCastDeviceData object
(podcastData, deviceData) -> {
JoinedPodCastDeviceData data =
JoinedPodCastDeviceData.builder().build();
data.setPodcastObject(podcastData);
data.setDeviceData(deviceData);
return data;
})
// left join the podcast stream to the geo table, looking up the geo data
.leftJoin(geoIPTable,
// get a Geo object from the ip address
(podcastID, podcastDeviceData) -> podcastDeviceData.getPodcastObject().getIp_address(),
// join podcast and geo
(podcastDeviceData, geoCodedData) -> {
JoinedPodcastGeoDeviceData data=
JoinedPodcastGeoDeviceData.builder().build();
data.setGeoData(geoCodedData);
data.setDeviceData(podcastDeviceData.getDeviceData());
data.setPodcastData(podcastDeviceData.getPodcastObject());
return data;
})
//
.to(kafkaProperties.getProperty("sink-topic"),
Produced.with(stringSerde, podcastGeoDeviceSerde));
...
...
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, stringSerde.getClass().getName());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, stringSerde.getClass().getName());
错误 错误 java.lang.String 无法转换为 DeviceData
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, stringSerde.getClass().getName());
由于上述值,应用程序将使用 String serde 作为默认值 serde,除非您在制作 KTable/KStream/GlobalKTable 时明确指定。
由于 deviceIDTable 的预期值类型是 DeviceData
,请按以下方式指定:
您需要在 GlobalKTable 中定义值 serde。
GlobalKTable<String, DeviceData> deviceIDTable = builder.globalTable(kafkaProperties.getProperty("deviceid-topic"), Materialized.<String, DeviceData, KeyValueStore<Bytes, byte[]>>as(DEVICE_STORE)
.withKeySerde(stringSerde)
.withValueSerde(deviceSerde));