Apache Flink (v1.6.0) 验证 Elasticsearch Sink (v6.4)
Apache Flink (v1.6.0) authenticate Elasticsearch Sink (v6.4)
我正在使用 Apache Flink v1.6.0,我正在尝试写入托管在 Elastic Cloud 中的 Elasticsearch v6.4.0。我在向 Elastic Cloud 集群进行身份验证时遇到问题。
我已经能够使用以下代码让 Flink 写入本地 Elasticsearch v6.4.0 节点,该节点没有加密:
/*
Elasticsearch Configuration
*/
List<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
// use a ElasticsearchSink.Builder to create an ElasticsearchSink
ElasticsearchSink.Builder<ObjectNode> esSinkBuilder = new ElasticsearchSink.Builder<>(
httpHosts,
new ElasticsearchSinkFunction<ObjectNode>() {
private IndexRequest createIndexRequest(ObjectNode payload) {
// remove the value node so the fields are at the base of the json payload
JsonNode jsonOutput = payload.get("value");
return Requests.indexRequest()
.index("raw-payload")
.type("payload")
.source(jsonOutput.toString(), XContentType.JSON);
}
@Override
public void process(ObjectNode payload, RuntimeContext ctx, RequestIndexer indexer) {
indexer.add(createIndexRequest(payload));
}
}
);
// set number of events to be seen before writing to Elasticsearch
esSinkBuilder.setBulkFlushMaxActions(1);
// finally, build and add the sink to the job's pipeline
stream.addSink(esSinkBuilder.build());
然而,当我尝试将身份验证添加到代码库中时,如相应 Elasticsearch Java 文档中所述 here in the Flink documentation and here。看起来像这样:
// provide a RestClientFactory for custom configuration on the internally created REST client
Header[] defaultHeaders = new Header[]{new BasicHeader("username", "password")};
esSinkBuilder.setRestClientFactory(
restClientBuilder -> {
restClientBuilder.setDefaultHeaders(defaultHeaders);
}
);
执行作业时出现以下错误:
14:49:54,700 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService - Stopped Akka RPC service.
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: org.elasticsearch.ElasticsearchStatusException: method [HEAD], host [https://XXXXXXXXXXXXXX.europe-west1.gcp.cloud.es.io:9243], URI [/], status line [HTTP/1.1 401 Unauthorized]
at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:623)
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
at com.downuk.AverageStockSalePrice.main(AverageStockSalePrice.java:146)
Caused by: org.elasticsearch.ElasticsearchStatusException: method [HEAD], host [https://XXXXXXXXXXXXXX.europe-west1.gcp.cloud.es.io:9243], URI [/], status line [HTTP/1.1 401 Unauthorized]
at org.elasticsearch.client.RestHighLevelClient.parseResponseException(RestHighLevelClient.java:625)
谁能帮我指出我错在哪里?
override def configureRestClientBuilder(restClientBuilder: RestClientBuilder): Unit = {
// TODO Additional rest client args go here - authentication headers for secure connections etc...
}
})
希望对您有所帮助
在看了 Flink 示例后,我能够解决这个问题 here and the Elasticsearch documentation here。
原来是我在上面设置错误的配置:
restClientBuilder.setDefaultHeaders(...);
不是实际需要设置的是:
restClientBuilder.setHttpClientConfigCallback(...);
一旦您使用了正确的自定义配置,剩下的就很简单了。所以我缺少的部分是:
// provide a RestClientFactory for custom configuration on the internally created REST client
esSinkBuilder.setRestClientFactory(
restClientBuilder -> {
restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
// elasticsearch username and password
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("$USERNAME", "$PASSWORD"));
return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}
});
}
);
最后是 Elasticsearch Sink 的完整代码段:
/*
Elasticsearch Configuration
*/
List<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
// use a ElasticsearchSink.Builder to create an ElasticsearchSink
ElasticsearchSink.Builder<ObjectNode> esSinkBuilder = new ElasticsearchSink.Builder<>(
httpHosts,
new ElasticsearchSinkFunction<ObjectNode>() {
private IndexRequest createIndexRequest(ObjectNode payload) {
// remove the value node so the fields are at the base of the json payload
JsonNode jsonOutput = payload.get("value");
return Requests.indexRequest()
.index("raw-payload")
.type("payload")
.source(jsonOutput.toString(), XContentType.JSON);
}
@Override
public void process(ObjectNode payload, RuntimeContext ctx, RequestIndexer indexer) {
indexer.add(createIndexRequest(payload));
}
}
);
// set number of events to be seen before writing to Elasticsearch
esSinkBuilder.setBulkFlushMaxActions(1);
// provide a RestClientFactory for custom configuration on the internally created REST client
esSinkBuilder.setRestClientFactory(
restClientBuilder -> {
restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
// elasticsearch username and password
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("$USERNAME", "$PASSWORD"));
return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}
});
}
);
// finally, build and add the sink to the job's pipeline
stream.addSink(esSinkBuilder.build());
我希望这可以帮助其他被困在同一个地方的人!
我正在使用 Apache Flink v1.6.0,我正在尝试写入托管在 Elastic Cloud 中的 Elasticsearch v6.4.0。我在向 Elastic Cloud 集群进行身份验证时遇到问题。
我已经能够使用以下代码让 Flink 写入本地 Elasticsearch v6.4.0 节点,该节点没有加密:
/*
Elasticsearch Configuration
*/
List<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
// use a ElasticsearchSink.Builder to create an ElasticsearchSink
ElasticsearchSink.Builder<ObjectNode> esSinkBuilder = new ElasticsearchSink.Builder<>(
httpHosts,
new ElasticsearchSinkFunction<ObjectNode>() {
private IndexRequest createIndexRequest(ObjectNode payload) {
// remove the value node so the fields are at the base of the json payload
JsonNode jsonOutput = payload.get("value");
return Requests.indexRequest()
.index("raw-payload")
.type("payload")
.source(jsonOutput.toString(), XContentType.JSON);
}
@Override
public void process(ObjectNode payload, RuntimeContext ctx, RequestIndexer indexer) {
indexer.add(createIndexRequest(payload));
}
}
);
// set number of events to be seen before writing to Elasticsearch
esSinkBuilder.setBulkFlushMaxActions(1);
// finally, build and add the sink to the job's pipeline
stream.addSink(esSinkBuilder.build());
然而,当我尝试将身份验证添加到代码库中时,如相应 Elasticsearch Java 文档中所述 here in the Flink documentation and here。看起来像这样:
// provide a RestClientFactory for custom configuration on the internally created REST client
Header[] defaultHeaders = new Header[]{new BasicHeader("username", "password")};
esSinkBuilder.setRestClientFactory(
restClientBuilder -> {
restClientBuilder.setDefaultHeaders(defaultHeaders);
}
);
执行作业时出现以下错误:
14:49:54,700 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService - Stopped Akka RPC service.
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: org.elasticsearch.ElasticsearchStatusException: method [HEAD], host [https://XXXXXXXXXXXXXX.europe-west1.gcp.cloud.es.io:9243], URI [/], status line [HTTP/1.1 401 Unauthorized]
at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:623)
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
at com.downuk.AverageStockSalePrice.main(AverageStockSalePrice.java:146)
Caused by: org.elasticsearch.ElasticsearchStatusException: method [HEAD], host [https://XXXXXXXXXXXXXX.europe-west1.gcp.cloud.es.io:9243], URI [/], status line [HTTP/1.1 401 Unauthorized]
at org.elasticsearch.client.RestHighLevelClient.parseResponseException(RestHighLevelClient.java:625)
谁能帮我指出我错在哪里?
override def configureRestClientBuilder(restClientBuilder: RestClientBuilder): Unit = {
// TODO Additional rest client args go here - authentication headers for secure connections etc...
}
})
希望对您有所帮助
在看了 Flink 示例后,我能够解决这个问题 here and the Elasticsearch documentation here。
原来是我在上面设置错误的配置:
restClientBuilder.setDefaultHeaders(...);
不是实际需要设置的是:
restClientBuilder.setHttpClientConfigCallback(...);
一旦您使用了正确的自定义配置,剩下的就很简单了。所以我缺少的部分是:
// provide a RestClientFactory for custom configuration on the internally created REST client
esSinkBuilder.setRestClientFactory(
restClientBuilder -> {
restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
// elasticsearch username and password
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("$USERNAME", "$PASSWORD"));
return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}
});
}
);
最后是 Elasticsearch Sink 的完整代码段:
/*
Elasticsearch Configuration
*/
List<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
// use a ElasticsearchSink.Builder to create an ElasticsearchSink
ElasticsearchSink.Builder<ObjectNode> esSinkBuilder = new ElasticsearchSink.Builder<>(
httpHosts,
new ElasticsearchSinkFunction<ObjectNode>() {
private IndexRequest createIndexRequest(ObjectNode payload) {
// remove the value node so the fields are at the base of the json payload
JsonNode jsonOutput = payload.get("value");
return Requests.indexRequest()
.index("raw-payload")
.type("payload")
.source(jsonOutput.toString(), XContentType.JSON);
}
@Override
public void process(ObjectNode payload, RuntimeContext ctx, RequestIndexer indexer) {
indexer.add(createIndexRequest(payload));
}
}
);
// set number of events to be seen before writing to Elasticsearch
esSinkBuilder.setBulkFlushMaxActions(1);
// provide a RestClientFactory for custom configuration on the internally created REST client
esSinkBuilder.setRestClientFactory(
restClientBuilder -> {
restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
// elasticsearch username and password
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("$USERNAME", "$PASSWORD"));
return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}
});
}
);
// finally, build and add the sink to the job's pipeline
stream.addSink(esSinkBuilder.build());
我希望这可以帮助其他被困在同一个地方的人!