无法读取 Spark 流数据
Could not read Spark streaming data
我正在尝试使用 Spark Python 读取流数据,并更改流数据的数据格式。但是我好像连流都看不懂...
这是我的步骤:
我打开一个终端,cd到输入数据文件夹,然后输入命令行
ls part-* | xargs -I % sh -c '{ cat %; sleep 5;}' | nc -lk 9999
然后我打开另一个终端,输入setenv SPARK_HOME /user/abc/Downloads/spark-1.5.2-bin-hadoop2.6/
,这样我就可以在本地运行 Spark。然后我输入命令 ${SPARK_HOME}/bin/spark-submit --master local /user/abc/test.py localhost 9999
到 运行 我的代码。
下面是代码,我只是简单测试一下是否读取流式数据,然后更改数据格式...但是总是报错:16/01/28 22:41:37 INFO ReceiverSupervisorImpl: Starting receiver
16/01/28 22:41:37 INFO ReceiverSupervisorImpl: Called receiver onStart
16/01/28 22:41:37 INFO ReceiverSupervisorImpl: Receiver started again
16/01/28 22:41:37 INFO SocketReceiver: Connecting to localhost:9999
16/01/28 22:41:37 INFO SocketReceiver: Connected to localhost:9999
16/01/28 22:41:37 INFO SocketReceiver: Closed socket to localhost:9999
16/01/28 22:41:37 WARN ReceiverSupervisorImpl: Restarting receiver with delay 2000 ms: Socket data stream had no more data
如果我重新运行ls part-* | xargs -I % sh -c '{ cat %; sleep 5;}' | nc -lk 9999
,还是报同样的错误....你知道怎么解决这个问题吗?
import sys
import re
from pyspark import SparkContext
from pyspark.sql.context import SQLContext
from pyspark.sql import Row
from pyspark.streaming import StreamingContext
sc = SparkContext(appName="test")
ssc = StreamingContext(sc, 5)
sqlContext = SQLContext(sc)
def get_tuple(r):
m = re.search('\[(.*?)\]',r)
s = m.group(1)
fs = s.split(',')
for i in range(len(fs)):
if i > 1:
fs[i] = float(fs[i])
return fs
def main():
indata = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
inrdd = indata.map(lambda r: get_tuple(r))
Features = Row('feature_vec')
features_rdd = inrdd.map(lambda r: Features(r))
features_rdd.pprint(num=10)
ssc.start()
ssc.awaitTermination()
if __name__ == "__main__":
main()
问题已解决。 Spark 命令行应该为 Spark streaming 添加 [*],像这样:
${SPARK_HOME}/bin/spark-submit --master local[*] /user/abc/test.py localhost 9999
然后会出现输出
我正在尝试使用 Spark Python 读取流数据,并更改流数据的数据格式。但是我好像连流都看不懂...
这是我的步骤:
我打开一个终端,cd到输入数据文件夹,然后输入命令行
ls part-* | xargs -I % sh -c '{ cat %; sleep 5;}' | nc -lk 9999
然后我打开另一个终端,输入
setenv SPARK_HOME /user/abc/Downloads/spark-1.5.2-bin-hadoop2.6/
,这样我就可以在本地运行 Spark。然后我输入命令${SPARK_HOME}/bin/spark-submit --master local /user/abc/test.py localhost 9999
到 运行 我的代码。
下面是代码,我只是简单测试一下是否读取流式数据,然后更改数据格式...但是总是报错:16/01/28 22:41:37 INFO ReceiverSupervisorImpl: Starting receiver
16/01/28 22:41:37 INFO ReceiverSupervisorImpl: Called receiver onStart
16/01/28 22:41:37 INFO ReceiverSupervisorImpl: Receiver started again
16/01/28 22:41:37 INFO SocketReceiver: Connecting to localhost:9999
16/01/28 22:41:37 INFO SocketReceiver: Connected to localhost:9999
16/01/28 22:41:37 INFO SocketReceiver: Closed socket to localhost:9999
16/01/28 22:41:37 WARN ReceiverSupervisorImpl: Restarting receiver with delay 2000 ms: Socket data stream had no more data
如果我重新运行ls part-* | xargs -I % sh -c '{ cat %; sleep 5;}' | nc -lk 9999
,还是报同样的错误....你知道怎么解决这个问题吗?
import sys
import re
from pyspark import SparkContext
from pyspark.sql.context import SQLContext
from pyspark.sql import Row
from pyspark.streaming import StreamingContext
sc = SparkContext(appName="test")
ssc = StreamingContext(sc, 5)
sqlContext = SQLContext(sc)
def get_tuple(r):
m = re.search('\[(.*?)\]',r)
s = m.group(1)
fs = s.split(',')
for i in range(len(fs)):
if i > 1:
fs[i] = float(fs[i])
return fs
def main():
indata = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
inrdd = indata.map(lambda r: get_tuple(r))
Features = Row('feature_vec')
features_rdd = inrdd.map(lambda r: Features(r))
features_rdd.pprint(num=10)
ssc.start()
ssc.awaitTermination()
if __name__ == "__main__":
main()
问题已解决。 Spark 命令行应该为 Spark streaming 添加 [*],像这样:
${SPARK_HOME}/bin/spark-submit --master local[*] /user/abc/test.py localhost 9999
然后会出现输出