Mysql 和 java 套接字并行压力测试

Mysql and java socket parallel stress test

我有一个 java tcp 套接字应用程序。 Tcp 通信器解码 GPS 位置并将该数据插入数据库,在插入之前我们做一些 selects 和更新,但我所做的都是使用准备好的语句。现在,TCP 通信器的一个线程服务于一个设备 request.Immediately 在创建线程后我们从池中获得一个连接。解码 GPS 数据后,我们对每个数据执行多次 select、更新和插入。随着设备数量的增加,我们 Mysql 数据库的并发连接数也在 increasing.So 我正在尝试进行模拟和压力测试,如下所示。问题是这是顺序测试,但在真实环境中,设备将并行进行。如何为 mysql 和 java 实现接近真实的压力情况,以找出 mysql 可以在第二个插入多少?

 public class stress1 extends Thread {
    public static void main(String[] argv) {
        try {
            for (int i = 0; i < 5000; i++) {
            Socket socket = new Socket("192.168.2.102", 8000);
            PrintWriter out = new PrintWriter(socket.getOutputStream(), true);
            BufferedReader in = new BufferedReader(new InputStreamReader(
            socket.getInputStream()));
            out.println("$A12345,30061104075130528955N10024852E000068*03A1*");
            System.out.println(in.readLine() + i);
            out.close();
            in.close();
            socket.close();
            }
        } catch (Exception e) {
            e.printStackTrace();
       }
      }
    }

这是我的服务器套接字的样子。

public class comm8888 {
    HikariDataSource connectionPool = null;
    private Socket receivedSocketConn1;
    ConnectionHandler(Socket receivedSocketConn1) {
      this.receivedSocketConn1=receivedSocketConn1;
    }
    Connection dbconn = null;
    public void run() { // etc
     DataOutputStream w = null;
     DataInputStream r = null;  
     String message="";
     receivedSocketConn1.setSoTimeout(60000);
       dbconn = connectionPool.getConnection();
     dbconn.setAutoCommit(false);
     try {
         w = new DataOutputStream(new BufferedOutputStream(receivedSocketConn1.getOutputStream()));
         r = new DataInputStream(new BufferedInputStream(receivedSocketConn1.getInputStream()));
         while ((m=r.read()) != -1){
             //multiple prepared based sql select,update and insert here.
         }
     }
     finally{
        try {
            if ( dbconn != null ) {
              dbconn.close();
            }
        }
        catch(SQLException ex){
             ex.printStackTrace();
        }
        try{
           if ( w != null ){
                w.close();
                r.close();
                receivedSocketConn1.close();
            }
        }
        catch(IOException ex){
           ex.printStackTrace(System.out);
        }
      }
   }
}
    public static void main(String[] args) {
      new comm8888();
    }
    comm8888() {
      try {
          HikariConfig config = new HikariConfig();
                config.setJdbcUrl("jdbc:mysql://localhost:3306/testdata"); 
                config.setUsername("****"); 
                config.setPassword("****");      
                config.setMaximumPoolSize(20);      
          connectionPool = new HikariDataSource(config); // setup the connection pool           
       }
          catch (Exception e) {
                e.printStackTrace(System.out);
         }
          try 
          {
                   final ServerSocket serverSocketConn = new ServerSocket(8888);                
                   while (true){
                            try {
                                    Socket socketConn1 = serverSocketConn.accept();
                                    new Thread(new ConnectionHandler(socketConn1)).start();                     
                            }
                            catch(Exception e){
                                e.printStackTrace(System.out);
                            }
                        }
          } 
          catch (Exception e) {
             e.printStackTrace(System.out);
          }
    }
} 

您的解决方案无法很好地扩展。我会将要完成的工作(设备的 GPS 位置)封装在一些 class 的对象中,然后将每个工作单元放入队列中。最后,一个线程可以顺序处理所有工作,一次处理一个请求。

如果一个线程无法跟上并且您的队列已满,那么通过添加更多工作人员来处理队列中的作业就可以很容易地进行扩展。 (或者如果 MySQL 的一个实例无法处理所有插入,您也可以尝试对数据进行水平分片并添加多个 MySQL 实例)。

这是一些示例代码:

import java.sql.Connection;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;


public class QueueingDatabaseProcessingExample {

    public static void main(String[] args) throws InterruptedException {
        QueueingDatabaseProcessingExample a = new QueueingDatabaseProcessingExample();
        a.doTheWork();
    }

    private void doTheWork() throws InterruptedException {

        BlockingQueue<TcpCommunicatorUnitOfWork> blockingQueue = new ArrayBlockingQueue(1000);

        // add work to queue as needed
        blockingQueue.put(new TcpCommunicatorUnitOfWork("device id", 40.7143528, -74.0059731, 10));  // blocks if queue is full


        Connection connection;

        // get connection to the database from database pool

        // process requests one by one sequentially
        while (true) {
            TcpCommunicatorUnitOfWork tcpCommunicatorUnitOfWork = blockingQueue.take(); // blocks if queue is empty

            proccess(tcpCommunicatorUnitOfWork);
        }
    }

    private void proccess(TcpCommunicatorUnitOfWork tcpCommunicatorUnitOfWork) {
        // do queries, inserts, deletes to database
    }

}


/**
 * this class should have all the information needed to query/update the database
 */
class TcpCommunicatorUnitOfWork {

    private final String deviceId;
    private final double latitude;
    private final double longitude;
    private final int seaLevel;

    public TcpCommunicatorUnitOfWork(String deviceId, double latitude, double longitude, int seaLevel) {
        this.deviceId = deviceId;
        this.latitude = latitude;
        this.longitude = longitude;
        this.seaLevel = seaLevel;
    }


}