如何通过在pyspark上插入子文档将两个文档合并为一个文档?

How to merge two documents into one document by inserting sub-documents on pyspark?

我有一个大问题,希望能清楚地解释我想做什么。 我正在尝试在 pyspark(Spark Structured Streaming)上获取 Stream-Stream 结构,并且我想在从 Kafka 中的抓取中获取新数据时更新相同的文档。

这些是 JSON 在 MongoCompass 上的本地主机上发送的示例:

{
_id: ObjectId("28276465847392747")
id: reply
Company: reply
Value:{

    Date: 20-05-2020
    Last_Hour_Contract: 09.12.25
    Last_Hour: 09.14.30
    Price: 16.08 
    Quantity: 8000 
    Medium_Price: 8.98 
    Min_Price: 8.98 
    Max_Price: 20.33
    News: { id_news: Reply_20-05-20
           title_news: "titolo news"
           text: "text"
           date: 20-05-2020
           hour: 09:13:00
           subject: Reply
        }

     }
}
{
_id: ObjectId("28276465847392747")
id: reply
Company: reply
Value:{

    Date: 20-05-2020
    Last_Hour_Contract: 09.12.25
    Last_Hour: 09.14.30
    Price: 17.78 
    Quantity: 9000 
    Medium_Price: 67.98 
    Min_Price: 8.98 
    Max_Price: 20.33
    News: { id_news: Reply_20-05-20
           title_news: "title_news"
           text: "text"
           date: 20-05-2020
           hour: 09:13:00
           subject: Reply
        }

    }
}

我想实现的是在新数据到达时将各种文档(基于Company_Name = "Name_Company")合并为一个文档。

我想要的JSON文件的设置如下:

{
_id: ObjectId("3333884747656565"),
id: reply
Date: 21-05-2020
Company: Reply
Value:{
    Date: 20-05-2020
    Last_Hour_Contract: 09.12.25
    Last_Hour: 09.14.30
    Price: 16.08
    Quantity: 8000
    Medium_Price: 8.98
    Min_Price: 8.98
    Max_Price: 20.33
    News: {id_news: Reply_20-05-20
           title_news: "title news..."
           text: "text..."
           date: 20-05-2020
           hour: 09:13:00
           subject: Reply
        }
    Date: 21-05-2020    
    Last_Hour_Contract: 09.12.25
    Last_Hour: 09.16.50
    Price: 16.68
    Quantity: 7000
    Medium_Price: 8.98
    Min_Price: 8.98
    Max_Price: 20.33
    News: {id_news: Reply_20-05-20
           title_news: "title news..."
           text: "text..."
           date: 21-05-2020
           hour: 09:14:00
           subject: Reply
        }
   }
}

我也插入一张图片让你更好地理解(我希望2个箭头是可以理解的):

如何使用 Pyspark 完成此操作?谢谢

这是我的代码:

def writeStreamer(sdf):
    sdf.select("id_Borsa","NomeAzienda","Valori_Di_Borsa")  \
    .dropDuplicates(["id_Borsa","NomeAzienda","Valori_Di_Borsa"]) \
    .writeStream \
    .outputMode("append") \
    .foreachBatch(foreach_batch_function) \
    .start() 


def foreach_batch_function(sdf, epoch_id):
    sdf.write \
        .format("mongo") \
        .mode("append") \
        .option("spark.mongodb.input.uri", "mongodb://127.0.0.1:27017/DataManagement.Data") \
        .option("spark.mongodb.output.uri", "mongodb://127.0.0.1:27017/DataManagement.Data") \
        .save() #"com.mongodb.spark.sql.DefaultSource"


df_borsa = spark.readStream.format("kafka") \
          .option("kafka.bootstrap.servers", kafka_broker) \
          .option("startingOffsets", "latest") \
          .option("subscribe","Reply_borsa") \
          .load() \
          .selectExpr("CAST(value AS STRING)") 

df_news = spark.readStream.format("kafka") \
          .option("kafka.bootstrap.servers", kafka_broker) \
          .option("startingOffsets", "latest") \
          .option("subscribe","Reply_news") \
          .load() \
          .selectExpr("CAST(value AS STRING)") 


df_borsa = df_borsa.withColumn("Valori_Di_Borsa",F.struct(F.col("Data"),F.col("PrezzoUltimoContratto"),F.col("Var%"),F.col("VarAssoluta"),F.col("OraUltimoContratto"),F.col("QuantitaUltimo"),F.col("QuantitaAcquisto"),F.col("QuantitaVendita"),F.col("QuantitaTotale"),F.col("NumeroContratti"),F.col("MaxOggi"),F.col("MinOggi")))

df_news = df_news.withColumn("News",F.struct(F.col("id_News"),F.col("TitoloNews"),F.col("TestoNews"),F.col("DataNews"),F.col("OraNews")))

# Apply watermarks on event-time columns
dfWithWatermark = df_borsa.select("id_Borsa","NomeAzienda","StartTime","Valori_Di_Borsa").withWatermark("StartTime", "2 hours") # maximal delay

df1WithWatermark = df_news.select("SoggettoNews","EndTime").withWatermark("EndTime", "3 hours") # maximal delay

# Join with event-time constraints
sdf = dfWithWatermark.join(df1WithWatermark,expr(""" 
      SoggettoNews = NomeAzienda AND
      EndTime >= StartTime AND
      EndTime <= StartTime + interval 1 hours
      """),
       "leftOuter").withColumn("Valori_Di_Borsa",F.struct(F.col("Valori_Di_Borsa.*"),F.col("News"))) 


query = writeStreamer(sdf)

spark.streams.awaitAnyTermination()

sdf_printschema():

你所要做的就是,使用group运算符按Company字段对文档进行分组,并将每个分组文档的value对象添加到新形成的数组字段中values 通过使用 $push 运算符。

所以,上面的 mongo 实现将如下所示:

db.collection.aggregate([{
    $group: {
        _id: '$Company',
        id: {$first: '$id'},
        date: {$first: '$first'},
        values: {$push: '$value'}
    }
}])

您可以轻松地将上述聚合转换为 PySpark 实现。

您需要执行以下操作:

pipeline = "{'$group': {'_id': '$Company', 'id': {'$first': '$id'}, 'date': {'$first': '$first'}, 'values': {'$push': '$value'}}}"
df = spark.read.format("mongo").option("pipeline", pipeline).load()
df.show()

注意:我不是 PySpark 专家,但您可以轻松地将其转换为所需的实现。