无法使用spark scala读取和写入kafka主题

Can't Read from and write to kafka topic using spark scala

我正在编写一个示例 spark 流程序来读取来自输入 kafka 主题的消息并将其写入控制台和另一个输出 kafka 主题。没有收到任何错误或异常,但我也没有在控制台和输出 kafka 主题中看到消息。谁能告诉我 where/what 我失踪了吗?

这是我的代码。


    object Test extends App {
      val logger: Logger = LoggerFactory.getLogger(Test.getClass)
      val kafkalogger = org.apache.log4j.Logger.getLogger("kafka")
      kafkalogger.info("Running Pipeline")
      kafkalogger.setLevel(Level.INFO);
      val spark = SparkSession.builder().getOrCreate()
      
      val dfStream = spark.readStream
          .format("kafka")
          .option("kafka.bootstrap.servers", "*.*.*.*:9092")
          .option("subscribe", "input-topic")
          .option("startingOffsets", "earliest")
          .load()
    
        dfStream.printSchema()
    
        val messageDF = dfStream.selectExpr("CAST(value AS STRING)")
        
        messageDF.printSchema()
        
        kafkalogger.info("Before writing messages to console")
        
        messageDF.writeStream.outputMode("append").format("console").start()
        
        kafkalogger.info("After writing to console")
         val writeToKafka = dfStream
          //.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
         .selectExpr("CAST(value AS STRING)")
          .writeStream
          .format("kafka")
          .option("kafka.bootstrap.servers", "*.*.*.*:9092")
          .option("topic", "output-topic")
          .option("checkpointLocation", "/tmp/jsp_checkpointdir")
          .start()
        kafkalogger.info("After writing to topic")
        writeToKafka.awaitTermination()
    }

我能够看到数据框的架构,但看不到实际消息。

21/11/08 07:02:00 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@2b99c937{/static/sql,null,AVAILABLE,@Spark}
root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)

root
 |-- value: string (nullable = true)

