当我将它发送到 Spark Streaming 时如何保持 JSON 结构
How to keep a JSON structure when I send it to Spark Streaming
我收到一个 JSON 输入,该输入在 Spark Streaming 上有几行和属性,方法是使用 Java 中的命令:
JavaReceiverInputDStream <String>
lines = ssc.socketTextStream
(localhost, port)
我现在想过滤行流,使其每行只有两个特定属性,并转储其余部分。
这里的问题是我注意到行不保持 JSON 结构,也就是说,我不能只做
JavaDstream<String> line=lines[1];
line.print() ;
我的问题是,如何让我的 JavaDStream 保持 JSON 对象的结构,然后打印我想要的行?
希望我说清楚了,谢谢。
曼努埃尔,
所以基本上你问的是如何确保整个 JSON 有效负载是 RDD 中的单个记录,或者当你在套接字上发送消息时什么是记录边界。
基本上,您使用的 socketTextStream 读取套接字上的消息,如果它找到换行符,它将使用该换行符作为记录边界。用于监听套接字、读取消息并将其传递给 Spark 的实际代码是 SocketReceiver.bytesToLines() 方法的一部分,如果您查看评论,这就是它所说的
/**
* This methods translates the data from an inputstream (say, from a socket)
* to '\n' delimited strings and returns an iterator to access the strings.
*/
因此请务必从 JSON 消息中删除 \n 字符,您将其作为单个记录发送到 spark
苏尼尔
我收到一个 JSON 输入,该输入在 Spark Streaming 上有几行和属性,方法是使用 Java 中的命令:
JavaReceiverInputDStream <String>
lines = ssc.socketTextStream
(localhost, port)
我现在想过滤行流,使其每行只有两个特定属性,并转储其余部分。
这里的问题是我注意到行不保持 JSON 结构,也就是说,我不能只做
JavaDstream<String> line=lines[1];
line.print() ;
我的问题是,如何让我的 JavaDStream 保持 JSON 对象的结构,然后打印我想要的行?
希望我说清楚了,谢谢。
曼努埃尔,
所以基本上你问的是如何确保整个 JSON 有效负载是 RDD 中的单个记录,或者当你在套接字上发送消息时什么是记录边界。
基本上,您使用的 socketTextStream 读取套接字上的消息,如果它找到换行符,它将使用该换行符作为记录边界。用于监听套接字、读取消息并将其传递给 Spark 的实际代码是 SocketReceiver.bytesToLines() 方法的一部分,如果您查看评论,这就是它所说的
/**
* This methods translates the data from an inputstream (say, from a socket)
* to '\n' delimited strings and returns an iterator to access the strings.
*/
因此请务必从 JSON 消息中删除 \n 字符,您将其作为单个记录发送到 spark
苏尼尔