Pyspark 2.4.0,使用读取流从 kafka 读取 avro - Python
Pyspark 2.4.0, read avro from kafka with read stream - Python
我正在尝试使用 PySpark 2.4.0 从 Kafka 读取 avro 消息。
spark-avro外部模块可以提供读取avro的这个方案
文件:
df = spark.read.format("avro").load("examples/src/main/resources/users.avro")
df.select("name", "favorite_color").write.format("avro").save("namesAndFavColors.avro")
但是,我需要阅读流式传输的 avro 消息。库文档建议使用 from_avro() 函数,该函数仅适用于 Scala 和 Java.
是否有任何其他模块支持读取从 Kafka 流式传输的 avro 消息?
您可以包含 spark-avro 包,例如使用 --packages
(调整版本以匹配 spark 安装):
bin/pyspark --packages org.apache.spark:spark-avro_2.11:2.4.0
并提供您自己的包装器:
from pyspark.sql.column import Column, _to_java_column
def from_avro(col, jsonFormatSchema):
sc = SparkContext._active_spark_context
avro = sc._jvm.org.apache.spark.sql.avro
f = getattr(getattr(avro, "package$"), "MODULE$").from_avro
return Column(f(_to_java_column(col), jsonFormatSchema))
def to_avro(col):
sc = SparkContext._active_spark_context
avro = sc._jvm.org.apache.spark.sql.avro
f = getattr(getattr(avro, "package$"), "MODULE$").to_avro
return Column(f(_to_java_column(col)))
用法示例(摘自the official test suite):
from pyspark.sql.functions import col, struct
avro_type_struct = """
{
"type": "record",
"name": "struct",
"fields": [
{"name": "col1", "type": "long"},
{"name": "col2", "type": "string"}
]
}"""
df = spark.range(10).select(struct(
col("id"),
col("id").cast("string").alias("id2")
).alias("struct"))
avro_struct_df = df.select(to_avro(col("struct")).alias("avro"))
avro_struct_df.show(3)
+----------+
| avro|
+----------+
|[00 02 30]|
|[02 02 31]|
|[04 02 32]|
+----------+
only showing top 3 rows
avro_struct_df.select(from_avro("avro", avro_type_struct)).show(3)
+------------------------------------------------+
|from_avro(avro, struct<col1:bigint,col2:string>)|
+------------------------------------------------+
| [0, 0]|
| [1, 1]|
| [2, 2]|
+------------------------------------------------+
only showing top 3 rows
我正在尝试使用 PySpark 2.4.0 从 Kafka 读取 avro 消息。
spark-avro外部模块可以提供读取avro的这个方案 文件:
df = spark.read.format("avro").load("examples/src/main/resources/users.avro")
df.select("name", "favorite_color").write.format("avro").save("namesAndFavColors.avro")
但是,我需要阅读流式传输的 avro 消息。库文档建议使用 from_avro() 函数,该函数仅适用于 Scala 和 Java.
是否有任何其他模块支持读取从 Kafka 流式传输的 avro 消息?
您可以包含 spark-avro 包,例如使用 --packages
(调整版本以匹配 spark 安装):
bin/pyspark --packages org.apache.spark:spark-avro_2.11:2.4.0
并提供您自己的包装器:
from pyspark.sql.column import Column, _to_java_column
def from_avro(col, jsonFormatSchema):
sc = SparkContext._active_spark_context
avro = sc._jvm.org.apache.spark.sql.avro
f = getattr(getattr(avro, "package$"), "MODULE$").from_avro
return Column(f(_to_java_column(col), jsonFormatSchema))
def to_avro(col):
sc = SparkContext._active_spark_context
avro = sc._jvm.org.apache.spark.sql.avro
f = getattr(getattr(avro, "package$"), "MODULE$").to_avro
return Column(f(_to_java_column(col)))
用法示例(摘自the official test suite):
from pyspark.sql.functions import col, struct
avro_type_struct = """
{
"type": "record",
"name": "struct",
"fields": [
{"name": "col1", "type": "long"},
{"name": "col2", "type": "string"}
]
}"""
df = spark.range(10).select(struct(
col("id"),
col("id").cast("string").alias("id2")
).alias("struct"))
avro_struct_df = df.select(to_avro(col("struct")).alias("avro"))
avro_struct_df.show(3)
+----------+
| avro|
+----------+
|[00 02 30]|
|[02 02 31]|
|[04 02 32]|
+----------+
only showing top 3 rows
avro_struct_df.select(from_avro("avro", avro_type_struct)).show(3)
+------------------------------------------------+
|from_avro(avro, struct<col1:bigint,col2:string>)|
+------------------------------------------------+
| [0, 0]|
| [1, 1]|
| [2, 2]|
+------------------------------------------------+
only showing top 3 rows