将 Kafka Streams 代码迁移到 Spring Cloud Stream?
Migrate Kafka Streams code to Spring Cloud Stream?
Spring 云流是否支持以下 Kafka Streams 应用程序。
以下是 Kafka 示例应用程序摘录中的代码。感谢任何反馈或支持。
...
StreamsBuilder streamsBuilder = new StreamsBuilder();
KStream<String, Purchase> purchaseKStream = streamsBuilder.stream.....
KStream<String, PurchasePattern> patternKStream = purchaseKStream.mapValues...
patternKStream.print(Printed.<String, PurchasePattern>toSysOut().withLabel("patterns"));
patternKStream.to("patterns", Produced.with(stringSerde, purchasePatternSerde));
purchaseKStream.print(Printed.<String, Purchase>toSysOut().withLabel("purchases"));
purchaseKStream.to("purchases", Produced.with(stringSerde, purchaseSerde));
// adding State to processor
String rewardsStateStoreName = "rewardsPointsStore";
RewardsStreamPartitioner streamPartitioner = new RewardsStreamPartitioner();
KeyValueBytesStoreSupplier storeSupplier = Stores.inMemoryKeyValueStore(rewardsStateStoreName);
StoreBuilder<KeyValueStore<String, Integer>> storeBuilder = Stores.keyValueStoreBuilder(storeSupplier.....
streamsBuilder.addStateStore(storeBuilder);
KStream<String, Purchase> transByCustomerStream = purchaseKStream.through("customer_transactions",....
KStream<String, RewardAccumulator> statefulRewardAccumulator = transByCustomerStream
.transformValues(() -> new PurchaseRewardTransformer(rewardsStateStoreName), rewardsStateStoreName);
statefulRewardAccumulator.print(Printed.<String, RewardAccumulator>toSysOut().withLabel("stateful-rewards"));
statefulRewardAccumulator.to("rewards", Produced.with(stringSerde, rewardAccumulatorSerde));
KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), getProperties());
假设您使用的是 Spring Cloud Stream Horsham (3.0.0)
版本,下面的伪代码应该可以工作。我没有测试此代码,但这应该适用于目标等的正确配置。请为此查看 docs。
@SpringBootApplicaiton
public class SampleApp {
public static void main(String[] args) {
SpringApplication.run(SampleApp.class, args);
}
@Bean
public Function<KStream<String, Purchase>, KStream<String, RewardAccumulator>> process() {
return purchaseKStream -> {
purchaseKStream.print(Printed.<String, Purchase>toSysOut().withLabel("purchases"));
KStream<String, PurchasePattern> patternKStream = purchaseKStream.mapValues...
patternKStream.print(Printed.<String, PurchasePattern>toSysOut().withLabel("patterns"));
patternKStream.to("patterns", Produced.with(stringSerde, purchasePatternSerde));
purchaseKStream.to("purchases", Produced.with(stringSerde, purchaseSerde));
KStream<String, Purchase> transByCustomerStream = purchaseKStream.through("customer_transactions",....
KStream<String, RewardAccumulator> statefulRewardAccumulator = transByCustomerStream.transformValues(()
statefulRewardAccumulator.print(Printed.<String, RewardAccumulator>toSysOut().withLabel("stateful-rewards"));
return statefulRewardAccumulator;
}
}
@Bean
public StoreBuilder storeBuilder() {
String rewardsStateStoreName = "rewardsPointsStore";
RewardsStreamPartitioner streamPartitioner = new RewardsStreamPartitioner();
KeyValueBytesStoreSupplier storeSupplier = Stores.inMemoryKeyValueStore(rewardsStateStoreName);
StoreBuilder<KeyValueStore<String, Integer>> storeBuilder = Stores.keyValueStoreBuilder(storeSupplier.....
return storeBuilder;
}
}
Spring 云流是否支持以下 Kafka Streams 应用程序。 以下是 Kafka 示例应用程序摘录中的代码。感谢任何反馈或支持。
...
StreamsBuilder streamsBuilder = new StreamsBuilder();
KStream<String, Purchase> purchaseKStream = streamsBuilder.stream.....
KStream<String, PurchasePattern> patternKStream = purchaseKStream.mapValues...
patternKStream.print(Printed.<String, PurchasePattern>toSysOut().withLabel("patterns"));
patternKStream.to("patterns", Produced.with(stringSerde, purchasePatternSerde));
purchaseKStream.print(Printed.<String, Purchase>toSysOut().withLabel("purchases"));
purchaseKStream.to("purchases", Produced.with(stringSerde, purchaseSerde));
// adding State to processor
String rewardsStateStoreName = "rewardsPointsStore";
RewardsStreamPartitioner streamPartitioner = new RewardsStreamPartitioner();
KeyValueBytesStoreSupplier storeSupplier = Stores.inMemoryKeyValueStore(rewardsStateStoreName);
StoreBuilder<KeyValueStore<String, Integer>> storeBuilder = Stores.keyValueStoreBuilder(storeSupplier.....
streamsBuilder.addStateStore(storeBuilder);
KStream<String, Purchase> transByCustomerStream = purchaseKStream.through("customer_transactions",....
KStream<String, RewardAccumulator> statefulRewardAccumulator = transByCustomerStream
.transformValues(() -> new PurchaseRewardTransformer(rewardsStateStoreName), rewardsStateStoreName);
statefulRewardAccumulator.print(Printed.<String, RewardAccumulator>toSysOut().withLabel("stateful-rewards"));
statefulRewardAccumulator.to("rewards", Produced.with(stringSerde, rewardAccumulatorSerde));
KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), getProperties());
假设您使用的是 Spring Cloud Stream Horsham (3.0.0)
版本,下面的伪代码应该可以工作。我没有测试此代码,但这应该适用于目标等的正确配置。请为此查看 docs。
@SpringBootApplicaiton
public class SampleApp {
public static void main(String[] args) {
SpringApplication.run(SampleApp.class, args);
}
@Bean
public Function<KStream<String, Purchase>, KStream<String, RewardAccumulator>> process() {
return purchaseKStream -> {
purchaseKStream.print(Printed.<String, Purchase>toSysOut().withLabel("purchases"));
KStream<String, PurchasePattern> patternKStream = purchaseKStream.mapValues...
patternKStream.print(Printed.<String, PurchasePattern>toSysOut().withLabel("patterns"));
patternKStream.to("patterns", Produced.with(stringSerde, purchasePatternSerde));
purchaseKStream.to("purchases", Produced.with(stringSerde, purchaseSerde));
KStream<String, Purchase> transByCustomerStream = purchaseKStream.through("customer_transactions",....
KStream<String, RewardAccumulator> statefulRewardAccumulator = transByCustomerStream.transformValues(()
statefulRewardAccumulator.print(Printed.<String, RewardAccumulator>toSysOut().withLabel("stateful-rewards"));
return statefulRewardAccumulator;
}
}
@Bean
public StoreBuilder storeBuilder() {
String rewardsStateStoreName = "rewardsPointsStore";
RewardsStreamPartitioner streamPartitioner = new RewardsStreamPartitioner();
KeyValueBytesStoreSupplier storeSupplier = Stores.inMemoryKeyValueStore(rewardsStateStoreName);
StoreBuilder<KeyValueStore<String, Integer>> storeBuilder = Stores.keyValueStoreBuilder(storeSupplier.....
return storeBuilder;
}
}