Chronicle Queue - 从上次读取的位置读取并删除所有消费者读取的文件
Chronicle Queue - Read from last read position and delete files if read by all consumers
我正在使用 Chronicle 4.5.27 编写和读取市场数据。
我有一个作家,但有多个 reader。开发 OS 是 Windows,然后是 Linux 用于 Prod 部署。
如何实现以下用例?
- 如何从上次读取位置开始读取队列?例如如果 reader 已经从一个有 100 条记录的文件中读取了 15 条记录,并且 crashed/stopped 如何从下次重新启动时从第 16 条记录开始读取? CQ 是否有内置的持久支持?
- 删除所有消费者读取的文件以节省磁盘space。
为此,我已经实施,但似乎由于某些未解决的问题,文件未在 windows 上删除。 CQ 中是否有任何内置支持,只有所有感兴趣的消费者都可以删除文件?
public static long readMarketData(String pathForMarketDataFile, long indexFrom) {
SingleChronicleQueue queue = SingleChronicleQueueBuilder.binary(pathForMarketDataFile).rollCycle(RollCycles.MINUTELY).storeFileListener(new StoreFileListener() {
@Override
public void onReleased(int i, File file) {
System.out.println("File is not in use and is ready for deletion: " + file.getName());
try {
file.delete();
} catch (IOException e) {
e.printStackTrace();
}
;
System.out.println("File deleted: " + file.getName() );
}
@Override
public void onAcquired(int cycle, File file) {
System.out.println("File is use for reading: " + file.getName());
}
}).build();
我读过一些关于这个主题的博客和帖子,例如
https://vanilla-java.github.io/2016/03/29/Microservices-in-the-Chronicle-world-Part-4.html
https://groups.google.com/forum/#!topic/java-chronicle/0Nz5P-nvLgM
但还是想知道是否有人实现了这个用例。
使用 MessageHistory
执行消费者高水位标记跟踪,但是,这要求您的消费者也将输出写入编年史队列(实质上是将消费者读取序列存储在输出队列中) .
或者,您需要实现自己的机制来记录每个消费者看到的最高序列(索引)。
在删除文件方面,可能有其他进程持有队列文件的打开文件句柄。如果 reader-A 不再使用队列文件 15.cq4,那么您的代码将尝试调用 file.delete(),但 reader-B 可能仍会引用该文件,阻止它被删除。
一个更稳健的策略是从每个 reader 到另一个 service/process 有某种事件负责在所有 reader 完成处理后删除文件.
1) 如果你看文档
https://github.com/OpenHFT/Chronicle-Queue#restartable-tailers ,命名您的裁缝将自动处理此功能。
你需要给你的每一个阅读tailer起一个唯一的名字,比如下面这行代码'a'
ExcerptTailer atailer = cq.createTailer("a");
tailer 将其索引存储在队列本身中并维护该索引。当您的阅读服务重新启动时,索引将被选取并从上次阅读的位置继续。
2) 当您调用 file.delete() 时,文件不会在 windows 上被删除并且功能在 Linux 上运行良好。 Windows 锁定程序当前正在使用的文件。您不能删除您的应用程序当前正在使用的文件。
其他线程中提到了其他解决方案
Java 'file.delete()' Is not Deleting Specified File
我正在使用 Chronicle 4.5.27 编写和读取市场数据。 我有一个作家,但有多个 reader。开发 OS 是 Windows,然后是 Linux 用于 Prod 部署。
如何实现以下用例?
- 如何从上次读取位置开始读取队列?例如如果 reader 已经从一个有 100 条记录的文件中读取了 15 条记录,并且 crashed/stopped 如何从下次重新启动时从第 16 条记录开始读取? CQ 是否有内置的持久支持?
- 删除所有消费者读取的文件以节省磁盘space。
为此,我已经实施,但似乎由于某些未解决的问题,文件未在 windows 上删除。 CQ 中是否有任何内置支持,只有所有感兴趣的消费者都可以删除文件?
public static long readMarketData(String pathForMarketDataFile, long indexFrom) {
SingleChronicleQueue queue = SingleChronicleQueueBuilder.binary(pathForMarketDataFile).rollCycle(RollCycles.MINUTELY).storeFileListener(new StoreFileListener() {
@Override
public void onReleased(int i, File file) {
System.out.println("File is not in use and is ready for deletion: " + file.getName());
try {
file.delete();
} catch (IOException e) {
e.printStackTrace();
}
;
System.out.println("File deleted: " + file.getName() );
}
@Override
public void onAcquired(int cycle, File file) {
System.out.println("File is use for reading: " + file.getName());
}
}).build();
我读过一些关于这个主题的博客和帖子,例如
https://vanilla-java.github.io/2016/03/29/Microservices-in-the-Chronicle-world-Part-4.html
https://groups.google.com/forum/#!topic/java-chronicle/0Nz5P-nvLgM
但还是想知道是否有人实现了这个用例。
使用 MessageHistory
执行消费者高水位标记跟踪,但是,这要求您的消费者也将输出写入编年史队列(实质上是将消费者读取序列存储在输出队列中) .
或者,您需要实现自己的机制来记录每个消费者看到的最高序列(索引)。
在删除文件方面,可能有其他进程持有队列文件的打开文件句柄。如果 reader-A 不再使用队列文件 15.cq4,那么您的代码将尝试调用 file.delete(),但 reader-B 可能仍会引用该文件,阻止它被删除。
一个更稳健的策略是从每个 reader 到另一个 service/process 有某种事件负责在所有 reader 完成处理后删除文件.
1) 如果你看文档 https://github.com/OpenHFT/Chronicle-Queue#restartable-tailers ,命名您的裁缝将自动处理此功能。
你需要给你的每一个阅读tailer起一个唯一的名字,比如下面这行代码'a'
ExcerptTailer atailer = cq.createTailer("a");
tailer 将其索引存储在队列本身中并维护该索引。当您的阅读服务重新启动时,索引将被选取并从上次阅读的位置继续。
2) 当您调用 file.delete() 时,文件不会在 windows 上被删除并且功能在 Linux 上运行良好。 Windows 锁定程序当前正在使用的文件。您不能删除您的应用程序当前正在使用的文件。
其他线程中提到了其他解决方案 Java 'file.delete()' Is not Deleting Specified File