Apache spark 将文件读取为正则表达式
Apache spark to read files as regex
我正在将流发送到 HDFS 并尝试使用 spark 读取文本文件。
JavaStreamingContext jssc = new JavaStreamingContext(jsc, new
Duration(1000));
JavaPairInputDStream<LongWritable, Text> textStream =
jssc.fileStream("hdfs://myip:9000/travel/FlumeData.[0-9]*",
LongWritable.class, Text.class, TextInputFormat.class);
在将流发送到 hdfs 时,会创建一些 FlumeData。1234.tmp 文件,一旦收到完整数据,该文件就会转换为正确的文件,例如。 FlumeData.1234
我想忽略此 .tmp 文件成为 read.from spark。我尝试使用正则表达式
hdfs://myip:9000/travel/FlumeData.[0-9]*
hdfs://myip:9000/travel/FlumeData.//d*
但它们不起作用。我正在寻找这样的东西
jssc.fileStream("hdfs://myip:9000/travel/FlumeData.[0-9]*",
LongWritable.class, Text.class, TextInputFormat.class);
fileStream 不应从文件扩展名中读取 .tmp。
我还尝试按照 Hadoop 代码检索苍蝇列表
private String pathValue(String PathVariable) throws IOException{
Configuration conf = new Configuration();
Path path = new Path(PathVariable);
FileSystem fs = FileSystem.get(path.toUri(), conf);
System.out.println("PathVariable" + fs.getWorkingDirectory());
return fs.getName();
}
但是它的 FileSystem 对象 fs 没有 filename()。由于新文件是在 运行 时间创建的。我需要阅读他们创建的内容。
您需要使用 () selector to select 您可以从匹配中保留的部分。如果您没有指定任何部分,则返回整个匹配项。
在你的例子中,如果我没有误解你想要 select 在你的例子中:
FlumeData.1234 from FlumeData.1234.tmp
为此,您需要的简单正则表达式是:
(.*).tmp
如果您想 select .tmp 扩展名之前的所有内容。
JavaPairInputDStream重载的fileStream方法带有过滤函数,我们可以写一个过滤函数过滤掉目录下的文件
fileStream(directory, kClass, vClass, fClass, filter, newFilesOnly)
JavaPairInputDStream<LongWritable, Text> lines = jssc.fileStream("hdfs://myip:9000/travel/", LongWritable.class, Text.class, TextInputFormat.class, new Function<Path,Boolean> () {
public Boolean call(Path path) throws Exception {
System.out.println("Is path :"+path.getName());
Pattern pattern = Pattern.compile("FlumeData.[0-9]*");
Matcher m = pattern.matcher(path.getName());
System.out.println("Is path : " + path.getName().toString() + " matching "
+ " ? , " + m.matches());
return m.matches();
}}, true);
请运行使用上面的代码,我希望这能解决问题。
我正在将流发送到 HDFS 并尝试使用 spark 读取文本文件。
JavaStreamingContext jssc = new JavaStreamingContext(jsc, new
Duration(1000));
JavaPairInputDStream<LongWritable, Text> textStream =
jssc.fileStream("hdfs://myip:9000/travel/FlumeData.[0-9]*",
LongWritable.class, Text.class, TextInputFormat.class);
在将流发送到 hdfs 时,会创建一些 FlumeData。1234.tmp 文件,一旦收到完整数据,该文件就会转换为正确的文件,例如。 FlumeData.1234
我想忽略此 .tmp 文件成为 read.from spark。我尝试使用正则表达式
hdfs://myip:9000/travel/FlumeData.[0-9]* hdfs://myip:9000/travel/FlumeData.//d*
但它们不起作用。我正在寻找这样的东西 jssc.fileStream("hdfs://myip:9000/travel/FlumeData.[0-9]*", LongWritable.class, Text.class, TextInputFormat.class);
fileStream 不应从文件扩展名中读取 .tmp。
我还尝试按照 Hadoop 代码检索苍蝇列表
private String pathValue(String PathVariable) throws IOException{
Configuration conf = new Configuration();
Path path = new Path(PathVariable);
FileSystem fs = FileSystem.get(path.toUri(), conf);
System.out.println("PathVariable" + fs.getWorkingDirectory());
return fs.getName();
}
但是它的 FileSystem 对象 fs 没有 filename()。由于新文件是在 运行 时间创建的。我需要阅读他们创建的内容。
您需要使用 () selector to select 您可以从匹配中保留的部分。如果您没有指定任何部分,则返回整个匹配项。
在你的例子中,如果我没有误解你想要 select 在你的例子中:
FlumeData.1234 from FlumeData.1234.tmp
为此,您需要的简单正则表达式是:
(.*).tmp
如果您想 select .tmp 扩展名之前的所有内容。
JavaPairInputDStream重载的fileStream方法带有过滤函数,我们可以写一个过滤函数过滤掉目录下的文件
fileStream(directory, kClass, vClass, fClass, filter, newFilesOnly)
JavaPairInputDStream<LongWritable, Text> lines = jssc.fileStream("hdfs://myip:9000/travel/", LongWritable.class, Text.class, TextInputFormat.class, new Function<Path,Boolean> () {
public Boolean call(Path path) throws Exception {
System.out.println("Is path :"+path.getName());
Pattern pattern = Pattern.compile("FlumeData.[0-9]*");
Matcher m = pattern.matcher(path.getName());
System.out.println("Is path : " + path.getName().toString() + " matching "
+ " ? , " + m.matches());
return m.matches();
}}, true);
请运行使用上面的代码,我希望这能解决问题。