pyspark.sql.utils.AnalysisException: '事件时间必须定义在window或时间戳上,但时间戳是字符串类型

pyspark.sql.utils.AnalysisException: 'Event time must be defined on a window or a timestamp, but timestamp is of type string

我用pyspark写了一个结构化的流式演示,但是出错了。我使用kafka作为streaming dataSource,代码如下:

def produce():
    p = KafkaProducer(bootstrap_servers="localhost:9092", value_serializer=lambda x: json.dumps(x, ensure_ascii=False).encode("utf-8"))

    for i in range(100):
        p.send(topic="test", value={"timestamp": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "word": "spark" + str(random.randint(1, 3))})
        time.sleep(0.5)
    p.flush()

火花流代码:

spark = SparkSession \
    .builder \
    .appName("StructuredNetworkWordCount") \
    .getOrCreate()
df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "test") \
    .option("deserializer", lambda x: json.loads(x.decode("utf-8"))) \
    .load()
df = df.select(F.get_json_object(df.value.cast("string"), "$.timestamp").alias("timestamp"),
               F.get_json_object(df.value.cast("string"), "$.word").alias("word"))

df = df \
    .withWatermark("timestamp", "5 seconds") \
    .groupBy(
    F.window("timestamp", "10 seconds")
).count()
df = df.select(df.window.start.cast("string").alias("start"), df.window.end.cast("string").alias("end"),
          "count")
q = df.writeStream \
    .outputMode("update") \
    .format("console") \
    .trigger(processingTime='20 seconds') \
    .option("checkpointLocation", "hdfs://127.0.0.1:9000/offsets_d") \
    .start()
q.awaitTermination()

并引发 pyspark.sql.utils.AnalysisException: 'Event time must be defined on a window or a timestamp, but timestamp is of type string;;\nEventTimeWatermark timestamp#21: string, interval 5 seconds\n+- Project [get_json_object(cast(value#8 as string), $.timestamp) AS timestamp#21, get_json_object(cast(value#8 as string), $.word) AS word#22]\n +- StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@435b0386, kafka, Map(deserializer -> <function <lambda> at 0x7fca0be971e0>, subscribe -> test, kafka.bootstrap.servers -> localhost:9092), [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@3546fa34,kafka,List(),None,List(),None,Map(deserializer -> <function <lambda> at 0x7fca0be971e0>, subscribe -> test, kafka.bootstrap.servers -> localhost:9092),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]\n

按照官方文档看的,不知道哪里错了

这部分看起来会导致异常:

df = df \
    .withWatermark("timestamp", "5 seconds") \
    .groupBy(
    F.window("timestamp", "10 seconds")
).count()

尝试按照 official documentation

中给出的示例进行操作
windowedCounts = words \
    .withWatermark("timestamp", "10 minutes") \
    .groupBy(
        window(words.timestamp, "10 minutes", "5 minutes"),
        words.word) \
    .count()

并确保你的timestamp列是时间戳类型而不是字符串