Structured Streaming error py4j.protocol.Py4JNetworkError: Answer from Java side is empty
Structured Streaming error py4j.protocol.Py4JNetworkError: Answer from Java side is empty
我正在尝试使用 PySpark 和 Structured Streaming (Spark 2.3) 在两个 Kafka Stream 之间进行左外连接。
import os
import time
from pyspark.sql.types import *
from pyspark.sql.functions import from_json, col, struct, explode, get_json_object
from ast import literal_eval
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 pyspark-shell'
spark = SparkSession \
.builder \
.appName("Spark Kafka Structured Streaming") \
.getOrCreate()
schema_impressions = StructType() \
.add("id_req", StringType()) \
.add("ts_imp_request", TimestampType()) \
.add("country", StringType()) \
.add("TS_IMPRESSION", TimestampType())
schema_requests = StructType() \
.add("id_req", StringType()) \
.add("page", StringType()) \
.add("conntype", StringType()) \
.add("TS_REQUEST", TimestampType())
impressions = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "ip-ec2.internal:9092") \
.option("subscribe", "ssp.datascience_impressions") \
.load()
requests = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "ip-ec2.internal:9092") \
.option("subscribe", "ssp.datascience_requests") \
.option("startingOffsets", "latest") \
.load()
query_requests = requests \
.select(col("timestamp"), col("key").cast("string"), from_json(col("value").cast("string"), schema_requests).alias("parsed")) \
.select(col("timestamp").alias("timestamp_req"), "parsed.id_req", "parsed.page", "parsed.conntype", "parsed.TS_REQUEST") \
.withWatermark("timestamp_req", "120 seconds")
query_impressions = impressions \
.select(col("timestamp"), col("key").cast("string"), from_json(col("value").cast("string"), schema_impressions).alias("parsed")) \
.select(col("timestamp").alias("timestamp_imp"), col("parsed.id_req").alias("id_imp"), "parsed.ts_imp_request", "parsed.country", "parsed.TS_IMPRESSION") \
.withWatermark("timestamp_imp", "120 seconds")
query_requests.printSchema()
query_impressions.printSchema()
> root
|-- timestamp_req: timestamp (nullable = true)
|-- id_req: string (nullable = true)
|-- page: string (nullable = true)
|-- conntype: string (nullable = true)
|-- TS_REQUEST: timestamp (nullable = true)
>
> root |-- timestamp_imp: timestamp (nullable = true)
|-- id_imp: string (nullable = true)
|-- ts_imp_request: timestamp (nullable = true)
|-- country: string (nullable = true)
|-- TS_IMPRESSION: timestamp (nullable = true)
在resume中,我将从两个Kafka Streams中获取数据,在接下来的几行中,我将尝试使用ID进行join。
rawQuery = query_requests.join(query_impressions, expr("""
(id_req = id_imp AND
timestamp_imp >= timestamp_req AND
timestamp_imp <= timestamp_req + interval 5 minutes)
"""),
"leftOuter")
rawQuery = rawQuery \
.writeStream \
.format("parquet") \
.option("checkpointLocation", "/home/jovyan/streaming/applicationHistory") \
.option("path", "/home/jovyan/streaming").start()
print(rawQuery.status)
{'message': 'Processing new data', 'isDataAvailable': True,
'isTriggerActive': True} ERROR:root:Exception while sending command.
Traceback (most recent call last): File
"/opt/conda/lib/python3.6/site-packages/py4j/java_gateway.py", line
1062, in send_command
raise Py4JNetworkError("Answer from Java side is empty") py4j.protocol.Py4JNetworkError: Answer from Java side is empty
During handling of the above exception, another exception occurred:
Traceback (most recent call last): File
"/opt/conda/lib/python3.6/site-packages/py4j/java_gateway.py", line
908, in send_command
response = connection.send_command(command) File "/opt/conda/lib/python3.6/site-packages/py4j/java_gateway.py", line
1067, in send_command
"Error while receiving", e, proto.ERROR_ON_RECEIVE) py4j.protocol.Py4JNetworkError: Error while receiving
ERROR:py4j.java_gateway:An error occurred while trying to connect to
the Java server (127.0.0.1:33968) Traceback (most recent call last):
File
"/opt/conda/lib/python3.6/site-packages/IPython/core/interactiveshell.py",
line 2910, in run_code
exec(code_obj, self.user_global_ns, self.user_ns) File "", line 3, in
print(rawQuery.status) File "/opt/conda/lib/python3.6/site-packages/pyspark/sql/streaming.py",
line 114, in status
return json.loads(self._jsq.status().json()) File "/opt/conda/lib/python3.6/site-packages/py4j/java_gateway.py", line
1160, in call
answer, self.gateway_client, self.target_id, self.name) File "/opt/conda/lib/python3.6/site-packages/pyspark/sql/utils.py", line
63, in deco
return f(*a, **kw) File "/opt/conda/lib/python3.6/site-packages/py4j/protocol.py", line 328,
in get_return_value
format(target_id, ".", name)) py4j.protocol.Py4JError: An error occurred while calling o92.status
During handling of the above exception, another exception occurred:
Traceback (most recent call last): File
"/opt/conda/lib/python3.6/site-packages/IPython/core/interactiveshell.py",
line 1828, in showtraceback
stb = value._render_traceback_() AttributeError: 'Py4JError' object has no attribute '_render_traceback_'
During handling of the above exception, another exception occurred:
Traceback (most recent call last): File
"/opt/conda/lib/python3.6/site-packages/py4j/java_gateway.py", line
852, in _get_connection
connection = self.deque.pop() IndexError: pop from an empty deque
我在本地使用 Jupyter Notebook 运行 Spark。在 spark/conf/spark-defaults.conf 我有:
# Example:
# spark.master spark://master:7077
# spark.eventLog.enabled true
# spark.eventLog.dir hdfs://namenode:8021/directory
# spark.serializer org.apache.spark.serializer.KryoSerializer
spark.driver.memory 15g
# spark.executor.extraJavaOptions -XX:+PrintGCDetails -Dkey=value -Dnumbers="one two three"
如果我在上一个错误后尝试使用 Spark,我会收到该错误:
ERROR:root:Exception while sending command. Traceback (most recent
call last): File
"/opt/conda/lib/python3.6/site-packages/py4j/java_gateway.py", line
1062, in send_command
raise Py4JNetworkError("Answer from Java side is empty") py4j.protocol.Py4JNetworkError: Answer from Java side is empty
During handling of the above exception, another exception occurred:
Traceback (most recent call last): File
"/opt/conda/lib/python3.6/site-packages/py4j/java_gateway.py", line
908, in send_command
response = connection.send_command(command) File "/opt/conda/lib/python3.6/site-packages/py4j/java_gateway.py", line
1067, in send_command
"Error while receiving", e, proto.ERROR_ON_RECEIVE) py4j.protocol.Py4JNetworkError: Error while receiving
我解决了问题!基本上,由于某种原因,该问题与 Jupyter Notebook 有关。我删除了之前代码的下一行:
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 pyspark-shell'
我 运行 使用控制台的代码:
> spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 spark_structured.py
这样,我就可以 运行 毫无问题地编写所有代码。
如果你有同样的问题,你也可以换
spark-default.conf 并增加 spark.driver.memory 和 spark.executor.memory
我正在尝试使用 PySpark 和 Structured Streaming (Spark 2.3) 在两个 Kafka Stream 之间进行左外连接。
import os
import time
from pyspark.sql.types import *
from pyspark.sql.functions import from_json, col, struct, explode, get_json_object
from ast import literal_eval
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 pyspark-shell'
spark = SparkSession \
.builder \
.appName("Spark Kafka Structured Streaming") \
.getOrCreate()
schema_impressions = StructType() \
.add("id_req", StringType()) \
.add("ts_imp_request", TimestampType()) \
.add("country", StringType()) \
.add("TS_IMPRESSION", TimestampType())
schema_requests = StructType() \
.add("id_req", StringType()) \
.add("page", StringType()) \
.add("conntype", StringType()) \
.add("TS_REQUEST", TimestampType())
impressions = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "ip-ec2.internal:9092") \
.option("subscribe", "ssp.datascience_impressions") \
.load()
requests = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "ip-ec2.internal:9092") \
.option("subscribe", "ssp.datascience_requests") \
.option("startingOffsets", "latest") \
.load()
query_requests = requests \
.select(col("timestamp"), col("key").cast("string"), from_json(col("value").cast("string"), schema_requests).alias("parsed")) \
.select(col("timestamp").alias("timestamp_req"), "parsed.id_req", "parsed.page", "parsed.conntype", "parsed.TS_REQUEST") \
.withWatermark("timestamp_req", "120 seconds")
query_impressions = impressions \
.select(col("timestamp"), col("key").cast("string"), from_json(col("value").cast("string"), schema_impressions).alias("parsed")) \
.select(col("timestamp").alias("timestamp_imp"), col("parsed.id_req").alias("id_imp"), "parsed.ts_imp_request", "parsed.country", "parsed.TS_IMPRESSION") \
.withWatermark("timestamp_imp", "120 seconds")
query_requests.printSchema()
query_impressions.printSchema()
> root
|-- timestamp_req: timestamp (nullable = true)
|-- id_req: string (nullable = true)
|-- page: string (nullable = true)
|-- conntype: string (nullable = true)
|-- TS_REQUEST: timestamp (nullable = true)
>
> root |-- timestamp_imp: timestamp (nullable = true)
|-- id_imp: string (nullable = true)
|-- ts_imp_request: timestamp (nullable = true)
|-- country: string (nullable = true)
|-- TS_IMPRESSION: timestamp (nullable = true)
在resume中,我将从两个Kafka Streams中获取数据,在接下来的几行中,我将尝试使用ID进行join。
rawQuery = query_requests.join(query_impressions, expr("""
(id_req = id_imp AND
timestamp_imp >= timestamp_req AND
timestamp_imp <= timestamp_req + interval 5 minutes)
"""),
"leftOuter")
rawQuery = rawQuery \
.writeStream \
.format("parquet") \
.option("checkpointLocation", "/home/jovyan/streaming/applicationHistory") \
.option("path", "/home/jovyan/streaming").start()
print(rawQuery.status)
{'message': 'Processing new data', 'isDataAvailable': True, 'isTriggerActive': True} ERROR:root:Exception while sending command. Traceback (most recent call last): File "/opt/conda/lib/python3.6/site-packages/py4j/java_gateway.py", line 1062, in send_command raise Py4JNetworkError("Answer from Java side is empty") py4j.protocol.Py4JNetworkError: Answer from Java side is empty
During handling of the above exception, another exception occurred:
Traceback (most recent call last): File "/opt/conda/lib/python3.6/site-packages/py4j/java_gateway.py", line 908, in send_command response = connection.send_command(command) File "/opt/conda/lib/python3.6/site-packages/py4j/java_gateway.py", line 1067, in send_command "Error while receiving", e, proto.ERROR_ON_RECEIVE) py4j.protocol.Py4JNetworkError: Error while receiving ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:33968) Traceback (most recent call last):
File "/opt/conda/lib/python3.6/site-packages/IPython/core/interactiveshell.py", line 2910, in run_code exec(code_obj, self.user_global_ns, self.user_ns) File "", line 3, in print(rawQuery.status) File "/opt/conda/lib/python3.6/site-packages/pyspark/sql/streaming.py", line 114, in status return json.loads(self._jsq.status().json()) File "/opt/conda/lib/python3.6/site-packages/py4j/java_gateway.py", line 1160, in call answer, self.gateway_client, self.target_id, self.name) File "/opt/conda/lib/python3.6/site-packages/pyspark/sql/utils.py", line 63, in deco return f(*a, **kw) File "/opt/conda/lib/python3.6/site-packages/py4j/protocol.py", line 328, in get_return_value format(target_id, ".", name)) py4j.protocol.Py4JError: An error occurred while calling o92.statusDuring handling of the above exception, another exception occurred:
Traceback (most recent call last): File "/opt/conda/lib/python3.6/site-packages/IPython/core/interactiveshell.py", line 1828, in showtraceback stb = value._render_traceback_() AttributeError: 'Py4JError' object has no attribute '_render_traceback_'
During handling of the above exception, another exception occurred:
Traceback (most recent call last): File "/opt/conda/lib/python3.6/site-packages/py4j/java_gateway.py", line 852, in _get_connection connection = self.deque.pop() IndexError: pop from an empty deque
我在本地使用 Jupyter Notebook 运行 Spark。在 spark/conf/spark-defaults.conf 我有:
# Example:
# spark.master spark://master:7077
# spark.eventLog.enabled true
# spark.eventLog.dir hdfs://namenode:8021/directory
# spark.serializer org.apache.spark.serializer.KryoSerializer
spark.driver.memory 15g
# spark.executor.extraJavaOptions -XX:+PrintGCDetails -Dkey=value -Dnumbers="one two three"
如果我在上一个错误后尝试使用 Spark,我会收到该错误:
ERROR:root:Exception while sending command. Traceback (most recent call last): File "/opt/conda/lib/python3.6/site-packages/py4j/java_gateway.py", line 1062, in send_command raise Py4JNetworkError("Answer from Java side is empty") py4j.protocol.Py4JNetworkError: Answer from Java side is empty
During handling of the above exception, another exception occurred:
Traceback (most recent call last): File "/opt/conda/lib/python3.6/site-packages/py4j/java_gateway.py", line 908, in send_command response = connection.send_command(command) File "/opt/conda/lib/python3.6/site-packages/py4j/java_gateway.py", line 1067, in send_command "Error while receiving", e, proto.ERROR_ON_RECEIVE) py4j.protocol.Py4JNetworkError: Error while receiving
我解决了问题!基本上,由于某种原因,该问题与 Jupyter Notebook 有关。我删除了之前代码的下一行:
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 pyspark-shell'
我 运行 使用控制台的代码:
> spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 spark_structured.py
这样,我就可以 运行 毫无问题地编写所有代码。
如果你有同样的问题,你也可以换 spark-default.conf 并增加 spark.driver.memory 和 spark.executor.memory