Kafka Schema registry - 在容器中通告动态端口

Kafka Schema registry - Advertise dynamic port in container

据我了解,在使用 Schema registry confluent docker 映像(不是 Zookeeper 弹性,而是 kafka 弹性)时,我们可以使用 SCHEMA_REGISTRY_HOST_NAME 将容器的主机名通告给 Kafka环境变量。

如果我尝试使用 SCHEMA_REGISTRY_PORT, 我收到以下错误:

PORT is deprecated. Please use SCHEMA_REGISTRY_LISTENERS instead.

为什么不能设置关联端口? 我可以获得这个动态端口(主机动态映射到我的容器的动态端口)但是我应该如何与 Kafka 共享它?

编辑 1:

要添加更多详细信息,这里是架构注册表协调员进行的分配示例:

[2019-10-25 11:55:47,813] INFO Finished rebalance with master election result: Assignment{version=1, error=0, master='sr-1-7a9a403a-63cc-4fed-b548-10ea440863d5', masterIdentity=version=1,host=10.135.124.179,port=29932,scheme=http,masterEligibility=true} (io.confluent.kafka.schemaregistry.masterelector.kafka.KafkaGroupMasterElector)

如您所见,有一个 hostname 和一个 port(粗体)。 hostname 来自变量 SCHEMA_REGISTRY_HOST_NAME 但根据代码,port 来自这里:

/**
   * A Schema Registry instance's identity is in part the port it listens on. Currently the port can
   * either be configured via the deprecated `port` configuration, or via the `listeners`
   * configuration.
   *
   * <p>This method uses `Application.parseListeners()` from `rest-utils` to get a list of
   * listeners, and returns the port of the first listener to be used for the instance's identity.
   *
   * <p></p>In theory, any port from any listener would be sufficient. Choosing the first, instead
   * of say the last, is arbitrary.
   */
  // TODO: once RestConfig.PORT_CONFIG is deprecated, remove the port parameter.
  static SchemeAndPort getSchemeAndPortForIdentity(int port, List<String> configuredListeners,
                                                   String requestedScheme)

( https://github.com/confluentinc/schema-registry/blob/5af0ca3be1138fe483d0f90f4ccfd4f02f158334/core/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaSchemaRegistry.java#L211-L223 )

所以通告端口的唯一方法是使用侦听器设置它,这可能很烦人(但作为一种解决方法仍然可行)。

listeners 采用绑定地址 和端口 ,正如您所发现的那样,广告中的地址是 the port of the first listener to be used for the instance's identity

Comma-separated list of listeners that listen for API requests over either HTTP or HTTPS. If a listener uses HTTPS, the appropriate SSL configuration parameters need to be set as well.

Schema Registry identities are stored in ZooKeeper and are made up of a hostname and port. If multiple listeners are configured, the first listener's port is used for its identity.

Type: list
Default: "http://0.0.0.0:8081"

https://docs.confluent.io/current/schema-registry/installation/config.html#listeners

好的,我制作了一个有效的 Nomad 模板文件,也许这会有所帮助。解决方法是在容器内使用 nomad 用于端口映射的相同动态端口,如下所示:

job "schema-registry" {
  datacenters = ["YOURDC"]
  type = "service"

  # Update strategy
  update {
    # Max instances (task groups) to be updated in parallel.
    max_parallel = 1

    # Once an allocation finishes, wait min_healthy_time until starting next one.
    min_healthy_time = "10s"

    # If allocation not healthy after healthy_deadline, mark as unhealthy.
    healthy_deadline = "3m"

    # If allocation unhealthy after progress_deadline, fail the deployment.
    progress_deadline = "10m"

    # Should auto revert to previous version if the deployment fails?
    auto_revert = false

    # Create n canaries.
    canary = 0
  }

  spread {
    attribute = "${attr.unique.hostname}"
    weight    = 100
  }

  migrate {
    # As in update stanza.
    max_parallel = 1

    # "checks" for health checks or "task_state" for task state.
    health_check = "checks"

    # As in update stanza.
    min_healthy_time = "10s"

    # As in update stanza.
    healthy_deadline = "5m"
  }

  # The "group" stanza defines a series of tasks that should be co-located on
  # the same Nomad client.
  group "schema-registry-group" {

    # count of instances of the "schema-registry" task group
    count = 3

    restart {
      # The number of attempts to run the job within the specified interval.
      attempts = 2
      interval = "30m"

      # The "delay" parameter specifies the duration to wait before restarting
      # a task after it has failed.
      delay = "5s"

      # What if after a few restats within `interval` the `attempts` limit is meet?
      # - "delay" mode delays the next restart until the next interval,
      # - "fail" mode does not restart the task.
      mode = "fail"
    }

    # Use ephemeral disk shared between tasks instead of HDD
    ephemeral_disk {
      size = 300
    }

    task "schema-registry" {
      driver = "docker"

      # Driver (docker) settings.
      config {
        image = "confluentinc/cp-schema-registry"

        # We cannot use this as schema registry will use the LISTENERS config to advertize its port..
        #port_map {
        #  schema_registry_port = 8081
        #}
      }

      # Time to shut down after SIGINT.
      # Caution! If want to set higher than 30s,
      # make sure max_kill_timeout allows that.
      kill_timeout = "30s"
      kill_signal = "SIGINT"
      shutdown_delay = "2s"

      env {
        SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS="YOURBOOTSTRAPSERVER"
        SCHEMA_REGISTRY_HOST_NAME="${NOMAD_IP_schema_registry_port}"
        SCHEMA_REGISTRY_LISTENERS="http://0.0.0.0:${NOMAD_HOST_PORT_schema_registry_port}"
      }

      # Max required storage = (max_files * 2) * max_file_size
      # *2 because there's a log file for stderr and stdout
      logs {
         max_files     = 10
         max_file_size = 15
       }

      # Required resources.
      resources {
        cpu    = 500 # 500 MHz
        memory = 512
        network {
          port "schema_registry_port" {} # defined in port_map
        }
      }

      service {
        name = "schema-registry"
        tags = ["schema-registry"]
        port = "schema_registry_port"
        check {
          name     = "alive"
          type     = "tcp"
          interval = "60s"
          timeout  = "4s"
        }
      }
    }
  }
}