189 8069 5689

40线程2_Event_Lock

 

创新互联专业为企业提供房县网站建设、房县做网站、房县网站设计、房县网站制作等企业网站建设、网页设计与制作、房县企业网站模板建站服务,十多年房县做网站经验,不只是建网站,更提供有价值的思路和整体网络服务。

 

 

线程同步:

线程间协同,通过某种技术,让一个线程访问某些数据时,其它线程不能访问这些数据,直到该线程完成对数据的操作;

 

结果要能预知;

 

critical section,临界区;

mutex,互斥量,threading.Lock类;   #即锁,你用我不用,我用你不用

semaphore,信号量;

event,事件,threading.Event类;

barrier,屏障、栅栏,threading.Barrier;

 

帮助:

Python 3.5.3 documentation-->Library Reference-->Concurrent Execution-->threading

 

 

 

目录

threading.Event类:...1

theading.Lock类:...4

阻塞锁:...5

非阻塞锁:...10

 

 

 

threading.Event类:

event,事件,是线程间通信机制中最简单的实现,使用一个内部的flag标记,通过flag的True或False的变化来进行操作;

 

e.set(),标记设置为True;

e.clear(),标记设置为False;

e.is_set(),标记是否为True;

e.wait(timeout=None),Block until the internal flag is true,设置等待标记为True的时长,None为无限等待,等到返回True,未等到超时了返回False;

 

e.wait()的使用:

wait优于sleep,wait会主动让出时间片,其它线程可以被调度,而sleep会占用时间片不让出;

 

例:

def do(interval, e: threading.Event):

    while not e.wait(interval):

        logging.info('do sth')

 

e = threading.Event()

threading.Thread(target=do, args=(3,e)).start()

e.wait(5)   #可用time.sleep(5),e.wait(5)优于time.sleep(5)

e.set()

print('main exit')

输出:

2018-07-30-14:58:08       Thread info: 10268 Thread-1 do sth

main exit

 

例:

老板雇佣了一个工人,让他生产杯子,老板一直等着工人,直到生产了10个杯子;

import threading

import time

import logging

FORMAT = '%(asctime)-15s\tThread info: %(thread)d %(threadName)s %(message)s'

logging.basicConfig(level=logging.INFO, format=FORMAT, datefmt='%Y-%m-%d-%H:%M:%S')

 

cups = []   #当前,如果多个工人做则有问题

event = threading.Event()   #实例,主线程、boss、worker都要看event,注意其作用域,使用同一个Event对象的flag

 

def boss(e: threading.Event):

    logging.info("I'm boss,waiting for U.")

    e.wait()

    logging.info('good job.')

 

def worker(n, e: threading.Event):

    while True:

        time.sleep(0.5)

        cups.append(1)

        logging.info('make 1')

        if len(cups) >= n:

            # logging.info('I finished my job. {}'.format(len(cups)))   #此处打印也可放到下面,老板在主线程中等待,if event.wait():中

            e.set()

            break

         logging.info('I finished my job. {}'.format(len(cups)))

 

b = threading.Thread(target=boss, args=(event,))

w = threading.Thread(target=worker, args=(10, event))

w.start()

b.start()

 

# if event.wait():   #方1,老板在主线程中等待

#     logging.info('I finished my job. {}'.format(len(cups)))

 

# while not event.wait(1):   #方2,1秒看1次,老板在主线程中等待

#     pass

# logging.info('I finished my job. {}'.format(len(cups)))

 

# while not event.is_set():   #方3,老板在主线程中等待

#     event.wait()   #谁wait就是等到flag变为True,或等到超时返回False,不限制等待的个数

# logging.info('I finished my job. {}'.format(len(cups)))

 

输出:

2018-07-30-14:47:12       Thread info: 11032 Thread-1 I'm boss,waiting for U.

2018-07-30-14:47:13       Thread info: 6996 Thread-2 make 1

2018-07-30-14:47:13       Thread info: 6996 Thread-2 make 1

2018-07-30-14:47:14       Thread info: 6996 Thread-2 make 1

2018-07-30-14:47:14       Thread info: 6996 Thread-2 make 1

2018-07-30-14:47:15       Thread info: 6996 Thread-2 make 1

2018-07-30-14:47:15       Thread info: 6996 Thread-2 make 1

