Flink 在 Java 中使用 Custom Class 设置基本的 Kafka 生产者消费者
Flink setup basic Kafka producer consumer with Custom Class in Java
我想在 Kafka 上使用 Flink 设置一个基本的生产者-消费者,但是我 无法通过 Java 向现有消费者生成数据.
CLI解决方案
我使用 https://kafka.apache.org/downloads
中的 kafka_2.11-2.4.0
zip 和命令
设置了 Kafka broker
bin/zookeeper-server-start.sh config/zookeeper.properties
和bin/kafka-server-start.sh config/server.properties
我使用
创建了一个名为 transactions1 的主题
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic transactions1
现在我可以在命令行上使用生产者和消费者来查看主题是否已创建并有效。
设置消费者 I 运行
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic transactions1 --from-beginning
现在,如果任何生产者向主题发送数据transactions1
,我将在消费者控制台中看到它。
我测试消费者正在运行ning
工作
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic transactions1
并在 cli 的生产者中输入以下数据行,这些数据行也会显示在消费者 cli 中。
{"txnID":1,"amt":100.0,"account":"AC1"}
{"txnID":2,"amt":10.0,"account":"AC2"}
{"txnID":3,"amt":20.0,"account":"AC3"}
现在我想在Java代码中复制第3步,即生产者和消费者,这是本题的核心问题。
- 所以我用 build.gradle
设置了一个 gradle java8 项目
...
dependencies {
testCompile group: 'junit', name: 'junit', version: '4.12'
compile group: 'org.apache.flink', name: 'flink-connector-kafka_2.11', version: '1.9.0'
compile group: 'org.apache.flink', name: 'flink-core', version: '1.9.0'
// https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java
compile group: 'org.apache.flink', name: 'flink-streaming-java_2.12', version: '1.9.2'
compile group: 'com.google.code.gson', name: 'gson', version: '2.8.5'
compile group: 'com.twitter', name: 'chill-thrift', version: '0.7.6'
compile group: 'org.apache.thrift', name: 'libthrift', version: '0.11.0'
compile group: 'com.twitter', name: 'chill-protobuf', version: '0.7.6'
compile group: 'org.apache.thrift', name: 'protobuf-java', version: '3.7.0'
}
...
- 我设置了一个自定义 Class
Transactions.class
,您可以在其中通过扩展与 Flink类 相关的 Kryo、Protobuf 或 TbaseSerializer 来建议对序列化逻辑的更改.
import com.google.gson.Gson;
import org.apache.flink.api.common.functions.MapFunction;
public class Transaction {
public final int txnID;
public final float amt;
public final String account;
public Transaction(int txnID, float amt, String account) {
this.txnID = txnID;
this.amt = amt;
this.account = account;
}
public String toJSONString() {
Gson gson = new Gson();
return gson.toJson(this);
}
public static Transaction fromJSONString(String some) {
Gson gson = new Gson();
return gson.fromJson(some, Transaction.class);
}
public static MapFunction<String, String> mapTransactions() {
MapFunction<String, String> map = new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
if (value != null || value.trim().length() > 0) {
try {
return fromJSONString(value).toJSONString();
} catch (Exception e) {
return "";
}
}
return "";
}
};
return map;
}
@Override
public String toString() {
return "Transaction{" +
"txnID=" + txnID +
", amt=" + amt +
", account='" + account + '\'' +
'}';
}
}
- 现在是时候使用 Flink 来生产和消费关于主题
transactions1
的流了。
public class SetupSpike {
public static void main(String[] args) throws Exception {
System.out.println("begin");
List<Transaction> txns = new ArrayList<Transaction>(){{
add(new Transaction(1, 100, "AC1"));
add(new Transaction(2, 10, "AC2"));
add(new Transaction(3, 20, "AC3"));
}};
// This list txns needs to be serialized in Flink as Transaction.class->String->ByteArray
//via producer and then to the topic in Kafka broker
//and deserialized as ByteArray->String->Transaction.class from the Consumer in Flink reading Kafka broker.
String topic = "transactions1";
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", topic);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//env.getConfig().addDefaultKryoSerializer(Transaction.class, TBaseSerializer.class);
// working Consumer logic below which needs edit if you change serialization
FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<String>(topic, new SimpleStringSchema(), properties);
myConsumer.setStartFromEarliest(); // start from the earliest record possible
DataStream<String> stream = env.addSource(myConsumer).map(Transaction::toJSONString);
//working Producer logic below which works if you are sinking a pre-existing DataStream
//but needs editing to work with Java List<Transaction> datatype.
System.out.println("sinking expanded stream");
MapFunction<String, String> etl = new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
if (value != null || value.trim().length() > 0) {
try {
return fromJSONString(value).toJSONString();
} catch (Exception e) {
return "";
}
}
return "";
}
};
FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<String>(topic,
new KafkaSerializationSchema<String>() {
@Override
public ProducerRecord<byte[], byte[]> serialize(String element, @Nullable Long timestamp) {
try {
System.out.println(element);
return new ProducerRecord<byte[], byte[]>(topic, stringToBytes(etl.map(element)));
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
}, properties, Semantic.EXACTLY_ONCE);
// stream.timeWindowAll(Time.minutes(1));
stream.addSink(myProducer);
JobExecutionResult execute = env.execute();
}
}
如您所见,我无法使用提供的列表 txns
执行此操作。以上是我可以从 Flink 文档中收集到的工作代码,用于重定向主题流数据并通过 Cli 生产者手动发送数据。问题是在 java 中编写 KafkaProducer 代码,将数据发送到主题,这与
等问题进一步复杂化
- 添加时间戳、水印
- KeyBy 操作
- GroupBy/WindowBy 操作
- 在下沉之前添加自定义 ETL 逻辑。
- Serialization/Deserialization Flink 中的逻辑
用过 Flink 的人能帮我看看如何在 Flink 中生成 txns
列表到 transactions1
主题,然后验证它是否适用于消费者吗?
此外,在下沉之前添加时间戳或一些处理的问题上的任何帮助都会有很大帮助。 您可以在 https://github.com/devssh/kafkaFlinkSpike 上找到源代码,目的是生成 Flink 样板以添加“AC1”的详细信息来自内存存储并将其与实时到来的交易事件结合起来,以将扩展输出发送给用户。
几点,排名不分先后:
最好不要将 Flink 1.9.2 版本与 1.9.0 版本混合使用,就像您在此处所做的那样:
compile group: 'org.apache.flink', name: 'flink-connector-kafka_2.11', version: '1.9.0'
compile group: 'org.apache.flink', name: 'flink-core', version: '1.9.0'
compile group: 'org.apache.flink', name: 'flink-streaming-java_2.12', version: '1.9.2'
有关如何使用时间戳、水印、keyBy、windows 等的教程,请参阅 online training materials from Ververica。
要使用 List<Transaction> txns
作为输入流,您可以这样做 (docs):
DataStream<Transaction> transactions = env.fromCollection(txns);
有关在使用 Flink 和 Kafka 时如何处理序列化/反序列化的示例,请参阅 Flink Operations Playground, in particular look at ClickEventDeserializationSchema
and ClickEventStatisticsSerializationSchema
, which are used in ClickEventCount.java and defined here。 (注意:此 playground 尚未针对 Flink 1.10 进行更新。)
我想在 Kafka 上使用 Flink 设置一个基本的生产者-消费者,但是我 无法通过 Java 向现有消费者生成数据.
CLI解决方案
我使用
设置了https://kafka.apache.org/downloads
中的kafka_2.11-2.4.0
zip 和命令Kafka broker
bin/zookeeper-server-start.sh config/zookeeper.properties
和
bin/kafka-server-start.sh config/server.properties
我使用
创建了一个名为 transactions1 的主题bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic transactions1
现在我可以在命令行上使用生产者和消费者来查看主题是否已创建并有效。
设置消费者 I 运行
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic transactions1 --from-beginning
现在,如果任何生产者向主题发送数据
transactions1
,我将在消费者控制台中看到它。我测试消费者正在运行ning
工作bin/kafka-console-producer.sh --broker-list localhost:9092 --topic transactions1
并在 cli 的生产者中输入以下数据行,这些数据行也会显示在消费者 cli 中。
{"txnID":1,"amt":100.0,"account":"AC1"}
{"txnID":2,"amt":10.0,"account":"AC2"}
{"txnID":3,"amt":20.0,"account":"AC3"}
现在我想在Java代码中复制第3步,即生产者和消费者,这是本题的核心问题。
- 所以我用 build.gradle 设置了一个 gradle java8 项目
...
dependencies {
testCompile group: 'junit', name: 'junit', version: '4.12'
compile group: 'org.apache.flink', name: 'flink-connector-kafka_2.11', version: '1.9.0'
compile group: 'org.apache.flink', name: 'flink-core', version: '1.9.0'
// https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java
compile group: 'org.apache.flink', name: 'flink-streaming-java_2.12', version: '1.9.2'
compile group: 'com.google.code.gson', name: 'gson', version: '2.8.5'
compile group: 'com.twitter', name: 'chill-thrift', version: '0.7.6'
compile group: 'org.apache.thrift', name: 'libthrift', version: '0.11.0'
compile group: 'com.twitter', name: 'chill-protobuf', version: '0.7.6'
compile group: 'org.apache.thrift', name: 'protobuf-java', version: '3.7.0'
}
...
- 我设置了一个自定义 Class
Transactions.class
,您可以在其中通过扩展与 Flink类 相关的 Kryo、Protobuf 或 TbaseSerializer 来建议对序列化逻辑的更改.
import com.google.gson.Gson;
import org.apache.flink.api.common.functions.MapFunction;
public class Transaction {
public final int txnID;
public final float amt;
public final String account;
public Transaction(int txnID, float amt, String account) {
this.txnID = txnID;
this.amt = amt;
this.account = account;
}
public String toJSONString() {
Gson gson = new Gson();
return gson.toJson(this);
}
public static Transaction fromJSONString(String some) {
Gson gson = new Gson();
return gson.fromJson(some, Transaction.class);
}
public static MapFunction<String, String> mapTransactions() {
MapFunction<String, String> map = new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
if (value != null || value.trim().length() > 0) {
try {
return fromJSONString(value).toJSONString();
} catch (Exception e) {
return "";
}
}
return "";
}
};
return map;
}
@Override
public String toString() {
return "Transaction{" +
"txnID=" + txnID +
", amt=" + amt +
", account='" + account + '\'' +
'}';
}
}
- 现在是时候使用 Flink 来生产和消费关于主题
transactions1
的流了。
public class SetupSpike {
public static void main(String[] args) throws Exception {
System.out.println("begin");
List<Transaction> txns = new ArrayList<Transaction>(){{
add(new Transaction(1, 100, "AC1"));
add(new Transaction(2, 10, "AC2"));
add(new Transaction(3, 20, "AC3"));
}};
// This list txns needs to be serialized in Flink as Transaction.class->String->ByteArray
//via producer and then to the topic in Kafka broker
//and deserialized as ByteArray->String->Transaction.class from the Consumer in Flink reading Kafka broker.
String topic = "transactions1";
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", topic);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//env.getConfig().addDefaultKryoSerializer(Transaction.class, TBaseSerializer.class);
// working Consumer logic below which needs edit if you change serialization
FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<String>(topic, new SimpleStringSchema(), properties);
myConsumer.setStartFromEarliest(); // start from the earliest record possible
DataStream<String> stream = env.addSource(myConsumer).map(Transaction::toJSONString);
//working Producer logic below which works if you are sinking a pre-existing DataStream
//but needs editing to work with Java List<Transaction> datatype.
System.out.println("sinking expanded stream");
MapFunction<String, String> etl = new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
if (value != null || value.trim().length() > 0) {
try {
return fromJSONString(value).toJSONString();
} catch (Exception e) {
return "";
}
}
return "";
}
};
FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<String>(topic,
new KafkaSerializationSchema<String>() {
@Override
public ProducerRecord<byte[], byte[]> serialize(String element, @Nullable Long timestamp) {
try {
System.out.println(element);
return new ProducerRecord<byte[], byte[]>(topic, stringToBytes(etl.map(element)));
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
}, properties, Semantic.EXACTLY_ONCE);
// stream.timeWindowAll(Time.minutes(1));
stream.addSink(myProducer);
JobExecutionResult execute = env.execute();
}
}
如您所见,我无法使用提供的列表 txns
执行此操作。以上是我可以从 Flink 文档中收集到的工作代码,用于重定向主题流数据并通过 Cli 生产者手动发送数据。问题是在 java 中编写 KafkaProducer 代码,将数据发送到主题,这与
- 添加时间戳、水印
- KeyBy 操作
- GroupBy/WindowBy 操作
- 在下沉之前添加自定义 ETL 逻辑。
- Serialization/Deserialization Flink 中的逻辑
用过 Flink 的人能帮我看看如何在 Flink 中生成 txns
列表到 transactions1
主题,然后验证它是否适用于消费者吗?
此外,在下沉之前添加时间戳或一些处理的问题上的任何帮助都会有很大帮助。 您可以在 https://github.com/devssh/kafkaFlinkSpike 上找到源代码,目的是生成 Flink 样板以添加“AC1”的详细信息来自内存存储并将其与实时到来的交易事件结合起来,以将扩展输出发送给用户。
几点,排名不分先后:
最好不要将 Flink 1.9.2 版本与 1.9.0 版本混合使用,就像您在此处所做的那样:
compile group: 'org.apache.flink', name: 'flink-connector-kafka_2.11', version: '1.9.0'
compile group: 'org.apache.flink', name: 'flink-core', version: '1.9.0'
compile group: 'org.apache.flink', name: 'flink-streaming-java_2.12', version: '1.9.2'
有关如何使用时间戳、水印、keyBy、windows 等的教程,请参阅 online training materials from Ververica。
要使用 List<Transaction> txns
作为输入流,您可以这样做 (docs):
DataStream<Transaction> transactions = env.fromCollection(txns);
有关在使用 Flink 和 Kafka 时如何处理序列化/反序列化的示例,请参阅 Flink Operations Playground, in particular look at ClickEventDeserializationSchema
and ClickEventStatisticsSerializationSchema
, which are used in ClickEventCount.java and defined here。 (注意:此 playground 尚未针对 Flink 1.10 进行更新。)