使用 Spark Streaming 读取脚本的输出输出
Read output output of a script with spark streaming
我有一个类似于这个的脚本
import json
def line_generator():
d = dict({1:1, 2:2, 3:3})
while True:
yield json.dumps(d)
it = line_generator()
for l in it:
print(l)
哪些输出值到标准输出。我想 "catch" 使用 spark Streaming api 将这些值存储在镶木地板文件中,并应用一些由 HiveQL 编写的推理代码。我不是 Scala 人:/ 所以,如果可能的话,我更愿意在 PySpark 中找到解决方案,但我对任何建议都很满意。
我知道可以读取来自 Kafka 流的数据流,是否有类似的方法来读取发送到标准输出的数据或连续写入文件的数据?
提前感谢您的帮助
我不会使用 stdout,因为 spark 通常用于具有多个节点的集群。更好的方法是 kafka(它还允许您临时存储数据并且更可靠)或套接字。下面一个socket例子(在Daniel Hynk的基础上):
#send your data
import socket
import json
def line_generator():
d = dict({1:1, 2:2, 3:3})
while True:
yield json.dumps(d)
hostip = '127.0.0.1'
portno = 56789
#listener need to be started before!
#try: netcat -lkp 56789
#before you start with spark streaming
soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
soc.connect((hostip, portno))
it = line_generator()
for l in it:
soc.send(l.encode('utf8'))
Spark 结构化流示例:
hostip = '127.0.0.1'
portno = '56789'
received = spark.readStream.format("socket").option("host", hostip).option("port", portno).load()
#the value column of a structured stream contains the content
values = received.select(received.value)
###
#do your stuff
###
#will listen to the specified port and write the results to memory!!! until you call query.stop()
#this allows you to see the data with select * from mystream
query = values.writeStream.queryName("mystream").outputMode("complete").format("memory").start()
当然你不会想最后把它写到内存中但是它会大大加快开发速度。完成程序后,只需将最后一行更改为 guide:
中提到的类似下面的内容
writeStream.format("parquet").option("path", "path/to/destination/dir").start()
我有一个类似于这个的脚本
import json
def line_generator():
d = dict({1:1, 2:2, 3:3})
while True:
yield json.dumps(d)
it = line_generator()
for l in it:
print(l)
哪些输出值到标准输出。我想 "catch" 使用 spark Streaming api 将这些值存储在镶木地板文件中,并应用一些由 HiveQL 编写的推理代码。我不是 Scala 人:/ 所以,如果可能的话,我更愿意在 PySpark 中找到解决方案,但我对任何建议都很满意。
我知道可以读取来自 Kafka 流的数据流,是否有类似的方法来读取发送到标准输出的数据或连续写入文件的数据?
提前感谢您的帮助
我不会使用 stdout,因为 spark 通常用于具有多个节点的集群。更好的方法是 kafka(它还允许您临时存储数据并且更可靠)或套接字。下面一个socket例子(在Daniel Hynk的基础上):
#send your data
import socket
import json
def line_generator():
d = dict({1:1, 2:2, 3:3})
while True:
yield json.dumps(d)
hostip = '127.0.0.1'
portno = 56789
#listener need to be started before!
#try: netcat -lkp 56789
#before you start with spark streaming
soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
soc.connect((hostip, portno))
it = line_generator()
for l in it:
soc.send(l.encode('utf8'))
Spark 结构化流示例:
hostip = '127.0.0.1'
portno = '56789'
received = spark.readStream.format("socket").option("host", hostip).option("port", portno).load()
#the value column of a structured stream contains the content
values = received.select(received.value)
###
#do your stuff
###
#will listen to the specified port and write the results to memory!!! until you call query.stop()
#this allows you to see the data with select * from mystream
query = values.writeStream.queryName("mystream").outputMode("complete").format("memory").start()
当然你不会想最后把它写到内存中但是它会大大加快开发速度。完成程序后,只需将最后一行更改为 guide:
中提到的类似下面的内容writeStream.format("parquet").option("path", "path/to/destination/dir").start()