如何确定会话在kafka流中完成
How to determine session finished in kafka streams
我卡在 kafka 流中,无法处理 DSL 场景。有人可以帮忙吗?
场景:
我有一个主题 timeOff,它有一个键 timeOffId 和类型对象的值。对象还包含代表
那个员工休假。所以一个员工可以有多个休假。
TimeOffs
timeoff1 {status:PENDING, employee: 1}
timeoff2 {status:PENDING, employee: 2}
timeoff3 {status:PENDING, employee: 3}
timeoff1 {status:APPROVED, employee: 1}
timeoff5 {status:PENDING, employee: 2}
timeoff3 {status:APPROVED, employee: 3}
timeoff6 {status:PENDING, employee: 1}
timeoff7 {status:PENDING, employee: 1}
timeoff8 {status:PENDING, employee: 2}
我想要如下所示的结果,这样员工只能有他的待休假:
employee1: [timeoff6, timeoff7] //as timeoff1 is already approved so don't need this now.
employee2: [timeoff2, timeoff5, timeoff8] //as all timeoffs for employee2 are pending
employee3: [] //No pending timeoffs
我该怎么做。我开始像下面的代码那样做这件事,但我不知道我做的是否正确。
我不需要代码,只是建议我 correct/good 通过 kafka 流 DSL 执行此操作的方法。谢谢你。
在下面的示例中,我正在流式传输主题,并按 employeeId 对休假进行分组。但在那种情况下,我如何获得超时的更新状态。我很迷惑。谁能帮忙。
KStream<String, TimeOff> source = builder.stream(topic);
KTable<String, ArrayList<TimeOff>> newStore = source.groupBy((k, v) -> v.getEmployeeId())
.aggregate(ArrayList::new,
(key, value, aggregate) -> {
aggregate.add(value);
return aggregate;
}, Materialized.<String, ArrayList<TimeOff>, KeyValueStore<Bytes, byte[]>>as("NewStore").withValueSerde(new TimeOffListSerde(new TimeOffSerde())));
我认为最好的方法是使用处理器 API。
您应该实施您的自定义 org.apache.kafka.streams.processor.Processor
。 Processor
将有状态存储来保持 TimeOffs
处于待处理状态,当 Timeoff
和 APPROVED
状态到达时,状态存储中的条目将被删除。
会是这样的:
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueStore;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
public class CustomProcessor implements Processor<String, Timeoff> {
protected KeyValueStore<String, List<Timeoff>> stateStore;
private String storeName;
public CustomProcessor(String storeName) {
this.storeName = storeName;
}
@Override
public void init(ProcessorContext context) {
stateStore = (KeyValueStore<String, List<Timeoff>>) context.getStateStore(storeName);
}
@Override
public void process(String employeeId, Timeoff timeoff) {
List<Timeoff> newTimeoffs = Optional.ofNullable(stateStore.get(employeeId)).map(timeoffs -> {
if ("APPROVED".equals(timeoff.getStatus()))
timeoffs.remove(employeeId);
else
timeoffs.add(timeoff);
return timeoffs;
}).orElse(Collections.singletonList(timeoff));
stateStore.put(employeeId, newTimeoffs);
}
...
}
这完全可以在DSL中实现,你几乎做到了;你只需要从你的状态对象
中删除approved的休假
下面是一个粗略的Scala例子(因为我已经很多年没写Java了,soz):
// These are just type-aliases e.g. TimeOffId is just a
// String, but has clearer semantics when reading type
// definitions throughout the code
type TimeOffId = String
type EmployeeId = String
// Enums can get quite involved in Scala, so this is just illustrative
enum Status { Pending, Approved }
// This represents the value (stream event) object
case class TimeOff(status: Status, employeeId: EmployeeId)
// An alias for the state object followed by a factory for the initial state
// I use `Set` for easy membership manipulation
type PendingTimeOff = Set[TimeOffId]
object PendingTimeOff {
def apply(): PendingTimeOff = Set.empty[TimeOffId]
}
// This is the aggregator function
def trackTimeOff: (EmployeeId, (TimeOffId, TimeOff), PendingTimeOff) => PendingTimeOff = {
case (_, (timeOffId, TimeOff(Pending, _)), pending) => pending + timeOffId
case (_, (timeOffId, TimeOff(Approved, _)), pending) => pending - timeOffId
}
...
source
// Map to retain the timeOffId
.map { case (timeOffId, timeOff) => (timeOff.employeeId, (timeOffId, timeOff)) }
// Now group by new key i.e. employeeId
.groupByKey
.aggregate(PendingTimeOff(), trackTimeOff)(Mat...)
...
您或许可以回收利用您的 TimeOffListSerde
;如果顺序很重要,您可以使用 SortedSet
,但要小心处理 timeOff1
、timeOff11
和 timeOff2
propertly
我卡在 kafka 流中,无法处理 DSL 场景。有人可以帮忙吗?
场景: 我有一个主题 timeOff,它有一个键 timeOffId 和类型对象的值。对象还包含代表 那个员工休假。所以一个员工可以有多个休假。
TimeOffs
timeoff1 {status:PENDING, employee: 1}
timeoff2 {status:PENDING, employee: 2}
timeoff3 {status:PENDING, employee: 3}
timeoff1 {status:APPROVED, employee: 1}
timeoff5 {status:PENDING, employee: 2}
timeoff3 {status:APPROVED, employee: 3}
timeoff6 {status:PENDING, employee: 1}
timeoff7 {status:PENDING, employee: 1}
timeoff8 {status:PENDING, employee: 2}
我想要如下所示的结果,这样员工只能有他的待休假:
employee1: [timeoff6, timeoff7] //as timeoff1 is already approved so don't need this now.
employee2: [timeoff2, timeoff5, timeoff8] //as all timeoffs for employee2 are pending
employee3: [] //No pending timeoffs
我该怎么做。我开始像下面的代码那样做这件事,但我不知道我做的是否正确。
我不需要代码,只是建议我 correct/good 通过 kafka 流 DSL 执行此操作的方法。谢谢你。 在下面的示例中,我正在流式传输主题,并按 employeeId 对休假进行分组。但在那种情况下,我如何获得超时的更新状态。我很迷惑。谁能帮忙。
KStream<String, TimeOff> source = builder.stream(topic);
KTable<String, ArrayList<TimeOff>> newStore = source.groupBy((k, v) -> v.getEmployeeId())
.aggregate(ArrayList::new,
(key, value, aggregate) -> {
aggregate.add(value);
return aggregate;
}, Materialized.<String, ArrayList<TimeOff>, KeyValueStore<Bytes, byte[]>>as("NewStore").withValueSerde(new TimeOffListSerde(new TimeOffSerde())));
我认为最好的方法是使用处理器 API。
您应该实施您的自定义 org.apache.kafka.streams.processor.Processor
。 Processor
将有状态存储来保持 TimeOffs
处于待处理状态,当 Timeoff
和 APPROVED
状态到达时,状态存储中的条目将被删除。
会是这样的:
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueStore;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
public class CustomProcessor implements Processor<String, Timeoff> {
protected KeyValueStore<String, List<Timeoff>> stateStore;
private String storeName;
public CustomProcessor(String storeName) {
this.storeName = storeName;
}
@Override
public void init(ProcessorContext context) {
stateStore = (KeyValueStore<String, List<Timeoff>>) context.getStateStore(storeName);
}
@Override
public void process(String employeeId, Timeoff timeoff) {
List<Timeoff> newTimeoffs = Optional.ofNullable(stateStore.get(employeeId)).map(timeoffs -> {
if ("APPROVED".equals(timeoff.getStatus()))
timeoffs.remove(employeeId);
else
timeoffs.add(timeoff);
return timeoffs;
}).orElse(Collections.singletonList(timeoff));
stateStore.put(employeeId, newTimeoffs);
}
...
}
这完全可以在DSL中实现,你几乎做到了;你只需要从你的状态对象
中删除approved的休假下面是一个粗略的Scala例子(因为我已经很多年没写Java了,soz):
// These are just type-aliases e.g. TimeOffId is just a
// String, but has clearer semantics when reading type
// definitions throughout the code
type TimeOffId = String
type EmployeeId = String
// Enums can get quite involved in Scala, so this is just illustrative
enum Status { Pending, Approved }
// This represents the value (stream event) object
case class TimeOff(status: Status, employeeId: EmployeeId)
// An alias for the state object followed by a factory for the initial state
// I use `Set` for easy membership manipulation
type PendingTimeOff = Set[TimeOffId]
object PendingTimeOff {
def apply(): PendingTimeOff = Set.empty[TimeOffId]
}
// This is the aggregator function
def trackTimeOff: (EmployeeId, (TimeOffId, TimeOff), PendingTimeOff) => PendingTimeOff = {
case (_, (timeOffId, TimeOff(Pending, _)), pending) => pending + timeOffId
case (_, (timeOffId, TimeOff(Approved, _)), pending) => pending - timeOffId
}
...
source
// Map to retain the timeOffId
.map { case (timeOffId, timeOff) => (timeOff.employeeId, (timeOffId, timeOff)) }
// Now group by new key i.e. employeeId
.groupByKey
.aggregate(PendingTimeOff(), trackTimeOff)(Mat...)
...
您或许可以回收利用您的 TimeOffListSerde
;如果顺序很重要,您可以使用 SortedSet
,但要小心处理 timeOff1
、timeOff11
和 timeOff2
propertly