如何在Spring 批量集成中使用文件锁?
How to use File lock in Spring batch-integration?
在我的 spring-batch-integration 应用程序中,文件轮询为每个文件调用批处理作业,这个应用程序可以 运行 在多个服务器(节点)上,但它们都应该读取一个公共的directory.Now,我写了一个自定义储物柜,它可以锁定文件,这样任何其他实例都无法处理同一个文件。代码如下
public class MyFileLocker extends AbstractFileLockerFilter{
private final ConcurrentMap<File, FileLock> lockCache = new ConcurrentHashMap<File, FileLock>();
private final ConcurrentMap<File, FileChannel> ChannelCache = new ConcurrentHashMap<File, FileChannel>();
@Override
public boolean lock(File fileToLock) {
FileChannel channel;
FileLock lock;
try {
channel = new RandomAccessFile(fileToLock, "rw").getChannel();
lock = channel.tryLock();
if (lock == null || !lock.isValid()) {
System.out.println(" Problem in acquiring lock!!" + fileToLock);
return false;
}
lockCache.put(fileToLock, lock);
ChannelCache.put(fileToLock, channel);
} catch (FileNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return true;
}
@Override
public boolean isLockable(File file) {
return file.canWrite();
}
@Override
public void unlock(File fileToUnlock) {
FileLock lock = lockCache.get(fileToUnlock);
try {
if(lock!=null){
lock.release();
ChannelCache.get(fileToUnlock).close();
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
现在,当我调用 Spring 批处理并尝试使用 flatfileitemreader 读取该文件时,它给了我
org.springframework.batch.item.file.NonTransientFlatFileException
我相信这是因为文件已锁定。我做了一些谷歌搜索,发现 NIOLocker 以一种即使当前线程也无法读取它的方式锁定文件。我找到了一个 link ,它显示了如何读取锁定的文件,但他们正在使用缓冲区。
如何让我的 FlatfileItemReader 可以访问我的文件。
求推荐。
是的,您确实只能通过 ByteBuffer
:
访问锁定的文件内容
FileChannel fileChannel = channelCache.get(lockedFile);
ByteBuffer byteBuffer = ByteBuffer.allocate((int) fileChannel.size());
fileChannel.read(byteBuffer);
System.out.println("Read File " + lockedFile.getName() + " with content: " + new String(byteBuffer.array()));
哦!是的。你真的指向了我的 repo :-).
因此,对于 locker
,您别无选择,除非 copy/paste 文件 byte[]
在 FlatfileItemReader
之前,或者只是注入一些自定义 BufferedReaderFactory
进入相同的 FlatfileItemReader
,将锁定的文件转换为适当的 BufferedReader
:
new BufferedReader(new CharArrayReader(byteBuffer.asCharBuffer().array()));
根据您分享的link,您似乎可以尝试以下方法。
//This is from your link (except the size variable)
FileChannel fileChannel = channelCache.get(lockedFile);
int size = (int) fileChannel.size();
ByteBuffer byteBuffer = ByteBuffer.allocate(size);
fileChannel.read(byteBuffer);
//Additional code that you could try
byte[] bArray = new byte[size];
//Write data to the byte array
byteBuffer.get(bArray);
FlatFileItemReader flatFileItemReader = new FlatFileItemReader();
flatFileItemReader.setResource(new ByteArrayResource(bArray));
//Next you can try reading from your flatFileItemReader as usual
...
如果它不能解决您的问题,请告诉我。
解决方案:我正在创建一个包含锁定文件内容的临时文件并对其进行处理。处理完成后,我将该文件存档并删除锁定的和临时的两个文件。这里的关键是创建一个具有锁定文件内容的新文件。下面的代码如下:
File tmpFile = new File(inputFile.getAbsolutePath() + ".lck");
FileChannel fileChannel = MyFileLocker.getChannelCache().get(new File(inputFile.getAbsolutePath()));
InputStream inputStream = Channels.newInputStream(fileChannel);
ByteStreams.copy(inputStream, Files.newOutputStreamSupplier(tmpFile));
这里的inputFile是我锁定的文件,tmp文件是锁定文件内容的新文件。我还在储物柜 class 中创建了一个方法来解锁和删除
public void unlockAndDelete(File fileToUnlockandDelete) {
FileLock lock = lockCache.get(fileToUnlockandDelete);
String fileName = fileToUnlockandDelete.getName();
try {
if(lock!=null){
lock.release();
channelCache.get(fileToUnlockandDelete).close();
//remove from cache
lockCache.remove(fileToUnlockandDelete);
channelCache.remove(fileToUnlockandDelete);
boolean isFiledeleted = fileToUnlockandDelete.delete();
if(isFiledeleted){
System.out.println("File deleted successfully" + fileName);
}else{
System.out.println("File is not deleted."+fileName);
}
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
在我的 spring-batch-integration 应用程序中,文件轮询为每个文件调用批处理作业,这个应用程序可以 运行 在多个服务器(节点)上,但它们都应该读取一个公共的directory.Now,我写了一个自定义储物柜,它可以锁定文件,这样任何其他实例都无法处理同一个文件。代码如下
public class MyFileLocker extends AbstractFileLockerFilter{
private final ConcurrentMap<File, FileLock> lockCache = new ConcurrentHashMap<File, FileLock>();
private final ConcurrentMap<File, FileChannel> ChannelCache = new ConcurrentHashMap<File, FileChannel>();
@Override
public boolean lock(File fileToLock) {
FileChannel channel;
FileLock lock;
try {
channel = new RandomAccessFile(fileToLock, "rw").getChannel();
lock = channel.tryLock();
if (lock == null || !lock.isValid()) {
System.out.println(" Problem in acquiring lock!!" + fileToLock);
return false;
}
lockCache.put(fileToLock, lock);
ChannelCache.put(fileToLock, channel);
} catch (FileNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return true;
}
@Override
public boolean isLockable(File file) {
return file.canWrite();
}
@Override
public void unlock(File fileToUnlock) {
FileLock lock = lockCache.get(fileToUnlock);
try {
if(lock!=null){
lock.release();
ChannelCache.get(fileToUnlock).close();
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
现在,当我调用 Spring 批处理并尝试使用 flatfileitemreader 读取该文件时,它给了我
org.springframework.batch.item.file.NonTransientFlatFileException
我相信这是因为文件已锁定。我做了一些谷歌搜索,发现 NIOLocker 以一种即使当前线程也无法读取它的方式锁定文件。我找到了一个 link ,它显示了如何读取锁定的文件,但他们正在使用缓冲区。 如何让我的 FlatfileItemReader 可以访问我的文件。
求推荐。
是的,您确实只能通过 ByteBuffer
:
FileChannel fileChannel = channelCache.get(lockedFile);
ByteBuffer byteBuffer = ByteBuffer.allocate((int) fileChannel.size());
fileChannel.read(byteBuffer);
System.out.println("Read File " + lockedFile.getName() + " with content: " + new String(byteBuffer.array()));
哦!是的。你真的指向了我的 repo :-).
因此,对于 locker
,您别无选择,除非 copy/paste 文件 byte[]
在 FlatfileItemReader
之前,或者只是注入一些自定义 BufferedReaderFactory
进入相同的 FlatfileItemReader
,将锁定的文件转换为适当的 BufferedReader
:
new BufferedReader(new CharArrayReader(byteBuffer.asCharBuffer().array()));
根据您分享的link,您似乎可以尝试以下方法。
//This is from your link (except the size variable)
FileChannel fileChannel = channelCache.get(lockedFile);
int size = (int) fileChannel.size();
ByteBuffer byteBuffer = ByteBuffer.allocate(size);
fileChannel.read(byteBuffer);
//Additional code that you could try
byte[] bArray = new byte[size];
//Write data to the byte array
byteBuffer.get(bArray);
FlatFileItemReader flatFileItemReader = new FlatFileItemReader();
flatFileItemReader.setResource(new ByteArrayResource(bArray));
//Next you can try reading from your flatFileItemReader as usual
...
如果它不能解决您的问题,请告诉我。
解决方案:我正在创建一个包含锁定文件内容的临时文件并对其进行处理。处理完成后,我将该文件存档并删除锁定的和临时的两个文件。这里的关键是创建一个具有锁定文件内容的新文件。下面的代码如下:
File tmpFile = new File(inputFile.getAbsolutePath() + ".lck");
FileChannel fileChannel = MyFileLocker.getChannelCache().get(new File(inputFile.getAbsolutePath()));
InputStream inputStream = Channels.newInputStream(fileChannel);
ByteStreams.copy(inputStream, Files.newOutputStreamSupplier(tmpFile));
这里的inputFile是我锁定的文件,tmp文件是锁定文件内容的新文件。我还在储物柜 class 中创建了一个方法来解锁和删除
public void unlockAndDelete(File fileToUnlockandDelete) {
FileLock lock = lockCache.get(fileToUnlockandDelete);
String fileName = fileToUnlockandDelete.getName();
try {
if(lock!=null){
lock.release();
channelCache.get(fileToUnlockandDelete).close();
//remove from cache
lockCache.remove(fileToUnlockandDelete);
channelCache.remove(fileToUnlockandDelete);
boolean isFiledeleted = fileToUnlockandDelete.delete();
if(isFiledeleted){
System.out.println("File deleted successfully" + fileName);
}else{
System.out.println("File is not deleted."+fileName);
}
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}