SPARK:如何使用日期差异动态生成 s3 文件路径
SPARK : How to generate s3 file path dynamically using date diff
我正在尝试获取 startDate 和 endDate 之间的文件列表,并从这些文件夹中读取文件:
例如我的文件结构是这样的:BucketName/year/month/day/files
s3://testBucket/2016/10/16/part00000
这些文件都是json。问题是我需要加载 starDate 和结束日期之间的所有路径:
开始日期(10/16/2016) 和结束日期(09/16/2016) 我想阅读从 09/16/2016(含) ....到 .... 10/ 16/2016(含)
import org.joda.time.Days
import org.joda.time.DurationFieldType
import org.joda.time.LocalDate
import org.joda.time.format.DateTimeFormat
import org.joda.time.format.DateTimeFormatter
val s3Bucket: String = "S3://myTestBucket/"
val startTimestamp: String = "2016-09-16T00:00:00Z"
val endTimestamp: String = "2016-10-16T00:00:00Z"
val dtf: DateTimeFormatter = DateTimeFormat.forPattern( "yyyy-MMM-dd" )
val startDate: LocalDate = dtf.parseLocalDate( startTimestamp )
val endDate: LocalDate = dtf.parseLocalDate( endTimestamp )
val days: Int = Days.daysBetween( startDate, endDate ).getDays
System.out.print( days )
val dates = new ListBuffer[String]()
var i: Int = 0
while (i < days) {
{
val d: LocalDate = startDate.withFieldAdded( DurationFieldType.days, i )
val tempDate: String = s3Bucket + d.getYear + "/" + d.getMonthOfYear + "/" + d.getDayOfMonth + "/" + "*"
dates += tempDate
}
{
i += 1;
}
}
val dateList = dates.toList
val files = dateList.mkString(", ")
sqlContext.read.json(files)
这样做正确吗?还有其他有效的方法吗?
根据给定的解决方案,我得到了这个错误:
org.apache.spark.SparkException: Job 2 cancelled because SparkContext was shut down
at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop.apply(DAGScheduler.scala:806)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop.apply(DAGScheduler.scala:804)
at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:804)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1658)
at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84)
at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1581)
at org.apache.spark.SparkContext$$anonfun$stop.apply$mcV$sp(SparkContext.scala:1731)
at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1229)
at org.apache.spark.SparkContext.stop(SparkContext.scala:1730)
at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend$MonitorThread.run(YarnClientSchedulerBackend.scala:147)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)
at org.apache.spark.rdd.RDD$$anonfun$collect.apply(RDD.scala:927)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.RDD.collect(RDD.scala:926)
at org.apache.spark.sql.sources.HadoopFsRelation$.listLeafFilesInParallel(interfaces.scala:904)
at org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.listLeafFiles(interfaces.scala:445)
at org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.refresh(interfaces.scala:477)
at org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache$lzycompute(interfaces.scala:489)
at org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache(interfaces.scala:487)
at org.apache.spark.sql.sources.HadoopFsRelation.cachedLeafStatuses(interfaces.scala:494)
at org.apache.spark.sql.execution.datasources.json.JSONRelation$$anonfun.apply(JSONRelation.scala:110)
at org.apache.spark.sql.execution.datasources.json.JSONRelation$$anonfun.apply(JSONRelation.scala:109)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.sql.execution.datasources.json.JSONRelation.dataSchema$lzycompute(JSONRelation.scala:109)
at org.apache.spark.sql.execution.datasources.json.JSONRelation.dataSchema(JSONRelation.scala:108)
at org.apache.spark.sql.sources.HadoopFsRelation.schema$lzycompute(interfaces.scala:636)
at org.apache.spark.sql.sources.HadoopFsRelation.schema(interfaces.scala:635)
at org.apache.spark.sql.execution.datasources.LogicalRelation.<init>(LogicalRelation.scala:37)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:125)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:136)
at org.apache.spark.sql.DataFrameReader.json(DataFrameReader.scala:263)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:58)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:63)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:65)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:67)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:69)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:71)
at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:73)
at $iwC$$iwC$$iwC$$iwC.<init>(<console>:75)
at $iwC$$iwC$$iwC.<init>(<console>:77)
at $iwC$$iwC.<init>(<console>:79)
at $iwC.<init>(<console>:81)
at <init>(<console>:83)
at .<init>(<console>:87)
at .<clinit>(<console>)
at .<init>(<console>:7)
at .<clinit>(<console>)
at $print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346)
at org.apache.spark.repl.SparkIMain.loadAndRunReq(SparkIMain.scala:840)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
at org.apache.zeppelin.spark.SparkInterpreter.interpretInput(SparkInterpreter.java:664)
at org.apache.zeppelin.spark.SparkInterpreter.interpret(SparkInterpreter.java:629)
at org.apache.zeppelin.spark.SparkInterpreter.interpret(SparkInterpreter.java:622)
at org.apache.zeppelin.interpreter.ClassloaderInterpreter.interpret(ClassloaderInterpreter.java:57)
at org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:93)
at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:276)
at org.apache.zeppelin.scheduler.Job.run(Job.java:170)
at org.apache.zeppelin.scheduler.FIFOScheduler.run(FIFOScheduler.java:118)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access1(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
我不认为它可以高效,但它绝对不是惯用的(使用while
和var
) 并且可以变得更短更简洁:
val s3Bucket: String = "S3://myTestBucket/"
val startDate: LocalDate = new LocalDate(2016, 9, 16)
val endDate: LocalDate = new LocalDate(2016, 10, 16)
val days: Int = Days.daysBetween(startDate, endDate).getDays
val pathDTF = DateTimeFormat.forPattern("yyyy/MM/dd")
val files: Seq[String] = (0 to days)
.map(startDate.plusDays)
.map(d => s"$s3Bucket${pathDTF.print(d)}/*")
val result = sqlContext.read.json(files: _*)
编辑:感谢@Newbie 的注意——确实不能将文件列表传递给read.json(...)
,所以最后一行应该是:
val result = sqlContext.read.json(sc.textFile(files.mkString(",")))
我正在尝试获取 startDate 和 endDate 之间的文件列表,并从这些文件夹中读取文件:
例如我的文件结构是这样的:BucketName/year/month/day/files
s3://testBucket/2016/10/16/part00000
这些文件都是json。问题是我需要加载 starDate 和结束日期之间的所有路径:
开始日期(10/16/2016) 和结束日期(09/16/2016) 我想阅读从 09/16/2016(含) ....到 .... 10/ 16/2016(含)
import org.joda.time.Days
import org.joda.time.DurationFieldType
import org.joda.time.LocalDate
import org.joda.time.format.DateTimeFormat
import org.joda.time.format.DateTimeFormatter
val s3Bucket: String = "S3://myTestBucket/"
val startTimestamp: String = "2016-09-16T00:00:00Z"
val endTimestamp: String = "2016-10-16T00:00:00Z"
val dtf: DateTimeFormatter = DateTimeFormat.forPattern( "yyyy-MMM-dd" )
val startDate: LocalDate = dtf.parseLocalDate( startTimestamp )
val endDate: LocalDate = dtf.parseLocalDate( endTimestamp )
val days: Int = Days.daysBetween( startDate, endDate ).getDays
System.out.print( days )
val dates = new ListBuffer[String]()
var i: Int = 0
while (i < days) {
{
val d: LocalDate = startDate.withFieldAdded( DurationFieldType.days, i )
val tempDate: String = s3Bucket + d.getYear + "/" + d.getMonthOfYear + "/" + d.getDayOfMonth + "/" + "*"
dates += tempDate
}
{
i += 1;
}
}
val dateList = dates.toList
val files = dateList.mkString(", ")
sqlContext.read.json(files)
这样做正确吗?还有其他有效的方法吗?
根据给定的解决方案,我得到了这个错误:
org.apache.spark.SparkException: Job 2 cancelled because SparkContext was shut down
at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop.apply(DAGScheduler.scala:806)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop.apply(DAGScheduler.scala:804)
at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:804)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1658)
at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84)
at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1581)
at org.apache.spark.SparkContext$$anonfun$stop.apply$mcV$sp(SparkContext.scala:1731)
at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1229)
at org.apache.spark.SparkContext.stop(SparkContext.scala:1730)
at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend$MonitorThread.run(YarnClientSchedulerBackend.scala:147)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)
at org.apache.spark.rdd.RDD$$anonfun$collect.apply(RDD.scala:927)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.RDD.collect(RDD.scala:926)
at org.apache.spark.sql.sources.HadoopFsRelation$.listLeafFilesInParallel(interfaces.scala:904)
at org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.listLeafFiles(interfaces.scala:445)
at org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.refresh(interfaces.scala:477)
at org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache$lzycompute(interfaces.scala:489)
at org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache(interfaces.scala:487)
at org.apache.spark.sql.sources.HadoopFsRelation.cachedLeafStatuses(interfaces.scala:494)
at org.apache.spark.sql.execution.datasources.json.JSONRelation$$anonfun.apply(JSONRelation.scala:110)
at org.apache.spark.sql.execution.datasources.json.JSONRelation$$anonfun.apply(JSONRelation.scala:109)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.sql.execution.datasources.json.JSONRelation.dataSchema$lzycompute(JSONRelation.scala:109)
at org.apache.spark.sql.execution.datasources.json.JSONRelation.dataSchema(JSONRelation.scala:108)
at org.apache.spark.sql.sources.HadoopFsRelation.schema$lzycompute(interfaces.scala:636)
at org.apache.spark.sql.sources.HadoopFsRelation.schema(interfaces.scala:635)
at org.apache.spark.sql.execution.datasources.LogicalRelation.<init>(LogicalRelation.scala:37)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:125)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:136)
at org.apache.spark.sql.DataFrameReader.json(DataFrameReader.scala:263)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:58)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:63)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:65)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:67)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:69)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:71)
at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:73)
at $iwC$$iwC$$iwC$$iwC.<init>(<console>:75)
at $iwC$$iwC$$iwC.<init>(<console>:77)
at $iwC$$iwC.<init>(<console>:79)
at $iwC.<init>(<console>:81)
at <init>(<console>:83)
at .<init>(<console>:87)
at .<clinit>(<console>)
at .<init>(<console>:7)
at .<clinit>(<console>)
at $print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346)
at org.apache.spark.repl.SparkIMain.loadAndRunReq(SparkIMain.scala:840)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
at org.apache.zeppelin.spark.SparkInterpreter.interpretInput(SparkInterpreter.java:664)
at org.apache.zeppelin.spark.SparkInterpreter.interpret(SparkInterpreter.java:629)
at org.apache.zeppelin.spark.SparkInterpreter.interpret(SparkInterpreter.java:622)
at org.apache.zeppelin.interpreter.ClassloaderInterpreter.interpret(ClassloaderInterpreter.java:57)
at org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:93)
at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:276)
at org.apache.zeppelin.scheduler.Job.run(Job.java:170)
at org.apache.zeppelin.scheduler.FIFOScheduler.run(FIFOScheduler.java:118)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access1(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
我不认为它可以高效,但它绝对不是惯用的(使用while
和var
) 并且可以变得更短更简洁:
val s3Bucket: String = "S3://myTestBucket/"
val startDate: LocalDate = new LocalDate(2016, 9, 16)
val endDate: LocalDate = new LocalDate(2016, 10, 16)
val days: Int = Days.daysBetween(startDate, endDate).getDays
val pathDTF = DateTimeFormat.forPattern("yyyy/MM/dd")
val files: Seq[String] = (0 to days)
.map(startDate.plusDays)
.map(d => s"$s3Bucket${pathDTF.print(d)}/*")
val result = sqlContext.read.json(files: _*)
编辑:感谢@Newbie 的注意——确实不能将文件列表传递给read.json(...)
,所以最后一行应该是:
val result = sqlContext.read.json(sc.textFile(files.mkString(",")))