2018-07-30-14:47:16       Thread info: 6996 Thread-2 make 1

2018-07-30-14:47:16       Thread info: 6996 Thread-2 make 1

2018-07-30-14:47:17       Thread info: 6996 Thread-2 make 1

2018-07-30-14:47:17       Thread info: 6996 Thread-2 make 1

2018-07-30-14:47:17       Thread info: 6996 Thread-2 I finished my job. 10

2018-07-30-14:47:17       Thread info: 11032 Thread-1 good job.

 

例:

Event练习:

实现Timer,延时执行的线程;

延时计算add(x,y);

思路:

Timer的构造函数中参数得有哪些?

如何实现start启动一个线程执行函数?

如何cancel取消待执行任务?

 

class Timer:

    def __init__(self, interval, fn, *args, **kwargs):

        self.interval = interval

        self.fn = fn

        self.args = args

        self.kwargs = kwargs

        self.event = threading.Event()

 

    def start(self):

        threading.Thread(target=self.__do).start()

 

    def cancel(self):

        self.event.set()

 

    def __do(self):

        self.event.wait(self.interval)   #等待超时后返回False

                   # print(self.event.__dict__)   # {'_flag': False, '_cond': , 0)>}

        print(self.event.is_set())   #False

        if not self.event.is_set():

            self.fn(*self.args, **self.kwargs)

 

def add(x, y):

    logging.info(x+y)

 

t = Timer(3, add, 4, 5)

t.start()

#t.cancel()   #有此句self.is_set()为True

输出:

False

2018-07-31-09:29:45       Thread info: 5156 Thread-1 9

 

 

 

theading.Lock类:

解决多线程中数据同步;

 

凡是存在共享资源争抢的地方都可使用锁,从而保证只有一个使用者可完全使用这个资源;

 

程序=数据+算法,要保证数据的结果是可预知的,若不可预知该程序是没用的,为得到正确的结果,性能是其次已不重要了;

 

一旦某一线程获得锁,其它试图获取的线程将被阻塞;

 

acquire(blocking=True,timeout=-1),默认阻塞,阻塞可设置超时时间;非阻塞时,timeout禁止设置;成功获取锁,返回True,否则返回False;

release(),释放锁,可从任何线程调用释放;该方法执行后,已上锁的锁会被重置为unlocked;在未上锁的锁上调用release(),抛RuntimeError: release unlocked lock;

 

注:

锁将acquire()和release()中间的代码管住;

大多数情况下用的是阻塞锁;

锁,类似ATM机的小房子;

加锁,应在分析业务基础之上,在合适的地方加,不能从前到后加一把大锁,这样效率低下;

 

加锁、解锁:

一般来说,加锁后还要有一些代码实现,在释放锁之前还有可能抛异常,一旦出现异常,锁无法释放,但当前线程可能因为这个异常终止了,这就产生了死锁;

一般谁(某个线程)加的锁谁解锁;

常用语句:

try...finally,保证锁的释放;

with,Lock类中有__enter__()和__exit__(),锁对象支持上下文管理;

 

锁的应用场景:

锁,适用于访问和修改同一个共享资源的时候,即读写同一个资源的时候;

如果全部都是读取同一个共享资源需要锁吗?不需要,因为这时可认为共享资源是不可变的,每一次读取它都是一样的值,所以不用加锁;

不使用锁,虽有了效率,但结果是错的;

使用了锁,虽效率低下,但结果是对的,所以为了对的结果,让计算机计算去吧!

 

使用锁的注意事项:

少用锁,必要时加锁,使用了锁,多线程访问被锁的资源时,就成了串行,要么排队执行,要么争抢执行;

例如,高速公路上的车并行跑,到了省界只开放了一个收费口,过了这个口,车辆依然可在多车道上并行跑;过收费口时,如果排队一辆一辆过,加不加锁一样,效率相当,但一旦出现争抢,就必须加锁一辆辆过;

加锁时间越短越好,不需要就立即释放锁;

一定要避免死锁,尽量用with语句;

 

 

阻塞锁:

lock.acquire()

 

例:

lock = threading.Lock()

lock.acquire()

print('get locker1')

lock.acquire()   #block,再次获取同一个锁时阻塞(独占),必须等别人将该锁释放才能获取到,此行之后的语句不能执行;即,某一线程获得锁后,另一线程只要执行到lock.acquire()就会阻塞住(卡住)

