4.2 版中的新功能。

使用类似于标准库提供给线程的同步原语来协调协程。 这些类与标准库的 asyncio包中提供的类非常相似。
请注意,这些原语实际上不是线程安全的,并且不能用来代替标准库的线程模块中的那些原语——它们旨在协调单线程应用程序中的 Tornado 协程,而不是保护多线程应用程序中的共享对象。
一个condition允许一个或多个协程等待直到收到通知。
与标准 threading.Condition 类似,但不需要获取和释放的底层锁。
使用 Condition,协程可以等待其他协程的通知:
from tornado import gen
from tornado.ioloop import IOLoop
from tornado.locks import Condition
condition = Condition()
async def waiter():
print("I'll wait right here")
await condition.wait()
print("I'm done waiting")
async def notifier():
print("About to notify")
condition.notify()
print("Done notifying")
async def runner():
# Wait for waiter() and notifier() in parallel
await gen.multi([waiter(), notifier()])
IOLoop.current().run_sync(runner)结果为:
I'll wait right here
About to notify
Done notifying
I'm done waitingwait接受一个可选的 timeout参数,它可以是一个绝对时间戳:
io_loop = IOLoop.current()
# Wait up to 1 second for a notification.
await condition.wait(timeout=io_loop.time() + 1)datetime.timedelta 表示相对于当前时间的超时:
# Wait up to 1 second.
await condition.wait(timeout=datetime.timedelta(seconds=1))如果在截止日期之前没有通知,则该方法返回 False。
如果条件被通知,则返回一个 Future解析 True,或者在超时后解析为 False。
唤醒n个waiters
唤醒所有waiters
一个事件会阻塞协程,直到其内部标志设置为 True。
类似于threading.Event。
协程可以等待设置事件。 一旦设置,对 yield event.wait() 的调用将不会阻塞,除非事件已被清除:
from tornado import gen
from tornado.ioloop import IOLoop
from tornado.locks import Event
event = Event()
async def waiter():
print("Waiting for event")
await event.wait()
print("Not waiting this time")
await event.wait()
print("Done")
async def setter():
print("About to set the event")
event.set()
async def runner():
await gen.multi([waiter(), setter()])
IOLoop.current().run_sync(runner)结果如下:
Waiting for event
About to set the event
Not waiting this time
Done如果内部标志为True,则返回True
将内部标志设置为 True。 所有的waiters都被唤醒了。
设置标志后调用 wait不会阻塞。
将内部标志重置为 False。
调用 wait将阻塞,直到调用 set 。
阻塞直到内部标志为True。
返回一个 awaitable,它在超时后引发 tornado.util.TimeoutError。
在阻塞之前可以获取固定次数的锁。
信号量管理一个计数器,表示释放调用的数量减去获取调用的数量,再加上一个初始值。 如果需要,acquire方法会阻塞,直到它可以返回而不使计数器为负。
信号量限制对共享资源的访问。 一次允许两个worker访问:
from tornado import gen
from tornado.ioloop import IOLoop
from tornado.locks import Semaphore
sem = Semaphore(2)
async def worker(worker_id):
await sem.acquire()
try:
print("Worker %d is working" % worker_id)
await use_some_resource()
finally:
print("Worker %d is done" % worker_id)
sem.release()
async def runner():
# Join all workers.
await gen.multi([worker(i) for i in range(3)])
IOLoop.current().run_sync(runner)结果如下:
Worker 0 is working
Worker 1 is working
Worker 0 is done
Worker 2 is working
Worker 1 is done
Worker 2 is doneworker 0 和 1 被允许同时运行,但worker 2 等到信号量被worker 0 释放一次。
信号量可以用作异步上下文管理器:
async def worker(worker_id):
async with sem:
print("Worker %d is working" % worker_id)
await use_some_resource()
# Now the semaphore has been released.
print("Worker %d is done" % worker_id)为了与旧版本的 python 兼容,acquire是一个上下文管理器,因此 worker 也可以写成:
@gen.coroutine
def worker(worker_id):
with (yield sem.acquire()):
print("Worker %d is working" % worker_id)
yield use_some_resource()
# Now the semaphore has been released.
print("Worker %d is done" % worker_id)增加计数器并唤醒一个waiter。
减少计数器。 返回一个可等待的。
如果计数器为零,则阻塞并等待释放。awaitable在截止日期后引发 TimeoutError。
防止 release() 被调用太多次的信号量。
如果 release增加信号量的值超过初始值,它会引发 ValueError。 信号量主要用于保护容量有限的资源,因此信号量释放次数过多是错误的标志。
增加计数器并唤醒一个waiter。
减少计数器。 返回一个可等待的。
如果计数器为零,则阻塞并等待释放。 awaitable在截止日期后引发 TimeoutError。
协程的锁。
锁开始解锁,并立即获取锁。 当它被锁定时,产生acquire的协程等待直到另一个协程调用release。
释放未锁定的锁会引发 RuntimeError。
Lock可以用作带有 async with 语句的异步上下文管理器:
>>> from tornado import locks
>>> lock = locks.Lock()
>>>
>>> async def f():
... async with lock:
... # Do something holding the lock.
... pass
...
... # Now the lock is released.为了与旧版本的 Python 兼容,acquire方法异步返回一个常规上下文管理器:
>>> async def f2():
... with (yield lock.acquire()):
... # Do something holding the lock.
... pass
...
... # Now the lock is released.尝试锁定。 返回一个awaitable。
返回一个 awaitable,它在超时后引发 tornado.util.TimeoutError。
解锁。
排队等待获取的第一个协程获得锁。
如果未锁定,则引发 RuntimeError。