使用 Spark 从 Elasticsearch 中获取最近的 N 条记录
Fetch the most recent N records from Elasticsearch using Spark
我想检索插入到 Elasticsearch
中的最后 50 条记录,以找出它们在 异常检测 项目中的平均值。
这就是我从 ES 中检索数据的方式。但是,它正在获取整个数据,而不是最后 50 条记录。有什么办法吗?
edf = spark \
.read \
.format("org.elasticsearch.spark.sql") \
.option("es.read.metadata", "false") \
.option("es.nodes.wan.only","true") \
.option("es.port","9200")\
.option("es.net.ssl","false")\
.option("es.nodes", "http://localhost") \
.load("anomaly_detection/data")
# GroupBy based on the `sender` column
df3 = edf.groupBy("sender") \
.agg(expr("avg(amount)").alias("avg_amount"))
这里sender
列是取整行数据,如何只取最后50DataFrame
行数据?
输入数据架构格式:
|sender|receiver|amount|
您还可以在读取数据时添加查询
query='{"query": {"match_all": {}}, "size": 50, "sort": [{"_timestamp": {"order": "desc"}}]}'
并将其作为
传递
edf = spark \
.read \
.format("org.elasticsearch.spark.sql") \
.option("es.read.metadata", "false") \
.option("es.nodes.wan.only","true") \
.option("es.port","9200")\
.option("es.net.ssl","false")\
.option("es.nodes", "http://localhost") \
.option("query", query)
.load("anomaly_detection/data")
我想检索插入到 Elasticsearch
中的最后 50 条记录,以找出它们在 异常检测 项目中的平均值。
这就是我从 ES 中检索数据的方式。但是,它正在获取整个数据,而不是最后 50 条记录。有什么办法吗?
edf = spark \
.read \
.format("org.elasticsearch.spark.sql") \
.option("es.read.metadata", "false") \
.option("es.nodes.wan.only","true") \
.option("es.port","9200")\
.option("es.net.ssl","false")\
.option("es.nodes", "http://localhost") \
.load("anomaly_detection/data")
# GroupBy based on the `sender` column
df3 = edf.groupBy("sender") \
.agg(expr("avg(amount)").alias("avg_amount"))
这里sender
列是取整行数据,如何只取最后50DataFrame
行数据?
输入数据架构格式:
|sender|receiver|amount|
您还可以在读取数据时添加查询
query='{"query": {"match_all": {}}, "size": 50, "sort": [{"_timestamp": {"order": "desc"}}]}'
并将其作为
传递edf = spark \
.read \
.format("org.elasticsearch.spark.sql") \
.option("es.read.metadata", "false") \
.option("es.nodes.wan.only","true") \
.option("es.port","9200")\
.option("es.net.ssl","false")\
.option("es.nodes", "http://localhost") \
.option("query", query)
.load("anomaly_detection/data")