spark 结构化流动态字符串过滤器
spark structured streaming dynamic string filter
我们正在尝试为结构化流应用程序使用动态过滤器。
假设我们有以下 Spark 结构化流应用程序的伪实现:
spark.readStream()
.format("kafka")
.option(...)
...
.load()
.filter(getFilter()) <-- dynamic staff - def filter(conditionExpr: String):
.writeStream()
.format("kafka")
.option(.....)
.start();
和getFilter returns string
String getFilter() {
// dynamic staff to create expression
return expression; // eg. "column = true";
}
当前版本的Spark是否可以实现动态过滤条件?我的意思是 getFilter()
方法应该动态地 return 过滤条件(假设它每 10 分钟刷新一次)。我们试图研究广播变量,但不确定结构化流是否支持这样的事情。
提交作业后似乎无法更新作业配置。作为部署,我们使用 yarn
。
每个 suggestion/option 都受到高度赞赏。
编辑:
假设 getFilter()
returns:
(columnA = 1 AND columnB = true) OR customHiveUDF(columnC, 'input') != 'required' OR columnD > 8
10 分钟后我们可以进行小的更改(在第一个 OR 之前没有第一个表达式)并且可能我们可以有一个新表达式(columnA = 2
)例如:
customHiveUDF(columnC, 'input') != 'required' OR columnD > 10 OR columnA = 2
目标是为一个 spark 应用程序设置多个过滤器,而不是提交多个作业。
广播变量在这里应该没问题。您可以像这样编写类型化过滤器:
query.filter(x => x > bv.value).writeStream(...)
其中 bv 是一个 Broadcast
变量。您可以按照此处所述更新它:How can I update a broadcast variable in spark streaming?
其他解决方案是提供 RCP 或 RESTful 端点,并每 10 分钟询问一次该端点。例如(Java,因为这里比较简单):
class EndpointProxy {
Configuration lastValue;
long lastUpdated
public static Configuration getConfiguration (){
if (lastUpdated + refreshRate > System.currentTimeMillis()){
lastUpdated = System.currentTimeMillis();
lastValue = askMyAPI();
}
return lastValue;
}
}
query.filter (x => x > EndpointProxy.getConfiguration().getX()).writeStream()
编辑:用户问题的 hacky 解决方法:
您可以创建单行视图:
// confsDF 应该在一些驱动端单例中
var confsDF = Seq(一些内容).toDF("someColumn")
and then use:
query.crossJoin(confsDF.as("conf")) // cross join as we have only 1 value
.filter("hiveUDF(conf.someColumn)")
.writeStream()...
new Thread() {
confsDF = Seq(some new data).toDF("someColumn)
}.start();
此 hack 依赖于 Spark 默认执行模型 - 微批处理。在每个触发器中,正在重建查询,因此应考虑新数据。
你也可以在线程中做:
Seq(some new data).toDF("someColumn).createOrReplaceTempView("conf")
然后在查询中:
.crossJoin(spark.table("conf"))
两者都应该有效。请记住,它不适用于连续处理模式
这是一个简单的例子,我在其中动态过滤来自套接字的记录。您可以使用任何可以动态更新过滤器或轻量级 zookeeper 实例的休息 API 而不是 Date。
注意:- 如果您打算使用任何 rest API 或 zookeeper 或任何其他选项,请使用 mapPartition 而不是 filter,因为在那种情况下您需要调用 API/Connection一次一个分区。
val lines = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
// Split the lines into words
val words = lines.as[String].filter(_ == new java.util.Date().getMinutes.toString)
// Generate running word count
val wordCounts = words.groupBy("value").count()
val query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.start()
query.awaitTermination()
我们正在尝试为结构化流应用程序使用动态过滤器。
假设我们有以下 Spark 结构化流应用程序的伪实现:
spark.readStream()
.format("kafka")
.option(...)
...
.load()
.filter(getFilter()) <-- dynamic staff - def filter(conditionExpr: String):
.writeStream()
.format("kafka")
.option(.....)
.start();
和getFilter returns string
String getFilter() {
// dynamic staff to create expression
return expression; // eg. "column = true";
}
当前版本的Spark是否可以实现动态过滤条件?我的意思是 getFilter()
方法应该动态地 return 过滤条件(假设它每 10 分钟刷新一次)。我们试图研究广播变量,但不确定结构化流是否支持这样的事情。
提交作业后似乎无法更新作业配置。作为部署,我们使用 yarn
。
每个 suggestion/option 都受到高度赞赏。
编辑:
假设 getFilter()
returns:
(columnA = 1 AND columnB = true) OR customHiveUDF(columnC, 'input') != 'required' OR columnD > 8
10 分钟后我们可以进行小的更改(在第一个 OR 之前没有第一个表达式)并且可能我们可以有一个新表达式(columnA = 2
)例如:
customHiveUDF(columnC, 'input') != 'required' OR columnD > 10 OR columnA = 2
目标是为一个 spark 应用程序设置多个过滤器,而不是提交多个作业。
广播变量在这里应该没问题。您可以像这样编写类型化过滤器:
query.filter(x => x > bv.value).writeStream(...)
其中 bv 是一个 Broadcast
变量。您可以按照此处所述更新它:How can I update a broadcast variable in spark streaming?
其他解决方案是提供 RCP 或 RESTful 端点,并每 10 分钟询问一次该端点。例如(Java,因为这里比较简单):
class EndpointProxy {
Configuration lastValue;
long lastUpdated
public static Configuration getConfiguration (){
if (lastUpdated + refreshRate > System.currentTimeMillis()){
lastUpdated = System.currentTimeMillis();
lastValue = askMyAPI();
}
return lastValue;
}
}
query.filter (x => x > EndpointProxy.getConfiguration().getX()).writeStream()
编辑:用户问题的 hacky 解决方法:
您可以创建单行视图: // confsDF 应该在一些驱动端单例中 var confsDF = Seq(一些内容).toDF("someColumn")
and then use:
query.crossJoin(confsDF.as("conf")) // cross join as we have only 1 value
.filter("hiveUDF(conf.someColumn)")
.writeStream()...
new Thread() {
confsDF = Seq(some new data).toDF("someColumn)
}.start();
此 hack 依赖于 Spark 默认执行模型 - 微批处理。在每个触发器中,正在重建查询,因此应考虑新数据。
你也可以在线程中做:
Seq(some new data).toDF("someColumn).createOrReplaceTempView("conf")
然后在查询中:
.crossJoin(spark.table("conf"))
两者都应该有效。请记住,它不适用于连续处理模式
这是一个简单的例子,我在其中动态过滤来自套接字的记录。您可以使用任何可以动态更新过滤器或轻量级 zookeeper 实例的休息 API 而不是 Date。
注意:- 如果您打算使用任何 rest API 或 zookeeper 或任何其他选项,请使用 mapPartition 而不是 filter,因为在那种情况下您需要调用 API/Connection一次一个分区。
val lines = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
// Split the lines into words
val words = lines.as[String].filter(_ == new java.util.Date().getMinutes.toString)
// Generate running word count
val wordCounts = words.groupBy("value").count()
val query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.start()
query.awaitTermination()