配置 Logstash 以从套接字接收数据并将其插入到 java 中的 Elasticsearch

Configure Logstash to receive data from socket and insert it into Elasticsearch in java

我想直接向ElasticSearch 注入数据来进行一些性能测试。我的第一个想法是为每个文档创建 JSON 文件并将文件导入 ElasticSearch,但这会花费太长时间。我测试了 110K 个文件,创建这些文件只花了 18 分钟,我需要 55M 个文档——这是我测试的 500 倍。快速计算:需要 150 小时,或 6.25 天,太长了。 第二个选项是当我有 JSON 时停止并使用 Logstash 将字符串注入 ElasticSearch。但是,我得到一个例外:

2019-12-16 13:49:27,240 | Timer-0 | ERROR | search-injector | c.n.es.injector.output.SocketOutput | SocketOutput::output: 
java.net.SocketException: Software caused connection abort: socket write error
at java.net.SocketOutputStream.socketWrite0(Native Method)
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111)
at java.net.SocketOutputStream.write(SocketOutputStream.java:134)
at java.io.DataOutputStream.writeBytes(DataOutputStream.java:276)
at com.beniregev.es.injector.output.SocketOutput.output(SocketOutput.java:39)
at com.beniregev.es.injector.policies.UpdateOutputHandlers.run(UpdateOutputHandlers.java:60)
at java.util.TimerThread.mainLoop(Timer.java:555)
at java.util.TimerThread.run(Timer.java:505)

ElasticSearch 运行 在 localhost 端口 9200 上,Logstash 运行 在 localhost 端口 9600 上。 我的 SocketOutput.java

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;

import java.io.DataOutputStream;
import java.io.IOException;
import java.net.Socket;

public class SocketOutput implements OutputHandler {

    private static final Logger log = LoggerFactory.getLogger(SocketOutput.class);

    public static final String CLI_OPTION = "socket";

    @Value("${socket.hostname}")
    private String hostname;
    @Value("${socket.port}")
    private int port;
    Socket clientSocket;

    public boolean open() {
        try {
            clientSocket = new Socket(hostname, port);
        } catch (IOException ioe) {
            log.error("", ioe);
            return false;
        }
        return true;
    }

    @Override
    public void output(String data) {

        DataOutputStream outToServer = null;
        try {
            outToServer = new DataOutputStream(clientSocket.getOutputStream());
            outToServer.writeBytes( data );
        } catch (IOException ioe) {
            log.error("", ioe);
        }
    }

}

logstash-simple.conf

# Simple Logstash configuration for creating a simple
# Stdin -> Logstash -> Elasticsearch pipeline.
input { stdin { } }

output {
  elasticsearch { 
    hosts => ["http://localhost:9200"]
    index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
  }
  stdout { codec => rubydebug }
}

我是 运行 Logstash 使用此命令:bin/logstash.bat -f config/logstash-simple.conf

JSON 字符串已创建且有效,Socket 获得正确的参数值(主机名="localhost" 和端口=9600)。 我将不胜感激任何帮助。

解决方案有多个层次,首先是使用 Kibana 管理器在 Elastic Search 中创建和使用正确的索引,然后是正确配置 Logstash,最后是使用 JSON字符串而不是文件。

创建索引不是本题的问题,就不展开了

Logstash.conf(配置文件):

#################################################
# Stdin -> Logstash -> Elastic Search pipeline.
#################################################
input {
    stdin{}
    tcp{
        host => "localhost"
        port => 9600
        codec => json
    }
}

filter 
{
   mutate 
    {
        remove_field => ["host", "@version", "@timestamp", "port", "tags", "level", "logger_name", "themessage", "mensage", "spring.application.name", "level_value", "thread_name"]
    }
}

output {
    stdout{ codec => rubydebug }

     elasticsearch{
        hosts => ["localhost:9200"]
        index => ["my-index"]
     }
}

注意:index 中输入您创建并将使用的索引的名称。

运行 Logstash 加上配置文件:bin/logstash.bat -f config/logstash-simple.conf

将字符串输出到 Java 中的 Logstash:

package com.beniregev.injector.output;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;

import java.io.DataOutputStream;
import java.io.IOException;
import java.net.Socket;

public class SocketOutput implements OutputHandler {

    private static final Logger log = LoggerFactory.getLogger(SocketOutput.class);

    public static final String CLI_OPTION = "socket";
    private int outputIndex = 0;

    @Value("${socket.hostname}")
    private String hostname;
    @Value("${socket.port}")
    private int port;
    Socket clientSocket;

    public boolean open() {
        try {
            clientSocket = new Socket(hostname, port);
        } catch (IOException ioe) {
            log.error("", ioe);
            return false;
        }
        return true;
    }

    @Override
    public void output(String data) {
        DataOutputStream outToServer = null;
        try {
            outToServer = new DataOutputStream(clientSocket.getOutputStream());
            outToServer.writeBytes( data );
            outputIndex++;
        } catch (IOException ioe) {
            log.error("", ioe);
        }
        System.out.println("Wrote segment " + outputIndex + " to socket");

    }

}

hostport是到Logstash,默认port=9600.

这解决了我的问题。