21/11/08 07:02:03 INFO kafka: Before writing messages to console
21/11/08 07:02:03 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(memoryOverhead -> name: memoryOverhead, amount: 512, script: , vendor: , cores -> name: cores, amount: 2, script: , vendor: , memory -> name: memory, amount: 4096, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
21/11/08 07:02:03 INFO StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint
21/11/08 07:02:03 INFO ServerInfo: Adding filter to /StreamingQuery: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
21/11/08 07:02:03 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@40fd1a78{/StreamingQuery,null,AVAILABLE,@Spark}
21/11/08 07:02:03 INFO ServerInfo: Adding filter to /StreamingQuery/json: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
21/11/08 07:02:03 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@32430075{/StreamingQuery/json,null,AVAILABLE,@Spark}
21/11/08 07:02:03 INFO ServerInfo: Adding filter to /StreamingQuery/statistics: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
21/11/08 07:02:03 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@60f1f95b{/StreamingQuery/statistics,null,AVAILABLE,@Spark}
21/11/08 07:02:03 INFO ServerInfo: Adding filter to /StreamingQuery/statistics/json: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
21/11/08 07:02:03 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@6f926d01{/StreamingQuery/statistics/json,null,AVAILABLE,@Spark}
21/11/08 07:02:03 INFO ServerInfo: Adding filter to /static/sql: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
21/11/08 07:02:03 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@10e4ce98{/static/sql,null,AVAILABLE,@Spark}
21/11/08 07:02:03 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /mnt/tmp/temporary-666cc7f1-7209-411d-b550-bc9b9e2ed79f. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
21/11/08 07:02:03 WARN StreamingQueryManager: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
21/11/08 07:02:03 INFO MicroBatchExecution: Checkpoint root /mnt/tmp/temporary-666cc7f1-7209-411d-b550-bc9b9e2ed79f resolved to hdfs://ip-10-0-2-229.ap-south-1.compute.internal:8020/mnt/tmp/temporary-666cc7f1-7209-411d-b550-bc9b9e2ed79f.
21/11/08 07:02:03 INFO CheckpointFileManager: Writing atomically to hdfs://ip-10-0-2-229.ap-south-1.compute.internal:8020/mnt/tmp/temporary-666cc7f1-7209-411d-b550-bc9b9e2ed79f/metadata using temp file hdfs://ip-10-0-2-229.ap-south-1.compute.internal:8020/mnt/tmp/temporary-666cc7f1-7209-411d-b550-bc9b9e2ed79f/.metadata.2a9256a4-a174-4ebb-b671-a1f011b78158.tmp
21/11/08 07:02:03 INFO CheckpointFileManager: Renamed temp file hdfs://ip-10-0-2-229.ap-south-1.compute.internal:8020/mnt/tmp/temporary-666cc7f1-7209-411d-b550-bc9b9e2ed79f/.metadata.2a9256a4-a174-4ebb-b671-a1f011b78158.tmp to hdfs://ip-10-0-2-229.ap-south-1.compute.internal:8020/mnt/tmp/temporary-666cc7f1-7209-411d-b550-bc9b9e2ed79f/metadata
21/11/08 07:02:03 INFO MicroBatchExecution: Starting [id = 3393cbc3-9899-4053-8cb6-8c7654d8db28, runId = 23729550-4fc3-4500-8fcf-fa76da0ed3cf]. Use hdfs://ip-10-0-2-229.ap-south-1.compute.internal:8020/mnt/tmp/temporary-666cc7f1-7209-411d-b550-bc9b9e2ed79f to store the query checkpoint.
21/11/08 07:02:03 INFO kafka: After writing to console
21/11/08 07:02:03 WARN StreamingQueryManager: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
21/11/08 07:02:03 INFO MicroBatchExecution: Checkpoint root /tmp/jsp_checkpointdir resolved to hdfs://ip-10-0-2-229.ap-south-1.compute.internal:8020/tmp/jsp_checkpointdir.
21/11/08 07:02:03 INFO MicroBatchExecution: Reading table [org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@43309170] from DataSourceV2 named 'kafka' [org.apache.spark.sql.kafka010.KafkaSourceProvider@d19db03]
21/11/08 07:02:03 INFO MicroBatchExecution: Starting new streaming query.
21/11/08 07:02:03 INFO MicroBatchExecution: Stream started from {}
21/11/08 07:02:03 INFO MicroBatchExecution: Starting [id = 3ae88b1d-c4bb-4a7c-be5c-4a75fda4903d, runId = 0d089f20-c110-465b-add9-080e213dd72d]. Use hdfs://ip-10-0-2-229.ap-south-1.compute.internal:8020/tmp/jsp_checkpointdir to store the query checkpoint.
21/11/08 07:02:03 INFO kafka: After writing to topic
21/11/08 07:02:03 INFO MicroBatchExecution: Reading table [org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@43309170] from DataSourceV2 named 'kafka' [org.apache.spark.sql.kafka010.KafkaSourceProvider@d19db03]
21/11/08 07:02:03 INFO MicroBatchExecution: Starting new streaming query.
21/11/08 07:02:03 INFO MicroBatchExecution: Stream started from {}
21/11/08 07:02:03 INFO ConsumerConfig: ConsumerConfig values:
        allow.auto.create.topics = true
        auto.commit.interval.ms = 5000
        auto.offset.reset = earliest

这些是输入kafka主题中的消息

[root@ip-*.*.*.* kafka]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic input-topic --from-beginning
test message
message2
message3
message4
message4
message5
message6

输出主题中也没有消息。

[root@ip-*.*.*.* kafka]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic output-topic --from-beginning

我猜你有一些网络问题需要解决,但以下内容在本地对我有效(使用 Spark 2.4.8 测试)。

import org.apache.kafka.clients.consumer.OffsetResetStrategy
import org.apache.spark.sql
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.OutputMode
import org.slf4j.LoggerFactory

object KafkaTest extends App {

  val logger = LoggerFactory.getLogger(getClass)

  /**
   * For testing output to a console.
   *
   * @param df A Streaming DataFrame
   * @return A DataStreamWriter
   */
  private def streamToConsole(df: sql.DataFrame) = {
    df.writeStream.outputMode(OutputMode.Append()).format("console")
  }

  private def getKafkaDf(spark: SparkSession, bootstrap: String, topicPattern: String, offsetResetStrategy: OffsetResetStrategy = OffsetResetStrategy.EARLIEST) = {
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrap)
      .option("subscribe", topicPattern)
      .option("startingOffsets", offsetResetStrategy.toString.toLowerCase())
      .load()
  }

  private def streamToKafka(df: sql.DataFrame, bootstrap: String, checkpointLocation: String) = {
    val cols = df.columns
    if (!cols.contains("topic")) {
      val e = new IllegalArgumentException(s"""Dataframe columns must contain 'topic'. Existing cols=[${cols.mkString(", ")}]""")
      logger.error("Unable to stream dataframe to Kafka", e)
      throw e
    }
    // output topic comes from dataframe, not from options
    df.writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrap)
      .option("checkpointLocation", checkpointLocation) // required option
  }

  val spark = SparkSession.builder()
    .appName("Kafka Test")
    .master("local[*]")
    .getOrCreate()
  import spark.implicits._
  
  val kafkaBootstrap = "localhost:9092"

  val df = getKafkaDf(spark, kafkaBootstrap, "input-topic")
  streamToKafka(
    // Stream topic values into the same partition from input-topic to output-topic
    df.select($"partition", $"value", regexp_replace($"topic", "^input", "output").as("topic")),
    kafkaBootstrap,
    checkpointLocation = "/tmp/spark-sql-kafka"
  ).start().awaitTermination()
}

将配置更改恢复到 config/server.properties 文件并将其保留为默认配置后,此问题已解决。如果消息在 Spark 流作业启动之前进入主题,我将无法从主题的开头使用。这是不同的问题。