Skip to main content

漫谈 ZooKeeper 实践

01 Why We Need It & Background

In Revenue Monitoring Service, there are 9 running pod instances, each performing an SQL query every 6 hours. However, I don't want all instances to query at the same time as this would waste resources and potentially overload the database. I need to ensure that only one instance executes the query during each 6-hour cadence.

我们不希望所有的实例都同时执行查询,这会浪费资源并可能导致数据库负载过大。我希望确保在每个6小时的时间间隔内,只有一个实例能够执行查询任务。因此,我引入了ZooKeeper来实现分布式锁,确保每次查询只有一个实例执行。

02 Design & Implementation

To solve the issue, I designed a ZooKeeper-based distributed lock component. 每次执行查询时,我会尝试在 ZooKeeper 中创建一个节点,节点名为查询. Each time a query is to be executed, I attempt to create a node in ZooKeeper, with the query string as the node's name.

2.1 Lock Acquisition

checkIfLocked

  • checks if the lock node exists in ZooKeeper.
  • If the node exists and its value is 1, it means the lock is already held.
  • If the node does not exist, the method will create a new node and set its value to 0, indicating that the lock is not held. The node is created using CreateMode.PERSISTENT and ACL.OPEN_ACL_UNSAFE.
public Pair<Boolean, Stat> checkIfLocked(ZooKeeper zk, String path) throws KeeperException, InterruptedException {
Stat stat = zk.exists(path, false);

if (stat == null) {
// 节点不存在,创建持久节点并设置value = 0
zk.create(path, "0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
return new Pair<>(false, null);
} else {
byte[] data = zk.getData(path, false, stat);
boolean isLocked = new String(data).equals("1");
return new Pair<>(isLocked, stat); // 返回节点锁状态和版本
}
}

tryLock

  • introduces a random delay to prevent multiple pods from competing for the lock at the same time. 随机延迟 防止锁的竞争。
  • It then calls the checkIfLocked method to check if the node is already locked.
  • If it is not locked, the method returns <first=isLocked, second=version> and then calls the getQueryLock method to try and acquire the lock.
public void tryLock(ZooKeeper zk, String path) throws KeeperException, InterruptedException {
// 引入随机延迟,防止锁竞争
Thread.sleep((long)(Math.random() * 1000));

Pair<Boolean, Stat> lockStatus = isLock(zk, path);

if (!lockStatus.getFirst()) {
getLock(zk, path, lockStatus.getSecond().getVersion());
}
}

getLock

  • checks if the lock node exists in ZooKeeper and verifies that the node’s version matches the version passed in. 确认节点的版本 version 是否和传入的版本相同。
  • If the version doesn't not match, it means another instance has modified the node, and the lock acquisition fails, throwing an exception.
  • If the version matches, the method will set the node’s value to 1, indicating that the lock is now held by the current pod.
public void getLock(ZooKeeper zk, String path, int version) throws KeeperException, InterruptedException {
Stat stat = zk.exists(path, false);

if (stat == null || stat.getVersion() != version) {
throw new KeeperException.BadVersionException(path);
}

// 设置锁,value = 1 表示获取锁
zk.setData(path, "1".getBytes(), version);
System.out.println("Lock acquired successfully.");
}

2.2 Lock Release

timeNodePath = "/<query>-last-execution-time"
lockNodePath = "/<query>-lock"

setLastExecutionTime

  • Update the another node's value to the last execution time.
public void setLastExecutionTime(ZooKeeper zk, String timeNodePath, String lastExecutionTime) throws KeeperException, InterruptedException {
// 将执行时间转换为 byte[] 类型
byte[] lastExecutionTimeBytes = lastExecutionTime.getBytes();

// 检查存储执行时间的节点是否存在
Stat stat = zk.exists(timeNodePath, false);
if (stat == null) {
// 如果节点不存在,创建持久节点并保存时间数据
zk.create(timeNodePath, lastExecutionTimeBytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} else {
// 如果节点存在,更新执行时间数据
zk.setData(timeNodePath, lastExecutionTimeBytes, stat.getVersion());
}

System.out.println("Last execution time set to: " + lastExecutionTime);
}

releaseLock

  • releases the lock by setting the ZooKeeper lock node's value to 0
public void releaseLock(ZooKeeper zk, String lockNodePath) throws KeeperException, InterruptedException {
// 检查锁节点是否存在
Stat stat = zk.exists(lockNodePath, false);
if (stat != null) {
// 将锁节点的值设置为 0,表示锁已释放
zk.setData(lockNodePath, "0".getBytes(), stat.getVersion());
System.out.println("Lock released successfully.");
} else {
System.out.println("Lock node does not exist, no need to release.");
}
}

03 Future Improvements