如何将消费者组与 Spring Data Redis for Redis Streams 一起使用(继续获取 NOGROUP)?
How to use consumer groups with Spring Data Redis for Redis Streams (keep getting NOGROUP)?
我正在尝试使用 Spring Data Redis 来消费使用消费者组的 Redis 流,但不断收到以下异常:
Caused by: io.lettuce.core.RedisCommandExecutionException: NOGROUP No such key 'event-stream' or consumer group 'my-group' in XREADGROUP with GROUP option
消息好像在暗示我需要先创建一个消费组?但是文档没有对此提供任何参考:https://github.com/spring-projects/spring-data-redis/blob/master/src/main/asciidoc/reference/redis-streams.adoc
框架版本:
- Spring 启动 2.2.6
- 生菜 5.2.2
- Redis 5.0.8
这是我用来消费流的代码:
@Bean
@Autowired
public StreamMessageListenerContainer eventStreamPersistenceListenerContainerTwo(RedisConnectionFactory streamRedisConnectionFactory, RedisTemplate streamRedisTemplate) {
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> containerOptions = StreamMessageListenerContainer.StreamMessageListenerContainerOptions
.builder().pollTimeout(Duration.ofMillis(100)).build();
StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = StreamMessageListenerContainer.create(streamRedisConnectionFactory,
containerOptions);
container.receive(Consumer.from("my-group", "my-consumer"),
StreamOffset.create("event-stream", ReadOffset.latest()),
message -> {
System.out.println("MessageId: " + message.getId());
System.out.println("Stream: " + message.getStream());
System.out.println("Body: " + message.getValue());
streamRedisTemplate.opsForStream().acknowledge("my-group", message);
});
/*Subscription subscription = container.receive(StreamOffset.fromStart("event-stream"), message -> {
System.out.println("MessageId: " + message.getId());
System.out.println("Stream: " + message.getStream());
System.out.println("Body: " + message.getValue());
});*/
container.start();
return container;
}
完整堆栈跟踪:
org.springframework.data.redis.RedisSystemException: Error in execution; nested exception is io.lettuce.core.RedisCommandExecutionException: NOGROUP No such key 'event-stream' or consumer group 'my-group' in XREADGROUP with GROUP option
at org.springframework.data.redis.connection.lettuce.LettuceExceptionConverter.convert(LettuceExceptionConverter.java:54) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
at org.springframework.data.redis.connection.lettuce.LettuceExceptionConverter.convert(LettuceExceptionConverter.java:52) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
at org.springframework.data.redis.connection.lettuce.LettuceExceptionConverter.convert(LettuceExceptionConverter.java:41) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
at org.springframework.data.redis.PassThroughExceptionTranslationStrategy.translate(PassThroughExceptionTranslationStrategy.java:44) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
at org.springframework.data.redis.FallbackExceptionTranslationStrategy.translate(FallbackExceptionTranslationStrategy.java:42) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
at org.springframework.data.redis.connection.lettuce.LettuceConnection.convertLettuceAccessException(LettuceConnection.java:270) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
at org.springframework.data.redis.connection.lettuce.LettuceStreamCommands.convertLettuceAccessException(LettuceStreamCommands.java:471) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
at org.springframework.data.redis.connection.lettuce.LettuceStreamCommands.xReadGroup(LettuceStreamCommands.java:361) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
at org.springframework.data.redis.connection.DefaultedRedisConnection.xReadGroup(DefaultedRedisConnection.java:529) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
at org.springframework.data.redis.core.DefaultStreamOperations.inRedis(DefaultStreamOperations.java:239) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
at org.springframework.data.redis.core.DefaultStreamOperations$RecordDeserializingRedisCallback.doInRedis(DefaultStreamOperations.java:305) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
at org.springframework.data.redis.core.DefaultStreamOperations$RecordDeserializingRedisCallback.doInRedis(DefaultStreamOperations.java:300) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:228) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:188) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
at org.springframework.data.redis.core.AbstractOperations.execute(AbstractOperations.java:96) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
at org.springframework.data.redis.core.DefaultStreamOperations.read(DefaultStreamOperations.java:234) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
at org.springframework.data.redis.stream.DefaultStreamMessageListenerContainer.lambda$getReadFunction(DefaultStreamMessageListenerContainer.java:236) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
at org.springframework.data.redis.stream.StreamPollTask.doLoop(StreamPollTask.java:138) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
at org.springframework.data.redis.stream.StreamPollTask.run(StreamPollTask.java:123) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: io.lettuce.core.RedisCommandExecutionException: NOGROUP No such key 'event-stream' or consumer group 'my-group' in XREADGROUP with GROUP option
at io.lettuce.core.ExceptionFactory.createExecutionException(ExceptionFactory.java:135) ~[lettuce-core-5.2.2.RELEASE.jar:5.2.2.RELEASE]
at io.lettuce.core.ExceptionFactory.createExecutionException(ExceptionFactory.java:108) ~[lettuce-core-5.2.2.RELEASE.jar:5.2.2.RELEASE]
at io.lettuce.core.protocol.AsyncCommand.completeResult(AsyncCommand.java:120) ~[lettuce-core-5.2.2.RELEASE.jar:5.2.2.RELEASE]
at io.lettuce.core.protocol.AsyncCommand.complete(AsyncCommand.java:111) ~[lettuce-core-5.2.2.RELEASE.jar:5.2.2.RELEASE]
at io.lettuce.core.protocol.CommandWrapper.complete(CommandWrapper.java:59) ~[lettuce-core-5.2.2.RELEASE.jar:5.2.2.RELEASE]
at io.lettuce.core.protocol.CommandHandler.complete(CommandHandler.java:654) ~[lettuce-core-5.2.2.RELEASE.jar:5.2.2.RELEASE]
at io.lettuce.core.protocol.CommandHandler.decode(CommandHandler.java:614) ~[lettuce-core-5.2.2.RELEASE.jar:5.2.2.RELEASE]
at io.lettuce.core.protocol.CommandHandler.channelRead(CommandHandler.java:565) ~[lettuce-core-5.2.2.RELEASE.jar:5.2.2.RELEASE]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final]
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.48.Final.jar:4.1.48.Final]
at io.netty.util.internal.ThreadExecutorMap.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.48.Final.jar:4.1.48.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.48.Final.jar:4.1.48.Final]
... 1 common frames omitted
回答我自己的问题。似乎您确实需要首先显式创建流和组,即使文档中的任何地方都没有提到。尽管除了向其发布消息之外,确实应该有更好的方法来初始化空流。
private void createConsumerGroup(String key, String group, RedisTemplate redisTemplate) {
try {
//redisTemplate.opsForStream().createGroup("my-stream", ReadOffset.from("0-0"), "my-group");
redisTemplate.opsForStream().createGroup("event-stream", "my-group-2");
} catch (RedisSystemException e) {
if (e.getRootCause().getClass().equals(RedisBusyException.class)) {
log.info("STREAM - Redis group already exists, skipping Redis group creation: my-group-2");
} else if (e.getRootCause().getClass().equals(RedisCommandExecutionException.class)) {
log.info("STREAM - Stream does not yet exist, creating empty stream: event-stream");
// TODO: There has to be a better way to create a stream than this!?
redisTemplate.opsForStream().add("event-stream", Collections.singletonMap("", ""));
redisTemplate.opsForStream().createGroup("event-stream", "my-group-2");
} else throw e;
}
}
编辑:正如@anstue 在下面的评论中提到的,spring-data-redis 2.3.1+ 现在会在调用 createGroup 时自动创建不存在的流。但是,如果该组已经存在,它将抛出 RedisSystemBusyException。因此,我正在使用我当前使用的解决方案更新答案,确保捕获此异常。
public class EventStreamUtils {
public static void createConsumerGroup(String key, String group, RedisTemplate redisTemplate) {
try {
// ReadOffset.from("0-0") will start reading stream from the very beginning. Otherwise,
// it will pick up at the point in the stream where the new group was created.
//redisTemplate.opsForStream().createGroup(key, ReadOffset.from("0-0"), group);
redisTemplate.opsForStream().createGroup(key, group);
} catch (RedisSystemException e) {
var cause = e.getRootCause();
if (cause != null && RedisBusyException.class.equals(cause.getClass())) {
log.info("STREAM - Redis group already exists, skipping Redis group creation: {}", group);
} else throw e;
}
}
}
我升级到 spring-data-redis 2.4.6
(我相信这是撰写本文时最新的稳定版本),但在使用创建空流组时仍然出现异常opsForStream().createGroup()
.
我想出的解决方案是直接使用 RedisStreamCommand
如下(从 OP 原始答案的 try-catch
继续):
try {
redisTemplate.getConnectionFactory().getConnection().xGroupCreate(
"key".getBytes(),
"group",
ReadOffset.from("0-0"),
true // this is important. It's to execute MKSTREAM command from redis
// only available from 2.3.0.RELEASE and above
);
} catch (RedisSystemException e) {
// your exception handling
// getConnection() can also throw NullPointerException
}
我最初在我的 spring 启动项目中使用了 kotlin,因此代码可能需要一些调整。我不知道使用该方法的安全性如何,但目前这是我知道的唯一方法。
我正在尝试使用 Spring Data Redis 来消费使用消费者组的 Redis 流,但不断收到以下异常:
Caused by: io.lettuce.core.RedisCommandExecutionException: NOGROUP No such key 'event-stream' or consumer group 'my-group' in XREADGROUP with GROUP option
消息好像在暗示我需要先创建一个消费组?但是文档没有对此提供任何参考:https://github.com/spring-projects/spring-data-redis/blob/master/src/main/asciidoc/reference/redis-streams.adoc
框架版本:
- Spring 启动 2.2.6
- 生菜 5.2.2
- Redis 5.0.8
这是我用来消费流的代码:
@Bean
@Autowired
public StreamMessageListenerContainer eventStreamPersistenceListenerContainerTwo(RedisConnectionFactory streamRedisConnectionFactory, RedisTemplate streamRedisTemplate) {
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> containerOptions = StreamMessageListenerContainer.StreamMessageListenerContainerOptions
.builder().pollTimeout(Duration.ofMillis(100)).build();
StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = StreamMessageListenerContainer.create(streamRedisConnectionFactory,
containerOptions);
container.receive(Consumer.from("my-group", "my-consumer"),
StreamOffset.create("event-stream", ReadOffset.latest()),
message -> {
System.out.println("MessageId: " + message.getId());
System.out.println("Stream: " + message.getStream());
System.out.println("Body: " + message.getValue());
streamRedisTemplate.opsForStream().acknowledge("my-group", message);
});
/*Subscription subscription = container.receive(StreamOffset.fromStart("event-stream"), message -> {
System.out.println("MessageId: " + message.getId());
System.out.println("Stream: " + message.getStream());
System.out.println("Body: " + message.getValue());
});*/
container.start();
return container;
}
完整堆栈跟踪:
org.springframework.data.redis.RedisSystemException: Error in execution; nested exception is io.lettuce.core.RedisCommandExecutionException: NOGROUP No such key 'event-stream' or consumer group 'my-group' in XREADGROUP with GROUP option
at org.springframework.data.redis.connection.lettuce.LettuceExceptionConverter.convert(LettuceExceptionConverter.java:54) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
at org.springframework.data.redis.connection.lettuce.LettuceExceptionConverter.convert(LettuceExceptionConverter.java:52) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
at org.springframework.data.redis.connection.lettuce.LettuceExceptionConverter.convert(LettuceExceptionConverter.java:41) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
at org.springframework.data.redis.PassThroughExceptionTranslationStrategy.translate(PassThroughExceptionTranslationStrategy.java:44) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
at org.springframework.data.redis.FallbackExceptionTranslationStrategy.translate(FallbackExceptionTranslationStrategy.java:42) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
at org.springframework.data.redis.connection.lettuce.LettuceConnection.convertLettuceAccessException(LettuceConnection.java:270) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
at org.springframework.data.redis.connection.lettuce.LettuceStreamCommands.convertLettuceAccessException(LettuceStreamCommands.java:471) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
at org.springframework.data.redis.connection.lettuce.LettuceStreamCommands.xReadGroup(LettuceStreamCommands.java:361) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
at org.springframework.data.redis.connection.DefaultedRedisConnection.xReadGroup(DefaultedRedisConnection.java:529) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
at org.springframework.data.redis.core.DefaultStreamOperations.inRedis(DefaultStreamOperations.java:239) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
at org.springframework.data.redis.core.DefaultStreamOperations$RecordDeserializingRedisCallback.doInRedis(DefaultStreamOperations.java:305) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
at org.springframework.data.redis.core.DefaultStreamOperations$RecordDeserializingRedisCallback.doInRedis(DefaultStreamOperations.java:300) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:228) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:188) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
at org.springframework.data.redis.core.AbstractOperations.execute(AbstractOperations.java:96) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
at org.springframework.data.redis.core.DefaultStreamOperations.read(DefaultStreamOperations.java:234) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
at org.springframework.data.redis.stream.DefaultStreamMessageListenerContainer.lambda$getReadFunction(DefaultStreamMessageListenerContainer.java:236) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
at org.springframework.data.redis.stream.StreamPollTask.doLoop(StreamPollTask.java:138) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
at org.springframework.data.redis.stream.StreamPollTask.run(StreamPollTask.java:123) ~[spring-data-redis-2.2.6.RELEASE.jar:2.2.6.RELEASE]
at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: io.lettuce.core.RedisCommandExecutionException: NOGROUP No such key 'event-stream' or consumer group 'my-group' in XREADGROUP with GROUP option
at io.lettuce.core.ExceptionFactory.createExecutionException(ExceptionFactory.java:135) ~[lettuce-core-5.2.2.RELEASE.jar:5.2.2.RELEASE]
at io.lettuce.core.ExceptionFactory.createExecutionException(ExceptionFactory.java:108) ~[lettuce-core-5.2.2.RELEASE.jar:5.2.2.RELEASE]
at io.lettuce.core.protocol.AsyncCommand.completeResult(AsyncCommand.java:120) ~[lettuce-core-5.2.2.RELEASE.jar:5.2.2.RELEASE]
at io.lettuce.core.protocol.AsyncCommand.complete(AsyncCommand.java:111) ~[lettuce-core-5.2.2.RELEASE.jar:5.2.2.RELEASE]
at io.lettuce.core.protocol.CommandWrapper.complete(CommandWrapper.java:59) ~[lettuce-core-5.2.2.RELEASE.jar:5.2.2.RELEASE]
at io.lettuce.core.protocol.CommandHandler.complete(CommandHandler.java:654) ~[lettuce-core-5.2.2.RELEASE.jar:5.2.2.RELEASE]
at io.lettuce.core.protocol.CommandHandler.decode(CommandHandler.java:614) ~[lettuce-core-5.2.2.RELEASE.jar:5.2.2.RELEASE]
at io.lettuce.core.protocol.CommandHandler.channelRead(CommandHandler.java:565) ~[lettuce-core-5.2.2.RELEASE.jar:5.2.2.RELEASE]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final]
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) ~[netty-transport-4.1.48.Final.jar:4.1.48.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.48.Final.jar:4.1.48.Final]
at io.netty.util.internal.ThreadExecutorMap.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.48.Final.jar:4.1.48.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.48.Final.jar:4.1.48.Final]
... 1 common frames omitted
回答我自己的问题。似乎您确实需要首先显式创建流和组,即使文档中的任何地方都没有提到。尽管除了向其发布消息之外,确实应该有更好的方法来初始化空流。
private void createConsumerGroup(String key, String group, RedisTemplate redisTemplate) {
try {
//redisTemplate.opsForStream().createGroup("my-stream", ReadOffset.from("0-0"), "my-group");
redisTemplate.opsForStream().createGroup("event-stream", "my-group-2");
} catch (RedisSystemException e) {
if (e.getRootCause().getClass().equals(RedisBusyException.class)) {
log.info("STREAM - Redis group already exists, skipping Redis group creation: my-group-2");
} else if (e.getRootCause().getClass().equals(RedisCommandExecutionException.class)) {
log.info("STREAM - Stream does not yet exist, creating empty stream: event-stream");
// TODO: There has to be a better way to create a stream than this!?
redisTemplate.opsForStream().add("event-stream", Collections.singletonMap("", ""));
redisTemplate.opsForStream().createGroup("event-stream", "my-group-2");
} else throw e;
}
}
编辑:正如@anstue 在下面的评论中提到的,spring-data-redis 2.3.1+ 现在会在调用 createGroup 时自动创建不存在的流。但是,如果该组已经存在,它将抛出 RedisSystemBusyException。因此,我正在使用我当前使用的解决方案更新答案,确保捕获此异常。
public class EventStreamUtils {
public static void createConsumerGroup(String key, String group, RedisTemplate redisTemplate) {
try {
// ReadOffset.from("0-0") will start reading stream from the very beginning. Otherwise,
// it will pick up at the point in the stream where the new group was created.
//redisTemplate.opsForStream().createGroup(key, ReadOffset.from("0-0"), group);
redisTemplate.opsForStream().createGroup(key, group);
} catch (RedisSystemException e) {
var cause = e.getRootCause();
if (cause != null && RedisBusyException.class.equals(cause.getClass())) {
log.info("STREAM - Redis group already exists, skipping Redis group creation: {}", group);
} else throw e;
}
}
}
我升级到 spring-data-redis 2.4.6
(我相信这是撰写本文时最新的稳定版本),但在使用创建空流组时仍然出现异常opsForStream().createGroup()
.
我想出的解决方案是直接使用 RedisStreamCommand
如下(从 OP 原始答案的 try-catch
继续):
try {
redisTemplate.getConnectionFactory().getConnection().xGroupCreate(
"key".getBytes(),
"group",
ReadOffset.from("0-0"),
true // this is important. It's to execute MKSTREAM command from redis
// only available from 2.3.0.RELEASE and above
);
} catch (RedisSystemException e) {
// your exception handling
// getConnection() can also throw NullPointerException
}
我最初在我的 spring 启动项目中使用了 kotlin,因此代码可能需要一些调整。我不知道使用该方法的安全性如何,但目前这是我知道的唯一方法。