卡在:找不到 suitable table 工厂

Stuck at: Could not find a suitable table factory

在玩Flink的时候,我一直在尝试将数据upsert到Elasticsearch中。我的 STDOUT 出现此错误:

Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSinkFactory' in
the classpath.

Reason: Required context properties mismatch.

The following properties are requested:
connector.hosts=http://elasticsearch-elasticsearch-coordinating-only.default.svc.cluster.local:9200
connector.index=transfers-sum
connector.key-null-literal=n/a
connector.property-version=1
connector.type=elasticsearch
connector.version=6
format.json-schema={      \"curr_careUnit\": {\"type\": \"text\"},      \"sum\": {\"type\": \"float\"}    }
format.property-version=1
format.type=json
schema.0.data-type=VARCHAR(2147483647)
schema.0.name=curr_careUnit
schema.1.data-type=FLOAT
schema.1.name=sum
update-mode=upsert

The following factories have been considered:
org.apache.flink.streaming.connectors.kafka.Kafka09TableSourceSinkFactory
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
org.apache.flink.table.sinks.CsvAppendTableSinkFactory
    at org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322)
...

这是我的 Scala Flink 代码:

  def main(args: Array[String]) {
    // Create streaming execution environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // Set properties per KafkaConsumer API
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "kafka.kafka:9092")
    properties.setProperty("group.id", "test")

    // Add Kafka source to environment
    val myKConsumer = new FlinkKafkaConsumer010[String]("raw.data4", new SimpleStringSchema(), properties)
    // Read from beginning of topic
    myKConsumer.setStartFromEarliest()

    val streamSource = env
      .addSource(myKConsumer)

    // Transform CSV (with a header row per Kafka event into a Transfers object
    val streamTransfers = streamSource.map(new TransfersMapper())

    // create a TableEnvironment
    val tEnv = StreamTableEnvironment.create(env)

    // register a Table
    val tblTransfers: Table = tEnv.fromDataStream(streamTransfers)
    tEnv.createTemporaryView("transfers", tblTransfers)

    tEnv.connect(
      new Elasticsearch()
        .version("6")
        .host("elasticsearch-elasticsearch-coordinating-only.default.svc.cluster.local", 9200, "http")
        .index("transfers-sum")
        .keyNullLiteral("n/a")
      .withFormat(new Json().jsonSchema("{      \"curr_careUnit\": {\"type\": \"text\"},      \"sum\": {\"type\": \"float\"}    }"))
      .withSchema(new Schema()
        .field("curr_careUnit", DataTypes.STRING())
        .field("sum", DataTypes.FLOAT())
      )
      .inUpsertMode()
      .createTemporaryTable("transfersSum")

    val result = tEnv.sqlQuery(
      """
        |SELECT curr_careUnit, sum(los)
        |FROM transfers
        |GROUP BY curr_careUnit
        |""".stripMargin)

    result.insertInto("transfersSum")

    env.execute("Flink Streaming Demo Dump to Elasticsearch")
  }
}

我正在创建一个 fat jar 并将其上传到我的远程 flink 实例。这是我的 build.gradle 依赖项:

compile 'org.scala-lang:scala-library:2.11.12'
compile 'org.apache.flink:flink-scala_2.11:1.10.0'
compile 'org.apache.flink:flink-streaming-scala_2.11:1.10.0'
compile 'org.apache.flink:flink-connector-kafka-0.10_2.11:1.10.0'
compile 'org.apache.flink:flink-table-api-scala-bridge_2.11:1.10.0'
compile 'org.apache.flink:flink-connector-elasticsearch6_2.11:1.10.0'
compile 'org.apache.flink:flink-json:1.10.0'
compile 'com.fasterxml.jackson.core:jackson-core:2.10.1'
compile 'com.fasterxml.jackson.module:jackson-module-scala_2.11:2.10.1'
compile 'org.json4s:json4s-jackson_2.11:3.7.0-M1'

以下是为 gradle 构建 farJar 命令的方式:

jar {
    from {
        (configurations.compile).collect {
            it.isDirectory() ? it : zipTree(it)
        }
    }
    manifest {
        attributes("Main-Class": "main" )
    }
}
task fatJar(type: Jar) {
    zip64 true
    manifest {
        attributes 'Main-Class': "flinkNamePull.Demo"
    }
    baseName = "${rootProject.name}"
    from { configurations.compile.collect { it.isDirectory() ? it : zipTree(it) } }
    with jar
}

任何人都可以帮我看看我错过了什么吗?总的来说,我对 Flink 和数据流还很陌生。呵呵

提前致谢!

The following factories have been considered:中的列表是否完整?它包含 Elasticsearch6UpsertTableSinkFactory 吗?如果不是据我所知,服务发现依赖项存在问题。

你如何提交你的工作?你能检查你的 uber jar 中是否有一个文件 META-INF/services/org.apache.flink.table.factories.TableFactory,其中包含 Elasticsearch6UpsertTableSinkFactory 的条目吗?

使用 Maven 时,您必须添加转换器才能正确合并服务文件:

<!-- The service transformer is needed to merge META-INF/services files -->
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>

我不知道你是怎么做到的gradle。


编辑: 感谢 Arvid Heise 在 gradle 中,当使用 shadowJar 插件时,您可以通过以下方式合并服务文件:

// Merging Service Files
shadowJar {
  mergeServiceFiles()
}

您应该使用 shadow 插件来创建 fat jar 而不是手动创建。

特别是,您想合并 service descriptors