Flink 到 NiFi 连接器
Flink to NiFi connector
我需要一些帮助来使用 Scala 代码将数据从输出 NiFi 端口传输到 Flink。
我卡在 .addSource()
功能上了。它要求额外的参数([OUT]),但是当我提供它们时,我不断收到错误。 Scala代码和错误信息如下。
package flinkTest
import java.nio.charset.{Charset, StandardCharsets}
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.DataStream
import org.apache.flink.streaming.connectors.nifi.NiFiSource
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.connectors.nifi.NiFiDataPacket
import org.apache.nifi.remote.client.{SiteToSiteClient, SiteToSiteClientConfig}
object NifiFlow {
def main(): Unit = {
// get the execution environment
val env: StreamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment
// get input data by connecting to NiFi
val clientConfig: SiteToSiteClientConfig = new SiteToSiteClient.Builder()
.url("http://localhost:8080/nifi")
.portName("Data to flink")
.requestBatchCount(2)
.buildConfig()
val nifiSource: SourceFunction[NiFiDataPacket] = new NiFiSource(clientConfig)
这是作品
val streamSource: DataStream[NiFiDataPacket] =
env.addSource(nifiSource).setParallelism(2)
还有一些代码
val dataStream = streamSource.map(dataPacket => new String(dataPacket.getContent, StandardCharsets.UTF_8))
dataStream.print()
env.execute()
}
}
1) 与 [OUT]
Error:(28, 76) value nifiSource of type org.apache.flink.streaming.api.functions.source.SourceFunction[org.apache.flink.streaming.connectors.nifi.NiFiDataPacket] does not take type parameters.
val streamSource: DataStream[NiFiDataPacket] = env.addSource(nifiSource[NiFiDataPacket]).setParallelism(2)
2) 没有 [OUT]
Error:(28, 66) type mismatch;
found : org.apache.flink.streaming.api.functions.source.SourceFunction[org.apache.flink.streaming.connectors.nifi.NiFiDataPacket]
required: org.apache.flink.streaming.api.function.source.SourceFunction[?]
val streamSource: DataStream[NiFiDataPacket] = env.addSource(nifiSource).setParallelism(2)
示例已取 here 并重写为 Scala。
如有任何建议,我将不胜感激。
UPD2
package flinkTest
import org.apache.nifi.remote.client.{SiteToSiteClient, SiteToSiteClientConfig}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.nifi._
object NifiFlow {
def main(): Unit = {
// get the execution environment
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// get input data by connecting to NiFi
val clientConfig: SiteToSiteClientConfig = new SiteToSiteClient.Builder()
.url("http://localhost:8080/nifi")
.portName("Data to flink")
.requestBatchCount(2)
.buildConfig()
val nifiSource = new NiFiSource(clientConfig)
val streamSource: DataStream[String] = env
.addSource(nifiSource)
.map(x => x.getAttributes().toString)
env.execute()
}
}
错误
Connected to the target VM, address: '127.0.0.1:41218', transport: 'socket'
Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: Interfaces and abstract classes are not valid types: interface org.apache.flink.streaming.connectors.nifi.NiFiDataPacket
at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:871)
at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:863)
at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:406)
at org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:197)
at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:184)
at flinkTest.NifiFlow$.main(NiFiFlow.scala:23)
scala有一个特殊的执行环境实现
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
用它代替 org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
env.addSource(nifiSource)
仅适用于先前设置 env.getJavaEnv.getConfig.disableClosureCleaner()
可能,这个开源项目中的 scala 源应该更新一下(位于 flink-scala_2.11...jar)。
我需要一些帮助来使用 Scala 代码将数据从输出 NiFi 端口传输到 Flink。
我卡在 .addSource()
功能上了。它要求额外的参数([OUT]),但是当我提供它们时,我不断收到错误。 Scala代码和错误信息如下。
package flinkTest
import java.nio.charset.{Charset, StandardCharsets}
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.DataStream
import org.apache.flink.streaming.connectors.nifi.NiFiSource
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.connectors.nifi.NiFiDataPacket
import org.apache.nifi.remote.client.{SiteToSiteClient, SiteToSiteClientConfig}
object NifiFlow {
def main(): Unit = {
// get the execution environment
val env: StreamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment
// get input data by connecting to NiFi
val clientConfig: SiteToSiteClientConfig = new SiteToSiteClient.Builder()
.url("http://localhost:8080/nifi")
.portName("Data to flink")
.requestBatchCount(2)
.buildConfig()
val nifiSource: SourceFunction[NiFiDataPacket] = new NiFiSource(clientConfig)
这是作品
val streamSource: DataStream[NiFiDataPacket] =
env.addSource(nifiSource).setParallelism(2)
还有一些代码
val dataStream = streamSource.map(dataPacket => new String(dataPacket.getContent, StandardCharsets.UTF_8))
dataStream.print()
env.execute()
}
}
1) 与 [OUT]
Error:(28, 76) value nifiSource of type org.apache.flink.streaming.api.functions.source.SourceFunction[org.apache.flink.streaming.connectors.nifi.NiFiDataPacket] does not take type parameters.
val streamSource: DataStream[NiFiDataPacket] = env.addSource(nifiSource[NiFiDataPacket]).setParallelism(2)
2) 没有 [OUT]
Error:(28, 66) type mismatch;
found : org.apache.flink.streaming.api.functions.source.SourceFunction[org.apache.flink.streaming.connectors.nifi.NiFiDataPacket]
required: org.apache.flink.streaming.api.function.source.SourceFunction[?]
val streamSource: DataStream[NiFiDataPacket] = env.addSource(nifiSource).setParallelism(2)
示例已取 here 并重写为 Scala。
如有任何建议,我将不胜感激。
UPD2
package flinkTest
import org.apache.nifi.remote.client.{SiteToSiteClient, SiteToSiteClientConfig}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.nifi._
object NifiFlow {
def main(): Unit = {
// get the execution environment
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// get input data by connecting to NiFi
val clientConfig: SiteToSiteClientConfig = new SiteToSiteClient.Builder()
.url("http://localhost:8080/nifi")
.portName("Data to flink")
.requestBatchCount(2)
.buildConfig()
val nifiSource = new NiFiSource(clientConfig)
val streamSource: DataStream[String] = env
.addSource(nifiSource)
.map(x => x.getAttributes().toString)
env.execute()
}
}
错误
Connected to the target VM, address: '127.0.0.1:41218', transport: 'socket'
Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: Interfaces and abstract classes are not valid types: interface org.apache.flink.streaming.connectors.nifi.NiFiDataPacket
at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:871)
at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:863)
at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:406)
at org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:197)
at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:184)
at flinkTest.NifiFlow$.main(NiFiFlow.scala:23)
scala有一个特殊的执行环境实现
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
用它代替 org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
env.addSource(nifiSource)
仅适用于先前设置 env.getJavaEnv.getConfig.disableClosureCleaner()
可能,这个开源项目中的 scala 源应该更新一下(位于 flink-scala_2.11...jar)。