Python coroutine (asyncio) (4) Synchronization primitives

Python Practical Tutorial_spiritx’s Blog-CSDN Blog

The asyncio synchronization primitives are designed to be similar to those of the threading module, but with two key caveats:
asyncio primitives are not thread-safe, so they should not be used for OS thread synchronization (threading should be used instead);
The methods of these synchronization primitives do not accept a timeout argument; use the asyncio.wait_for() function to perform operations with a timeout.

Lock

class asyncio.Lock

Implement a mutex lock for asyncio tasks. Not thread safe.
asyncio locks can be used to guarantee exclusive access to shared resources.
The recommended way to use Lock is through the async with statement:

lock = asyncio.Lock()

# ... later
async with lock:
    # access shared state

This is equivalent to:

lock = asyncio.Lock()

# ... later
await lock.acquire()
try:
    # access shared state
finally:
    lock.release()

Main methods and properties:

Methods and Attribute name Description
< em>coroutine acquire() Acquire the lock.
This method waits until the lock is unlocked, sets it to locked and returns True.
When more than one coroutine is blocked in acquire(), they will wait for unlocking, and eventually only one coroutine will be executed.
Lock acquisition is fair: the executed coroutine will be the first to start waiting for the lock.
release() Release the lock.
When the lock is locked, set it to unlocked and return.
If the lock is unlocked, a RuntimeError is raised.
locked() If the lock is locked, return True.

Event

class asyncio.Event

event object. This object is not thread-safe.
asyncio events can be used to notify multiple asyncio tasks that an event has occurred.
The Event object manages an internal flag that can be set to true via the set() method and reset to false via the clear() method. The wait() method blocks until this flag is set to true. This flag will initially be set to false.

async def waiter(event):
    print('waiting for it...')
    await event.wait()
    print('... got it!')

async def main():
    # Create an Event object.
    event = asyncio.Event()

    # Spawn a Task to wait until 'event' is set.
    waiter_task = asyncio.create_task(waiter(event))

    # Sleep for 1 second and set the event.
    await asyncio.sleep(1)
    event.set()

    # Wait until the waiter task is finished.
    await waiter_task

asyncio.run(main())

Main methods and properties:

Methods and Attribute name Description
< em>coroutine wait() Wait until the event is set.
If the event has been set, returns True immediately. Otherwise it blocks until another task calls set().
set()

Set event.

All tasks waiting for events to be set will be awakened immediately.

clear() Clear (unset) event.
Tasks waiting via wait() will now block until the set() method is called again.
is_set() Returns True if the event has been set.

Condition

class asyncio.Condition(lock=None)
condition object. This object is not thread-safe.
The asyncio conditional primitive can be used by a task to wait for an event to occur and then obtain exclusive access to a shared resource.
In essence, the Condition object combines the functionality of Event and Lock. It is possible for multiple Condition objects to share a Lock, which allows different tasks focused on a specific state of the shared resource to achieve coordinated exclusive access to the shared resource.
The optional lock parameter must be a Lock object or None. In the latter case a new Lock object is automatically created.
The recommended way to use Condition is through the async with statement:

cond = asyncio.Condition()

# ... later
async with cond:
    await cond.wait()

This is equivalent to:

cond = asyncio.Condition()

# ... later
await cond.acquire()
try:
    await cond.wait()
finally:
    cond.release()

Main methods and properties:

Methods and Attribute name Description
< em>coroutine acquire()

Get the underlying lock.

This method waits until the underlying lock is unlocked, sets it to locked and returns True.

notify(n=1) Wake up at most n Tasks waiting for this condition (default is 1). This method is a no-op if no tasks are waiting.
The lock must be acquired before this method is called and released quickly thereafter. A RuntimeError will be raised if called with an unlocked lock.
locked() Returns True if the underlying lock has been acquired.
notify_all()

Wakes up all tasks waiting for this condition.
This method behaves like notify(), but wakes up all waiting tasks.
The lock must be acquired before this method is called and released quickly thereafter. A RuntimeError will be raised if called with an unlocked lock.

release() Release the underlying lock.
A RuntimeError is raised when the call is made on an unlocked lock.
coroutine wait() Wait until notification is received.
If the calling task does not acquire the lock when this method is called, a RuntimeError is raised.
This method releases the underlying lock and then blocks until awakened by a notify() or notify_all() call. Once awakened, the Condition will reacquire its lock and this method will return True.
coroutine wait_for(predicate)

Wait until the target value becomes true.

The target must be a callable object and the result will be interpreted as a Boolean value. The final value will be the return value.

Semaphore

class asyncio.Semaphore(value=1)
Semaphore object. This object is not thread-safe.
The semaphore manages an internal counter that is decremented with each acquire() call and incremented with each release() call. The counter’s value never drops below zero; when acquire() finds that its value is zero, it blocks until some task calls release().
The optional value parameter is used to assign an initial value to the internal counter (default value is 1). A ValueError is raised if the given value is less than 0.
The recommended way to use Semaphore is through the async with statement. :

sem = asyncio.Semaphore(10)

# ... later
async with sem:
    # work with shared resource

This is equivalent to:

sem = asyncio.Semaphore(10)

# ... later
await sem.acquire()
try:
    # work with shared resource
finally:
    sem.release()

Main methods and properties:

Methods and Attribute name Description
< em>coroutine acquire() Acquire a semaphore.
If the value of the internal counter is greater than zero, it is decremented by one and True is returned immediately. If its value is zero, it waits until release() is called and returns True.
release() Release a semaphore object and increase the value of the internal counter by one. You can wake up a task that is waiting to obtain a semaphore object.
Unlike BoundedSemaphore, Semaphore allows more release() calls than acquire() calls to be performed.
locked() If the semaphore object cannot be obtained immediately, True is returned .

BoundedSemaphore

class asyncio.BoundedSemaphore(value=1)
The bound semaphore object. This object is not thread-safe.

BoundedSemaphore is a special version of Semaphore that will raise a ValueError if the internal counter value increases above the initial value in release().

Barrier

class asyncio.Barrier(parties)

A fence is a simple synchronization primitive that allows blocking until a large number of tasks are waiting for it. Tasks can wait on the wait() method and will be blocked until the specified number of tasks eventually reaches the number of parties to wait for. At this point, all waiting tasks will be unlocked simultaneously.

Fences are best used via async with.

async def example_barrier():
   # barrier with 3 parties
   b = asyncio.Barrier(3)

   # create 2 new waiting tasks
   asyncio.create_task(b.wait())
   asyncio.create_task(b.wait())

   await asyncio.sleep(0)
   print(b)

   # The third .wait() call passes the barrier
   await b.wait()
   print(b)
   print("barrier passed")

   await asyncio.sleep(0)
   print(b)

asyncio.run(example_barrier())

Main methods and properties:

Methods and Attribute name Description
< em>coroutine wait() Waiting in front of the fence
coroutine reset() Reset the fence to empty
coroutine abort() Setting the barrier to broken will cause all wait() calls to raise BrokenBarrierError exceptions
parties The waiting number to be reached by the fence
n_waiting Waiting in front of the fence Number of tasks
broken Returns whether the fence is broken
...
async with barrier as position:
   if position == 0:
      # Only one task prints this
      print('End of *draining phase*')