streamingDataFrames/DataSets无水印时有streaming聚合时不支持追加输出模式;;\nJoin Inner
Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;;\nJoin Inner
我想加入 2 个流,但我收到下一个错误,我不知道如何修复它:
Append output mode not supported when there are streaming aggregations
on streaming DataFrames/DataSets without watermark;;\nJoin Inner
df_stream = spark.readStream.schema(schema_clicks).option("ignoreChanges", True).option("header", True).format("csv").load("s3://mybucket/*.csv")
display(df_stream.select("SendID", "EventType", "EventDate"))
我想加入 df1 和 df2:
df1 = df_stream \
.withColumn('timestamp', unix_timestamp(col('EventDate'), "MM/dd/yyyy hh:mm:ss aa").cast(TimestampType())) \
.select(col("SendID"), col("timestamp"), col("EventType")) \
.withColumnRenamed("SendID", "SendID_update") \
.withColumnRenamed("timestamp", "timestamp_update") \
.withWatermark("timestamp_update", "1 minutes")
df2 = df_stream \
.withColumn('timestamp', unix_timestamp(col('EventDate'), "MM/dd/yyyy hh:mm:ss aa").cast(TimestampType())) \
.withWatermark("timestamp", "1 minutes") \
.groupBy(col("SendID")) \
.agg(max(col('timestamp')).alias("timestamp")) \
.orderBy('timestamp', ascending=False)
join = df2.alias("A").join(df1.alias("B"), expr(
"A.SendID = B.SendID_update" +
" AND " +
"B.timestamp_update >= A.timestamp " +
" AND " +
"B.timestamp_update <= A.timestamp + interval 1 hour"))
最后当我以附加模式写入结果时:
join \
.writeStream \
.outputMode("Append") \
.option("checkpointLocation", "s3://checkpointjoin_delta") \
.format("delta") \
.table("test_join")
我收到了之前的错误。
AnalysisException Traceback (most recent call
last) in ()
----> 1 join.writeStream.outputMode("Append").option("checkpointLocation",
"s3://checkpointjoin_delta").format("delta").table("test_join")
/databricks/spark/python/pyspark/sql/streaming.py in table(self,
tableName) 1137 """ 1138 if
isinstance(tableName, basestring):
-> 1139 return self._sq(self._jwrite.table(tableName)) 1140 else: 1141 raise TypeError("tableName can
be only a single string")
/databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py
in call(self, *args) 1255 answer =
self.gateway_client.send_command(command) 1256 return_value
= get_return_value(
-> 1257 answer, self.gateway_client, self.target_id, self.name) 1258 1259 for temp_arg in temp_args:
/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
67 e.java_exception.getStackTrace()))
问题出在.groupBy,需要加上时间戳。例如:
df2 = df_stream \
.withColumn('timestamp', unix_timestamp(col('EventDate'), "MM/dd/yyyy hh:mm:ss aa").cast(TimestampType())) \
.withWatermark("timestamp", "1 minutes") \
.groupBy(col("SendID"), "timestamp") \
.agg(max(col('timestamp')).alias("timestamp")) \
.orderBy('timestamp', ascending=False)
我想加入 2 个流,但我收到下一个错误,我不知道如何修复它:
Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;;\nJoin Inner
df_stream = spark.readStream.schema(schema_clicks).option("ignoreChanges", True).option("header", True).format("csv").load("s3://mybucket/*.csv")
display(df_stream.select("SendID", "EventType", "EventDate"))
我想加入 df1 和 df2:
df1 = df_stream \
.withColumn('timestamp', unix_timestamp(col('EventDate'), "MM/dd/yyyy hh:mm:ss aa").cast(TimestampType())) \
.select(col("SendID"), col("timestamp"), col("EventType")) \
.withColumnRenamed("SendID", "SendID_update") \
.withColumnRenamed("timestamp", "timestamp_update") \
.withWatermark("timestamp_update", "1 minutes")
df2 = df_stream \
.withColumn('timestamp', unix_timestamp(col('EventDate'), "MM/dd/yyyy hh:mm:ss aa").cast(TimestampType())) \
.withWatermark("timestamp", "1 minutes") \
.groupBy(col("SendID")) \
.agg(max(col('timestamp')).alias("timestamp")) \
.orderBy('timestamp', ascending=False)
join = df2.alias("A").join(df1.alias("B"), expr(
"A.SendID = B.SendID_update" +
" AND " +
"B.timestamp_update >= A.timestamp " +
" AND " +
"B.timestamp_update <= A.timestamp + interval 1 hour"))
最后当我以附加模式写入结果时:
join \
.writeStream \
.outputMode("Append") \
.option("checkpointLocation", "s3://checkpointjoin_delta") \
.format("delta") \
.table("test_join")
我收到了之前的错误。
AnalysisException Traceback (most recent call last) in () ----> 1 join.writeStream.outputMode("Append").option("checkpointLocation", "s3://checkpointjoin_delta").format("delta").table("test_join")
/databricks/spark/python/pyspark/sql/streaming.py in table(self, tableName) 1137 """ 1138 if isinstance(tableName, basestring): -> 1139 return self._sq(self._jwrite.table(tableName)) 1140 else: 1141 raise TypeError("tableName can be only a single string")
/databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in call(self, *args) 1255 answer = self.gateway_client.send_command(command) 1256 return_value = get_return_value( -> 1257 answer, self.gateway_client, self.target_id, self.name) 1258 1259 for temp_arg in temp_args:
/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw) 67 e.java_exception.getStackTrace()))
问题出在.groupBy,需要加上时间戳。例如:
df2 = df_stream \
.withColumn('timestamp', unix_timestamp(col('EventDate'), "MM/dd/yyyy hh:mm:ss aa").cast(TimestampType())) \
.withWatermark("timestamp", "1 minutes") \
.groupBy(col("SendID"), "timestamp") \
.agg(max(col('timestamp')).alias("timestamp")) \
.orderBy('timestamp', ascending=False)