在 Spark Structured Streaming 中找不到 "window" 函数
Can´t find "window" function in Spark Structured Streaming
我正在 Spark Structured Streaming
中编写一个小示例,我在其中尝试处理 netstat
命令的输出,但无法弄清楚如何调用 window
功能。
这些是我的 build.sbt:
的相关行
scalaVersion := "2.11.4"
scalacOptions += "-target:jvm-1.8"
libraryDependencies ++= {
val sparkVer = "2.3.0"
Seq(
"org.apache.spark" %% "spark-streaming" % sparkVer % "provided",
"org.apache.spark" %% "spark-streaming-kafka-0-8" % sparkVer % "provided",
"org.apache.spark" %% "spark-core" % sparkVer % "provided" withSources(),
"org.apache.spark" %% "spark-hive" % sparkVer % "provided",
)
}
代码:
case class NetEntry(val timeStamp: java.sql.Timestamp, val sourceHost: String, val targetHost: String, val status: String)
def convertToNetEntry(x: String): NetEntry = {
// tcp 0 0 eselivpi14:icl-twobase1 eselivpi149.int.e:48442 TIME_WAIT
val array = x.replaceAll("\s+"," ").split(" ").slice(3,6)
NetEntry(java.sql.Timestamp.valueOf(LocalDateTime.now()), array(0),array(1),array(2))
}
def main(args: Array[String]) {
// Initialize spark context
val spark: SparkSession = SparkSession.builder.appName("StructuredNetworkWordCount").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
val lines = spark.readStream
.format("socket")
.option("host", args(0))
.option("port", args(1).toInt)
.load()
import spark.implicits._
val df = lines.as[String].map(x => convertToNetEntry(x))
val wordsArr: Dataset[NetEntry] = df.as[NetEntry]
wordsArr.printSchema()
// Never get past this point
val windowColumn = window($"timestamp", "10 minutes", "5 minutes")
val windowedCounts = wordsArr.groupBy( windowColumn, $"targetHost").count()
val query = windowedCounts.writeStream.outputMode("complete").format("console").start()
query.awaitTermination()
}
我使用 Spark 2.1、2,2 和 2.3 得到了相同的结果。真正奇怪的是,我有一个 Spark 集群,我登录 Spark Shell 并复制所有行......它有效!知道我做错了什么吗?
编译时的错误:
[error] C:\code_legacy\edos-dp-mediation-spark-consumer\src\main\scala\com\ericsson\streaming\structured\StructuredStreamingMain.scala:39: not found: value window
[error] val windowColumn = window($"timestamp", "10 minutes", "5 minutes")
[error] ^
[warn] 5 warnings found
[error] one error found
[error] (compile:compileIncremental) Compilation failed
[error] Total time: 19 s, completed 16-mar-2018 20:13:40
更新:为了让事情变得更奇怪,我查看了 API 文档,但我在这里也找不到有效的参考:
https://spark.apache.org/docs/2.3.0/api/scala/index.html#org.apache.spark.sql.SparkSession$暗示$
需要导入window
函数进行编译,spark-shell中已经导入了。
添加此导入语句:
import org.apache.spark.sql.functions.window
我正在 Spark Structured Streaming
中编写一个小示例,我在其中尝试处理 netstat
命令的输出,但无法弄清楚如何调用 window
功能。
这些是我的 build.sbt:
的相关行scalaVersion := "2.11.4"
scalacOptions += "-target:jvm-1.8"
libraryDependencies ++= {
val sparkVer = "2.3.0"
Seq(
"org.apache.spark" %% "spark-streaming" % sparkVer % "provided",
"org.apache.spark" %% "spark-streaming-kafka-0-8" % sparkVer % "provided",
"org.apache.spark" %% "spark-core" % sparkVer % "provided" withSources(),
"org.apache.spark" %% "spark-hive" % sparkVer % "provided",
)
}
代码:
case class NetEntry(val timeStamp: java.sql.Timestamp, val sourceHost: String, val targetHost: String, val status: String)
def convertToNetEntry(x: String): NetEntry = {
// tcp 0 0 eselivpi14:icl-twobase1 eselivpi149.int.e:48442 TIME_WAIT
val array = x.replaceAll("\s+"," ").split(" ").slice(3,6)
NetEntry(java.sql.Timestamp.valueOf(LocalDateTime.now()), array(0),array(1),array(2))
}
def main(args: Array[String]) {
// Initialize spark context
val spark: SparkSession = SparkSession.builder.appName("StructuredNetworkWordCount").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
val lines = spark.readStream
.format("socket")
.option("host", args(0))
.option("port", args(1).toInt)
.load()
import spark.implicits._
val df = lines.as[String].map(x => convertToNetEntry(x))
val wordsArr: Dataset[NetEntry] = df.as[NetEntry]
wordsArr.printSchema()
// Never get past this point
val windowColumn = window($"timestamp", "10 minutes", "5 minutes")
val windowedCounts = wordsArr.groupBy( windowColumn, $"targetHost").count()
val query = windowedCounts.writeStream.outputMode("complete").format("console").start()
query.awaitTermination()
}
我使用 Spark 2.1、2,2 和 2.3 得到了相同的结果。真正奇怪的是,我有一个 Spark 集群,我登录 Spark Shell 并复制所有行......它有效!知道我做错了什么吗?
编译时的错误:
[error] C:\code_legacy\edos-dp-mediation-spark-consumer\src\main\scala\com\ericsson\streaming\structured\StructuredStreamingMain.scala:39: not found: value window
[error] val windowColumn = window($"timestamp", "10 minutes", "5 minutes")
[error] ^
[warn] 5 warnings found
[error] one error found
[error] (compile:compileIncremental) Compilation failed
[error] Total time: 19 s, completed 16-mar-2018 20:13:40
更新:为了让事情变得更奇怪,我查看了 API 文档,但我在这里也找不到有效的参考: https://spark.apache.org/docs/2.3.0/api/scala/index.html#org.apache.spark.sql.SparkSession$暗示$
需要导入window
函数进行编译,spark-shell中已经导入了。
添加此导入语句:
import org.apache.spark.sql.functions.window