如何通过IntelliJ的sbt将Kafka连接器添加到flink?我想添加通用连接器以支持 Kafka 2.3

How to add Kafka connector to flink through sbt of IntelliJ? I want to add the universal connector to support Kafka 2.3

以下为SBT配置

// https://mvnrepository.com/artifact/org.apache.flink/flink-scala
libraryDependencies += "org.apache.flink" %% "flink-scala" % "1.9.1"

// https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala
libraryDependencies += "org.apache.flink" %% "flink-streaming-scala" % "1.9.1"

// https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-scala-bridge
libraryDependencies += "org.apache.flink" %% "flink-table-api-scala-bridge" % "1.9.1"

// https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner
libraryDependencies += "org.apache.flink" %% "flink-table-planner" % "1.9.1"

// https://mvnrepository.com/artifact/org.apache.flink/flink-table-common
libraryDependencies += "org.apache.flink" % "flink-table-common" % "1.9.1"

// https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-base
//libraryDependencies += "org.apache.flink" %% "flink-connector-kafka_2.11" % "1.9.1" -- This not working and throwing unable to connect error.

由于我无法通过sbt添加flink-connector-kafka,我下载了jar并将其放入 我的 sbt 项目中的 lib(created lib) 文件夹。 sbt 项目是通过 IntelliJ 创建的,只有我手动添加了 lib 文件夹。

现在,当我导入 kafka 连接器包时,即 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer 工作正常。

下面是我从 Kafka 使用的代码

import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.api.scala._

object KafkaFlink {

  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val properties = new Properties()
//    properties.setProperty("bootstrap.servers", "localhost:9092")
//    // only required for Kafka 0.8
//    properties.setProperty("zookeeper.connect", "localhost:2181")
//    properties.setProperty("group.id", "test")
    val properties1 = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092")
    properties.setProperty("group.id", "test")
    val topic = "flink-fault-testing"
    val flinkKafkaConsumer = new FlinkKafkaConsumer[String](topic, new SimpleStringSchema(), properties1)

    val value: DataStream[String] = env.addSource(flinkKafkaConsumer)
  }

}

我无法编译,因为我遇到了错误 无法解析重载方法"addSource"

请指出我哪里做错了。

此外,如果有一种方法可以直接通过 IntelliJbuild.sbt 获取通用 flink-kafka 连接器

您要在 SBT 配置中指定的是这个

libraryDependencies += "org.apache.flink" %% "flink-connector-kafka" % "1.9.1"

如果您不使用“_2.11”,那么它应该可以工作。这表明要使用哪个 Scala 版本,SBT 会自行处理 Scala 版本控制。

不确定为什么您的代码无法编译。我觉得还可以。