Spring batch Remote partitioning : 在分区期间将大量数据推送到kafka
Spring batch Remote partitioning : Pushing Huge data in kafka during partition
我已经实现了 spring 远程批处理 partitioning.Now 我必须推送分区 100 亿个 ID 分成 partitions.The 将从弹性中提取 ID 并推送到分区中推入卡夫卡
@Override
public Map partition(int gridSize) {
Map<String, ExecutionContext> map = new HashMap<>(gridSize);
AtomicInteger partitionNumber = new AtomicInteger(1);
try {
for(int i=0;i<n;i++){
List<Integer> ids = //fetch id from elastic
map.put("partition" + partitionNumber.getAndIncrement(), context);
}
System.out.println("Partitions Created");
} catch (IOException e) {
e.printStackTrace();
}
return map;
}
我无法一次获取并推送地图中的所有 ID,否则,我将退出 memory.I 希望将 ID 推送到队列中,然后获取下一个 ID。
这可以通过 spring 批处理完成吗?
如果你想使用分区,你必须找到一种方法来使用给定的键对输入数据集进行分区。没有分区键,您就无法真正使用分区(有或没有 Spring 批处理)。
如果你的ID是由一个可以划分分区的序列定义的,你就不必获取100亿个ID,将它们分区并将每个分区(即每个分区的所有ID)放在执行上下文中工作人员。您可以做的是找到最大 ID,创建 ID 范围并将它们分配给不同的工人。例如:
- 分区 1:0 - 10000
- 分区 2:10001 - 20000
- 等等
如果您的 ID 不是按序列定义的并且不能按范围分区,那么您需要找到另一个键(或复合键),允许您根据另一个条件对数据进行分区。否则,(远程)分区不适合您。
我已经实现了 spring 远程批处理 partitioning.Now 我必须推送分区 100 亿个 ID 分成 partitions.The 将从弹性中提取 ID 并推送到分区中推入卡夫卡
@Override
public Map
Map<String, ExecutionContext> map = new HashMap<>(gridSize);
AtomicInteger partitionNumber = new AtomicInteger(1);
try {
for(int i=0;i<n;i++){
List<Integer> ids = //fetch id from elastic
map.put("partition" + partitionNumber.getAndIncrement(), context);
}
System.out.println("Partitions Created");
} catch (IOException e) {
e.printStackTrace();
}
return map;
}
我无法一次获取并推送地图中的所有 ID,否则,我将退出 memory.I 希望将 ID 推送到队列中,然后获取下一个 ID。
这可以通过 spring 批处理完成吗?
如果你想使用分区,你必须找到一种方法来使用给定的键对输入数据集进行分区。没有分区键,您就无法真正使用分区(有或没有 Spring 批处理)。
如果你的ID是由一个可以划分分区的序列定义的,你就不必获取100亿个ID,将它们分区并将每个分区(即每个分区的所有ID)放在执行上下文中工作人员。您可以做的是找到最大 ID,创建 ID 范围并将它们分配给不同的工人。例如:
- 分区 1:0 - 10000
- 分区 2:10001 - 20000
- 等等
如果您的 ID 不是按序列定义的并且不能按范围分区,那么您需要找到另一个键(或复合键),允许您根据另一个条件对数据进行分区。否则,(远程)分区不适合您。