在写入数据库时使用 java 读取 tcp 流
read tcp stream with java while writing to database
我不确定以下情况的最佳解决方案是什么:
我的 Java-程序一直在从 TCP 流中读取数据。同时需要将这些数据持久化到数据库中。应该写入数据库的数据量可能不同
我已经阅读了很多有关消息队列系统等的内容。详细来说,我的解决方案会考虑使用 LinkedBlockingQueue。因此,有两个线程:
a) 启动一个生产者威胁,它将执行从 tcp 流中读取
b) 启动消费者威胁,它将(解析的)数据从流写入数据库
(示例)代码如下所示:
Main.java
public static void main(String[] args) {
LinkedBlockingQueue queue = new LinkedBlockingQueue(50);
Producer producer = new Producer(queue);
Consumer consumer = new Consumer(queue, producer);
Produer.java
public class Producer implements Runnable {
private LinkedBlockingQueue queue;
private boolean running;
public Producer(LinkedBlockingQueue queue) {
this.queue = queue;
running = true;
}
@Override
public void run() {
//read TCP-Stream here and save parsed messages to queue
}
public boolean isRunning() {
return running;
}
Consumer.java
public class Consumer implements Runnable {
private Producer producer;
private LinkedBlockingQueue queue;
public Consumer(LinkedBlockingQueue queue, Producer producer) {
this.queue = queue;
this.producer = producer;
}
@Override
public void run() {
//insert data into database here
if(producer.isRunning()) {
//while producer is running, data needs to be inserted to database
}
}
这是您会推荐使用的解决方案吗?或者您知道更好的解决方案吗?
谢谢!
你自己的建议很好
最终您要解决的是 back pressure 的问题,即如果您接收数据的速度快于将数据写入数据库的速度。这可能只是因为有大量数据到达或仅仅是因为您的目的地暂时不可用。无论哪种方式,这都是您需要处理的情况。
在您提出的解决方案中,这是由内存暂存区(=您的队列)处理的。只要您有足够的内存并且您不太担心在断电时丢失数据,那么内存中策略就可以很好地工作。您的 Java 应用程序中不断增加的内存将吸收一次爆发。这本身不是问题,但请记住,当您的队列最终耗尽时,JVM GC 将启动并再次从 JVM 堆中释放内存。但是从外部,即从 OS 的角度来看,内存可能永远不会被释放。 JVM 在将内存释放回 OS 方面非常非常保守。同样,在大多数情况下这不是问题。
如果您有更严格的需求,那么您需要考虑更多 "robust" 临时区域而不是 RAM,例如本地磁盘。根据我的经验,您提出的解决方案适合 95% 的用例。
我不确定以下情况的最佳解决方案是什么:
我的 Java-程序一直在从 TCP 流中读取数据。同时需要将这些数据持久化到数据库中。应该写入数据库的数据量可能不同
我已经阅读了很多有关消息队列系统等的内容。详细来说,我的解决方案会考虑使用 LinkedBlockingQueue。因此,有两个线程: a) 启动一个生产者威胁,它将执行从 tcp 流中读取 b) 启动消费者威胁,它将(解析的)数据从流写入数据库
(示例)代码如下所示:
Main.java
public static void main(String[] args) {
LinkedBlockingQueue queue = new LinkedBlockingQueue(50);
Producer producer = new Producer(queue);
Consumer consumer = new Consumer(queue, producer);
Produer.java
public class Producer implements Runnable {
private LinkedBlockingQueue queue;
private boolean running;
public Producer(LinkedBlockingQueue queue) {
this.queue = queue;
running = true;
}
@Override
public void run() {
//read TCP-Stream here and save parsed messages to queue
}
public boolean isRunning() {
return running;
}
Consumer.java
public class Consumer implements Runnable {
private Producer producer;
private LinkedBlockingQueue queue;
public Consumer(LinkedBlockingQueue queue, Producer producer) {
this.queue = queue;
this.producer = producer;
}
@Override
public void run() {
//insert data into database here
if(producer.isRunning()) {
//while producer is running, data needs to be inserted to database
}
}
这是您会推荐使用的解决方案吗?或者您知道更好的解决方案吗?
谢谢!
你自己的建议很好
最终您要解决的是 back pressure 的问题,即如果您接收数据的速度快于将数据写入数据库的速度。这可能只是因为有大量数据到达或仅仅是因为您的目的地暂时不可用。无论哪种方式,这都是您需要处理的情况。
在您提出的解决方案中,这是由内存暂存区(=您的队列)处理的。只要您有足够的内存并且您不太担心在断电时丢失数据,那么内存中策略就可以很好地工作。您的 Java 应用程序中不断增加的内存将吸收一次爆发。这本身不是问题,但请记住,当您的队列最终耗尽时,JVM GC 将启动并再次从 JVM 堆中释放内存。但是从外部,即从 OS 的角度来看,内存可能永远不会被释放。 JVM 在将内存释放回 OS 方面非常非常保守。同样,在大多数情况下这不是问题。
如果您有更严格的需求,那么您需要考虑更多 "robust" 临时区域而不是 RAM,例如本地磁盘。根据我的经验,您提出的解决方案适合 95% 的用例。