从 google 云平台读取 spark 作业中的文件
read file in spark jobs from google cloud platform
我在 google 云平台上使用 spark。
显然我正在从文件系统 gs://<bucket>/dir/file
读取文件,但日志输出提示
FileNotFoundException: `gs:/bucket/dir/file (No such file or dir exist)
缺少/
显然是问题所在。我该如何解决?
这是我的代码:
val files = Array(("call 1","gs://<bucket>/google-cloud-dataproc-metainfo/test/123.wav"))
val splitAudioFiles = sc.parallelize(files.map(x => splitAudio(x, 5, sc)))
def splitAudio(path: (String, String), interval: Int, sc: SparkContext): (String, Seq[(String,Int)]) = {
val stopWords = sc.broadcast(loadTxtAsSet("gs://<bucket>/google-cloud-dataproc-metainfo/test/stopword.txt")).value
val keyWords = sc.broadcast(loadTxtAsSet("gs://<bucket>/google-cloud-dataproc-metainfo/test/KeywordList.txt")).value
val file = new File((path._2))
val audioTitle = path._1
val fileFormat: AudioFileFormat = AudioSystem.getAudioFileFormat(file)
val format = fileFormat.getFormat
您使用的似乎是 AudioSystem.getAudioFileFormat(URL),它不支持 gs:// URI。相反,您需要使用 Hadoop FileSystem 接口来获取文件的 InputStream 并使用 AudioSystem.getAudioFileFormat(InputStream)。我认为像这样的东西应该有用:
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.Configuration;
val sc: SparkContext = ...
val urls : RDD[URL] = ...
val formats : RDD[AudioFileFormat] = urls.map(url => {
val asUri = url.toURI
val conf = new Configuration()
val hadoopPath = new Path(asUri)
val hadoopFs = hadooPath.getFileSystem(conf)
val inputStream = hadoopFs.open(hadoopPath)
AudioSystem.getAudioFileFormat(inputStream)
})
我在 google 云平台上使用 spark。
显然我正在从文件系统 gs://<bucket>/dir/file
读取文件,但日志输出提示
FileNotFoundException: `gs:/bucket/dir/file (No such file or dir exist)
缺少/
显然是问题所在。我该如何解决?
这是我的代码:
val files = Array(("call 1","gs://<bucket>/google-cloud-dataproc-metainfo/test/123.wav"))
val splitAudioFiles = sc.parallelize(files.map(x => splitAudio(x, 5, sc)))
def splitAudio(path: (String, String), interval: Int, sc: SparkContext): (String, Seq[(String,Int)]) = {
val stopWords = sc.broadcast(loadTxtAsSet("gs://<bucket>/google-cloud-dataproc-metainfo/test/stopword.txt")).value
val keyWords = sc.broadcast(loadTxtAsSet("gs://<bucket>/google-cloud-dataproc-metainfo/test/KeywordList.txt")).value
val file = new File((path._2))
val audioTitle = path._1
val fileFormat: AudioFileFormat = AudioSystem.getAudioFileFormat(file)
val format = fileFormat.getFormat
您使用的似乎是 AudioSystem.getAudioFileFormat(URL),它不支持 gs:// URI。相反,您需要使用 Hadoop FileSystem 接口来获取文件的 InputStream 并使用 AudioSystem.getAudioFileFormat(InputStream)。我认为像这样的东西应该有用:
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.Configuration;
val sc: SparkContext = ...
val urls : RDD[URL] = ...
val formats : RDD[AudioFileFormat] = urls.map(url => {
val asUri = url.toURI
val conf = new Configuration()
val hadoopPath = new Path(asUri)
val hadoopFs = hadooPath.getFileSystem(conf)
val inputStream = hadoopFs.open(hadoopPath)
AudioSystem.getAudioFileFormat(inputStream)
})