[vertx.redis.client]没有等待消息的处理程序

[vertx.redis.client]No handler waiting for message

版本

vert.x core:3.5.0

vert.xredisclient:3.5.0

上下文

2018-06-02 17:40:55.981 错误 4933 --- [ntloop-thread-2] io.vertx.redis.impl.RedisConnection:没有等待消息的处理程序:14751915

2018-06-02 17:41:10.937 错误 4933 --- [ntloop-thread-2] io.vertx.redis.impl.RedisConnection:没有等待消息的处理程序:false

2018-06-02 17:41:10.947 错误 4933 --- [ntloop-thread-2] io.vertx.redis.impl.RedisConnection:没有等待消息的处理程序:false

2018-06-02 17:41:20.937 错误 4933 --- [ntloop-thread-2] io.vertx.redis.impl.RedisConnection:没有等待消息的处理程序:true

2018-06-02 17:41:30.937 错误 4933 --- [ntloop-thread-2] io.vertx.redis.impl.RedisConnection:没有等待消息的处理程序:true

2018-06-02 17:41:35.927 错误 4933 --- [ntloop-thread-2] io.vertx.redis.impl.RedisConnection:没有等待消息的处理程序:false

2018-06-02 17:41:40.937 错误 4933 --- [ntloop-thread-2] io.vertx.redis.impl.RedisConnection:没有等待消息的处理程序:true

2018-06-02 17:41:50.948 错误 4933 --- [ntloop-thread-2] io.vertx.redis.impl.RedisConnection:没有等待消息的处理程序:true

查看 io.vertx.redis.impl.RedisConnection 的代码后,我找到了原因:

  1. 服务器启动时,创建redis连接,运行即可。

  2. 长时间(eg.days)后,连接状态为DISCONNECTED。 Vert.x 当向redis 服务器发送命令时,redis 客户端重新连接redis 服务器:


      void send(final Command command) {

    // start the handshake if not connected
    if (state.get() == State.DISCONNECTED) {
      connect();
    }

  1. connect() 调用 clearQueue()

  2. clearQueue(): 等待命令队列将为空。

  3. 当从具有新连接的 redis 服务器接收到消息时调用 handleReply()。

注意:此处出现错误日志(倒数第三行)。


      private void handleReply(Reply reply) {
        final Command cmd = waiting.poll();

        if (cmd != null) {
          switch (reply.type()) {
            case '-': // Error
              cmd.handle(Future.failedFuture(reply.asType(String.class)));
              return;
            case '+':   // Status
              switch (cmd.responseTransform()) {
                case ARRAY:
                  cmd.handle(Future.succeededFuture(new JsonArray().add(reply.asType(String.class))));
                  break;
                default:
                  cmd.handle(Future.succeededFuture(reply.asType(cmd.returnType())));
                  break;
              }
              return;
            case '$':  // Bulk
              switch (cmd.responseTransform()) {
                case ARRAY:
                  cmd.handle(Future.succeededFuture(new JsonArray().add(reply.asType(String.class, cmd.encoding()))));
                  break;
                case INFO:
                  String info = reply.asType(String.class, cmd.encoding());

                  if (info == null) {
                    cmd.handle(Future.succeededFuture(null));
                  } else {
                    String lines[] = info.split("\r?\n");
                    JsonObject value = new JsonObject();

                    JsonObject section = null;
                    for (String line : lines) {
                      if (line.length() == 0) {
                        // end of section
                        section = null;
                        continue;
                      }

                      if (line.charAt(0) == '#') {
                        // begin section
                        section = new JsonObject();
                        // create a sub key with the section name
                        value.put(line.substring(2).toLowerCase(), section);
                      } else {
                        // entry in section
                        int split = line.indexOf(':');
                        if (section == null) {
                          value.put(line.substring(0, split), line.substring(split + 1));
                        } else {
                          section.put(line.substring(0, split), line.substring(split + 1));
                        }
                      }
                    }
                    cmd.handle(Future.succeededFuture(value));
                  }
                  break;
                default:
                  cmd.handle(Future.succeededFuture(reply.asType(cmd.returnType(), cmd.encoding())));
                  break;
              }
              return;
            case '*': // Multi
              switch (cmd.responseTransform()) {
                case HASH:
                  cmd.handle(Future.succeededFuture(reply.asType(JsonObject.class, cmd.encoding())));
                  break;
                default:
                  cmd.handle(Future.succeededFuture(reply.asType(JsonArray.class, cmd.encoding())));
                  break;
              }
              return;
            case ':':   // Integer
              switch (cmd.responseTransform()) {
                case ARRAY:
                  cmd.handle(Future.succeededFuture(new JsonArray().add(reply.asType(Long.class))));
                  break;
                default:
                  cmd.handle(Future.succeededFuture(reply.asType(cmd.returnType())));
                  break;
              }
              return;
            default:
              cmd.handle(Future.failedFuture("Unknown message type"));
          }
        } else {
          // **An error log appears here**
          log.error("No handler waiting for message: " + reply.asType(String.class));
        }
      }

问题:

是不是bug? 如果不是错误,post 命令将在重新连接 redis 服务器时被丢弃。

处​​理这种情况有什么好的方法吗?

问题已解决。出现上述问题的原因是连接已经被重用,没有关闭。解决办法是: `

RedisClient redisClient = RedisClient.create(this.vertx, redisOptions);
//do some thing; 
redisClient.close(h-{})...

` 对于每个会话。