flink 中没有文件写入 HDFS
No File writen down to HDFS in flink
我正在尝试通过 flink 使用 kafka 并将结果保存到 hdfs 但一直没有生成文件.. 也没有出现错误消息..
顺便说一句,保存到本地文件是可以的,但是当我将路径更改为 hdfs 时,我什么也没得到。
object kafka2Hdfs {
private val ZOOKEEPER_HOST = "ip1:2181,ip2:2181,ip3:2181"
private val KAFKA_BROKER = "ip1:9092,ip2:9092,ip3:9092"
private val TRANSACTION_GROUP = "transaction"
val topic = "tgt3"
def main(args : Array[String]){
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.enableCheckpointing(1000L)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
// configure Kafka consumer
val kafkaProps = new Properties()
.... //topic infos
kafkaProps.setProperty("fs.default-scheme", "hdfs://ip:8020")
val consumer = new FlinkKafkaConsumer010[String](topic, new SimpleStringSchema(), kafkaProps)
val source = env.addSource(consumer)
val path = new Path("/user/jay/data")
// sink
val rollingPolicy : RollingPolicy[String,String] = DefaultRollingPolicy.create()
.withRolloverInterval(15000)
.build()
val sink: StreamingFileSink[String] = StreamingFileSink
.forRowFormat(path, new SimpleStringEncoder[String]("UTF-8"))
.withRollingPolicy(rollingPolicy)
.build()
source.addSink(sink)
env.execute("test")
}
}
我很困惑..
在我的脑海中,可能有两件事需要研究:
- 是否正确配置了 HDFS namenode,以便 Flink 知道它尝试写入 HDFS 而不是本地磁盘?
- nodemanger 和 taskmanager 日志说了什么?由于 HDFS 上的权限问题,它可能会失败。
我正在尝试通过 flink 使用 kafka 并将结果保存到 hdfs 但一直没有生成文件.. 也没有出现错误消息..
顺便说一句,保存到本地文件是可以的,但是当我将路径更改为 hdfs 时,我什么也没得到。
object kafka2Hdfs {
private val ZOOKEEPER_HOST = "ip1:2181,ip2:2181,ip3:2181"
private val KAFKA_BROKER = "ip1:9092,ip2:9092,ip3:9092"
private val TRANSACTION_GROUP = "transaction"
val topic = "tgt3"
def main(args : Array[String]){
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.enableCheckpointing(1000L)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
// configure Kafka consumer
val kafkaProps = new Properties()
.... //topic infos
kafkaProps.setProperty("fs.default-scheme", "hdfs://ip:8020")
val consumer = new FlinkKafkaConsumer010[String](topic, new SimpleStringSchema(), kafkaProps)
val source = env.addSource(consumer)
val path = new Path("/user/jay/data")
// sink
val rollingPolicy : RollingPolicy[String,String] = DefaultRollingPolicy.create()
.withRolloverInterval(15000)
.build()
val sink: StreamingFileSink[String] = StreamingFileSink
.forRowFormat(path, new SimpleStringEncoder[String]("UTF-8"))
.withRollingPolicy(rollingPolicy)
.build()
source.addSink(sink)
env.execute("test")
}
}
我很困惑..
在我的脑海中,可能有两件事需要研究:
- 是否正确配置了 HDFS namenode,以便 Flink 知道它尝试写入 HDFS 而不是本地磁盘?
- nodemanger 和 taskmanager 日志说了什么?由于 HDFS 上的权限问题,它可能会失败。