java.lang.IllegalArgumentException: 'path' 未指定 // Spark 消费者问题
java.lang.IllegalArgumentException: 'path' is not specified // Spark Consumer Issue
我正在尝试创建 SparkConsumer,因此我可以在这种情况下通过 Spark Streaming 向 Kafka 发送一个 csv 文件。但是我有一个错误,未指定 'path'。
请参阅下面的代码
我的代码如下:
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.streaming.FileStreamSource.Timestamp
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.streaming.OutputMode
object sparkConsumer extends App {
val conf = new SparkConf().setMaster("local").setAppName("Name")
val sc = new SparkContext(conf)
val rootLogger = Logger.getRootLogger()
rootLogger.setLevel(Level.ERROR)
val spark = SparkSession
.builder()
.appName("Spark-Kafka-Integration")
.master("local")
.getOrCreate()
val schema = StructType(Array(
StructField("InvoiceNo", StringType, nullable = true),
StructField("StockCode", StringType, nullable = true),
StructField("Description", StringType, nullable = true),
StructField("Quantity", StringType, nullable = true)
))
val streamingDataFrame = spark.readStream.schema(schema).csv("C:/Users/me/Desktop/Tasks/Tasks1/test.csv")
streamingDataFrame.selectExpr("CAST(InvoiceNo AS STRING) AS key", "to_json(struct(*)) AS value").
writeStream
.format("csv")
.option("topic", "topic_test")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("checkpointLocation", "C:/Users/me/IdeaProjects/SparkStreaming/checkpointLocation/")
.start()
import spark.implicits._
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "topic_test")
.load()
val df1 = df.selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)").as[(String, Timestamp)]
.select(from_json($"value", schema).as("data"), $"timestamp")
.select("data.*", "timestamp")
df1.writeStream
.format("console")
.option("truncate","false")
.outputMode(OutputMode.Append)
.start()
.awaitTermination()
}
我变成如下错误:
Exception in thread "main" java.lang.IllegalArgumentException: 'path' is not specified
有人知道我错过了什么吗?
您的这部分代码似乎有问题:
streamingDataFrame.selectExpr("CAST(InvoiceNo AS STRING) AS key", "to_json(struct(*)) AS value").
writeStream
.format("csv")
.option("topic", "topic_test")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("checkpointLocation", "C:/Users/me/IdeaProjects/SparkStreaming/checkpointLocation/")
.start()
因为您使用 "csv" 格式,但没有设置它需要的文件位置。相反,您将 Kafka 属性配置为使用 kafka 主题作为您的接收器。因此,如果您将格式更改为 "kafka",它应该可以工作。
您可以尝试使用 csv 作为源的另一个问题是您的路径应该是目录而不是文件。在您的情况下,如果您创建一个目录并移动您的 csv 文件,它将起作用。
只是为了测试,创建一个名为 C:/Users/me/Desktop/Tasks/Tasks1/test.csv 的目录,并在里面创建一个名为 part-0000.csv 的文件。然后将您的 csv 内容包含在这个新文件中并重新开始该过程。
我正在尝试创建 SparkConsumer,因此我可以在这种情况下通过 Spark Streaming 向 Kafka 发送一个 csv 文件。但是我有一个错误,未指定 'path'。 请参阅下面的代码
我的代码如下:
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.streaming.FileStreamSource.Timestamp
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.streaming.OutputMode
object sparkConsumer extends App {
val conf = new SparkConf().setMaster("local").setAppName("Name")
val sc = new SparkContext(conf)
val rootLogger = Logger.getRootLogger()
rootLogger.setLevel(Level.ERROR)
val spark = SparkSession
.builder()
.appName("Spark-Kafka-Integration")
.master("local")
.getOrCreate()
val schema = StructType(Array(
StructField("InvoiceNo", StringType, nullable = true),
StructField("StockCode", StringType, nullable = true),
StructField("Description", StringType, nullable = true),
StructField("Quantity", StringType, nullable = true)
))
val streamingDataFrame = spark.readStream.schema(schema).csv("C:/Users/me/Desktop/Tasks/Tasks1/test.csv")
streamingDataFrame.selectExpr("CAST(InvoiceNo AS STRING) AS key", "to_json(struct(*)) AS value").
writeStream
.format("csv")
.option("topic", "topic_test")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("checkpointLocation", "C:/Users/me/IdeaProjects/SparkStreaming/checkpointLocation/")
.start()
import spark.implicits._
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "topic_test")
.load()
val df1 = df.selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)").as[(String, Timestamp)]
.select(from_json($"value", schema).as("data"), $"timestamp")
.select("data.*", "timestamp")
df1.writeStream
.format("console")
.option("truncate","false")
.outputMode(OutputMode.Append)
.start()
.awaitTermination()
}
我变成如下错误:
Exception in thread "main" java.lang.IllegalArgumentException: 'path' is not specified
有人知道我错过了什么吗?
您的这部分代码似乎有问题:
streamingDataFrame.selectExpr("CAST(InvoiceNo AS STRING) AS key", "to_json(struct(*)) AS value").
writeStream
.format("csv")
.option("topic", "topic_test")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("checkpointLocation", "C:/Users/me/IdeaProjects/SparkStreaming/checkpointLocation/")
.start()
因为您使用 "csv" 格式,但没有设置它需要的文件位置。相反,您将 Kafka 属性配置为使用 kafka 主题作为您的接收器。因此,如果您将格式更改为 "kafka",它应该可以工作。
您可以尝试使用 csv 作为源的另一个问题是您的路径应该是目录而不是文件。在您的情况下,如果您创建一个目录并移动您的 csv 文件,它将起作用。
只是为了测试,创建一个名为 C:/Users/me/Desktop/Tasks/Tasks1/test.csv 的目录,并在里面创建一个名为 part-0000.csv 的文件。然后将您的 csv 内容包含在这个新文件中并重新开始该过程。