使用 spark 和 kafka 进行 Twitter 流式传输:如何将数据存储在 MongoDB 中
Twitter streaming using spark and kafka: How store the data in MongoDB
我正在使用此 python 代码收集推特流数据
https://github.com/sridharswamy/Twitter-Sentiment-Analysis-Using-Spark-Streaming-And-Kafka/blob/master/app.py
之后,我 运行 使用此代码创建流上下文并将数据存储在 MongoDB 中。
def main():
conf = SparkConf().setMaster("local[2]").setAppName("Streamer")
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 10)
ssc.checkpoint("checkpoint")
kstream = KafkaUtils.createDirectStream(
ssc, topics = ['topic1'], kafkaParams = {"metadata.broker.list":
'localhost:9092'})
tweets = kstream.map(lambda x: x[1].encode("ascii", "ignore"))
#................insert in MonGODB.........................
db.mynewcollection.insert_one(tweets)
ssc.start()
ssc.awaitTerminationOrTimeout(100)
ssc.stop(stopGraceFully = True)
if __name__=="__main__":
urllib3.contrib.pyopenssl.inject_into_urllib3()
connection = pymongo.MongoClient('....',...)
db = connection['twitter1']
db.authenticate('..','...')
main()
但是我得到了这个错误:
TypeError: document must be an instance of dict, bson.son.SON, bson.raw_bson.RawBSONDocument, or a type that inherits from collections.MutableMapping
我也尝试使用 'foreachRDD' 并创建函数 'save'
tweets.foreachRDD(Save)
并且我将 'insert' 移动到此函数
def Save(rdd):
if not rdd.isEmpty():
db.mynewcollection.insert_one(rdd)
但是没用
TypeError: can't pickle _thread.lock objects
谁能帮我知道如何在 MongoDB
中存储流数据
第一个错误发生是因为您将分布式对象传递给 db.mynewcollection.insert_one
。
第二个错误是因为你在驱动上初始化了数据库连接,一般情况下,连接对象是不能序列化的。
虽然存在许多 Spark / MongoDB 连接器,但您应该查看 (),通用模式是使用 foreachPartition
。定义助手
def insert_partition(xs):
connection = pymongo.MongoClient('....',...)
db = connection['twitter1']
db.authenticate('..','...')
db.mynewcollection.insert_many(xs)
然后:
def to_dict(s):
return ... # Convert input to a format acceptable by `insert_many`, for example with json.loads
tweets.map(to_dict) \
.foreachRDD(lambda rdd: rdd.foreachPartition(insert_partition))
我正在使用此 python 代码收集推特流数据 https://github.com/sridharswamy/Twitter-Sentiment-Analysis-Using-Spark-Streaming-And-Kafka/blob/master/app.py
之后,我 运行 使用此代码创建流上下文并将数据存储在 MongoDB 中。
def main():
conf = SparkConf().setMaster("local[2]").setAppName("Streamer")
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 10)
ssc.checkpoint("checkpoint")
kstream = KafkaUtils.createDirectStream(
ssc, topics = ['topic1'], kafkaParams = {"metadata.broker.list":
'localhost:9092'})
tweets = kstream.map(lambda x: x[1].encode("ascii", "ignore"))
#................insert in MonGODB.........................
db.mynewcollection.insert_one(tweets)
ssc.start()
ssc.awaitTerminationOrTimeout(100)
ssc.stop(stopGraceFully = True)
if __name__=="__main__":
urllib3.contrib.pyopenssl.inject_into_urllib3()
connection = pymongo.MongoClient('....',...)
db = connection['twitter1']
db.authenticate('..','...')
main()
但是我得到了这个错误:
TypeError: document must be an instance of dict, bson.son.SON, bson.raw_bson.RawBSONDocument, or a type that inherits from collections.MutableMapping
我也尝试使用 'foreachRDD' 并创建函数 'save'
tweets.foreachRDD(Save)
并且我将 'insert' 移动到此函数
def Save(rdd):
if not rdd.isEmpty():
db.mynewcollection.insert_one(rdd)
但是没用
TypeError: can't pickle _thread.lock objects
谁能帮我知道如何在 MongoDB
中存储流数据第一个错误发生是因为您将分布式对象传递给
db.mynewcollection.insert_one
。第二个错误是因为你在驱动上初始化了数据库连接,一般情况下,连接对象是不能序列化的。
虽然存在许多 Spark / MongoDB 连接器,但您应该查看 (foreachPartition
。定义助手
def insert_partition(xs):
connection = pymongo.MongoClient('....',...)
db = connection['twitter1']
db.authenticate('..','...')
db.mynewcollection.insert_many(xs)
然后:
def to_dict(s):
return ... # Convert input to a format acceptable by `insert_many`, for example with json.loads
tweets.map(to_dict) \
.foreachRDD(lambda rdd: rdd.foreachPartition(insert_partition))