WMQ 连接套接字在 v9_client 和 v6_server 之间不断关闭

WMQ connection socket constantly closed between v9_client and v6_server

我们有一个独立的 java 应用程序使用第三方工具来管理连接池,它在 v6_client + v6_server 设置中为我们工作了很长时间。

现在我们正在尝试从 v6 迁移到 v9(是的,我们来晚了......),发现 v9_client 到 v6_server 的连接不断中断, 意思是:

我们查看了 v9_client 的完整可调参数列表(XAQCF https://www.ibm.com/support/knowledgecenter/SSFKSJ_9.0.0/com.ibm.mq.ref.dev.doc/q111800_.html) 中的列,并且仅发现我们在 v6_client、SHARECONVALLOWEDPROVIDERVERSION 中未使用的两个潜在新配置.不幸的是,两者都没有帮助我们……具体来说:

只是想知道是否有其他人有类似的经历并可以分享一些见解。我们尝试了 v9_server+v9_client 并没有看到任何类似的问题,所以这也可能是我们最终的解决方案.....

顺便说一句,WMQ 托管在 linux (RedHat) 上,我们只在客户端使用 MQXAQueueConnectionFactory 的产品(ibm mq class for jms)。

谢谢。


其他 details/updates.


[更新-1]

--[操场设置]

v9_client 个罐子:

javax.jms-api-2.0.jar
com.ibm.mq.allclient(-9.0.0.[1/5]).jar

v6_client 个罐子: 除了到v9_client个jar,在eclipse classpath

中引入了以下jar
com.ibm.dhbcore-1.0.jar 
com.ibm.mq.pcf-6.0.3.jar
com.ibm.mqjms-6.0.2.2.jar
com.ibm.mq-6.0.2.2.jar
com.ibm.mqetclient-6.0.2.2.jar
connector.jar
jta-1.1.jar

测试代码-单线程:

import javax.jms.*;
import com.ibm.mq.jms.*;
import com.ibm.msg.client.wmq.WMQConstants;

public class MQSeries_simpleAuditQ {

    private static String queueManager = "QM.RCTQ.ALL.01";
    private static String host = "localhost";
    private static int port = 26005; 

    public static void main(String[] args) throws JMSException {
        MQXAQueueConnectionFactory queueFactory= new MQXAQueueConnectionFactory();

        System.out.println("\n\n\n*******************\nqueueFactory implementation version: " + 
                queueFactory.getClass().getPackage().getImplementationVersion() + "*****************\n\n\n");

        queueFactory.setHostName(host);
        queueFactory.setPort(port);
        queueFactory.setQueueManager(queueManager);
        queueFactory.setTransportType(WMQConstants.WMQ_CM_CLIENT);
        if (queueFactory.getClass().getPackage().getImplementationVersion().split("\.")[0].equals("9")) {
            queueFactory.setProviderVersion("6");
            //queueFactory.setShareConvAllowed(WMQConstants.WMQ_SHARE_CONV_ALLOWED_YES);
    } 
        XASession xaSession;
        javax.jms.QueueConnection xaQueueConnection;
        try {
            // Obtain a QueueConnection
            System.out.println("Creating Connection...");
            xaQueueConnection = (QueueConnection)queueFactory.createXAConnection(" ", "");
            xaQueueConnection.start();

            for (int counter=0; counter<2; counter++) {
                try {
                    xaSession = ((XAConnection)xaQueueConnection).createXASession();
                    xaSession.close();
                } catch (Exception ex) {
                    System.out.println(ex);
                }
            } 
            System.out.println("Closing connection.... ");
            xaQueueConnection.close();
        } catch (Exception e) {
            System.out.println("Error processing " + e.getMessage());
        } 

    }
}

--[观察] v6_client 只创建和关闭了一个套接字,而 v9_client (都是 9.0.0.[1/5]):

天真地我期待套接字保持打开状态直到 xaQueueConnection.close()


[更新-2] 将 queueFactory.setProviderVersion("9");queueFactory.setShareConvAllowed(WMQConstants.WMQ_SHARE_CONV_ALLOWED_YES); 用于 v9_server+v9_client,我们在 v6_server+v9_client 中看不到相同的常量套接字关闭问题,这是个好消息。


[update-3] MCAUSER v6_server 上所有 SVRCONN 频道的属性。在 v9_server 上相同(当与相同的 v9_client 连接时没有相同的套接字关闭问题)。

display channel (SYSTEM.ADMIN.SVRCONN)
    MCAUSER(mqm) 

display channel (SYSTEM.AUTO.SVRCONN)
    MCAUSER( ) 

display channel (SYSTEM.DEF.SVRCONN)
    MCAUSER( )

[update-4]

我尝试将 MCAUSER() 设置为 mqm,然后从客户端同时使用 </code>(空白)和 <code>mqm,两者都可以创建连接,但是仍然看到使用 v9_client+v6_user 关闭相同的意外套接字。更新 MCAUSER() 后,我总是添加 refresh security,并重新启动 qmgr。

在使用空白用户创建连接之前,我还尝试在 Eclipse 中将系统变量设置为空白,但也没有帮助。


[update-5]

将我们的讨论限制在 v9_client+v9_server。下面的异步测试代码使用有限数量的现有连接生成大量 xasession create/close 请求。使用 SHARECNV(1) 我们最终也会得到无法承受的高 TIME_WAIT 计数,但使用大于 1 个 SHARECNV(例如 10)可能会引入额外的性能损失......

异步测试代码

import java.util.ArrayList;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;

import javax.jms.*;
import com.ibm.mq.jms.*;
import com.ibm.msg.client.wmq.WMQConstants;

public class MQSeries_simpleAuditQ_Async_v9 {

private static String queueManager = "QM.ALPQ.ALL.01";
private static int port = 26010; 

private static String host = "localhost";

private static int connCount = 20;
private static int amp = 100;
private static ExecutorService amplifier = Executors.newFixedThreadPool(amp);

public static void main(String[] args) throws JMSException {
    MQXAQueueConnectionFactory queueFactory= new MQXAQueueConnectionFactory();

    System.out.println("\n\n\n*******************\nqueueFactory implementation version: " + 
            queueFactory.getClass().getPackage().getImplementationVersion() + "*****************\n\n\n");

    queueFactory.setTransportType(WMQConstants.WMQ_CM_CLIENT);
    if (queueFactory.getClass().getPackage().getImplementationVersion().split("\.")[0].equals("9")) {
        queueFactory.setProviderVersion("9");
        queueFactory.setShareConvAllowed(WMQConstants.WMQ_SHARE_CONV_ALLOWED_YES);
    } 

    queueFactory.setHostName(host);
    queueFactory.setPort(port);
    queueFactory.setQueueManager(queueManager);
    //queueFactory.setChannel("");

    ArrayList<QueueConnection> xaQueueConnections = new ArrayList<QueueConnection>();
    try {
        // Obtain a QueueConnection
        System.out.println("Creating Connection...");
        //System.setProperty("user.name", "mqm");
        //System.out.println("system username: " + System.getProperty("user.name"));
        for (int ct=0; ct<connCount; ct++) {

            // xaQueueConnection instance of MQXAQueueConnection
            QueueConnection xaQueueConnection = (QueueConnection)queueFactory.createXAConnection(" ", "");
            xaQueueConnection.start();
            xaQueueConnections.add(xaQueueConnection);
        }

        ArrayList<Double> totalElapsedTimeRecord = new ArrayList<Double>();
        ArrayList<FutureTask<Double>> taskBuffer = new ArrayList<FutureTask<Double>>();

        for (int loop=0; loop <= 10; loop++) {
            try {
                for (int i=0; i<amp; i++) {
                    int idx = (int)(Math.random()*((connCount)));
                    System.out.println("Using connection: " + idx);
                    FutureTask<Double> xaSessionPoker = new FutureTask<Double>(new XASessionPoker(xaQueueConnections.get(idx)));
                    amplifier.submit(xaSessionPoker);
                    taskBuffer.add(xaSessionPoker);                 
                }
                System.out.println("loop " + loop + " completed");

            } catch (Exception ex) {
                System.out.println(ex);
            }
        } 

        for (FutureTask<Double> xaSessionPoker : taskBuffer) {
            totalElapsedTimeRecord.add(xaSessionPoker.get());
        }
        System.out.println("Average xaSession poking time: " + calcAverage(totalElapsedTimeRecord));

        System.out.println("Closing connections.... ");
        for (QueueConnection xaQueueConnection : xaQueueConnections) {
            xaQueueConnection.close();
        }
    } catch (Exception e) {
        System.out.println("Error processing " + e.getMessage());
    } 

    amplifier.shutdown();
}

private static double calcAverage(ArrayList<Double> myArr) {
    double sum = 0;
    for (Double val : myArr) {
        sum += val;
    }
    return sum/myArr.size();
}

// create and close session through QueueConnection object passed in.
private static class XASessionPoker implements Callable<Double> {

    // conn instance of MQXAQueueConnection. ref. QueueProviderService
    private QueueConnection conn;

    XASessionPoker(QueueConnection conn) {
        this.conn = conn;
    }
    @Override
    public Double call() throws Exception {
        XASession xaSession;
        double elapsed = 0;
        try {
            final long start = System.currentTimeMillis();

            // ref. DualSessionWrapper
            // xaSession instance of MQXAQueueSession
            xaSession = ((XAConnection) conn).createXASession();

            xaSession.close();
            final long end = System.currentTimeMillis();
            elapsed = (end - start) / 1000.0;
        } catch (Exception e) {
            // TODO Auto-generated catch block
            System.out.println(e);
        }
        return elapsed;
    }  
}
}

我们发现根本原因是 no more session pooling + bitronix TM doesn't offer session pooling across TX 的组合。具体来说(在我们的例子中),bitronix 管理 JmsPooledConnection 池,但每次使用 (xa)session(在 JmsPooledConnection 下)时,都会创建一个新套接字 (createXASession()) 并关闭 (xaSession.close()).

一个解决方案是用 (xa)session 池包装 jms connection,类似于 https://github.com/messaginghub/pooled-jms/tree/master/pooled-jms/src/main/java/org/messaginghub/pooled/jms

中所做的

http://bjansen.github.io/java/2018/03/04/high-performance-mq-jms.html 也表明 Spring CachingConnectionFactory 效果很好,这听起来像是第一个解决方案的特例。