Logstash 编解码器 - Avro 架构注册表:avro_schema_registry 由于未定义的局部变量或方法“esponse”而失败

Logstash Codec - Avro Schema Registry: avro_schema_registry fails due to undefined local variable or method `esponse'

我有一个 Logstash conf,它以 json 格式从 Kafka 主题读取,它使用 avro_schema_registry 将输出序列化为 avro。 这是配置文件:

input {
  kafka{
    group_id => "test_group"
    topics => ["logs_json"]
    bootstrap_servers => "server2:9094, server1:9094, server3:9094"
    codec => "json"
    consumer_threads => 1
  }
}

output {
  kafka {
    codec => avro_schema_registry {
      endpoint => "http://host_schema_registry:port"
      schema_id  => 1
    }
    value_serializer => "org.apache.kafka.common.serialization.ByteArraySerializer"
    bootstrap_servers => "server1:9094, server1:9094, server1:9094"
    topic_id => "logs_avro"
  }
}

但是我收到这个错误:

org.jruby.exceptions.NameError: (NameError) undefined local variable or method `esponse' for #<SchemaRegistry::Client:0x3c5ad39>
        at usr.share.logstash.vendor.bundle.jruby._dot_5_dot_0.gems.schema_registry_minus_0_dot_1_dot_0.lib.schema_registry.client.request(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/schema_registry-0.1.0/lib/schema_registry/cli
ent.rb:127) ~[?:?]
        at uri_3a_classloader_3a_.META_minus_INF.jruby_dot_home.lib.ruby.stdlib.net.http.start(uri:classloader:/META-INF/jruby.home/lib/ruby/stdlib/net/http.rb:914) ~[?:?]
        at uri_3a_classloader_3a_.META_minus_INF.jruby_dot_home.lib.ruby.stdlib.net.http.start(uri:classloader:/META-INF/jruby.home/lib/ruby/stdlib/net/http.rb:609) ~[?:?]
        at usr.share.logstash.vendor.bundle.jruby._dot_5_dot_0.gems.schema_registry_minus_0_dot_1_dot_0.lib.schema_registry.client.request(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/schema_registry-0.1.0/lib/schema_registry/cli
ent.rb:101) ~[?:?]
        at usr.share.logstash.vendor.bundle.jruby._dot_5_dot_0.gems.schema_registry_minus_0_dot_1_dot_0.lib.schema_registry.client.schema(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/schema_registry-0.1.0/lib/schema_registry/clie
nt.rb:40) ~[?:?]
        at usr.share.logstash.vendor.bundle.jruby._dot_5_dot_0.gems.logstash_minus_codec_minus_avro_schema_registry_minus_1_dot_1_dot_1.lib.logstash.codecs.avro_schema_registry.get_schema(/usr/share/logstash/vendor/bundle/jruby/2.5.0/g
ems/logstash-codec-avro_schema_registry-1.1.1/lib/logstash/codecs/avro_schema_registry.rb:158) ~[?:?]
        at usr.share.logstash.vendor.bundle.jruby._dot_5_dot_0.gems.logstash_minus_codec_minus_avro_schema_registry_minus_1_dot_1_dot_1.lib.logstash.codecs.avro_schema_registry.encode(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/
logstash-codec-avro_schema_registry-1.1.1/lib/logstash/codecs/avro_schema_registry.rb:246) ~[?:?]
        at usr.share.logstash.vendor.bundle.jruby._dot_5_dot_0.gems.logstash_minus_integration_minus_kafka_minus_10_dot_0_dot_0_minus_java.lib.logstash.outputs.kafka.multi_receive(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logs
tash-integration-kafka-10.0.0-java/lib/logstash/outputs/kafka.rb:219) ~[?:?]
        at org.jruby.RubyArray.each(org/jruby/RubyArray.java:1800) ~[jruby-complete-9.2.8.0.jar:?]
        at usr.share.logstash.vendor.bundle.jruby._dot_5_dot_0.gems.logstash_minus_integration_minus_kafka_minus_10_dot_0_dot_0_minus_java.lib.logstash.outputs.kafka.multi_receive(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logs
tash-integration-kafka-10.0.0-java/lib/logstash/outputs/kafka.rb:217) ~[?:?]
        at org.logstash.config.ir.compiler.OutputStrategyExt$AbstractOutputStrategyExt.multi_receive(org/logstash/config/ir/compiler/OutputStrategyExt.java:118) ~[logstash-core.jar:?]
        at org.logstash.config.ir.compiler.AbstractOutputDelegatorExt.multi_receive(org/logstash/config/ir/compiler/AbstractOutputDelegatorExt.java:101) ~[logstash-core.jar:?]
        at usr.share.logstash.logstash_minus_core.lib.logstash.java_pipeline.start_workers(/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:262) ~[?:?]
[2020-02-11T13:11:41,720][ERROR][org.logstash.execution.WorkerLoop][main] Exception in pipelineworker, the pipeline stopped processing new events, please check your filter configuration and restart Logstash.

那个编解码器(已)损坏

参考问题 - https://github.com/wvanbergen/schema_registry/issues/5


没有理由将 JSON 序列化为 Avro, 然后 无论如何都会插入到 Elasticsearch 中,因为 Elasticsearch 存储 JSON,但如果您确实想这样做那,我建议改用 Confluent 的 Elasticsearch Kafka 连接器。

如果您甚至没有使用 Elasticsearch,我认为您真的不应该使用 Logstash。

KSQL 支持您正在尝试做的事情 - https://www.confluent.io/stream-processing-cookbook/ksql-recipes/changing-data-serialization-format-json-avro/

调试更多代码后,我了解到内部服务器错误是由于 client.rb 中的一行在 GET 请求 header.

中设置 'Accept'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/schema_registry-0.1.0/lib/schema_registry/client.rb:112
        request['Accept'] = "application/vnd.schemaregistry.v1+json"

通过评论或将值更改为请求['Accept'] = "application/json",GET 请求成功通过。