如何使用 KCL 确定特定分区键的分片 ID?
How to determine shard id for a specific partition key with KCL?
PutRequest
API 使用分区键来确定记录的分片 ID。即使 PutRequest
的响应包含分片 ID,它也不可靠,因为分片是可拆分的,因此记录可能会移动到新的分片。我找不到确定消费者端特定分区键的分片 ID 的方法。
AWS 似乎将分区键映射到 128 位整数键,但文档中没有解释哈希算法。我想要做的是处理具有特定分区键的 Kinesis 流中的记录,这意味着它们将位于特定的分片中,这样我就可以在特定的分片中获取数据,但我找不到合适的 API 在文档中。
根据文档,使用的散列算法是 MD5。
An MD5 hash function is used to map partition keys to 128-bit integer values and to map associated data records to shards using the hash key ranges of the shards.
见http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html
在您的情况下,如果您知道要为其识别适当分片的分区键,则需要执行以下两件事:
- 计算分区键的 MD5 哈希值
- 遍历分片列表,找到散列键范围包含第一步计算的散列值的分片。
这里有一些代码片段可以帮助您上手:
MD5 哈希为 BigInteger
String partitionKey = "YourKnownKey";
byte[] partitionBytes = partitionKey.getBytes("UTF-8");
byte[] hashBytes = MessageDigest.getInstance("MD5").digest(partitionBytes);
BigInteger biPartitionKey = new BigInteger(1, hashBytes);
为分区键查找分片
Shard shardYouAreAfter = null;
String streamName = "YourStreamName";
StreamDescription streamDesc = client.describeStream(streamName).getStreamDescription();
List<Shard> shards = streamDesc.getShards();
for(Shard shard : shards){
BigInteger startingHashKey = new BigInteger(shard.getHashKeyRange().getStartingHashKey());
BigInteger endingHashKey = new BigInteger(shard.getHashKeyRange().getEndingHashKey());
if(startingHashKey.compareTo(biPartKey) <= 0 &&
endingHashKey.compareTo(biPartKey) >=0) {
shardYouAreAfter = shard;
break;
}
}
如果您一直在拆分 and/or 合并碎片,事情会变得有点复杂。以上假设您只有存在的活动分片。
PutRequest
API 使用分区键来确定记录的分片 ID。即使 PutRequest
的响应包含分片 ID,它也不可靠,因为分片是可拆分的,因此记录可能会移动到新的分片。我找不到确定消费者端特定分区键的分片 ID 的方法。
AWS 似乎将分区键映射到 128 位整数键,但文档中没有解释哈希算法。我想要做的是处理具有特定分区键的 Kinesis 流中的记录,这意味着它们将位于特定的分片中,这样我就可以在特定的分片中获取数据,但我找不到合适的 API 在文档中。
根据文档,使用的散列算法是 MD5。
An MD5 hash function is used to map partition keys to 128-bit integer values and to map associated data records to shards using the hash key ranges of the shards.
见http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html
在您的情况下,如果您知道要为其识别适当分片的分区键,则需要执行以下两件事:
- 计算分区键的 MD5 哈希值
- 遍历分片列表,找到散列键范围包含第一步计算的散列值的分片。
这里有一些代码片段可以帮助您上手:
MD5 哈希为 BigInteger
String partitionKey = "YourKnownKey";
byte[] partitionBytes = partitionKey.getBytes("UTF-8");
byte[] hashBytes = MessageDigest.getInstance("MD5").digest(partitionBytes);
BigInteger biPartitionKey = new BigInteger(1, hashBytes);
为分区键查找分片
Shard shardYouAreAfter = null;
String streamName = "YourStreamName";
StreamDescription streamDesc = client.describeStream(streamName).getStreamDescription();
List<Shard> shards = streamDesc.getShards();
for(Shard shard : shards){
BigInteger startingHashKey = new BigInteger(shard.getHashKeyRange().getStartingHashKey());
BigInteger endingHashKey = new BigInteger(shard.getHashKeyRange().getEndingHashKey());
if(startingHashKey.compareTo(biPartKey) <= 0 &&
endingHashKey.compareTo(biPartKey) >=0) {
shardYouAreAfter = shard;
break;
}
}
如果您一直在拆分 and/or 合并碎片,事情会变得有点复杂。以上假设您只有存在的活动分片。