Java 以 Closeable 资源作为累加器的收集器
Java Collector with Closeable resource as accumulator
假设我正在尝试创建一个收集器,将数据聚合到一个资源中,该资源在使用后必须关闭。有什么方法可以在 Collector
中实现类似于 finally
块的东西吗?在成功的情况下,这可以在 finisher
方法中完成,但似乎没有在异常情况下调用任何方法。
我们的目标是以干净的方式实现如下所示的操作,而不必先将流收集到内存列表中。
stream.collect(groupingBy(this::extractFileName, collectToFile()));
好的,我已经查看了 Collectors
实现,您需要 CollectorImpl
来创建自定义收集器,但它不需要 public。所以我使用它的副本实现了新的(你可能对最后两种方法感兴趣):
public class CollectorUtils<T, A, R> implements Collector<T, A, R> {
static final Set<Collector.Characteristics> CH_ID = Collections
.unmodifiableSet(EnumSet.of(Collector.Characteristics.IDENTITY_FINISH));
private final Supplier<A> supplier;
private final BiConsumer<A, T> accumulator;
private final BinaryOperator<A> combiner;
private final Function<A, R> finisher;
private final Set<Characteristics> characteristics;
CollectorUtils(Supplier<A> supplier, BiConsumer<A, T> accumulator, BinaryOperator<A> combiner,
Function<A, R> finisher, Set<Characteristics> characteristics) {
this.supplier = supplier;
this.accumulator = accumulator;
this.combiner = combiner;
this.finisher = finisher;
this.characteristics = characteristics;
}
CollectorUtils(Supplier<A> supplier, BiConsumer<A, T> accumulator, BinaryOperator<A> combiner,
Set<Characteristics> characteristics) {
this(supplier, accumulator, combiner, castingIdentity(), characteristics);
}
@Override
public BiConsumer<A, T> accumulator() {
return accumulator;
}
@Override
public Supplier<A> supplier() {
return supplier;
}
@Override
public BinaryOperator<A> combiner() {
return combiner;
}
@Override
public Function<A, R> finisher() {
return finisher;
}
@Override
public Set<Characteristics> characteristics() {
return characteristics;
}
@SuppressWarnings("unchecked")
private static <I, R> Function<I, R> castingIdentity() {
return i -> (R) i;
}
public static <C extends Collection<File>> Collector<String, ?, C> toFile() {
return new CollectorUtils<>((Supplier<List<File>>) ArrayList::new, (c, t) -> {
c.add(toFile(t));
}, (r1, r2) -> {
r1.addAll(r2);
return r1;
}, CH_ID);
}
private static File toFile(String fileName) {
try (Closeable type = () -> System.out.println("Complete! closing file " + fileName);) {
// stuff
System.out.println("Converting " + fileName);
return new File(fileName);
} catch (FileNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
throw new RuntimeException("Failed to create file");
}
}
然后我调用流如下:
public static void main(String[] args) {
Stream.of("x.txt", "y.txt","z.txt").collect(CollectorUtils.toFile());
}
输出:
Convertingx.txt
closing filex.txt
Convertingy.txt
closing filey.txt
Convertingz.txt
closing filez.txt
我认为您可以满足您的要求的唯一方法是通过提供给 Stream.onClose
方法的关闭处理程序。假设你有以下 class:
class CloseHandler implements Runnable {
List<Runnable> children = new ArrayList<>();
void add(Runnable ch) { children.add(ch); }
@Override
public void run() { children.forEach(Runnable::run); }
}
现在,您需要按如下方式使用您的流:
CloseHandler closeAll = new CloseHandler();
try (Stream<Something> stream = list.stream().onClose(closeAll)) {
// Now collect
stream.collect(Collectors.groupingBy(
this::extractFileName,
toFile(closeAll)));
}
这使用了 try-with-resources
构造,以便在使用时或发生错误时自动关闭流。请注意,我们将 closeAll
关闭处理程序传递给 Stream.onClose
方法。
这是下游收集器的草图,它将 collect/write/send 元素发送到 Closeable
资源(请注意,我们还将 closeAll
关闭处理程序传递给它):
static Collector<Something, ?, Void> toFile(CloseHandler closeAll) {
class Acc {
SomeResource resource; // this is your closeable resource
Acc() {
try {
resource = new SomeResource(...); // create closeable resource
closeAll.add(this::close); // this::close is a Runnable
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
void add(Something elem) {
try {
// TODO write/send to closeable resource here
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
Acc merge(Acc another) {
// TODO left as an exercise
}
// This is the close handler for this particular closeable resource
private void close() {
try {
// Here we close our closeable resource
if (resource != null) resource.close();
} catch (IOException ignored) {
}
}
}
return Collector.of(Acc::new, Acc::add, Acc::merge, a -> null);
}
因此,这使用本地 class(名为 Acc
)来包装可关闭资源,并声明方法 add
流的元素到可关闭资源,并且还 merge
两个 Acc
实例,以防流是平行的(留作练习,以防值得付出努力)。
Collector.of
用于创建基于 Acc
class' 方法的收集器,具有 returns null
的整理器,因为我们没有'想在 Collectors.groupingBy
.
创建的地图中放置任何内容
最后,还有 close
方法,它会关闭已创建的已包装可关闭资源。
当通过 try-with-resources
构造隐式关闭流时,将自动执行 CloseHandler.run
方法,这将依次执行先前添加的所有子关闭处理程序 Acc
实例已创建。
假设我正在尝试创建一个收集器,将数据聚合到一个资源中,该资源在使用后必须关闭。有什么方法可以在 Collector
中实现类似于 finally
块的东西吗?在成功的情况下,这可以在 finisher
方法中完成,但似乎没有在异常情况下调用任何方法。
我们的目标是以干净的方式实现如下所示的操作,而不必先将流收集到内存列表中。
stream.collect(groupingBy(this::extractFileName, collectToFile()));
好的,我已经查看了 Collectors
实现,您需要 CollectorImpl
来创建自定义收集器,但它不需要 public。所以我使用它的副本实现了新的(你可能对最后两种方法感兴趣):
public class CollectorUtils<T, A, R> implements Collector<T, A, R> {
static final Set<Collector.Characteristics> CH_ID = Collections
.unmodifiableSet(EnumSet.of(Collector.Characteristics.IDENTITY_FINISH));
private final Supplier<A> supplier;
private final BiConsumer<A, T> accumulator;
private final BinaryOperator<A> combiner;
private final Function<A, R> finisher;
private final Set<Characteristics> characteristics;
CollectorUtils(Supplier<A> supplier, BiConsumer<A, T> accumulator, BinaryOperator<A> combiner,
Function<A, R> finisher, Set<Characteristics> characteristics) {
this.supplier = supplier;
this.accumulator = accumulator;
this.combiner = combiner;
this.finisher = finisher;
this.characteristics = characteristics;
}
CollectorUtils(Supplier<A> supplier, BiConsumer<A, T> accumulator, BinaryOperator<A> combiner,
Set<Characteristics> characteristics) {
this(supplier, accumulator, combiner, castingIdentity(), characteristics);
}
@Override
public BiConsumer<A, T> accumulator() {
return accumulator;
}
@Override
public Supplier<A> supplier() {
return supplier;
}
@Override
public BinaryOperator<A> combiner() {
return combiner;
}
@Override
public Function<A, R> finisher() {
return finisher;
}
@Override
public Set<Characteristics> characteristics() {
return characteristics;
}
@SuppressWarnings("unchecked")
private static <I, R> Function<I, R> castingIdentity() {
return i -> (R) i;
}
public static <C extends Collection<File>> Collector<String, ?, C> toFile() {
return new CollectorUtils<>((Supplier<List<File>>) ArrayList::new, (c, t) -> {
c.add(toFile(t));
}, (r1, r2) -> {
r1.addAll(r2);
return r1;
}, CH_ID);
}
private static File toFile(String fileName) {
try (Closeable type = () -> System.out.println("Complete! closing file " + fileName);) {
// stuff
System.out.println("Converting " + fileName);
return new File(fileName);
} catch (FileNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
throw new RuntimeException("Failed to create file");
}
}
然后我调用流如下:
public static void main(String[] args) {
Stream.of("x.txt", "y.txt","z.txt").collect(CollectorUtils.toFile());
}
输出:
Convertingx.txt
closing filex.txt
Convertingy.txt
closing filey.txt
Convertingz.txt
closing filez.txt
我认为您可以满足您的要求的唯一方法是通过提供给 Stream.onClose
方法的关闭处理程序。假设你有以下 class:
class CloseHandler implements Runnable {
List<Runnable> children = new ArrayList<>();
void add(Runnable ch) { children.add(ch); }
@Override
public void run() { children.forEach(Runnable::run); }
}
现在,您需要按如下方式使用您的流:
CloseHandler closeAll = new CloseHandler();
try (Stream<Something> stream = list.stream().onClose(closeAll)) {
// Now collect
stream.collect(Collectors.groupingBy(
this::extractFileName,
toFile(closeAll)));
}
这使用了 try-with-resources
构造,以便在使用时或发生错误时自动关闭流。请注意,我们将 closeAll
关闭处理程序传递给 Stream.onClose
方法。
这是下游收集器的草图,它将 collect/write/send 元素发送到 Closeable
资源(请注意,我们还将 closeAll
关闭处理程序传递给它):
static Collector<Something, ?, Void> toFile(CloseHandler closeAll) {
class Acc {
SomeResource resource; // this is your closeable resource
Acc() {
try {
resource = new SomeResource(...); // create closeable resource
closeAll.add(this::close); // this::close is a Runnable
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
void add(Something elem) {
try {
// TODO write/send to closeable resource here
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
Acc merge(Acc another) {
// TODO left as an exercise
}
// This is the close handler for this particular closeable resource
private void close() {
try {
// Here we close our closeable resource
if (resource != null) resource.close();
} catch (IOException ignored) {
}
}
}
return Collector.of(Acc::new, Acc::add, Acc::merge, a -> null);
}
因此,这使用本地 class(名为 Acc
)来包装可关闭资源,并声明方法 add
流的元素到可关闭资源,并且还 merge
两个 Acc
实例,以防流是平行的(留作练习,以防值得付出努力)。
Collector.of
用于创建基于 Acc
class' 方法的收集器,具有 returns null
的整理器,因为我们没有'想在 Collectors.groupingBy
.
最后,还有 close
方法,它会关闭已创建的已包装可关闭资源。
当通过 try-with-resources
构造隐式关闭流时,将自动执行 CloseHandler.run
方法,这将依次执行先前添加的所有子关闭处理程序 Acc
实例已创建。