批量上传日志消息到本地Elasticsearch

Bulk upload log messages to local Elasticsearch

我们在云 (IBM Bluemix) 中有一些外部应用程序,它们将其应用程序系统日志记录在内部使用 ELK 堆栈的 bluemix logmet 服务中。

现在,我们想定期从云端下载日志并将其上传到本地 Elastic/Kibana 实例。这是因为如果我们想通过 Kibana 搜索相同的内容,将日志存储在云服务中会产生成本和额外成本。本地弹性实例可以 delete/flush 我们不需要的旧日志。

下载的日志将如下所示

{"instance_id_str":"0","source_id_str":"APP/PROC/WEB","app_name_str":"ABC","message":"Hello","type":"syslog","event_uuid":"474b78aa-6012-44f3-8692-09bd667c5822","origin_str":"rep","ALCH_TENANT_ID":"3213cd20-63cc-4592-b3ee-6a204769ce16","logmet_cluster":"topic3-elasticsearch_3","org_name_str":"123","@timestamp":"2017-09-29T02:30:15.598Z","message_type_str":"OUT","@version":"1","space_name_str":"prod","application_id_str":"3104b522-aba8-48e0-aef6-6291fc6f9250","ALCH_ACCOUNT_ID_str":"","org_id_str":"d728d5da-5346-4614-b092-e17be0f9b820","timestamp":"2017-09-29T02:30:15.598Z"}

{"instance_id_str":"0","source_id_str":"APP/PROC/WEB","app_name_str":"ABC","message":"EFG","type":"syslog","event_uuid":"d902dddb-afb7-4f55-b472-211f1d370837","origin_str":"rep","ALCH_TENANT_ID":"3213cd20-63cc-4592-b3ee-6a204769ce16","logmet_cluster":"topic3-elasticsearch_3","org_name_str":"123","@timestamp":"2017-09-29T02:30:28.636Z","message_type_str":"OUT","@version":"1","space_name_str":"prod","application_id_str":"dcd9f975-3be3-4451-a9db-6bed1d906ae8","ALCH_ACCOUNT_ID_str":"","org_id_str":"d728d5da-5346-4614-b092-e17be0f9b820","timestamp":"2017-09-29T02:30:28.636Z"}

我已经在我们本地的 elasticsearch 中创建了一个索引

curl -XPUT 'localhost:9200/commslog?pretty' -H 'Content-Type: application/json' -d'
{
    "settings" : {
        "number_of_shards" : 1
    },
    "mappings" : {
        "logs" : {
            "properties" : {
                "instance_id_str" : { "type" : "text" },
                "source_id_str" : { "type" : "text" },
                "app_name_str" : { "type" : "text" },
                "message" : { "type" : "text" },
                "type" : { "type" : "text" },
                "event_uuid" : { "type" : "text" },
                "ALCH_TENANT_ID" : { "type" : "text" },
                "logmet_cluster" : { "type" : "text" },
                "org_name_str" : { "type" : "text" },
                "@timestamp" : { "type" : "date" },
                "message_type_str" : { "type" : "text" },
                "@version" : { "type" : "text" },
                "space_name_str" : { "type" : "text" },
                "application_id_str" : { "type" : "text" },
                "ALCH_ACCOUNT_ID_str" : { "type" : "text" },
                "org_id_str" : { "type" : "text" },
                "timestamp" : { "type" : "date" }
            }
        }
    }
}'

现在批量上传文件,使用命令

curl -XPOST -H 'Content-Type: application/x-ndjson' http://localhost:9200/commslog/logs/_bulk --data-binary '@commslogs.json'

以上命令抛出错误

格式错误的 action/metadata 行 [1],预计 START_OBJECT 或 END_OBJECT 但发现 [VALUE_STRING]

解决方法是按照

的批量上传规则

https://discuss.elastic.co/t/bulk-insert-file-having-many-json-entries-into-elasticsearch/46470/2

https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html

所以我通过在每一行之前添加操作手动更改了一些日志语句

{ "index" : { "_index" : "commslog", "_type" : "logs" } } 

这有效!!

另一种选择是调用 curl 命令,在路径中提供 _idex 和 _type

curl -XPOST -H 'Content-Type: application/x-ndjson' http://localhost:9200/commslog/logs/_bulk --data-binary '@commslogs.json'

但是如果没有操作,这也会引发同样的错误

问题是我们无法对获得的数千条日志记录执行此操作。是否有一个选项,一旦我们从 Bluemix 下载日志文件并上传文件而不添加操作。

注意 我们目前没有使用 logstash,但是

谢谢

正如@Alain Collins所说,你应该可以直接使用filebeat。

对于 logstash:

  • 应该可以使用 logstash,但与其使用 grok,不如使用 json codec/filter,这会 容易得多。
  • 您可以使用带有 logstash 的文件输入来处理许多文件并等待它完成(要知道它何时完成,请使用 file/stdout,可能使用点编解码器,并等待它停止写作)。
  • 您应该直接上传到 elasticsearch(使用 elasticsearch 输出),而不是仅仅使用 logstash 转换文件。

至于你的问题,我认为只使用一个小程序来添加缺少的操作行或使用 filebeat 会容易得多,除非你对 logstash 配置进行了足够的实验,以比程序更快地编写和配置 logstash在文档中各处添加一行。