如何通过在Flink Kafka问题中使用字符串方法保留顺序将数据流推送到kafka主题
How to push datastream to kafka topic by retaining the order using string method in Flink Kafka Problem
我正在尝试每隔 500 ms
创建一个 JSON
数据集,并想将其推送到 Kafka
主题,以便我可以在 windows下游并执行计算。下面是我的代码:
package KafkaAsSource
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.Semantic
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaProducer}
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper
import java.time.format.DateTimeFormatter
import java.time.LocalDateTime
import java.util.{Optional, Properties}
object PushingDataToKafka {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setMaxParallelism(256)
env.enableCheckpointing(5000)
val stream: DataStream[String] = env.fromElements(createData())
stream.addSink(sendToTopic(stream))
}
def getProperties(): Properties = {
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("zookeeper.connect", "localhost:2181")
return properties
}
def createData(): String = {
val minRange: Int = 0
val maxRange: Int = 1000
var jsonData = ""
for (a <- minRange to maxRange) {
jsonData = "{\n \"id\":\"" + a + "\",\n \"Category\":\"Flink\",\n \"eventTime\":\"" + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS").format(LocalDateTime.now) + "\"\n \n}"
println(jsonData)
Thread.sleep(500)
}
return jsonData
}
def sendToTopic(): Properties = {
val producer = new FlinkKafkaProducer[String](
"topic"
,
new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema())
,
getProperties(),
FlinkKafkaProducer.Semantic.EXACTLY_ONCE
)
return producer
}
}
它给我以下错误:
type mismatch;
found : Any
required: org.apache.flink.streaming.api.functions.sink.SinkFunction[String]
stream.addSink(sendToTopic())
修改后的代码:
object FlinkTest {
def main(ars: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.setMaxParallelism(256)
var stream = env.fromElements("")
//env.enableCheckpointing(5000)
//val stream: DataStream[String] = env.fromElements("hey mc", "1")
val myProducer = new FlinkKafkaProducer[String](
"maddy", // target topic
new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema()), // serialization schema
getProperties(), // producer config
FlinkKafkaProducer.Semantic.EXACTLY_ONCE)
val minRange: Int = 0
val maxRange: Int = 10
var jsonData = ""
for (a <- minRange to maxRange) {
jsonData = "{\n \"id\":\"" + a + "\",\n \"Category\":\"Flink\",\n \"eventTime\":\"" + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS").format(LocalDateTime.now) + "\"\n \n}"
println(a)
Thread.sleep(500)
stream = env.fromElements(jsonData)
println(jsonData)
stream.addSink(myProducer)
}
env.execute("hey")
}
def getProperties(): Properties = {
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("zookeeper.connect", "localhost:2181")
return properties
}
/*
def createData(): String = {
val minRange: Int = 0
val maxRange: Int = 10
var jsonData = ""
for (a <- minRange to maxRange) {
jsonData = "{\n \"id\":\"" + a + "\",\n \"Category\":\"Flink\",\n \"eventTime\":\"" + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS").format(LocalDateTime.now) + "\"\n \n}"
Thread.sleep(500)
}
return jsonData
}
*/
}
Modified Code
给了我 Kafka 主题中的数据,但它不保留顺序。我在循环中做错了什么?另外,必须将 Flink 的版本从 1.13.5
.
更改为 1.12.2
我最初使用 Flink
1.13.5
、Connectors
和 Scala
的 2.11
。我到底错过了什么?
关于这个循环的一些事情:
for (a <- minRange to maxRange) {
jsonData =
"{\n \"id\":\"" + a + "\",\n \"Category\":\"Flink\",\n \"eventTime\":\""
+ DateTimeFormatter
.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")
.format(LocalDateTime.now) + "\"\n \n}"
println(a)
Thread.sleep(500)
stream = env.fromElements(jsonData)
println(jsonData)
stream.addSink(myProducer)
}
睡眠发生在 Flink 客户端中,只会影响客户端在将作业图提交到集群之前 assemble 花费的时间。它对作业 运行s.
没有影响
此循环正在创建 10 个独立的管道,这些管道将 运行 独立地并行地生成同一个 Kafka 主题。这些管道将相互竞争。
要获得您正在寻找的行为(跨单个管道的全局排序),您需要从一个来源(当然是按顺序)生成所有事件,并且 运行 并行度为 1 的作业。像这样的事情就可以做到:
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
object FlinkTest {
def main(ars: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.setParallelism(1)
val myProducer = ...
val jsonData = (i: Long) => ...
env.fromSequence(0, 9)
.map(i => jsonData(i))
.addSink(myProducer)
env.execute()
}
}
您可以将 maxParallelism 保留为 256(或其默认值 128);它在这里不是特别相关。 maxParallelism 是 keyBy
将把键哈希到的哈希桶的数量,它定义了作业可扩展性的上限。
我正在尝试每隔 500 ms
创建一个 JSON
数据集,并想将其推送到 Kafka
主题,以便我可以在 windows下游并执行计算。下面是我的代码:
package KafkaAsSource
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.Semantic
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaProducer}
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper
import java.time.format.DateTimeFormatter
import java.time.LocalDateTime
import java.util.{Optional, Properties}
object PushingDataToKafka {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setMaxParallelism(256)
env.enableCheckpointing(5000)
val stream: DataStream[String] = env.fromElements(createData())
stream.addSink(sendToTopic(stream))
}
def getProperties(): Properties = {
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("zookeeper.connect", "localhost:2181")
return properties
}
def createData(): String = {
val minRange: Int = 0
val maxRange: Int = 1000
var jsonData = ""
for (a <- minRange to maxRange) {
jsonData = "{\n \"id\":\"" + a + "\",\n \"Category\":\"Flink\",\n \"eventTime\":\"" + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS").format(LocalDateTime.now) + "\"\n \n}"
println(jsonData)
Thread.sleep(500)
}
return jsonData
}
def sendToTopic(): Properties = {
val producer = new FlinkKafkaProducer[String](
"topic"
,
new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema())
,
getProperties(),
FlinkKafkaProducer.Semantic.EXACTLY_ONCE
)
return producer
}
}
它给我以下错误:
type mismatch;
found : Any
required: org.apache.flink.streaming.api.functions.sink.SinkFunction[String]
stream.addSink(sendToTopic())
修改后的代码:
object FlinkTest {
def main(ars: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.setMaxParallelism(256)
var stream = env.fromElements("")
//env.enableCheckpointing(5000)
//val stream: DataStream[String] = env.fromElements("hey mc", "1")
val myProducer = new FlinkKafkaProducer[String](
"maddy", // target topic
new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema()), // serialization schema
getProperties(), // producer config
FlinkKafkaProducer.Semantic.EXACTLY_ONCE)
val minRange: Int = 0
val maxRange: Int = 10
var jsonData = ""
for (a <- minRange to maxRange) {
jsonData = "{\n \"id\":\"" + a + "\",\n \"Category\":\"Flink\",\n \"eventTime\":\"" + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS").format(LocalDateTime.now) + "\"\n \n}"
println(a)
Thread.sleep(500)
stream = env.fromElements(jsonData)
println(jsonData)
stream.addSink(myProducer)
}
env.execute("hey")
}
def getProperties(): Properties = {
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("zookeeper.connect", "localhost:2181")
return properties
}
/*
def createData(): String = {
val minRange: Int = 0
val maxRange: Int = 10
var jsonData = ""
for (a <- minRange to maxRange) {
jsonData = "{\n \"id\":\"" + a + "\",\n \"Category\":\"Flink\",\n \"eventTime\":\"" + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS").format(LocalDateTime.now) + "\"\n \n}"
Thread.sleep(500)
}
return jsonData
}
*/
}
Modified Code
给了我 Kafka 主题中的数据,但它不保留顺序。我在循环中做错了什么?另外,必须将 Flink 的版本从 1.13.5
.
1.12.2
我最初使用 Flink
1.13.5
、Connectors
和 Scala
的 2.11
。我到底错过了什么?
关于这个循环的一些事情:
for (a <- minRange to maxRange) {
jsonData =
"{\n \"id\":\"" + a + "\",\n \"Category\":\"Flink\",\n \"eventTime\":\""
+ DateTimeFormatter
.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")
.format(LocalDateTime.now) + "\"\n \n}"
println(a)
Thread.sleep(500)
stream = env.fromElements(jsonData)
println(jsonData)
stream.addSink(myProducer)
}
睡眠发生在 Flink 客户端中,只会影响客户端在将作业图提交到集群之前 assemble 花费的时间。它对作业 运行s.
没有影响此循环正在创建 10 个独立的管道,这些管道将 运行 独立地并行地生成同一个 Kafka 主题。这些管道将相互竞争。
要获得您正在寻找的行为(跨单个管道的全局排序),您需要从一个来源(当然是按顺序)生成所有事件,并且 运行 并行度为 1 的作业。像这样的事情就可以做到:
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
object FlinkTest {
def main(ars: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.setParallelism(1)
val myProducer = ...
val jsonData = (i: Long) => ...
env.fromSequence(0, 9)
.map(i => jsonData(i))
.addSink(myProducer)
env.execute()
}
}
您可以将 maxParallelism 保留为 256(或其默认值 128);它在这里不是特别相关。 maxParallelism 是 keyBy
将把键哈希到的哈希桶的数量,它定义了作业可扩展性的上限。