将文件数据处理到 kafka 时出现问题,这会引发结构化流式传输
Problem processing the data of a file to kafka and this to spark structured streaming
我正在尝试从文件生成到 kafka,然后在 spark streaming 中使用它。
historical_data_level_1.json -> producer.py -> consumer.py
问题是,当消息到达spark时,它们到达的很奇怪,所以我明白问题出在producer.py
当我检查 df
kafka_df.selectExpr("CAST(value AS STRING)").show(30, False)
结果是:
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|value |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|"{\"localSymbol\": \"EUR.USD\", \"time\": \"2021-05-07 10:10:50.873031+00:00\", \"precio_actual\": 1.2065, \"bid\": NaN, \"ask\": NaN, \"high\": 1.20905, \"low\": 1.2053, \"close\": 1.2065}\n" |
|"{\"localSymbol\": \"EUR.USD\", \"time\": \"2021-05-07 10:10:50.873720+00:00\", \"precio_actual\": 1.208235, \"bid\": 1.20823, \"ask\": 1.20824, \"high\": 1.20905, \"low\": 1.2053, \"close\": 1.2065}\n" |
|"{\"localSymbol\": \"USD.JPY\", \"time\": \"2021-05-07 10:10:50.914310+00:00\", \"precio_actual\": 109.09, \"bid\": NaN, \"ask\": NaN, \"high\": 109.2, \"low\": 108.935, \"close\": 109.09}\n" |
|"{\"localSymbol\": \"USD.JPY\", \"time\": \"2021-05-07 10:10:50.914867+00:00\", \"precio_actual\": 109.10249999999999, \"bid\": 109.102, \"ask\": 109.103, \"high\": 109.2, \"low\": 108.935, \"close\": 109.09}\n"|
|"{\"localSymbol\": \"USD.JPY\", \"time\": \"2021-05-07 10:10:50.975038+00:00\", \"precio_actual\": 109.102, \"bid\": 109.101, \"ask\": 109.103, \"high\": 109.2, \"low\": 108.935, \"close\": 109.09}\n" |
|"{\"localSymbol\": \"USD.JPY\", \"time\": \"2021-05-07 10:10:51.059851+00:00\", \"precio_actual\": 109.1015, \"bid\": 109.101, \"ask\": 109.102, \"high\": 109.2, \"low\": 108.935, \"close\": 109.09}\n" |
|"{\"localSymbol\": \"EUR.USD\", \"time\": \"2021-05-07 10:10:51.101304+00:00\", \"precio_actual\": 1.208235, \"bid\": 1.20823, \"ask\": 1.20824, \"high\": 1.20905, \"low\": 1.2053, \"close\": 1.2065}\n" |
如果我将数据复制并粘贴到终端(生产者)中,那么它可以正常工作,所以问题在于我如何将消息发送到 spark
你能帮帮我吗?
我有以下文件 historical_data_level_1.json 和这些数据:
{"localSymbol": "EUR.USD", "time": "2021-05-07 10:10:50.873031+00:00", "precio_actual": 1.2065, "bid": NaN, "ask": NaN, "high": 1.20905, "low": 1.2053, "close": 1.2065}
{"localSymbol": "EUR.USD", "time": "2021-05-07 10:10:50.873720+00:00", "precio_actual": 1.208235, "bid": 1.20823, "ask": 1.20824, "high": 1.20905, "low": 1.2053, "close": 1.2065}
{"localSymbol": "USD.JPY", "time": "2021-05-07 10:10:50.914310+00:00", "precio_actual": 109.09, "bid": NaN, "ask": NaN, "high": 109.2, "low": 108.935, "close": 109.09}
{"localSymbol": "USD.JPY", "time": "2021-05-07 10:10:50.914867+00:00", "precio_actual": 109.10249999999999, "bid": 109.102, "ask": 109.103, "high": 109.2, "low": 108.935, "close": 109.09}
我有以下制作人producer.py
import json
from kafka import KafkaProducer
import settings
def producer():
kafka_producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
with open(settings.HISTORICAL_DATA_FOLDER+"historical_data_level_1.json") as historical_data_level_1:
for line in historical_data_level_1:
kafka_producer.send("test", json.dumps(line).encode('utf-8'))
if __name__ == '__main__':
producer()
和
我有以下消费者consumer.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, expr
from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType, IntegerType, ArrayType
if __name__ == "__main__":
spark = SparkSession \
.builder \
.appName("File Streaming Demo3") \
.master("local[3]") \
.config("spark.streaming.stopGracefullyOnShutdown", "true") \
.config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1') \
.getOrCreate()
kafka_df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "test") \
.option("startingOffsets", "earliest") \
.load()
kafka_df.printSchema()
schema = StructType([
StructField('localSymbol', StringType()),
StructField('time', StringType()),
StructField('precio_actual', StringType()),
StructField('bid', StringType()),
StructField('ask', StringType()),
StructField('high', StringType()),
StructField('low', StringType()),
StructField('close', StringType()),
])
value_df = kafka_df.select(from_json(col("value").cast("string"), schema).alias("value"))
value_df.printSchema()
level_1_df_process = value_df \
.select(
'value.*'
)
invoice_writer_query = level_1_df_process.writeStream \
.format("json") \
.queryName("Flattened Invoice Writer") \
.outputMode("append") \
.option("path", "output") \
.option("checkpointLocation", "chk-point-dir") \
.trigger(processingTime="1 minute") \
.start()
invoice_writer_query.awaitTermination()
解决了producer.py发送的是字典而不是字符串,留下代码如下
import json
from kafka import KafkaProducer
import settings
def producer():
kafka_producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
with open(settings.HISTORICAL_DATA_FOLDER+"historical_data_level_1.json") as historical_data_level_1:
for line in historical_data_level_1:
line_python_dict = json.loads(line)
kafka_producer.send("test", json.dumps(line_python_dict).encode('utf-8'))
if __name__ == '__main__':
producer()
我正在尝试从文件生成到 kafka,然后在 spark streaming 中使用它。
historical_data_level_1.json -> producer.py -> consumer.py
问题是,当消息到达spark时,它们到达的很奇怪,所以我明白问题出在producer.py
当我检查 df
kafka_df.selectExpr("CAST(value AS STRING)").show(30, False)
结果是:
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|value |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|"{\"localSymbol\": \"EUR.USD\", \"time\": \"2021-05-07 10:10:50.873031+00:00\", \"precio_actual\": 1.2065, \"bid\": NaN, \"ask\": NaN, \"high\": 1.20905, \"low\": 1.2053, \"close\": 1.2065}\n" |
|"{\"localSymbol\": \"EUR.USD\", \"time\": \"2021-05-07 10:10:50.873720+00:00\", \"precio_actual\": 1.208235, \"bid\": 1.20823, \"ask\": 1.20824, \"high\": 1.20905, \"low\": 1.2053, \"close\": 1.2065}\n" |
|"{\"localSymbol\": \"USD.JPY\", \"time\": \"2021-05-07 10:10:50.914310+00:00\", \"precio_actual\": 109.09, \"bid\": NaN, \"ask\": NaN, \"high\": 109.2, \"low\": 108.935, \"close\": 109.09}\n" |
|"{\"localSymbol\": \"USD.JPY\", \"time\": \"2021-05-07 10:10:50.914867+00:00\", \"precio_actual\": 109.10249999999999, \"bid\": 109.102, \"ask\": 109.103, \"high\": 109.2, \"low\": 108.935, \"close\": 109.09}\n"|
|"{\"localSymbol\": \"USD.JPY\", \"time\": \"2021-05-07 10:10:50.975038+00:00\", \"precio_actual\": 109.102, \"bid\": 109.101, \"ask\": 109.103, \"high\": 109.2, \"low\": 108.935, \"close\": 109.09}\n" |
|"{\"localSymbol\": \"USD.JPY\", \"time\": \"2021-05-07 10:10:51.059851+00:00\", \"precio_actual\": 109.1015, \"bid\": 109.101, \"ask\": 109.102, \"high\": 109.2, \"low\": 108.935, \"close\": 109.09}\n" |
|"{\"localSymbol\": \"EUR.USD\", \"time\": \"2021-05-07 10:10:51.101304+00:00\", \"precio_actual\": 1.208235, \"bid\": 1.20823, \"ask\": 1.20824, \"high\": 1.20905, \"low\": 1.2053, \"close\": 1.2065}\n" |
如果我将数据复制并粘贴到终端(生产者)中,那么它可以正常工作,所以问题在于我如何将消息发送到 spark
你能帮帮我吗?
我有以下文件 historical_data_level_1.json 和这些数据:
{"localSymbol": "EUR.USD", "time": "2021-05-07 10:10:50.873031+00:00", "precio_actual": 1.2065, "bid": NaN, "ask": NaN, "high": 1.20905, "low": 1.2053, "close": 1.2065}
{"localSymbol": "EUR.USD", "time": "2021-05-07 10:10:50.873720+00:00", "precio_actual": 1.208235, "bid": 1.20823, "ask": 1.20824, "high": 1.20905, "low": 1.2053, "close": 1.2065}
{"localSymbol": "USD.JPY", "time": "2021-05-07 10:10:50.914310+00:00", "precio_actual": 109.09, "bid": NaN, "ask": NaN, "high": 109.2, "low": 108.935, "close": 109.09}
{"localSymbol": "USD.JPY", "time": "2021-05-07 10:10:50.914867+00:00", "precio_actual": 109.10249999999999, "bid": 109.102, "ask": 109.103, "high": 109.2, "low": 108.935, "close": 109.09}
我有以下制作人producer.py
import json
from kafka import KafkaProducer
import settings
def producer():
kafka_producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
with open(settings.HISTORICAL_DATA_FOLDER+"historical_data_level_1.json") as historical_data_level_1:
for line in historical_data_level_1:
kafka_producer.send("test", json.dumps(line).encode('utf-8'))
if __name__ == '__main__':
producer()
和
我有以下消费者consumer.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, expr
from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType, IntegerType, ArrayType
if __name__ == "__main__":
spark = SparkSession \
.builder \
.appName("File Streaming Demo3") \
.master("local[3]") \
.config("spark.streaming.stopGracefullyOnShutdown", "true") \
.config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1') \
.getOrCreate()
kafka_df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "test") \
.option("startingOffsets", "earliest") \
.load()
kafka_df.printSchema()
schema = StructType([
StructField('localSymbol', StringType()),
StructField('time', StringType()),
StructField('precio_actual', StringType()),
StructField('bid', StringType()),
StructField('ask', StringType()),
StructField('high', StringType()),
StructField('low', StringType()),
StructField('close', StringType()),
])
value_df = kafka_df.select(from_json(col("value").cast("string"), schema).alias("value"))
value_df.printSchema()
level_1_df_process = value_df \
.select(
'value.*'
)
invoice_writer_query = level_1_df_process.writeStream \
.format("json") \
.queryName("Flattened Invoice Writer") \
.outputMode("append") \
.option("path", "output") \
.option("checkpointLocation", "chk-point-dir") \
.trigger(processingTime="1 minute") \
.start()
invoice_writer_query.awaitTermination()
解决了producer.py发送的是字典而不是字符串,留下代码如下
import json
from kafka import KafkaProducer
import settings
def producer():
kafka_producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
with open(settings.HISTORICAL_DATA_FOLDER+"historical_data_level_1.json") as historical_data_level_1:
for line in historical_data_level_1:
line_python_dict = json.loads(line)
kafka_producer.send("test", json.dumps(line_python_dict).encode('utf-8'))
if __name__ == '__main__':
producer()