您可以从 ItemListener 中访问 Hazelcast 队列吗?
Can you access a Hazelcast Queue from within an ItemListener?
我有一个用例,其中有一组项目 DiagnosticRun
s,已提交到我的集群。我想连续处理它们(以避免冲突)。我正在尝试使用受 Lock
保护的 Hazelcast Queue
来确保一次处理一个项目。 Hazelcast 在我的集群中处于嵌入式模式 运行。如果我用 Queue
注册一个 ItemListener
,从 itemAdded()
方法中调用 Queue
上的 take()
是否安全?例如:
@Component
public class DistributedQueueListener
{
public static final String DIAGNOSTICS_RUN_QUEUE_NAME = "diagnosticRun";
@Autowired
private HazelcastInstance hazelcast;
@Autowired
private ProductVersioningService productVersioningService;
private IQueue<DiagnosticRun> diagnosticRunQueue;
private ILock diagnosticRunLock;
private String diagnosticRunListenerId;
@PostConstruct
public void init()
{
diagnosticRunQueue = hazelcast.getQueue(DIAGNOSTICS_RUN_QUEUE_NAME);
diagnosticRunLock = hazelcast.getLock("diagnosticRunLock");
diagnosticRunListenerId = diagnosticRunQueue.addItemListener(new DiagnosticRunListener(), false);
}
@PreDestroy
public void stop()
{
diagnosticRunQueue.removeItemListener(diagnosticRunListenerId);
}
public class DiagnosticRunListener implements ItemListener<DiagnosticRun>
{
@Override
public void itemAdded(ItemEvent<diagnosticRun> item)
{
diagnosticRunLock.lock(5, TimeUnit.SECONDS);
try
{
DiagnosticRun diagnosticRun = diagnosticRunQueue.poll();
if(diagnosticRun != null)
{
productVersioningService.updateProductDeviceTable(diagnosticRun);
}
}
finally
{
diagnosticRunLock.unlock();
}
}
@Override
public void itemRemoved(ItemEvent<diagnosticRun> item)
{
}
}
}
我不确定从该位置和线程在 Queue
上调用 take()
是否线程安全。
如果不允许,我将不得不设置自己的长 运行 循环到 poll()
Queue
。我不确定在 Spring 引导应用程序中设置长 运行 线程的最佳方法是什么。假设上面的方法不起作用,下面的代码是线程安全的吗?或者有更好的方法吗?
@Component
public class DistributedQueueListener
{
public static final String DIAGNOSTIC_RUN_QUEUE_NAME = "diagnosticRun";
@Autowired
private HazelcastInstance hazelcast;
@Autowired
private ProductVersioningService productVersioningService;
private IQueue<diagnosticRun> diagnosticRunQueue;
private ILock diagnosticRunLock;
private ExecutorService executorService;
@PostConstruct
public void init()
{
diagnosticRunQueue = hazelcast.getQueue(DIAGNOSTIC_RUN_QUEUE_NAME);
diagnosticRunLock = hazelcast.getLock("diagnosticRunLock");
executorService = Executors.newFixedThreadPool(1);
executorService.submit(() -> listenToDiagnosticRuns());
}
@PreDestroy
public void stop()
{
executorService.shutdown();
}
private void listenToDiagnosticRuns()
{
while(!executorService.isShutdown())
{
diagnosticRunLock.lock(5, TimeUnit.SECONDS);
try
{
DiagnosticRun diagnosticRun = diagnosticRunQueue.poll(1L, TimeUnit.SECONDS);
productVersioningService.updateProductDeviceTable(diagnosticRun);
}
catch(InterruptedException e)
{
logger.error("Interrupted polling diagnosticRun queue", e);
}
finally
{
diagnosticRunLock.unlock();
}
}
}
}
首先,我要证明我不是执行这些线程的专家,所以有些人可能不同意,但这是我对此的看法,所以任何人都请插话,因为这看起来是有趣的案例。您的第一个解决方案将 Hazelcast 事件线程与其操作线程混合在一起。事实上,由于单个事件,您触发了三个要调用的操作。如果您在对 updateProcductDeviceTable 的调用中设置了一些任意延迟,您最终会发现它会变慢,但会在一段时间后再次恢复。这将导致您的本地事件队列在调用操作时堆积起来。您可以将您正在做的所有事情放在一个单独的线程中,您可以 "wake" 将其放在 #itemAdded 上,或者如果您可以承受一些延迟,请在第二个解决方案中执行您正在做的事情。但是,我会在
中进行一些更改
listenToDiagnosticsRuns() 方法:
private void listenToDiagnosticRuns()
{
while(!executorService.isShutdown())
{
if(diagnosticRunQueue.peek() != null)
{
diagnosticRunLock.lock(5, TimeUnit.SECONDS);
try
{
DiagnosticRun diagnosticRun = diagnosticRunQueue.poll(1L, TimeUnit.SECONDS);
if(diagnosticRun != null)
{
productVersioningService.updateProductDeviceTable(diagnosticRun);
}
}
catch(InterruptedException e)
{
logger.error("Interrupted polling diagnosticRun queue", e);
}
finally
{
diagnosticRunLock.unlock();
}
} // peek != null
else
{
try
{
Thread.sleep(5000);
}
catch (InterruptedException e)
{
//do nothing
}
}
}
}
我有一个用例,其中有一组项目 DiagnosticRun
s,已提交到我的集群。我想连续处理它们(以避免冲突)。我正在尝试使用受 Lock
保护的 Hazelcast Queue
来确保一次处理一个项目。 Hazelcast 在我的集群中处于嵌入式模式 运行。如果我用 Queue
注册一个 ItemListener
,从 itemAdded()
方法中调用 Queue
上的 take()
是否安全?例如:
@Component
public class DistributedQueueListener
{
public static final String DIAGNOSTICS_RUN_QUEUE_NAME = "diagnosticRun";
@Autowired
private HazelcastInstance hazelcast;
@Autowired
private ProductVersioningService productVersioningService;
private IQueue<DiagnosticRun> diagnosticRunQueue;
private ILock diagnosticRunLock;
private String diagnosticRunListenerId;
@PostConstruct
public void init()
{
diagnosticRunQueue = hazelcast.getQueue(DIAGNOSTICS_RUN_QUEUE_NAME);
diagnosticRunLock = hazelcast.getLock("diagnosticRunLock");
diagnosticRunListenerId = diagnosticRunQueue.addItemListener(new DiagnosticRunListener(), false);
}
@PreDestroy
public void stop()
{
diagnosticRunQueue.removeItemListener(diagnosticRunListenerId);
}
public class DiagnosticRunListener implements ItemListener<DiagnosticRun>
{
@Override
public void itemAdded(ItemEvent<diagnosticRun> item)
{
diagnosticRunLock.lock(5, TimeUnit.SECONDS);
try
{
DiagnosticRun diagnosticRun = diagnosticRunQueue.poll();
if(diagnosticRun != null)
{
productVersioningService.updateProductDeviceTable(diagnosticRun);
}
}
finally
{
diagnosticRunLock.unlock();
}
}
@Override
public void itemRemoved(ItemEvent<diagnosticRun> item)
{
}
}
}
我不确定从该位置和线程在 Queue
上调用 take()
是否线程安全。
如果不允许,我将不得不设置自己的长 运行 循环到 poll()
Queue
。我不确定在 Spring 引导应用程序中设置长 运行 线程的最佳方法是什么。假设上面的方法不起作用,下面的代码是线程安全的吗?或者有更好的方法吗?
@Component
public class DistributedQueueListener
{
public static final String DIAGNOSTIC_RUN_QUEUE_NAME = "diagnosticRun";
@Autowired
private HazelcastInstance hazelcast;
@Autowired
private ProductVersioningService productVersioningService;
private IQueue<diagnosticRun> diagnosticRunQueue;
private ILock diagnosticRunLock;
private ExecutorService executorService;
@PostConstruct
public void init()
{
diagnosticRunQueue = hazelcast.getQueue(DIAGNOSTIC_RUN_QUEUE_NAME);
diagnosticRunLock = hazelcast.getLock("diagnosticRunLock");
executorService = Executors.newFixedThreadPool(1);
executorService.submit(() -> listenToDiagnosticRuns());
}
@PreDestroy
public void stop()
{
executorService.shutdown();
}
private void listenToDiagnosticRuns()
{
while(!executorService.isShutdown())
{
diagnosticRunLock.lock(5, TimeUnit.SECONDS);
try
{
DiagnosticRun diagnosticRun = diagnosticRunQueue.poll(1L, TimeUnit.SECONDS);
productVersioningService.updateProductDeviceTable(diagnosticRun);
}
catch(InterruptedException e)
{
logger.error("Interrupted polling diagnosticRun queue", e);
}
finally
{
diagnosticRunLock.unlock();
}
}
}
}
首先,我要证明我不是执行这些线程的专家,所以有些人可能不同意,但这是我对此的看法,所以任何人都请插话,因为这看起来是有趣的案例。您的第一个解决方案将 Hazelcast 事件线程与其操作线程混合在一起。事实上,由于单个事件,您触发了三个要调用的操作。如果您在对 updateProcductDeviceTable 的调用中设置了一些任意延迟,您最终会发现它会变慢,但会在一段时间后再次恢复。这将导致您的本地事件队列在调用操作时堆积起来。您可以将您正在做的所有事情放在一个单独的线程中,您可以 "wake" 将其放在 #itemAdded 上,或者如果您可以承受一些延迟,请在第二个解决方案中执行您正在做的事情。但是,我会在
中进行一些更改listenToDiagnosticsRuns() 方法:
private void listenToDiagnosticRuns()
{
while(!executorService.isShutdown())
{
if(diagnosticRunQueue.peek() != null)
{
diagnosticRunLock.lock(5, TimeUnit.SECONDS);
try
{
DiagnosticRun diagnosticRun = diagnosticRunQueue.poll(1L, TimeUnit.SECONDS);
if(diagnosticRun != null)
{
productVersioningService.updateProductDeviceTable(diagnosticRun);
}
}
catch(InterruptedException e)
{
logger.error("Interrupted polling diagnosticRun queue", e);
}
finally
{
diagnosticRunLock.unlock();
}
} // peek != null
else
{
try
{
Thread.sleep(5000);
}
catch (InterruptedException e)
{
//do nothing
}
}
}
}