Zookeeper 获取多个资源的锁

Zookeeper to get lock of Multiple resources

我的场景如下: 我有 5 linux 台机器,我在 HDFS 中有 10 个(可能更多)文件。我的要求是一台机器应该锁定其中一个文件并处理它,而另一台机器不应处理该文件但锁定另一个文件并处理这些文件。 例如:machine1 - 锁定 file2 并处理它 machine2 - 锁定 file3 并处理它 machine3 - 锁定 file1 并处理它

我写了一个虚拟的多线程java程序来模拟它。但是它不起作用:

public class DistributedLock {

    private final ZooKeeper zk;
    private final String lockBasePath;
    private String lockPath;

    public DistributedLock(ZooKeeper zk, String lockBasePath) {
        this.zk = zk;
        this.lockBasePath = lockBasePath;
    }

    public boolean lock(String lockName) throws IOException {
        try {
            boolean locked = false;
            if(zk.exists(lockBasePath + "/" + lockName, false) == null){
            lockPath = zk.create(lockBasePath + "/" + lockName, null,
                    ZooDefs.Ids.OPEN_ACL_UNSAFE,
                    CreateMode.EPHEMERAL_SEQUENTIAL);
             if(lockPath != null ){
                 locked =true;
             }
            }
            final Object lock = new Object();

            return locked;
        } catch (KeeperException e) {
            throw new IOException(e);
        } catch (InterruptedException e) {
            throw new IOException(e);
        }
    }

    public void unlock() throws IOException {
        try {
            zk.delete(lockPath, -1);
            lockPath = null;
        } catch (KeeperException e) {
            throw new IOException(e);
        } catch (InterruptedException e) {
            throw new IOException(e);
        }
    }

}

public class DistributedLockTest {

    public static void main(String[] args) throws Exception {
        new DistributedLockTest().run();
    }

    public void run() throws Exception {
        Thread t1 = new Thread(new Process(1));
        Thread t2 = new Thread(new Process(2));
        Thread t3 = new Thread(new Process(3));
        Thread t4 = new Thread(new Process(4));

        t1.start();
        t2.start();
        t3.start();
        t4.start();
    }

    class Process implements Runnable {

        int id;
        List<String> fileNames = new ArrayList<String>();

        public Process(int id) {
            this.id = id;
            for (int i = 1; i < 11; i++) {
                fileNames.add("file" + i);
            }
        }

