Apache Flink 与 Elasticsearch 集成
Apache Flink integration with Elasticsearch
我正在尝试将 Flink 与 Elasticsearch 2.1.1 集成,我正在使用 maven 依赖项
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch2_2.10</artifactId>
<version>1.1-SNAPSHOT</version>
</dependency>
这里是 Java 我从 Kafka 队列中读取事件的代码(工作正常)但是不知何故事件没有发布在 Elasticsearch 中并且也没有错误,在下面如果我更改与 ElasticSearch 的端口、主机名、集群名称或索引名称相关的任何设置,我会立即看到一个错误,但目前它没有显示任何错误,也没有在 ElasticSearch 中创建任何新文档
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// parse user parameters
ParameterTool parameterTool = ParameterTool.fromArgs(args);
DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer082<>(parameterTool.getRequired("topic"), new SimpleStringSchema(), parameterTool.getProperties()));
messageStream.print();
Map<String, String> config = new HashMap<>();
config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_INTERVAL_MS, "1");
config.put("cluster.name", "FlinkDemo");
List<InetSocketAddress> transports = new ArrayList<>();
transports.add(new InetSocketAddress(InetAddress.getByName("localhost"), 9300));
messageStream.addSink(new ElasticsearchSink<String>(config, transports, new TestElasticsearchSinkFunction()));
env.execute();
}
private static class TestElasticsearchSinkFunction implements ElasticsearchSinkFunction<String> {
private static final long serialVersionUID = 1L;
public IndexRequest createIndexRequest(String element) {
Map<String, Object> json = new HashMap<>();
json.put("data", element);
return Requests.indexRequest()
.index("flink").id("hash"+element).source(json);
}
@Override
public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
indexer.add(createIndexRequest(element));
}
}
我确实 运行 它在本地机器上并且也在调试,但是,我唯一缺少的是正确配置日志记录,因为大多数弹性问题都在 "log.warn" 语句中描述.问题是 elasticsearch-2.2.1 客户端 API 中 "BulkRequestHandler.java" 内部的异常引发了错误 -"org.elasticsearch.action.ActionRequestValidationException: Validation Failed: 1: type is missing;" 因为我创建了索引但不是我觉得很奇怪的类型因为它应该主要关注索引并默认创建类型。
我找到了一个非常good example的Flink & Elasticsearch Connector
第一个 Maven 依赖项:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch2_2.10</artifactId>
<version>1.1-SNAPSHOT</version>
</dependency>
第二个例子Java代码
public static void writeElastic(DataStream<String> input) {
Map<String, String> config = new HashMap<>();
// This instructs the sink to emit after every element, otherwise they would be buffered
config.put("bulk.flush.max.actions", "1");
config.put("cluster.name", "es_keira");
try {
// Add elasticsearch hosts on startup
List<InetSocketAddress> transports = new ArrayList<>();
transports.add(new InetSocketAddress("127.0.0.1", 9300)); // port is 9300 not 9200 for ES TransportClient
ElasticsearchSinkFunction<String> indexLog = new ElasticsearchSinkFunction<String>() {
public IndexRequest createIndexRequest(String element) {
String[] logContent = element.trim().split("\t");
Map<String, String> esJson = new HashMap<>();
esJson.put("IP", logContent[0]);
esJson.put("info", logContent[1]);
return Requests
.indexRequest()
.index("viper-test")
.type("viper-log")
.source(esJson);
}
@Override
public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
indexer.add(createIndexRequest(element));
}
};
ElasticsearchSink esSink = new ElasticsearchSink(config, transports, indexLog);
input.addSink(esSink);
} catch (Exception e) {
System.out.println(e);
}
}
我正在尝试将 Flink 与 Elasticsearch 2.1.1 集成,我正在使用 maven 依赖项
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch2_2.10</artifactId>
<version>1.1-SNAPSHOT</version>
</dependency>
这里是 Java 我从 Kafka 队列中读取事件的代码(工作正常)但是不知何故事件没有发布在 Elasticsearch 中并且也没有错误,在下面如果我更改与 ElasticSearch 的端口、主机名、集群名称或索引名称相关的任何设置,我会立即看到一个错误,但目前它没有显示任何错误,也没有在 ElasticSearch 中创建任何新文档
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// parse user parameters
ParameterTool parameterTool = ParameterTool.fromArgs(args);
DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer082<>(parameterTool.getRequired("topic"), new SimpleStringSchema(), parameterTool.getProperties()));
messageStream.print();
Map<String, String> config = new HashMap<>();
config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_INTERVAL_MS, "1");
config.put("cluster.name", "FlinkDemo");
List<InetSocketAddress> transports = new ArrayList<>();
transports.add(new InetSocketAddress(InetAddress.getByName("localhost"), 9300));
messageStream.addSink(new ElasticsearchSink<String>(config, transports, new TestElasticsearchSinkFunction()));
env.execute();
}
private static class TestElasticsearchSinkFunction implements ElasticsearchSinkFunction<String> {
private static final long serialVersionUID = 1L;
public IndexRequest createIndexRequest(String element) {
Map<String, Object> json = new HashMap<>();
json.put("data", element);
return Requests.indexRequest()
.index("flink").id("hash"+element).source(json);
}
@Override
public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
indexer.add(createIndexRequest(element));
}
}
我确实 运行 它在本地机器上并且也在调试,但是,我唯一缺少的是正确配置日志记录,因为大多数弹性问题都在 "log.warn" 语句中描述.问题是 elasticsearch-2.2.1 客户端 API 中 "BulkRequestHandler.java" 内部的异常引发了错误 -"org.elasticsearch.action.ActionRequestValidationException: Validation Failed: 1: type is missing;" 因为我创建了索引但不是我觉得很奇怪的类型因为它应该主要关注索引并默认创建类型。
我找到了一个非常good example的Flink & Elasticsearch Connector
第一个 Maven 依赖项:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch2_2.10</artifactId>
<version>1.1-SNAPSHOT</version>
</dependency>
第二个例子Java代码
public static void writeElastic(DataStream<String> input) {
Map<String, String> config = new HashMap<>();
// This instructs the sink to emit after every element, otherwise they would be buffered
config.put("bulk.flush.max.actions", "1");
config.put("cluster.name", "es_keira");
try {
// Add elasticsearch hosts on startup
List<InetSocketAddress> transports = new ArrayList<>();
transports.add(new InetSocketAddress("127.0.0.1", 9300)); // port is 9300 not 9200 for ES TransportClient
ElasticsearchSinkFunction<String> indexLog = new ElasticsearchSinkFunction<String>() {
public IndexRequest createIndexRequest(String element) {
String[] logContent = element.trim().split("\t");
Map<String, String> esJson = new HashMap<>();
esJson.put("IP", logContent[0]);
esJson.put("info", logContent[1]);
return Requests
.indexRequest()
.index("viper-test")
.type("viper-log")
.source(esJson);
}
@Override
public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
indexer.add(createIndexRequest(element));
}
};
ElasticsearchSink esSink = new ElasticsearchSink(config, transports, indexLog);
input.addSink(esSink);
} catch (Exception e) {
System.out.println(e);
}
}