[Structured Streaming]:Structured Streaming into Redshift sink
[Structured Streaming]: Structured Streaming into Redshift sink
是否可以将 Kafka Streaming 源支持的 Dataframe 写入 AWS Redshift,我们过去曾使用 spark-redshift 写入 Redshift,但我认为它不适用于 DataFrame##writeStream
.鉴于 Redshift 的工作方式,使用 JDBC 连接器和 ForeachWriter
编写也可能不是一个好主意。
我从 Yelp blog is to write the files into S3 and then invoke Redshift COPY
中遇到的一种可能的方法是使用具有 S3 对象路径的清单文件,在结构化流的情况下,我如何控制写入 S3 的文件?在将 5 个文件写入 S3 后,还有一个单独的触发器来创建清单文件。
也欢迎任何其他可能的解决方案。提前致谢。
Spark 能够非常有效地将普通数据帧加载到 Redshift,但我还没有在 Spark 中使用流。
如果您可以连续将流输出写入标准 df,那么在指定的时间间隔内,您可以将该 df 加载到 Redshift 并清空它。
另一种选择是将流发送到 Kinesis 并使用 Kinesis Firehose 将其加载到 Redshift。不过向堆栈添加另一个流层似乎过多。
有一种方法可以在结构化流中使用 spark-redshift,但您必须在自己的分支中实施一些额外的 类。首先你需要一个应该实现 org.apache.spark.sql.execution.streaming.Sink
接口的 RedshiftSink:
private[redshift] class RedshiftSink(
sqlContext: SQLContext,
parameters: MergedParameters,
redshiftWriter: RedshiftWriter) extends Sink {
private val log = LoggerFactory.getLogger(getClass)
@volatile private var latestBatchId = -1L
override def toString(): String = "RedshiftSink"
override def addBatch(batchId: Long, data: DataFrame): Unit = {
if (batchId <= latestBatchId) {
log.info(s"Skipping already committed batch $batchId")
} else {
val mode = if (parameters.overwrite) SaveMode.Overwrite else SaveMode.Append
redshiftWriter.saveToRedshift(sqlContext, data, mode, parameters)
latestBatchId = batchId
}
}
}
然后 com.databricks.spark.redshift.DefaultSource
应该通过实施 org.apache.spark.sql.sources.StreamSinkProvider
来扩展:
/**
* Creates a Sink instance
*/
override def createSink(
sqlContext: SQLContext,
parameters: Map[String, String],
partitionColumns: Seq[String],
outputMode: OutputMode): Sink = {
new RedshiftSink(sqlContext, Parameters.mergeParameters(parameters), new RedshiftWriter(jdbcWrapper, s3ClientFactory))
}
现在您应该可以在结构化流中使用它了:
dataset.writeStream()
.trigger(Trigger.ProcessingTime(10, TimeUnit.SECONDS))
.format("com.databricks.spark.redshift")
.outputMode(OutputMode.Append())
.queryName("redshift-stream")
.start()
更新
要解决向 StreamExecution RedshiftWriter.unloadData()
报告指标的问题,必须更改为使用 data.queryExecution.toRdd.mapPartitions
而不是 data.rdd.mapPartitions
,因为 data.rdd
创建了一个不可见的新计划到 StreamExecution(它使用现有计划来收集指标)。它还需要将转换函数更改为:
val conversionFunctions: Array[(InternalRow, Int) => Any] = data.schema.fields.map { field =>
field.dataType match {
case DateType =>
val dateFormat = Conversions.createRedshiftDateFormat()
(row: InternalRow, ordinal: Int) => {
if (row.isNullAt(ordinal)) null else dateFormat.format(
DateTimeUtils.toJavaDate(row.getInt(ordinal)))
}
case TimestampType =>
val timestampFormat = Conversions.createRedshiftTimestampFormat()
(row: InternalRow, ordinal: Int) => {
if (row.isNullAt(ordinal)) null else timestampFormat.format(
DateTimeUtils.toJavaTimestamp(row.getLong(ordinal)))
}
case StringType =>
(row: InternalRow, ordinal: Int) => {
if (row.isNullAt(ordinal)) null else row.getString(ordinal)
}
case dt: DataType =>
(row: InternalRow, ordinal: Int) => {
if (row.isNullAt(ordinal)) null else row.get(ordinal, dt)
}
}
}
是否可以将 Kafka Streaming 源支持的 Dataframe 写入 AWS Redshift,我们过去曾使用 spark-redshift 写入 Redshift,但我认为它不适用于 DataFrame##writeStream
.鉴于 Redshift 的工作方式,使用 JDBC 连接器和 ForeachWriter
编写也可能不是一个好主意。
我从 Yelp blog is to write the files into S3 and then invoke Redshift COPY
中遇到的一种可能的方法是使用具有 S3 对象路径的清单文件,在结构化流的情况下,我如何控制写入 S3 的文件?在将 5 个文件写入 S3 后,还有一个单独的触发器来创建清单文件。
也欢迎任何其他可能的解决方案。提前致谢。
Spark 能够非常有效地将普通数据帧加载到 Redshift,但我还没有在 Spark 中使用流。
如果您可以连续将流输出写入标准 df,那么在指定的时间间隔内,您可以将该 df 加载到 Redshift 并清空它。
另一种选择是将流发送到 Kinesis 并使用 Kinesis Firehose 将其加载到 Redshift。不过向堆栈添加另一个流层似乎过多。
有一种方法可以在结构化流中使用 spark-redshift,但您必须在自己的分支中实施一些额外的 类。首先你需要一个应该实现 org.apache.spark.sql.execution.streaming.Sink
接口的 RedshiftSink:
private[redshift] class RedshiftSink(
sqlContext: SQLContext,
parameters: MergedParameters,
redshiftWriter: RedshiftWriter) extends Sink {
private val log = LoggerFactory.getLogger(getClass)
@volatile private var latestBatchId = -1L
override def toString(): String = "RedshiftSink"
override def addBatch(batchId: Long, data: DataFrame): Unit = {
if (batchId <= latestBatchId) {
log.info(s"Skipping already committed batch $batchId")
} else {
val mode = if (parameters.overwrite) SaveMode.Overwrite else SaveMode.Append
redshiftWriter.saveToRedshift(sqlContext, data, mode, parameters)
latestBatchId = batchId
}
}
}
然后 com.databricks.spark.redshift.DefaultSource
应该通过实施 org.apache.spark.sql.sources.StreamSinkProvider
来扩展:
/**
* Creates a Sink instance
*/
override def createSink(
sqlContext: SQLContext,
parameters: Map[String, String],
partitionColumns: Seq[String],
outputMode: OutputMode): Sink = {
new RedshiftSink(sqlContext, Parameters.mergeParameters(parameters), new RedshiftWriter(jdbcWrapper, s3ClientFactory))
}
现在您应该可以在结构化流中使用它了:
dataset.writeStream()
.trigger(Trigger.ProcessingTime(10, TimeUnit.SECONDS))
.format("com.databricks.spark.redshift")
.outputMode(OutputMode.Append())
.queryName("redshift-stream")
.start()
更新
要解决向 StreamExecution RedshiftWriter.unloadData()
报告指标的问题,必须更改为使用 data.queryExecution.toRdd.mapPartitions
而不是 data.rdd.mapPartitions
,因为 data.rdd
创建了一个不可见的新计划到 StreamExecution(它使用现有计划来收集指标)。它还需要将转换函数更改为:
val conversionFunctions: Array[(InternalRow, Int) => Any] = data.schema.fields.map { field =>
field.dataType match {
case DateType =>
val dateFormat = Conversions.createRedshiftDateFormat()
(row: InternalRow, ordinal: Int) => {
if (row.isNullAt(ordinal)) null else dateFormat.format(
DateTimeUtils.toJavaDate(row.getInt(ordinal)))
}
case TimestampType =>
val timestampFormat = Conversions.createRedshiftTimestampFormat()
(row: InternalRow, ordinal: Int) => {
if (row.isNullAt(ordinal)) null else timestampFormat.format(
DateTimeUtils.toJavaTimestamp(row.getLong(ordinal)))
}
case StringType =>
(row: InternalRow, ordinal: Int) => {
if (row.isNullAt(ordinal)) null else row.getString(ordinal)
}
case dt: DataType =>
(row: InternalRow, ordinal: Int) => {
if (row.isNullAt(ordinal)) null else row.get(ordinal, dt)
}
}
}