Apache Spark Scala API:Scala 中的 ReduceByKeyAndWindow
Apache Spark Scala API: ReduceByKeyAndWindow in Scala
由于我是 Spark 的 Scala 新手 API 我遇到以下问题:
在我的 java 代码中,我做了一个 reduceByKeyAndWindow 转换,但现在我看到,只有一个 reduceByWindow(因为 Scala 中也没有 PairDStream)。但是,我现在开始使用 Scala 了:
import org.apache.hadoop.conf.Configuration;
import [...]
val serverIp = "xxx.xxx.xxx.xxx"
val receiverInstances = 2
val batchIntervalSec = 2
val windowSize1hSek = 60 * 60
val slideDurationSek = batchIntervalSec
val streamingCtx = new StreamingContext(sc, Seconds(batchIntervalSec))
val hadoopConf = sc.hadoopConfiguration
hadoopConf.set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoopConf.set("fs.s3n.awsAccessKeyId", "xxx")
hadoopConf.set("fs.s3n.awsSecretAccessKey", "xxx")
// ReceiverInputDStream
val receiver1 = streamingCtx.socketTextStream(serverIp, 7777)
val receiver2 = streamingCtx.socketTextStream(serverIp, 7778)
// DStream
val inputDStream = receiver1.union(receiver2)
// h.hh.plug.ts.val
case class DebsEntry(house: Integer, household: Integer, plug: Integer, ts: Long, value: Float)
// h.hh.plug.val
case class DebsEntryWithoutTs(house: Integer, household: Integer, plug: Integer, value: Float)
// h.hh.plug.1
case class DebsEntryWithoutTsCount(house: Integer, household: Integer, plug: Integer, count: Long)
val debsPairDStream = inputDStream.map(s => s.split(",")).map(s => DebsEntry(s(6).toInt, s(5).toInt, s(4).toInt, s(1).toLong, s(2).toFloat)) //.foreachRDD(rdd => rdd.toDF().registerTempTable("test"))
val debsPairDStreamWithoutDuplicates = debsPairDStream.transform(s => s.distinct())
val debsPairDStreamConsumptionGreater0 = debsPairDStreamWithoutDuplicates.filter(s => s.value > 100.0)
debsPairDStreamConsumptionGreater0.foreachRDD(rdd => rdd.toDF().registerTempTable("test3"))
val debsPairDStreamConsumptionGreater0withoutTs = debsPairDStreamConsumptionGreater0.map(s => DebsEntryWithoutTs(s.house, s.household, s.plug, s.value))
// 5.) Average per Plug
// 5.1) Create a count-prepared PairDStream (house, household, plug, 1)
val countPreparedPerPlug1h = debsPairDStreamConsumptionGreater0withoutTs.map(s => DebsEntryWithoutTsCount(s.house, s.household, s.plug, 1))
// 5.2) ReduceByKeyAndWindow
val countPerPlug1h = countPreparedPerPlug1h.reduceByWindow(...???...)
直到步骤 5.1 一切正常。在 5.2 中,我现在想对 countPreparedPerPlug1h 的 1 求和,但前提是其他属性(房屋、家庭、插头)相等。 - 目标是获得每个(房屋、家庭、插座)组合的条目数。有人可以帮忙吗?谢谢!
编辑 - 第一次尝试
我在步骤 5.2 中尝试了以下操作:
// 5.2)
val countPerPlug1h = countPreparedPerPlug1h.reduceByKeyAndWindow((a,b) => a+b, Seconds(windowSize1hSek), Seconds(slideDurationSek))
但是我得到以下错误:
<console>:69: error: missing parameter type
val countPerPlug1h = countPreparedPerPlug1h.reduceByKeyAndWindow((a,b) => a+b, Seconds(windowSize1hSek), Seconds(slideDurationSek))
^
我好像用错了reduceByKeyAndWindow转换,错在哪里?要加起来的值的类型是 Int,见上面步骤 5.1 中的 countPreparedPerPlug1h。
您可以在 Scala 中使用 reduceByKeyAndWindow
比在 Java 版本中更简单。您没有 PairDStream,因为对是隐式确定的,您可以直接调用对方法。隐式解析转到 PairDStreamFunctions
例如:
val myPairDStream: DStream[KeyType, ValueType] = ...
myPairDStream.reduceByKeyAndWindow(...)
幕后花絮如下:
new PairDStreamFunctions(myPairDStream).reduceByKeyAndWindow(...)
这个 PairDStreamFunctions
的包装器被添加到任何由 Tuple2
组成的 DStream
我明白了,现在似乎可以使用以下代码:
val countPerPlug1h = countPreparedPerPlug1h.reduceByKeyAndWindow({(x, y) => x + y}, {(x, y) => x - y}, Seconds(windowSize1hSek), Seconds(slideDurationSek))
感谢您提供线索,@Justin Pihony
由于我是 Spark 的 Scala 新手 API 我遇到以下问题:
在我的 java 代码中,我做了一个 reduceByKeyAndWindow 转换,但现在我看到,只有一个 reduceByWindow(因为 Scala 中也没有 PairDStream)。但是,我现在开始使用 Scala 了:
import org.apache.hadoop.conf.Configuration;
import [...]
val serverIp = "xxx.xxx.xxx.xxx"
val receiverInstances = 2
val batchIntervalSec = 2
val windowSize1hSek = 60 * 60
val slideDurationSek = batchIntervalSec
val streamingCtx = new StreamingContext(sc, Seconds(batchIntervalSec))
val hadoopConf = sc.hadoopConfiguration
hadoopConf.set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoopConf.set("fs.s3n.awsAccessKeyId", "xxx")
hadoopConf.set("fs.s3n.awsSecretAccessKey", "xxx")
// ReceiverInputDStream
val receiver1 = streamingCtx.socketTextStream(serverIp, 7777)
val receiver2 = streamingCtx.socketTextStream(serverIp, 7778)
// DStream
val inputDStream = receiver1.union(receiver2)
// h.hh.plug.ts.val
case class DebsEntry(house: Integer, household: Integer, plug: Integer, ts: Long, value: Float)
// h.hh.plug.val
case class DebsEntryWithoutTs(house: Integer, household: Integer, plug: Integer, value: Float)
// h.hh.plug.1
case class DebsEntryWithoutTsCount(house: Integer, household: Integer, plug: Integer, count: Long)
val debsPairDStream = inputDStream.map(s => s.split(",")).map(s => DebsEntry(s(6).toInt, s(5).toInt, s(4).toInt, s(1).toLong, s(2).toFloat)) //.foreachRDD(rdd => rdd.toDF().registerTempTable("test"))
val debsPairDStreamWithoutDuplicates = debsPairDStream.transform(s => s.distinct())
val debsPairDStreamConsumptionGreater0 = debsPairDStreamWithoutDuplicates.filter(s => s.value > 100.0)
debsPairDStreamConsumptionGreater0.foreachRDD(rdd => rdd.toDF().registerTempTable("test3"))
val debsPairDStreamConsumptionGreater0withoutTs = debsPairDStreamConsumptionGreater0.map(s => DebsEntryWithoutTs(s.house, s.household, s.plug, s.value))
// 5.) Average per Plug
// 5.1) Create a count-prepared PairDStream (house, household, plug, 1)
val countPreparedPerPlug1h = debsPairDStreamConsumptionGreater0withoutTs.map(s => DebsEntryWithoutTsCount(s.house, s.household, s.plug, 1))
// 5.2) ReduceByKeyAndWindow
val countPerPlug1h = countPreparedPerPlug1h.reduceByWindow(...???...)
直到步骤 5.1 一切正常。在 5.2 中,我现在想对 countPreparedPerPlug1h 的 1 求和,但前提是其他属性(房屋、家庭、插头)相等。 - 目标是获得每个(房屋、家庭、插座)组合的条目数。有人可以帮忙吗?谢谢!
编辑 - 第一次尝试
我在步骤 5.2 中尝试了以下操作:
// 5.2)
val countPerPlug1h = countPreparedPerPlug1h.reduceByKeyAndWindow((a,b) => a+b, Seconds(windowSize1hSek), Seconds(slideDurationSek))
但是我得到以下错误:
<console>:69: error: missing parameter type
val countPerPlug1h = countPreparedPerPlug1h.reduceByKeyAndWindow((a,b) => a+b, Seconds(windowSize1hSek), Seconds(slideDurationSek))
^
我好像用错了reduceByKeyAndWindow转换,错在哪里?要加起来的值的类型是 Int,见上面步骤 5.1 中的 countPreparedPerPlug1h。
您可以在 Scala 中使用 reduceByKeyAndWindow
比在 Java 版本中更简单。您没有 PairDStream,因为对是隐式确定的,您可以直接调用对方法。隐式解析转到 PairDStreamFunctions
例如:
val myPairDStream: DStream[KeyType, ValueType] = ...
myPairDStream.reduceByKeyAndWindow(...)
幕后花絮如下:
new PairDStreamFunctions(myPairDStream).reduceByKeyAndWindow(...)
这个 PairDStreamFunctions
的包装器被添加到任何由 Tuple2
我明白了,现在似乎可以使用以下代码:
val countPerPlug1h = countPreparedPerPlug1h.reduceByKeyAndWindow({(x, y) => x + y}, {(x, y) => x - y}, Seconds(windowSize1hSek), Seconds(slideDurationSek))
感谢您提供线索,@Justin Pihony