print('get locker2')

lock.release()   #一旦某一线程release,等着的线程争抢,谁抢到谁上锁;一般情况,有多少个acquire就有多少个release;要保证一定release,用到上下文(with语句)或try...finally

print('release locker')

 

注:

死锁(解不开的锁,就是死锁):

同一个锁,你等我释放、我等你释放、自己等自己释放;

两个锁,你等我放一把,我等你放一把,互相等待,都不释放,都为阻塞状态;

 

例:

def work():

    time.sleep(3)

    lock.release()   #任何一个线程都可对同一个锁操作,此处释放锁

 

lock = threading.Lock()

lock.acquire()

print('get locker1')

threading.Thread(target=work).start()

time.sleep(5)

lock.acquire()

print('get locker2')

lock.release()

print('release locker')

 

例:

需求:10个工人完成100个杯子;

注:临界点问题:

当生产到99个时,若每个线程都可看到,每个线程看到转头去生产了,导致结果会多;

只允许一个线程可看到当前生产杯子的数量,其它线程不可见,这个线程看到后生产完1个杯子时,其它线程均可看到,杯子数量达到要求就不生产了;

 

cups = []

 

def worker(task=100):

    while True:

        count = len(cups)

        logging.info('current count: {}'.format(count))

        if count >= task:

            break

        cups.append(1)

        logging.info('{} make 1'.format(threading.current_thread()))

    logging.info('total: {}'.format(count))

 

for _ in range(10):

    threading.Thread(target=worker).start()

输出:

……

2018-08-06-09:13:14       Thread info: 10208 Thread-2 current count: 104

2018-08-06-09:13:14       Thread info: 10208 Thread-2 total: 104

2018-08-06-09:13:14       Thread info: 10252 Thread-6 make 1

2018-08-06-09:13:14       Thread info: 10252 Thread-6 current count: 104

2018-08-06-09:13:14       Thread info: 10252 Thread-6 total: 104

 

解决,加锁:

cups = []

lock = threading.Lock()

 

def worker(lock:threading.Lock, task=100):

    while True:

        lock.acquire()

        count = len(cups)

        logging.info('current count: {}'.format(count))

        lock.release()

        if count >= task:   #虽不是大锁,仍有问题,在临界点时,其它线程仍可看到杯子总数,继而再做;解决,把中间代码全部放到锁里

            break

        lock.acquire()

        cups.append(1)

        lock.release()

        logging.info('{} make 1'.format(threading.current_thread()))

    logging.info('total: {}'.format(count))

 

for _ in range(10):

    threading.Thread(target=worker, args=(lock,)).start()

输出:

……

2018-08-06-09:19:11       Thread info: 4092 Thread-4 current count: 100

2018-08-06-09:19:11       Thread info: 4092 Thread-4 total: 100

2018-08-06-09:19:11       Thread info: 11024 Thread-6 current count: 100

2018-08-06-09:19:11       Thread info: 11024 Thread-6 total: 100

 

例:

cups = []

lock = threading.Lock()

 

def worker(lock:threading.Lock, task=100):

    while True:

        lock.acquire()   #该锁为互斥锁,你有我没有,我有你没有

        count = len(cups)

        logging.info('current count: {}'.format(count))

        # lock.release()

        if count >= task:

                            lock.release()

            break   #仍有问题,break后未释放锁,解决try...finally或上下文

        # lock.acquire()

        cups.append(1)

        lock.release()

        logging.info('{} make 1'.format(threading.current_thread()))

    logging.info('total: {}'.format(count))

 

for _ in range(10):

    threading.Thread(target=worker, args=(lock,)).start()

输出:

……

2018-08-06-09:23:38       Thread info: 10148 Thread-5 current count: 99

2018-08-06-09:23:38       Thread info: 10148 Thread-5 make 1

2018-08-06-09:23:38       Thread info: 10148 Thread-5 current count: 100

2018-08-06-09:23:38       Thread info: 10148 Thread-5 total: 100

 

例:

计数器类,可以加、可以减;

class Counter:

    def __init__(self):

        self.__val = 0

 

    def inc(self):

        self.__val += 1

 

    def dec(self):

        self.__val -= 1

 

    @property

    def value(self):

        return self.__val

 

