具有 Chronicle-Queue 内存的应用程序不断增长
application with Chronicle-Queue memory keeps on growing
我实现了一个简单的 Spring 启动应用程序,它接收网络消息,使用 appender.writeText(str) 将其排队到 SingleChronicleQueue 中,另一个线程使用 tailer.readText( ).经过一些处理后,已处理的消息被放置在另一个 SingleChronicleQueue 中以被发送出去。
我在应用程序中有三个队列。
应用程序每晚轮换文件,第一件奇怪的事情是文件大小(每个 Q)相同(每个 Q 不同)。
最大的cq4文件每天约220MB
我面临的问题是,从开始到现在的三天内,内存从 480MB 增长到 1.6GB,这是不合理的。
我觉得我在配置中遗漏了一些东西,或者我缺少 naive/bad 实现。 (我不会在每次使用后关闭 appender 和 tailer,我应该吗)。
这是一个精简的例子,也许有人可以阐明一些问题。
@Service
public class QueuesService {
private static Logger LOG = LoggerFactory.getLogger(QueuesService.class);
@Autowired
AppConfiguration conf;
private SingleChronicleQueue Q = null;
private ExcerptAppender QAppender = null;
private ExcerptTailer QTailer = null;
public QueuesService() {
}
@PostConstruct
private void init() {
Q = SingleChronicleQueueBuilder.binary(conf.getQueuePath()).indexSpacing(1).build();
QAppender = Q.acquireAppender();
QTailer = Q.createTailer();
}
public ExcerptAppender getQAppender() {
return QAppender;
}
public ExcerptTailer getQTailer() {
return QTailer;
}
}
@Service
public class ProcessingService {
private static Logger LOG = LoggerFactory.getLogger(ProcessingService.class);
@Autowired
AppConfiguration conf;
@Autowired
private TaskExecutor taskExecutor;
@Autowired
private QueuesService queueService;
private QueueProcessor processor = null;
public ProcessingService() {
}
@PostConstruct
private void init() {
processor = new QueueProcessor();
processor.start();
}
@Override
public Message processMessage(Message msg, Map<String, Object> metadata) throws SomeException {
String strMsg = msg.getMessage().toString();
if (LOG.isInfoEnabled()) {
LOG.info("\n" + strMsg);
}
try {
queueService.getQAppender().writeText(strMsg);
if (LOG.isInfoEnabled()) {
LOG.info("Added new message to queue. index: " + queueService.getQAppender().lastIndexAppended());
}
}
catch(Exception e) {
LOG.error("Unkbown error. reason: " + e.getMessage(), e);
}
}
class QueueProcessor extends Thread {
public void run() {
while (!interrupted()) {
try {
String msg = queueService.getEpicQTailer().readText();
if (msg != null) {
long index = queueService.getEpicQTailer().index();
// process
}
else {
Thread.sleep(10);
}
}
catch (InterruptedException e) {
LOG.warn(e);
this.interrupt();
break;
}
}
ThreadPoolTaskExecutor tp = (ThreadPoolTaskExecutor) taskExecutor;
tp.shutdown();
}
}
}
Chronicle Queue 旨在使用比主内存(或堆)大得多的虚拟内存,而不会对您的系统产生重大影响。这使您可以快速随机访问数据。
这是一个在 3 小时内写入 1 TB 的进程示例。
这显示了随着队列的增长速度变慢了多少
即使在 128 GB 的机器上大小为 1 TB,它也能在 2 秒内相当一致地写入 1 GB。
虽然这不会导致技术问题,但我们知道这确实与同样发现此问题的人有关 "weird",我们计划采用一种模式来减少虚拟内存的使用(即使速度稍慢对于某些用例)
我实现了一个简单的 Spring 启动应用程序,它接收网络消息,使用 appender.writeText(str) 将其排队到 SingleChronicleQueue 中,另一个线程使用 tailer.readText( ).经过一些处理后,已处理的消息被放置在另一个 SingleChronicleQueue 中以被发送出去。 我在应用程序中有三个队列。
应用程序每晚轮换文件,第一件奇怪的事情是文件大小(每个 Q)相同(每个 Q 不同)。 最大的cq4文件每天约220MB
我面临的问题是,从开始到现在的三天内,内存从 480MB 增长到 1.6GB,这是不合理的。
我觉得我在配置中遗漏了一些东西,或者我缺少 naive/bad 实现。 (我不会在每次使用后关闭 appender 和 tailer,我应该吗)。
这是一个精简的例子,也许有人可以阐明一些问题。
@Service
public class QueuesService {
private static Logger LOG = LoggerFactory.getLogger(QueuesService.class);
@Autowired
AppConfiguration conf;
private SingleChronicleQueue Q = null;
private ExcerptAppender QAppender = null;
private ExcerptTailer QTailer = null;
public QueuesService() {
}
@PostConstruct
private void init() {
Q = SingleChronicleQueueBuilder.binary(conf.getQueuePath()).indexSpacing(1).build();
QAppender = Q.acquireAppender();
QTailer = Q.createTailer();
}
public ExcerptAppender getQAppender() {
return QAppender;
}
public ExcerptTailer getQTailer() {
return QTailer;
}
}
@Service
public class ProcessingService {
private static Logger LOG = LoggerFactory.getLogger(ProcessingService.class);
@Autowired
AppConfiguration conf;
@Autowired
private TaskExecutor taskExecutor;
@Autowired
private QueuesService queueService;
private QueueProcessor processor = null;
public ProcessingService() {
}
@PostConstruct
private void init() {
processor = new QueueProcessor();
processor.start();
}
@Override
public Message processMessage(Message msg, Map<String, Object> metadata) throws SomeException {
String strMsg = msg.getMessage().toString();
if (LOG.isInfoEnabled()) {
LOG.info("\n" + strMsg);
}
try {
queueService.getQAppender().writeText(strMsg);
if (LOG.isInfoEnabled()) {
LOG.info("Added new message to queue. index: " + queueService.getQAppender().lastIndexAppended());
}
}
catch(Exception e) {
LOG.error("Unkbown error. reason: " + e.getMessage(), e);
}
}
class QueueProcessor extends Thread {
public void run() {
while (!interrupted()) {
try {
String msg = queueService.getEpicQTailer().readText();
if (msg != null) {
long index = queueService.getEpicQTailer().index();
// process
}
else {
Thread.sleep(10);
}
}
catch (InterruptedException e) {
LOG.warn(e);
this.interrupt();
break;
}
}
ThreadPoolTaskExecutor tp = (ThreadPoolTaskExecutor) taskExecutor;
tp.shutdown();
}
}
}
Chronicle Queue 旨在使用比主内存(或堆)大得多的虚拟内存,而不会对您的系统产生重大影响。这使您可以快速随机访问数据。
这是一个在 3 小时内写入 1 TB 的进程示例。
这显示了随着队列的增长速度变慢了多少
即使在 128 GB 的机器上大小为 1 TB,它也能在 2 秒内相当一致地写入 1 GB。
虽然这不会导致技术问题,但我们知道这确实与同样发现此问题的人有关 "weird",我们计划采用一种模式来减少虚拟内存的使用(即使速度稍慢对于某些用例)