使用 IgniteDataStreamer 的正确方法 API
correct way of using IgniteDataStreamer API
我有一个多线程应用程序,它使用 write() 方法不断写入以启动缓存。在启动时,它调用 init() 方法来创建缓存和流对象。一旦所有线程都完成,将调用 flush() 关闭 streamer 并写入任何未能插入 write() 方法的密钥。我有一些疑问。
在 write() 方法中,我应该如何处理 IgniteFuture?我应该等到它完成吗?
当条目写入缓存或流媒体内部缓冲区时,这个未来是否完成?
在 flush() 方法中,我在关闭 streamer 对象之前写入在 write() 方法中失败的所有条目。这是正确的方法吗?
代码如下:
public void initialize(final String cacheName) {
getOrCreateCache(cacheName, true); // create new cache or get existing if it already exists
this.streamer = ignite.dataStreamer(cacheName);
this.cacheName = cacheName;
}
public void write(K key, V value) {
try {
IgniteFuture<?> future = streamer.addData(key, value); // what to do with this future. Should i wait on that?
numberOfEntriesWrittentIntoCache.incrementAndGet();
} catch (IgniteInterruptedException | IgniteDataStreamerTimeoutException | CacheException | IllegalStateException e) {
failedMap.put(key, value);
}
}
public void flush() {
try {
if (streamer != null) {
if (failedMap.size() > 0) {
LOGGER.info("Writing " + failedMap.size() + " failed entries");
failedMap.forEach((k, v) -> writeToGrid(k, v));
}
}
} catch (IllegalStateException | CacheException | IgniteException e) {
LOGGER.error("Exception while writing/closing ignite");
} catch (Exception e) {
LOGGER.error("Exception while writing/closing ignite");
} finally {
failedMap.clear();
if (streamer != null) {
streamer.close();
streamer = null;
}
LOGGER.info("Number of entries written into cache are " + numberOfEntriesWrittentIntoCache.intValue());
}
}
数据流向其他节点批量发送数据。未来,从 addData
方法返回完成,当具有所提供条目的批次被刷新到缓存中时。
所以,如果你等待每个未来的完成,从 addData(...)
方法返回,那么你可能永远不会等到它完成,如果 autoFlushFrequency 没有配置并且 flush()
或者close()
没有被调用。即使配置了 autoFlushFrequency
,每次调用 write()
方法都会等到批次被刷新。
我没有发现在所有处理结束时再次尝试将失败的条目写入数据流送器有什么不好。但是我真的不知道什么时候有用。
我唯一要更改的是将 writeToGrid(k, v)
包装在 forEach
内到它自己的 try-catch 块中。否则一个异常将停止处理所有失败的条目。
我有一个多线程应用程序,它使用 write() 方法不断写入以启动缓存。在启动时,它调用 init() 方法来创建缓存和流对象。一旦所有线程都完成,将调用 flush() 关闭 streamer 并写入任何未能插入 write() 方法的密钥。我有一些疑问。
在 write() 方法中,我应该如何处理 IgniteFuture?我应该等到它完成吗?
当条目写入缓存或流媒体内部缓冲区时,这个未来是否完成?
在 flush() 方法中,我在关闭 streamer 对象之前写入在 write() 方法中失败的所有条目。这是正确的方法吗? 代码如下:
public void initialize(final String cacheName) { getOrCreateCache(cacheName, true); // create new cache or get existing if it already exists this.streamer = ignite.dataStreamer(cacheName); this.cacheName = cacheName; } public void write(K key, V value) { try { IgniteFuture<?> future = streamer.addData(key, value); // what to do with this future. Should i wait on that? numberOfEntriesWrittentIntoCache.incrementAndGet(); } catch (IgniteInterruptedException | IgniteDataStreamerTimeoutException | CacheException | IllegalStateException e) { failedMap.put(key, value); } } public void flush() { try { if (streamer != null) { if (failedMap.size() > 0) { LOGGER.info("Writing " + failedMap.size() + " failed entries"); failedMap.forEach((k, v) -> writeToGrid(k, v)); } } } catch (IllegalStateException | CacheException | IgniteException e) { LOGGER.error("Exception while writing/closing ignite"); } catch (Exception e) { LOGGER.error("Exception while writing/closing ignite"); } finally { failedMap.clear(); if (streamer != null) { streamer.close(); streamer = null; } LOGGER.info("Number of entries written into cache are " + numberOfEntriesWrittentIntoCache.intValue()); }
}
数据流向其他节点批量发送数据。未来,从 addData
方法返回完成,当具有所提供条目的批次被刷新到缓存中时。
所以,如果你等待每个未来的完成,从 addData(...)
方法返回,那么你可能永远不会等到它完成,如果 autoFlushFrequency 没有配置并且 flush()
或者close()
没有被调用。即使配置了 autoFlushFrequency
,每次调用 write()
方法都会等到批次被刷新。
我没有发现在所有处理结束时再次尝试将失败的条目写入数据流送器有什么不好。但是我真的不知道什么时候有用。
我唯一要更改的是将 writeToGrid(k, v)
包装在 forEach
内到它自己的 try-catch 块中。否则一个异常将停止处理所有失败的条目。