Flink 没有向 Elasticsearch 添加任何数据但没有错误

Flink is not adding any data to Elasticsearch but no errors

伙计们,我是所有这些数据流处理的新手,但我能够构建并提交一个 Flink 作业,该作业将从 Kafka 读取一些 CSV 数据并将其聚合,然后将其放入 Elasticsearch。

我能够完成前两部分,并将我的聚合打印到 STDOUT。但是当我添加代码以将其放入 Elasticsearch 时,那里似乎什么也没有发生(没有添加数据)。我查看了 Flink 作业管理器日志,它看起来很好(没有错误)并说:

2020-03-03 16:18:03,877 INFO 
org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7ApiCallBridge
- Created Elasticsearch RestHighLevelClient connected to [http://elasticsearch-elasticsearch-coordinating-only.default.svc.cluster.local:9200]

此时我的代码如下:

/*
 * This Scala source file was generated by the Gradle 'init' task.
 */
package flinkNamePull

import java.time.LocalDateTime
import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer010, FlinkKafkaProducer010}
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.{DataTypes, Table}
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.table.descriptors.{Elasticsearch, Json, Schema}

object Demo {
  /**
   * MapFunction to generate Transfers POJOs from parsed CSV data.
   */
  class TransfersMapper extends RichMapFunction[String, Transfers] {
    private var formatter = null

    @throws[Exception]
    override def open(parameters: Configuration): Unit = {
      super.open(parameters)
      //formatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")
    }

    @throws[Exception]
    override def map(csvLine: String): Transfers = {
      //var splitCsv = csvLine.stripLineEnd.split("\n")(1).split(",")
      var splitCsv = csvLine.stripLineEnd.split(",")

      val arrLength = splitCsv.length
      val i = 0
      if (arrLength != 13) {
        for (i <- arrLength + 1 to 13) {
          if (i == 13) {
            splitCsv = splitCsv :+ "0.0"
          } else {
            splitCsv = splitCsv :+ ""
          }
        }
      }
      var trans = new Transfers()
      trans.rowId = splitCsv(0)
      trans.subjectId = splitCsv(1)
      trans.hadmId = splitCsv(2)
      trans.icuStayId = splitCsv(3)
      trans.dbSource = splitCsv(4)
      trans.eventType = splitCsv(5)
      trans.prev_careUnit = splitCsv(6)
      trans.curr_careUnit = splitCsv(7)
      trans.prev_wardId = splitCsv(8)
      trans.curr_wardId = splitCsv(9)
      trans.inTime = splitCsv(10)
      trans.outTime = splitCsv(11)
      trans.los = splitCsv(12).toDouble

      return trans
    }
  }

  def main(args: Array[String]) {
    // Create streaming execution environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // Set properties per KafkaConsumer API
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "kafka.kafka:9092")
    properties.setProperty("group.id", "test")

    // Add Kafka source to environment
    val myKConsumer = new FlinkKafkaConsumer010[String]("raw.data3", new SimpleStringSchema(), properties)
    // Read from beginning of topic
    myKConsumer.setStartFromEarliest()

    val streamSource = env
      .addSource(myKConsumer)

    // Transform CSV (with a header row per Kafka event into a Transfers object
    val streamTransfers = streamSource.map(new TransfersMapper())

    // create a TableEnvironment
    val tEnv = StreamTableEnvironment.create(env)

    println("***** NEW EXECUTION STARTED AT " + LocalDateTime.now() + " *****")

    // register a Table
    val tblTransfers: Table = tEnv.fromDataStream(streamTransfers)
    tEnv.createTemporaryView("transfers", tblTransfers)

    tEnv.connect(
      new Elasticsearch()
        .version("7")
        .host("elasticsearch-elasticsearch-coordinating-only.default.svc.cluster.local", 9200, "http")   // required: one or more Elasticsearch hosts to connect to
        .index("transfers-sum")
        .documentType("_doc")
        .keyNullLiteral("n/a")
    )
      .withFormat(new Json().jsonSchema("{type: 'object', properties: {curr_careUnit: {type: 'string'}, sum: {type: 'number'}}}"))
      .withSchema(new Schema()
        .field("curr_careUnit", DataTypes.STRING())
        .field("sum", DataTypes.DOUBLE())
      )
      .inUpsertMode()
      .createTemporaryTable("transfersSum")

    val result = tEnv.sqlQuery(
      """
        |SELECT curr_careUnit, sum(los)
        |FROM transfers
        |GROUP BY curr_careUnit
        |""".stripMargin)

    result.insertInto("transfersSum")

    // Elasticsearch elasticsearch-elasticsearch-coordinating-only.default.svc.cluster.local:9200

    env.execute("Flink Streaming Demo Dump to Elasticsearch")
  }
}

我不确定如何调试这个野兽...想知道是否有人可以帮助我弄清楚为什么 Flink 作业没有向 Elasticsearch 添加数据:( 从我的 Flink 集群中,我能够很好地(手动)查询 Elasticsearch 并将记录添加到我的索引中:

curl -XPOST "http://elasticsearch-elasticsearch-coordinating-only.default.svc.cluster.local:9200/transfers-sum/_doc" -H 'Content-Type: application/json' -d'{"curr_careUnit":"TEST123","sum":"123"}'

Flink mailist 中的一位好心人指出了一个事实,即可能是 Elasticsearch 缓冲了我的记录……嗯,确实如此。 ;)

我已将以下选项添加到 Elasticsearch 连接器:

.bulkFlushMaxActions(2)
.bulkFlushInterval(1000L)

使用 Scala 的 Flink Elasticsearch Connector 7

请找到我提供的可行且详细的答案 here