将日志从 Kafka 发送到 ClickHouse 时获取空行

Get empty rows when sending log from Kafka to ClickHouse

我正在尝试使用 filebeat 将数据从 kafka 放入 clickhouse,我的配置如下所示

filebeat 配置文件

filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/log/nginx/access.log

output.kafka:
  # specifying filebeat to take timestamp and message fields, other wise it
  # take the lines as json and publish to kafka
  codec.format:
    string: '%{[@timestamp]} %{[message]}'

  # kafka
  # publishing to 'log' topic
  hosts: ["kafka:9092"]
  topic: 'myfirst'
  partition.round_robin:
    reachable_only: false
  required_acks: 1
  compression: gzip
  max_message_bytes: 1000000

在 Kafka 中我正在获取我的登录主题并且一切都很好,数据插入到 kafka 主题的部分是这样的

2021-01-01T21:51:25.225Z {"remote_addr": "192.168.222.1","remote_user": "-","time_local":  "01/Jan/2021:21:51:17 +0000","request":     "GET / HTTP/1.1","status":      "304","body_bytes_sent": "0","http_referer": "-","http_user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Safari/537.36"}

然后我创建了 clickhouse 表和 MATERIALIZED

CREATE TABLE accesslog (
...
    ) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka:9092',


但是clickhouse的查询输出是这样的,没有数据!为什么?

┌─remote_addr─┬─remote_user─┬─time_local─┬───────date─┬─request─┬─status─┬─body_bytes_sent─┬─http_referer─┬─http_user_agent─┐
│             │             │            │ 0000-00-00 │         │      0 │               0 │              │                 │
│             │             │            │ 0000-00-00 │         │      0 │               0 │              │                 │
│             │             │            │ 0000-00-00 │         │      0 │               0 │              │                 │
└─────────────┴─────────────┴────────────┴────────────┴─────────┴────────┴─────────────────┴──────────────┴─────────────────┘

看来问题出在错误的 Kafka 代理地址上。应该使用的不是外部地址kafka:9092而是内部地址kafka:19092:

CREATE TABLE accesslog (
..
  ) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka:19092', ..


重现步骤:

卡夫卡端:

# run shell in Kafka container
docker exec -it kafka bash

# create topic
kafka-topics --create --topic myfirst --partitions 1 --replication-factor 1 --bootstrap-server kafka:19092

# check topic
# kafka-topics --describe --topic myfirst  --bootstrap-server kafka:19092

# add events to the topic
kafka-console-producer --topic myfirst --broker-list kafka:19092
# event body: {"remote_addr": "192.168.222.1","remote_user": "-","time_local":  "01/Jan/2021:21:51:17 +0000","request":     "GET / HTTP/1.1","status":      "304","body_bytes_sent": "0","http_referer": "-","http_user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Safari/537.36"}
..

ClickHouse 端:

SELECT *
FROM accesslog

/*
┌─remote_addr───┬─remote_user─┬─time_local─────────────────┬─request────────┬─status─┬─body_bytes_sent─┬─http_referer─┬─http_user_agent────────────────────────────────────────────────────────────────────────────────────────────────────┐
│ 192.168.222.1 │ -           │ 01/Jan/2021:21:51:17 +0000 │ GET / HTTP/1.1 │    304 │               0 │ -            │ Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Safari/537.36 │
..
*/

摘自docker-compose.yml

..
  kafka:
    image: confluentinc/cp-kafka:5.2.2
    container_name: kafka
    restart: unless-stopped
    hostname: kafka
    depends_on:
      - zookeeper
    environment:
      KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-x.x.x.x}:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
      KAFKA_BROKER_ID: 1
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    ports:
      - 9092:9092
    networks:
      - net1
..