如何通过在Flink Kafka问题中使用字符串方法保留顺序将数据流推送到kafka主题

How to push datastream to kafka topic by retaining the order using string method in Flink Kafka Problem

我正在尝试每隔 500 ms 创建一个 JSON 数据集,并想将其推送到 Kafka 主题,以便我可以在 windows下游并执行计算。下面是我的代码:

package KafkaAsSource

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.Semantic
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaProducer}
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper


import java.time.format.DateTimeFormatter
import java.time.LocalDateTime
import java.util.{Optional, Properties}

object PushingDataToKafka {

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setMaxParallelism(256)
    env.enableCheckpointing(5000)
    val stream: DataStream[String] = env.fromElements(createData())

    stream.addSink(sendToTopic(stream))
  }

  def getProperties(): Properties = {
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092")
    properties.setProperty("zookeeper.connect", "localhost:2181")

    return properties
  }

  def createData(): String = {
    val minRange: Int = 0
    val maxRange: Int = 1000
    var jsonData = ""
    for (a <- minRange to maxRange) {
      jsonData = "{\n  \"id\":\"" + a + "\",\n  \"Category\":\"Flink\",\n  \"eventTime\":\"" + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS").format(LocalDateTime.now) + "\"\n  \n}"
      println(jsonData)
      Thread.sleep(500)
    }
    return jsonData
  }

  def sendToTopic(): Properties = {
    val producer = new FlinkKafkaProducer[String](
      "topic"
      ,
      new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema())
      ,
      getProperties(),
      FlinkKafkaProducer.Semantic.EXACTLY_ONCE
    )
    return producer
  }
}

它给我以下错误:

type mismatch;
 found   : Any
 required: org.apache.flink.streaming.api.functions.sink.SinkFunction[String]
    stream.addSink(sendToTopic())

修改后的代码:

object FlinkTest {

  def main(ars: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment()
    env.setMaxParallelism(256)
    var stream = env.fromElements("")
    //env.enableCheckpointing(5000)
    //val stream: DataStream[String] = env.fromElements("hey mc", "1")

    val myProducer = new FlinkKafkaProducer[String](
      "maddy", // target topic
      new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema()), // serialization schema
      getProperties(), // producer config
      FlinkKafkaProducer.Semantic.EXACTLY_ONCE)
    val minRange: Int = 0
    val maxRange: Int = 10
    var jsonData = ""
    for (a <- minRange to maxRange) {
      jsonData = "{\n  \"id\":\"" + a + "\",\n  \"Category\":\"Flink\",\n  \"eventTime\":\"" + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS").format(LocalDateTime.now) + "\"\n  \n}"
      println(a)
      Thread.sleep(500)
      stream = env.fromElements(jsonData)
      println(jsonData)
      stream.addSink(myProducer)
    }

    env.execute("hey")
  }

  def getProperties(): Properties = {
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092")
    properties.setProperty("zookeeper.connect", "localhost:2181")
    return properties
  }
  /*
  def createData(): String = {
    val minRange: Int = 0
    val maxRange: Int = 10
    var jsonData = ""
    for (a <- minRange to maxRange) {
      jsonData = "{\n  \"id\":\"" + a + "\",\n  \"Category\":\"Flink\",\n  \"eventTime\":\"" + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS").format(LocalDateTime.now) + "\"\n  \n}"
      Thread.sleep(500)
    }
    return jsonData
  }
  */

}

Modified Code 给了我 Kafka 主题中的数据,但它不保留顺序。我在循环中做错了什么?另外,必须将 Flink 的版本从 1.13.5.

更改为 1.12.2

我最初使用 Flink 1.13.5ConnectorsScala2.11。我到底错过了什么?

关于这个循环的一些事情:

for (a <- minRange to maxRange) {
    jsonData = 
      "{\n  \"id\":\"" + a + "\",\n  \"Category\":\"Flink\",\n  \"eventTime\":\""
      + DateTimeFormatter
        .ofPattern("yyyy-MM-dd HH:mm:ss.SSS")
        .format(LocalDateTime.now) + "\"\n  \n}"
    println(a)
    Thread.sleep(500)
    stream = env.fromElements(jsonData)
    println(jsonData)
    stream.addSink(myProducer)
}
  • 睡眠发生在 Flink 客户端中,只会影响客户端在将作业图提交到集群之前 assemble 花费的时间。它对作业 运行s.

    没有影响
  • 此循环正在创建 10 个独立的管道,这些管道将 运行 独立地并行地生成同一个 Kafka 主题。这些管道将相互竞争。


要获得您正在寻找的行为(跨单个管道的全局排序),您需要从一个来源(当然是按顺序)生成所有事件,并且 运行 并行度为 1 的作业。像这样的事情就可以做到:

import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}

object FlinkTest {

  def main(ars: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment()
    env.setParallelism(1)

    val myProducer = ...
    val jsonData = (i: Long) => ...

    env.fromSequence(0, 9)
      .map(i => jsonData(i))
      .addSink(myProducer)

      env.execute()
  }
}

您可以将 maxParallelism 保留为 256(或其默认值 128);它在这里不是特别相关。 maxParallelism 是 keyBy 将把键哈希到的哈希桶的数量,它定义了作业可扩展性的上限。