更新时间:2023-05-16 来源:黑马程序员 浏览量:

  笔者通过以下Python代码,演示一下基于Python Redis客户端库实现的分布式锁:
import redis
import time
class RedisLock:
    def __init__(self, redis_client, key, expire=10):
        self.redis_client = redis_client
        self.key = key
        self.expire = expire
        self.value = None
    def __enter__(self):
        while True:
            # 尝试获取锁
            timestamp = time.time()
            self.value = str(timestamp)
            result = self.redis_client.set(self.key, self.value, ex=self.expire, nx=True)
            if result:
                return True
            else:
                # 未获取到锁,等待一段时间后重试
                time.sleep(0.1)
    def __exit__(self, exc_type, exc_val, exc_tb):
        # 释放锁
        script = """
            if redis.call("get", KEYS[1]) == ARGV[1] then
                return redis.call("del", KEYS[1])
            else
                return 0
            end
        """
        self.redis_client.eval(script, 1, self.key, self.value)
if __name__ == '__main__':
    redis_client = redis.Redis(host='localhost', port=6379, db=0)
    with RedisLock(redis_client, 'mylock'):
        print('Get lock')
        time.sleep(5)
    print('Release lock')  在以上示例代码中,我们首先定义了一个RedisLock类,该类包含了获取锁和释放锁的逻辑,同时在__enter__方法中实现了自旋锁(spin lock)的逻辑,如果在一定的时间内未获取到锁,则会进行一次重试,这样可以减少对Redis的请求次数。在__exit__方法中实现了释放锁的逻辑,使用Redis Lua脚本来保证原子性。

同时,我们使用Redis默认的0号数据库,并在本地运行的Redis服务器上测试了代码。我们首先获取锁并打印Get lock信息,然后等待5秒钟,最后释放锁并打印Release lock信息。
注意:在实际应用中,我们需要在获取到锁后执行一些临界区代码,并在临界区代码执行完毕后释放锁。同时,需要考虑到锁的超时问题,防止因为某个线程崩溃导致锁一直被占用。
1024首播|39岁程序员逆袭记:不被年龄定义,AI浪潮里再迎春天
2025-10-241024程序员节丨10年同行,致敬用代码改变世界的你
2025-10-24【AI设计】北京143期毕业仅36天,全员拿下高薪offer!黑马AI设计连续6期100%高薪就业
2025-09-19【跨境电商运营】深圳跨境电商运营毕业22个工作日,就业率91%+,最高薪资达13500元
2025-09-19【AI运维】郑州运维1期就业班,毕业14个工作日,班级93%同学已拿到Offer, 一线均薪资 1W+
2025-09-19【AI鸿蒙开发】上海校区AI鸿蒙开发4期5期,距离毕业21天,就业率91%,平均薪资14046元
2025-09-19