信任库和 Google 云数据流
Truststore and Google Cloud Dataflow
我需要使用信任库在 Google Cloud Dataflow 中建立 SSL Kafka 连接。我可以从桶中提供这个,还是有办法将它存储在 "local file system" 上?
您可以为此使用 KafkaIO.Read.withConsumerFactoryFn to supply a factory function that will be invoked to create the Kafka consumer. In that function, you're free to do anything you like, e.g. you can download the trust store file from a GCS bucket (I would recommend using GcsUtil)并将其保存到本地磁盘上的临时文件中 - AFAIK Kafka 本身仅支持将此文件保存在本地磁盘上。然后手动创建一个KafkaConsumer
指向文件
感谢@jkff 的解决方案,这里是一个实现示例:
ConsumerFactoryFn 实施示例:
private static class ConsumerFactoryFn
implements SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>
{
public Consumer<byte[], byte[]> apply(Map<String, Object> config)
{
try
{
Storage storage = StorageOptions.newBuilder()
.setProjectId("prj-id-of-your-bucket")
.setCredentials(GoogleCredentials.getApplicationDefault())
.build()
.getService();
Blob blob = storage.get("your-bucket-name", "pth.to.your.kafka.client.truststore.jks");
ReadChannel readChannel = blob.reader();
FileOutputStream fileOuputStream;
fileOuputStream = new FileOutputStream("/tmp/kafka.client.truststore.jks"); //path where the jks file will be stored
fileOuputStream.getChannel().transferFrom(readChannel, 0, Long.MAX_VALUE);
fileOuputStream.close();
File f = new File("/tmp/kafka.client.truststore.jks"); //assuring the store file exists
if (f.exists())
{
LOG.debug("key exists");
}
else
{
LOG.error("key does not exist");
}
} catch (FileNotFoundException e) {
// TODO Auto-generated catch block
LOG.error( e.getMessage());
} catch (IOException e) {
// TODO Auto-generated catch block
LOG.error( e.getMessage());
}
config.put("ssl.truststore.location",(Object) "/tmp/kafka.client.truststore.jks" );
return new KafkaConsumer<byte[], byte[]>(config);
}
}
并且不要忘记在 KafkaIO.read() 调用中使用 .withConsumerFactoryFn,应该类似于:
Map<String, Object> configMap = new HashMap<String, Object>();
configMap.put("security.protocol", (Object) "SSL");
configMap.put("ssl.truststore.password", (Object) "clientpass");
p.apply("ReadFromKafka", KafkaIO.<String, String>read()
.withBootstrapServers("ip:9093")
.withTopic("pageviews")
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.updateConsumerProperties(configMap)
.withConsumerFactoryFn(new ConsumerFactoryFn()) ... etc.
我需要使用信任库在 Google Cloud Dataflow 中建立 SSL Kafka 连接。我可以从桶中提供这个,还是有办法将它存储在 "local file system" 上?
您可以为此使用 KafkaIO.Read.withConsumerFactoryFn to supply a factory function that will be invoked to create the Kafka consumer. In that function, you're free to do anything you like, e.g. you can download the trust store file from a GCS bucket (I would recommend using GcsUtil)并将其保存到本地磁盘上的临时文件中 - AFAIK Kafka 本身仅支持将此文件保存在本地磁盘上。然后手动创建一个KafkaConsumer
指向文件
感谢@jkff 的解决方案,这里是一个实现示例:
ConsumerFactoryFn 实施示例:
private static class ConsumerFactoryFn
implements SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>
{
public Consumer<byte[], byte[]> apply(Map<String, Object> config)
{
try
{
Storage storage = StorageOptions.newBuilder()
.setProjectId("prj-id-of-your-bucket")
.setCredentials(GoogleCredentials.getApplicationDefault())
.build()
.getService();
Blob blob = storage.get("your-bucket-name", "pth.to.your.kafka.client.truststore.jks");
ReadChannel readChannel = blob.reader();
FileOutputStream fileOuputStream;
fileOuputStream = new FileOutputStream("/tmp/kafka.client.truststore.jks"); //path where the jks file will be stored
fileOuputStream.getChannel().transferFrom(readChannel, 0, Long.MAX_VALUE);
fileOuputStream.close();
File f = new File("/tmp/kafka.client.truststore.jks"); //assuring the store file exists
if (f.exists())
{
LOG.debug("key exists");
}
else
{
LOG.error("key does not exist");
}
} catch (FileNotFoundException e) {
// TODO Auto-generated catch block
LOG.error( e.getMessage());
} catch (IOException e) {
// TODO Auto-generated catch block
LOG.error( e.getMessage());
}
config.put("ssl.truststore.location",(Object) "/tmp/kafka.client.truststore.jks" );
return new KafkaConsumer<byte[], byte[]>(config);
}
}
并且不要忘记在 KafkaIO.read() 调用中使用 .withConsumerFactoryFn,应该类似于:
Map<String, Object> configMap = new HashMap<String, Object>();
configMap.put("security.protocol", (Object) "SSL");
configMap.put("ssl.truststore.password", (Object) "clientpass");
p.apply("ReadFromKafka", KafkaIO.<String, String>read()
.withBootstrapServers("ip:9093")
.withTopic("pageviews")
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.updateConsumerProperties(configMap)
.withConsumerFactoryFn(new ConsumerFactoryFn()) ... etc.