您可以从 ItemListener 中访问 Hazelcast 队列吗?

Can you access a Hazelcast Queue from within an ItemListener?

我有一个用例,其中有一组项目 DiagnosticRuns,已提交到我的集群。我想连续处理它们(以避免冲突)。我正在尝试使用受 Lock 保护的 Hazelcast Queue 来确保一次处理一个项目。 Hazelcast 在我的集群中处于嵌入式模式 运行。如果我用 Queue 注册一个 ItemListener,从 itemAdded() 方法中调用 Queue 上的 take() 是否安全?例如:

@Component      
public class DistributedQueueListener
{
    public static final String DIAGNOSTICS_RUN_QUEUE_NAME = "diagnosticRun";

    @Autowired
    private HazelcastInstance hazelcast;

    @Autowired
    private ProductVersioningService productVersioningService;

    private IQueue<DiagnosticRun> diagnosticRunQueue;
    private ILock diagnosticRunLock;
    private String diagnosticRunListenerId;

    @PostConstruct
    public void init()
    {
        diagnosticRunQueue = hazelcast.getQueue(DIAGNOSTICS_RUN_QUEUE_NAME);
        diagnosticRunLock = hazelcast.getLock("diagnosticRunLock");
        diagnosticRunListenerId = diagnosticRunQueue.addItemListener(new DiagnosticRunListener(), false);
    }

    @PreDestroy
    public void stop()
    {
        diagnosticRunQueue.removeItemListener(diagnosticRunListenerId);
    }

    public class DiagnosticRunListener implements ItemListener<DiagnosticRun>
    {
        @Override
        public void itemAdded(ItemEvent<diagnosticRun> item)
        {
            diagnosticRunLock.lock(5, TimeUnit.SECONDS);
            try
            {
                DiagnosticRun diagnosticRun = diagnosticRunQueue.poll();
                if(diagnosticRun != null)
                {
                    productVersioningService.updateProductDeviceTable(diagnosticRun);
                }
            }
            finally
            {
                diagnosticRunLock.unlock();
            }
        }

        @Override
        public void itemRemoved(ItemEvent<diagnosticRun> item)
        {
        }   
    }       
}

我不确定从该位置和线程在 Queue 上调用 take() 是否线程安全。

如果不允许,我将不得不设置自己的长 运行 循环到 poll() Queue。我不确定在 Spring 引导应用程序中设置长 运行 线程的最佳方法是什么。假设上面的方法不起作用,下面的代码是线程安全的吗?或者有更好的方法吗?

@Component      
public class DistributedQueueListener
{
    public static final String DIAGNOSTIC_RUN_QUEUE_NAME = "diagnosticRun";

    @Autowired
    private HazelcastInstance hazelcast;

    @Autowired
    private ProductVersioningService productVersioningService;

    private IQueue<diagnosticRun> diagnosticRunQueue;
    private ILock diagnosticRunLock;

    private ExecutorService executorService;

    @PostConstruct
    public void init()
    {
        diagnosticRunQueue = hazelcast.getQueue(DIAGNOSTIC_RUN_QUEUE_NAME);
        diagnosticRunLock = hazelcast.getLock("diagnosticRunLock");

        executorService = Executors.newFixedThreadPool(1);
        executorService.submit(() -> listenToDiagnosticRuns());
    }

    @PreDestroy
    public void stop()
    {
        executorService.shutdown();
    }

    private void listenToDiagnosticRuns()
    {
        while(!executorService.isShutdown())
        {
            diagnosticRunLock.lock(5, TimeUnit.SECONDS);
            try
            {
                DiagnosticRun diagnosticRun = diagnosticRunQueue.poll(1L, TimeUnit.SECONDS);
                productVersioningService.updateProductDeviceTable(diagnosticRun);
            }
            catch(InterruptedException e)
            {
                logger.error("Interrupted polling diagnosticRun queue", e);
            }
            finally
            {
                diagnosticRunLock.unlock();
            }
        }
    }
}

首先,我要证明我不是执行这些线程的专家,所以有些人可能不同意,但这是我对此的看法,所以任何人都请插话,因为这看起来是有趣的案例。您的第一个解决方案将 Hazelcast 事件线程与其操作线程混合在一起。事实上,由于单个事件,您触发了三个要调用的操作。如果您在对 updateProcductDeviceTable 的调用中设置了一些任意延迟,您最终会发现它会变慢,但会在一段时间后再次恢复。这将导致您的本地事件队列在调用操作时堆积起来。您可以将您正在做的所有事情放在一个单独的线程中,您可以 "wake" 将其放在 #itemAdded 上,或者如果您可以承受一些延迟,请在第二个解决方案中执行您正在做的事情。但是,我会在

中进行一些更改

listenToDiagnosticsRuns() 方法:

private void listenToDiagnosticRuns()
{
    while(!executorService.isShutdown())
    {
        if(diagnosticRunQueue.peek() != null)
       {
           diagnosticRunLock.lock(5, TimeUnit.SECONDS);
           try
           {
               DiagnosticRun diagnosticRun = diagnosticRunQueue.poll(1L, TimeUnit.SECONDS);
               if(diagnosticRun != null)
               {
                   productVersioningService.updateProductDeviceTable(diagnosticRun);
               }
           }
           catch(InterruptedException e)
           {
               logger.error("Interrupted polling diagnosticRun queue", e);
           }
           finally
           {
               diagnosticRunLock.unlock();
           }
        } // peek != null
        else
        {
            try
            {
                Thread.sleep(5000);
            }
            catch (InterruptedException e)
            {
                 //do nothing
            }
        }
    }
}