def do(c:Counter, count=100):

    for _ in range(count):

        for i in range(-50,50):

            if i < 0:

                c.dec()

            else:

                c.inc()

 

c = Counter()

threadcount = 10   #依次10,100,1000查看,结果没变化

c2 = 1000   #依次10,100,1000查看,当1000时,每个线程耗cpu时间长,结果将不可预知

for i in range(threadcount):

    threading.Thread(target=do, args=(c,c2)).start()

# time.sleep(5)

# print(c.value)   #多线程情况下,每个线程耗cpu时间长的情况下,此处打印的值不是最终结果

while True:

    time.sleep(3)

    print(threading.enumerate())

    print(c.value)

输出:

[<_MainThread(MainThread, started 7424)>]

81

[<_MainThread(MainThread, started 7424)>]

81

 

例,解决,加锁:

class Counter:

    def __init__(self):

        self.__val = 0

        self.__lock = threading.Lock()

 

    def inc(self):

        try:   #同with self.__lock: self.__val += 1;

            self.__lock.acquire()

            self.__val += 1

        finally:

            self.__lock.release()

 

    def dec(self):

        with self.__lock:

            self.__val -= 1

 

    @property

    def value(self):

        with self.__lock:

            return self.__val

 

def do(c:Counter, count=100):

    for _ in range(count):

        for i in range(-50,50):

            if i < 0:

                c.dec()

            else:

                c.inc()

 

c = Counter()

threadcount = 10

c2 = 1000

for i in range(threadcount):

    threading.Thread(target=do, args=(c,c2)).start()

# time.sleep(5)

# print(c.value)

while True:

    time.sleep(1)

    # print(threading.enumerate())

    # print(c.value)

    if threading.active_count() == 1:

        print(threading.enumerate())

        print(c.value)

    else:

        print(threading.enumerate())

输出:

[, , , <_MainThread(MainThread, started 12200)>, , , , ]

[<_MainThread(MainThread, started 12200)>]

0

[<_MainThread(MainThread, started 12200)>]

0

 

 

非阻塞锁:

lock.acquire(False),获取到锁返回True,否则返回False;

 

 

例:

lock = threading.Lock()

lock.acquire()

ret = lock.acquire(False)   #获取到锁则返回True

print(ret)

输出:

False

 

例:

cups = []

lock = threading.Lock()

 

def worker(lock:threading.Lock, task=100):

    while True:

        if lock.acquire(False):

            count = len(cups)

            logging.info('current count: {}'.format(count))

            if count >= task:

                lock.release()

                break

            cups.append(1)

            lock.release()

            logging.info('{} make 1'.format(threading.current_thread()))

    logging.info('total: {}'.format(count))

 

for _ in range(10):

    threading.Thread(target=worker, args=(lock,)).start()

输出:

……

2018-08-06-13:52:35       Thread info: 11232 Thread-9 current count: 100

2018-08-06-13:52:35       Thread info: 11232 Thread-9 total: 100

2018-08-06-13:52:35       Thread info: 11752 Thread-10 current count: 100

2018-08-06-13:52:35       Thread info: 11752 Thread-10 total: 100

 

例:

def worker(tasks):

    for task in tasks:

        if task.lock.acquire(False):

            time.sleep(0.01)

            logging.info('{} {} begin to start'.format(threading.current_thread(), task.name))

        else:

            logging.info('{} {} is working'.format(threading.current_thread(), task.name))

 

class Task:

    def __init__(self, name):

        self.name = name

        self.lock = threading.Lock()

 

tasks = [Task('task-{}'.format(x)) for x in range(10)]

 

for i in range(5):

    threading.Thread(target=worker, args=(tasks,), name='worker-{}'.format(i)).start()

输出:

……

2018-08-06-14:10:34       Thread info: 12256 worker-1 task-7 is working

2018-08-06-14:10:34       Thread info: 12256 worker-1 task-8 is working

2018-08-06-14:10:34       Thread info: 12256 worker-1 task-9 is working

2018-08-06-14:10:34       Thread info: 11912 worker-3 task-9 begin to start

2018-08-06-14:10:34       Thread info: 10496 worker-4 task-8 begin to start

2018-08-06-14:10:34       Thread info: 10496 worker-4 task-9 is working

 

 

 


分享文章:40线程2_Event_Lock
转载来源:http://cdxtjz.cn/article/jopdhc.html

其他资讯