如何使用 logstash 将 Mysql 数据迁移到 elasticsearch

How to migrate Mysql data to elasticsearch using logstash

我需要简要说明如何使用 logstash 将 MySQL 数据转换为 Elastic Search。 任何人都可以解释一下这个的逐步过程

您可以使用 logstash 的 jdbc input plugin 来做到这一点。

Here 是一个配置示例。

这是一个宽泛的问题,不知道你对MySQLES了解多少。假设您有一个 table user。您可以简单地将其转储为 csv 并将其加载到您的 ES 就可以了。但是如果你有一个动态数据,比如 MySQL 就像一个管道,你需要写一个 Script 来做那些事情。无论如何,在询问 How 之前,您可以查看下面的内容 link 来建立您的基础知识。

How to dump mysql?

How to load data to ES

此外,由于您可能想知道如何将 CSV 转换为 json 文件,这是 ES 最容易理解的套件。

How to covert CSV to JSON

让我为您提供一个高级指令集。

  • 安装 Logstash 和 Elasticsearch。
  • 在 Logstash bin 文件夹中复制 jar ojdbc7.jar.
  • 对于 logstash,创建一个配置文件例如:config.yml
# 
input {
    # Get the data from database, configure fields to get data incrementally
    jdbc {
        jdbc_driver_library => "./ojdbc7.jar"
        jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
        jdbc_connection_string => "jdbc:oracle:thin:@db:1521:instance"
        jdbc_user => "user"
        jdbc_password => "pwd"

        id => "some_id"

        jdbc_validate_connection => true
        jdbc_validation_timeout => 1800
        connection_retry_attempts => 10
        connection_retry_attempts_wait_time => 10

        #fetch the db logs using logid
        statement => "select * from customer.table where logid > :sql_last_value order by logid asc"

        #limit how many results are pre-fetched at a time from the cursor into the client’s cache before retrieving more results from the result-set
        jdbc_fetch_size => 500
        jdbc_default_timezone => "America/New_York"

        use_column_value => true
        tracking_column => "logid"
        tracking_column_type => "numeric"
        record_last_run => true

        schedule => "*/2 * * * *"

        type => "log.customer.table"
        add_field => {"source" => "customer.table"}
        add_field => {"tags" => "customer.table" } 
        add_field => {"logLevel" => "ERROR" }

        last_run_metadata_path => "last_run_metadata_path_table.txt"
    }

}

# Massage the data to store in index
filter {
    if [type] == 'log.customer.table' {
        #assign values from db column to custom fields of index
        ruby{
            code => "event.set( 'errorid', event.get('ssoerrorid') );
                    event.set( 'msg', event.get('errormessage') );
                    event.set( 'logTimeStamp', event.get('date_created'));
                    event.set( '@timestamp', event.get('date_created'));
                    "
        }
        #remove the db columns that were mapped to custom fields of index
        mutate {
            remove_field => ["ssoerrorid","errormessage","date_created" ]
        }
    }#end of [type] == 'log.customer.table' 
} #end of filter

# Insert into index
output {
    if [type] == 'log.customer.table' {
        amazon_es {
            hosts => ["vpc-xxx-es-yyyyyyyyyyyy.us-east-1.es.amazonaws.com"]
            region => "us-east-1"
            aws_access_key_id => '<access key>'
            aws_secret_access_key => '<secret password>'
            index => "production-logs-table-%{+YYYY.MM.dd}"
        }
    }
}
  • 转到 bin,运行 作为 logstash -f config.yml