Java 8 个流和并发写入
Java 8 streams and concurrent writes
我有这样的代码
public static void main(String[] args) throws Exception {
long start = System.currentTimeMillis();
List<String> matches = new Vector<>(); // Race condition for ArrayList??
BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream("AHugeFile.txt")));
BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(new FileOutputStream("output.txt")));
reader.lines().parallel()
.filter(s -> s.matches("someFancyRegEx"))
.forEach(s -> {
matches.add(s);
try {
writer.write(s);
writer.newLine();
} catch (Exception e) {
System.out.println("error");
}
}
);
out.println("Processing took " + (System.currentTimeMillis() - start) / 1000 + " seconds and matches " + matches.size());
reader.close();
writer.flush();
writer.close();
}
我注意到,如果我在第 3 行用 ArrayList 替换 Vector,每次匹配都会得到不同的结果。我只是想在 Streams 上动手,但假设 forEach 同时执行并尝试写入 ArrayList,而 ArrayList 错过了一些写入!使用 Vector,结果是一致的。
我有两个问题:
- 我关于 ArrayList 导致 RACE 的推理是否正确?
- 鉴于 'write' 也在同一个终端操作中写入文件,'write' 是否可能遗漏某些行?在我的测试中,运行程序几次,结果似乎与正确写出的行数一致。
ArrayList 不是同步集合,所以它会导致 RACE 条件。所有改变向量状态的方法都是同步的,所以你在那里没有发现任何问题。
BufferedWriter的write方法是synchronized的,所以所有的写在线程间都是一致的。因此文件中的单个写操作将是线程安全的。但是您将需要显式处理同步以使其在线程之间保持一致。
这里是Java6中write方法的代码片段。
public void write(String s, int off, int len) throws IOException {
synchronized (lock) {
ensureOpen();
int b = off, t = off + len;
while (b < t) {
int d = min(nChars - nextChar, t - b);
s.getChars(b, b + d, cb, nextChar);
b += d;
nextChar += d;
if (nextChar >= nChars)
flushBuffer();
}
}
}
}
要事第一:定义你是否关心行的书写顺序; .
第二:使用什么工具Java8提供的;它有两个非常方便的方法,分别是 Files.lines()
和 Files.write()
.
第三:正确处理你的资源!您的代码无法保证文件描述符将正确关闭。
第四:.matches()
每次都会重新创建一个 Pattern
并且您总是使用相同的正则表达式进行过滤...您在浪费资源。
第五:考虑到BufferedWriter
的write方法是同步的,你不会从并行中获得太多。
这是我的做法:
public static void writeFiltered(final Path srcFile, final Path dstFile,
final String regex)
throws IOException
{
final Pattern pattern = Pattern.compile(regex);
final List<String> filteredLines;
try (
// UTF-8 by default
final Stream<String> srcLines = Files.lines(srcFile);
) {
filteredLines = srcLines.map(pattern::matcher)
.filter(Matcher::matches)
.collect(Collectors.toList());
}
// UTF-8 by default
Files.write(dstFile, filteredLines);
}
我有这样的代码
public static void main(String[] args) throws Exception {
long start = System.currentTimeMillis();
List<String> matches = new Vector<>(); // Race condition for ArrayList??
BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream("AHugeFile.txt")));
BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(new FileOutputStream("output.txt")));
reader.lines().parallel()
.filter(s -> s.matches("someFancyRegEx"))
.forEach(s -> {
matches.add(s);
try {
writer.write(s);
writer.newLine();
} catch (Exception e) {
System.out.println("error");
}
}
);
out.println("Processing took " + (System.currentTimeMillis() - start) / 1000 + " seconds and matches " + matches.size());
reader.close();
writer.flush();
writer.close();
}
我注意到,如果我在第 3 行用 ArrayList 替换 Vector,每次匹配都会得到不同的结果。我只是想在 Streams 上动手,但假设 forEach 同时执行并尝试写入 ArrayList,而 ArrayList 错过了一些写入!使用 Vector,结果是一致的。
我有两个问题:
- 我关于 ArrayList 导致 RACE 的推理是否正确?
- 鉴于 'write' 也在同一个终端操作中写入文件,'write' 是否可能遗漏某些行?在我的测试中,运行程序几次,结果似乎与正确写出的行数一致。
ArrayList 不是同步集合,所以它会导致 RACE 条件。所有改变向量状态的方法都是同步的,所以你在那里没有发现任何问题。
BufferedWriter的write方法是synchronized的,所以所有的写在线程间都是一致的。因此文件中的单个写操作将是线程安全的。但是您将需要显式处理同步以使其在线程之间保持一致。
这里是Java6中write方法的代码片段。
public void write(String s, int off, int len) throws IOException {
synchronized (lock) {
ensureOpen();
int b = off, t = off + len;
while (b < t) {
int d = min(nChars - nextChar, t - b);
s.getChars(b, b + d, cb, nextChar);
b += d;
nextChar += d;
if (nextChar >= nChars)
flushBuffer();
}
}
}
}
要事第一:定义你是否关心行的书写顺序;
第二:使用什么工具Java8提供的;它有两个非常方便的方法,分别是 Files.lines()
和 Files.write()
.
第三:正确处理你的资源!您的代码无法保证文件描述符将正确关闭。
第四:.matches()
每次都会重新创建一个 Pattern
并且您总是使用相同的正则表达式进行过滤...您在浪费资源。
第五:考虑到BufferedWriter
的write方法是同步的,你不会从并行中获得太多。
这是我的做法:
public static void writeFiltered(final Path srcFile, final Path dstFile,
final String regex)
throws IOException
{
final Pattern pattern = Pattern.compile(regex);
final List<String> filteredLines;
try (
// UTF-8 by default
final Stream<String> srcLines = Files.lines(srcFile);
) {
filteredLines = srcLines.map(pattern::matcher)
.filter(Matcher::matches)
.collect(Collectors.toList());
}
// UTF-8 by default
Files.write(dstFile, filteredLines);
}