        // @Override
        public void run() {
            try {
                System.out.println("machine " + id + " started");
                String resource = "resource";
                String path = "/LockDir";
                ZooKeeper zooKeeper = new ZooKeeper("127.0.0.1", 2181, null);
                if (zooKeeper.exists(path, false) == null) {
                    zooKeeper.create(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE,
                            CreateMode.PERSISTENT);
                }
                DistributedLock lock = new DistributedLock(zooKeeper, path);
                String lockedFile;
                for (String fileName : fileNames) {
                    System.out.println("machine " + id + " Acquiring Lock on "+ fileName);
                    boolean locked = lock.lock(fileName);
                    if(locked){
                     System.out.println("machine " + id + "got Lock on "+ fileName);
                     lockedFile = fileName;
                    }
                    else continue;
                    Thread.sleep(500);
                }
                System.out.println("machine " + id + " Releasing Lock");
                lock.unlock();
                System.out.println("machine " + id + " Released Lock");

            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

}

我得到的输出是:

machine 1 started
machine 2 started
machine 3 started
machine 4 started
log4j:WARN No appenders could be found for logger (org.apache.zookeeper.ZooKeeper).
log4j:WARN Please initialize the log4j system properly.
machine 2 Acquiring Lock on file1
machine 1 Acquiring Lock on file1
machine 4 Acquiring Lock on file1
machine 3 Acquiring Lock on file1
machine 1got Lock on file1
machine 3got Lock on file1
machine 2got Lock on file1
machine 4got Lock on file1
machine 1 Acquiring Lock on file2
machine 3 Acquiring Lock on file2
machine 4 Acquiring Lock on file2
machine 2 Acquiring Lock on file2
machine 1got Lock on file2
machine 3got Lock on file2
machine 2got Lock on file2
machine 4got Lock on file2
machine 3 Acquiring Lock on file3
machine 1 Acquiring Lock on file3
machine 2 Acquiring Lock on file3
machine 4 Acquiring Lock on file3
machine 1got Lock on file3
machine 4got Lock on file3
machine 3got Lock on file3
machine 2got Lock on file3
machine 2 Acquiring Lock on file4
machine 4 Acquiring Lock on file4
machine 3 Acquiring Lock on file4
machine 1 Acquiring Lock on file4
machine 4got Lock on file4
machine 2got Lock on file4
machine 3got Lock on file4
machine 1got Lock on file4
machine 4 Acquiring Lock on file5
machine 3 Acquiring Lock on file5
machine 2 Acquiring Lock on file5
machine 1 Acquiring Lock on file5
machine 3got Lock on file5
machine 2got Lock on file5
machine 4got Lock on file5
machine 1got Lock on file5
machine 2 Acquiring Lock on file6
machine 4 Acquiring Lock on file6
machine 3 Acquiring Lock on file6
machine 1 Acquiring Lock on file6
machine 2got Lock on file6
machine 1got Lock on file6
machine 4got Lock on file6
machine 3got Lock on file6
machine 2 Acquiring Lock on file7
machine 4 Acquiring Lock on file7
machine 1 Acquiring Lock on file7
machine 3 Acquiring Lock on file7
machine 4got Lock on file7
machine 2got Lock on file7
machine 1got Lock on file7
machine 3got Lock on file7
machine 4 Acquiring Lock on file8
machine 3 Acquiring Lock on file8
machine 1 Acquiring Lock on file8
machine 2 Acquiring Lock on file8
machine 1got Lock on file8
machine 4got Lock on file8
machine 3got Lock on file8
machine 2got Lock on file8
machine 2 Acquiring Lock on file9
machine 4 Acquiring Lock on file9
machine 3 Acquiring Lock on file9
machine 1 Acquiring Lock on file9
machine 4got Lock on file9
machine 3got Lock on file9
machine 1got Lock on file9
machine 2got Lock on file9
machine 4 Acquiring Lock on file10
machine 3 Acquiring Lock on file10
machine 1 Acquiring Lock on file10
machine 2 Acquiring Lock on file10
machine 2got Lock on file10
machine 4got Lock on file10
machine 1got Lock on file10
machine 3got Lock on file10
machine 4 Releasing Lock
machine 1 Releasing Lock
machine 2 Releasing Lock
machine 3 Releasing Lock
machine 2 Released Lock
machine 1 Released Lock
machine 4 Released Lock
machine 3 Released Lock

这表明每个 thread/machine 都在尝试锁定每个文件并获取它。但我想要的是,如果一台机器没有锁定特定机器,它应该尝试锁定另一个文件并处理它。 对此有何建议?

我在你的代码中发现了两个错误,第一个是你使用 CreateMode.EPHEMERAL_SEQUENTIAL 作为你的锁节点。当您可能想使用 CreateMode.EPHEMERAL 时。 Sequential 主要用于队列而不是锁,它会创建名称类似于 file10000000000123 file10000000000124 等的节点。因此,您永远不会创建用于检查锁是否可用的节点拍了。

如果您修复该问题,您很可能会在线程之间出现竞争条件,因为它们首先检查节点是否存在然后创建它。使多个线程可以尝试创建相同的节点,因此我的解决方案如下所示:

public class DistributedLock {  
  public static final String _LOCK = "lock";
...
  public boolean lock(String lockName) throws IOException {
    try {
        boolean locked = false;
        synchronized(_LOCK){
          if(zk.exists(lockBasePath + "/" + lockName, false) == null){
            lockPath = zk.create(lockBasePath + "/" + lockName, null,
                    ZooDefs.Ids.OPEN_ACL_UNSAFE,
                    CreateMode.EPHEMERAL);
            if(lockPath != null ){
                locked =true;
            }
          }
        }
        final Object lock = new Object();

        return locked;
...
  }
...

输出如下所示,这就是我假设您想要的:

machine 1 Acquiring Lock on file1
machine 4 Acquiring Lock on file1
machine 2 Acquiring Lock on file1
machine 3 Acquiring Lock on file1
machine 1 got Lock on file1
machine 3 Acquiring Lock on file2
machine 3 got Lock on file2
machine 2 Acquiring Lock on file2
machine 4 Acquiring Lock on file2
machine 2 Acquiring Lock on file3
machine 2 got Lock on file3
...

PS:作为旁注,我建议您使用 Apache Curator 而不是为 Zookeeper 编写自己的外观,这要容易得多,而且它们已经涵盖了大部分边缘情况。