Asyncio и конкурентное программирование на Python

Мэтью Фаулер · «ДМК Пресс» · 2022 г. · 398 с.

#python#asyncio#programming#book#read

Books Read | Books About Concurrency In Python

Abstract

Если типичную программу на стандартном Python подвергнуть слишком высокой нагрузке, то она будет работать с черепашьей скоростью. Для решения этой проблемы была разработана библиотека asyncio, которая позволяет разбить программу на более мелкие задачи и планировать их выполнение. В итоге получающиеся приложения работают молниеносно и допускают масштабирование.

В этой книге асинхронное, параллельное и конкурентное программирование рассматривается на конкретных примерах. Сложные для понимания вопросы иллюстрируются с помощью диаграмм, позволяющих наглядно представить, как работают задачи. Вы узнаете, как asyncio преодолевает ограничения Python и способствует ускорению медленных веб-серверов и микросервисов. Вы даже научитесь сочетать asyncio с традиционной многопроцессной обработкой, получив в награду резкий скачок производительности.

Для программистов на Python среднего уровня. Опыт работы с конкурентностью не требуется.

Книга на сайте издательства ДМК

Прочитано и проработано: 6 ноября 2023 г. – 1 декабря 2023 г.


Дополнительные материалы


TODO:

  • Отличия типа bytes от str – разобраться (глава 4 Fluent Python).
  • listing 2.8 – try asyncio.TaskGroup context manager, no need to await (need Python 3.11)
  • Изучить – якобы в psycopg3 тоже есть асинхронный режим.

Что такое asyncio?

Библиотека asyncio впервые появилась в версии Python 3.4 как ещё один способ справляться с высокими конкурентными нагрузками, не прибегая к нескольким потокам или процессам. При правильном использовании эта библиотека может значительно повысить производительность и уменьшить потребление ресурсов в приложениях, выполняющих много операций ввода-вывода, поскольку позволяет запускать сразу много таких долго работающих задач.

Конкурентность, параллелизм и многозадачность

Конкурентность

Говоря, что две задачи выполняются конкурентно, мы имеем в виду, что они работают в одно и тоже время. Происходит переключение между задачами, пока одна не завершилась, выполняется другая.

В случае конкурентности, несколько задач работают в течение одного промежутка времени, но только одна из них активна в каждый момент времени.

Пример с пекарем – делать что-то, пока духовка разогревается; готовить тесто для следующего торта, пока первое взбивается в миксере.

Параллелизм

Говоря о параллельной работе, мы имеем в виду, что две или более задачи не просто чередуются, а выполняются строго в одно и то же время.

В случае параллелизма, несколько задач активны одновременно.

Различия между конкурентностью и параллелизмом

Пусть работают два приложения. В конкурентной системе мы можем переключаться между ними, дав немного поработать сначала одному, потому другому. Если делать это достаточно быстро, создаётся впечатление, что два дела делаются одновременно. В параллельной системе два приложения работают действительно одновременно, то есть оба активны в одно и то же время.

Конкурентность возможна, когда несколько задач может работать независимо друг от друга. Конкурентность можно организовать, имея процессор всего с одним ядром, применив вытесняющую многозадачность для переключения между задачами.

С другой стороны, параллелизм означает, что мы должны выполнять две задачи или более строго одновременно. На машине с одним ядром это невозможно, необходимо иметь процессор с несколькими ядрами.

Многозадачность

Вытесняющая многозадачность. В этой модели мы позволяем ОС решить, как переключаться между выполняемыми задачами с помощью процедуры квантования времени. Когда ОС переключает задачи, мы говорим, что имеет место вытеснение.

Кооперативная многозадачность. В этой модели мы явно определяем в коде приложения точки, где можно уступить управление другой задаче.

==В asyncio для организации конкурентности используется кооперативная многозадачность. Когда приложение доходит до точки, в которой может подождать результата, мы явно помечаем это в коде. Поэтому другой код может работать, пока мы ждем получения результата, выполняемого в фоновом режиме. Таким образом, несколько задач может работать одновременно, но – важно! – не параллельно, так как их выполнение чередуется.==

Процессы, потоки, многопроцессность, многопоточность

Процесс

Процессом называется работающее приложение, которому выделена область памяти, недоступная другим приложениям.

Поток

Потоки можно представить себе как облегченные процессы. Это наименьшая единица выполнения, которая может управляться операционной системой. У потоков нет своей памяти, они пользуются памятью создавшего их процесса. Потоки ассоциированы с процессом, создавшим их. С каждым процессом всегда ассоциирован по меньшей мере один поток, называемый главным. Процесс может создавать дополнительные потоки, которые обычно называются рабочими или фоновыми. Эти потоки могут конкурентно выполнять работу наряду с главным потоком. Обычное Python-приложение создаёт процесс и главный поток, который отвечает за его выполнение.

Демоны (daemon) – это специальный вид потоков, предназначенный для выполнения длительных фоновых задач. Они не мешают приложению завершиться. На самом деле если работают только потоки-демоны, то приложение вообще завершается автоматически.

Многопоточность

Процессы могут порождать дополнительные потоки, разделяющие память со своим процессом-родителем. Они могут конкурентно выполнять другую работу, это называется многопоточностью.

Глобальная блокировка интерпретатора

Глобальная блокировка интерпретатора (global interpreter lock – GIL) не даёт Python-процессу исполнять более одной команды байт-кода в каждый момент времени. Это означает, что, даже если имеется несколько потоков на многоядерной машине, интерпретатор сможет в каждый момент времени исполнять только один поток, содержащий написанный на Python код

Примечание: многопроцессные приложения могут конкурентно выполнять несколько команд байт-кода, потому что у каждого Python-процесса своя собственная GIL.

Причина наличия GIL: интерпретатор CPython не является потокобезопасным. Это означает, что если два или более потоков модифицируют разделяемую переменную, то её конечное состояние может оказаться неожиданным, поскольку зависит от порядка доступа к переменной со стороны потоков. Эта ситуация называется состоянием гонки.

Пример не из книги: как узнать количество ссылок на объект Python?

import sys
a = []
b = a
sys.getrefcount(a) # "3"

GIL гарантирует безопасную работу этого счетчика ссылок.

Состояния гонки (race condition) могут возникать, когда два потока одновременно обращаются к одному объекту Python.

Освобождается ли когда-нибудь GIL?

Глобальная блокировка интерпретатора освобождается на время выполнения операций ввода-вывода (time.sleep() тоже освобождает GIL). Это позволяет использовать потоки для конкурентного выполнения ввода-вывода, но не для выполнения счетного кода, написанного на Python. В случае ввода-вывода низкоуровневые системные вызовы работают за пределами среды выполнения Python. Это позволяет освободить GIL, потому что код ОС не взаимодействует напрямую с объектами Python. GIL захватывается снова, только когда полученные данные переносятся в объект Python.

asyncio и GIL

В asyncio используется тот факт, что операции ввода-вывода освобождают GIL, что позволяет реализовать конкурентность даже в одном потоке. При работе с asyncio мы создаём объекты сопрограмм.

Сопрограмму можно представить себе, как облегченный поток. Ожидая завершения сопрограмм, занимающихся вводом-выводом, мы можем выполнять другой Python-код, получая таким образом конкурентность.

Важной отметить, что asyncio не обходит GIL, мы по прежнему ограничены ей. Если имеется счетная задача, то для её конкурентного выполнения нужно всё равно нужно заводить отдельный процесс (и в asyncio есть для этого средства), иначе производительность снизится.

Сопрограммы asyncio

Сопрограмму можно рассматривать как обычную функцию Python, наделенную способностью приостанавливаться, встретив операцию, для выполнения которой нужно заметное время. По завершении такой длительной операции сопрограмму можно «пробудить», после чего она продолжит выполнение. Пока приостановленная сопрограмма ждет завершения операции, мы можем выполнять другой код. Такое выполнение другого кода во время ожидания и обеспечивает конкурентность внутри приложения. Можно также одновременно выполнять несколько длительных операций, что еще больше повышает производительность приложения.

asyncio.run задумана как главная точка входа в созданное нами приложение asyncio. Она выполняет только одну сопрограмму, и эта сопрограмма должна позаботиться обо всех остальных аспектах приложения. Сопрограмма, которую выполняет asyncio.run, должна создать и запустить все прочие сопрограммы, это позволит нам обратить себе на пользу конкурентную природу asyncio.

Приостановка выполнения с помощью await

Для приостановки выполнения служит ключевое слово await, за ним обычно следует обращение к сопрограмме (точнее, к объекту, допускающему ожидание, который необязательно является сопрограммой).

Использование ключевого слова await приводит к выполнению следующей за ним сопрограммы, а не просто к возврату объекту сопрограммы, как при прямом вызове. Кроме того, выражение await приостанавливает объемлющую сопрограмму до того момента, как сопрограмма, которую мы ждем, завершится и вернет результат. А после этого мы получим доступ к возвращенному результату, а объемлющая сопрограмма пробудится и обработает результат.

Конкурентное выполнение с помощью задач

Задача – это обертка вокруг сопрограммы, которая планирует выполнение последней в цикле событий как можно раньше. И планирование, и выполнение происходят в неблокирующем режиме, т. е., создав задачу, мы можем сразу приступить к выполнению другого кода, пока эта задача работает в фоне.

Создание задач

Для создания задачи служит функция asyncio.create_task. Ей передается подлежащая выполнению сопрограмма, а в ответ она немедленно возвращает объект задачи. Этот объект можно включить в выражение await, которое извлечет возвращенное значение по завершении задачи.

Снятие задач

У каждого объекта задачи есть метод cancel, который можно вызвать, если требуется остановить задачу. В результате снятия задача возбудит исключение CancelledError, когда мы ждем ее с помощью await. Это исключения можно обработать, как того требует ситуация.

Важно отметить, что исключение CancelledError может быть возбуждено только внутри предложения await. То есть, если вызвать метод cancel, когда задача исполняет Python-код, этот код будет продолжать работать, пока не встретится следующее предложение await(если встретится), и только тогда будет возбуждено исключение CancelledError. Вызов cancel не прерывает задачу, делающую свое дело; он снимает ее, только если она уже находится в точке ожидания или когда дойдет до следующей такой точки.

Задачи, сопрограммы, будущие объекты и объекты, допускающие ожидание

Будущие объекты

Объект future в Python содержит одно значение, которое мы ожидаем получить в будущем, но пока еще, возможно, не получили. Обычно в момент создания future не обертывает никакого значения, потому что его еще не существует. Объект в таком состоянии называется неполным, неразрешенным или просто неготовым. И только получив результат, мы можем установить значение объекта future, в результате чего он становится полным и из него можно извлечь результат.

Будущие объекты также можно использовать в выражениях await. Это означает «я посплю, пока в будущем объекте не будет установлено значение, с которым я могу работать, а когда оно появится, разбуди меня и дай возможность его обработать».

"""
Ожидание будущего объекта
"""
import asyncio
from asyncio import Future
 
def make_request() -> Future:
    future = Future()
    # Create task that will set future value asynchronously:
    asyncio.create_task(set_future_value(future))
    return future
 
async def set_future_value(future: Future) -> None:
    await asyncio.sleep(1)
    future.set_result(42)
 
async def main():
    future = make_request()
    print(f"Future object ready? {future.done()}")
    print(f"future = {future}")
    value = await future  # Pause `main` until future value is set
    print(f"Future object ready? {future.done()}")
    print(f"future = {future}")
    print(f"value = {value}")
 
asyncio.run(main())
 
"""
Sample output:
 
Future object ready? False
future = <Future pending>
Future object ready? True
future = <Future finished result=42>
value = 42
"""

Связь между будущими объектами, задачами и сопрограммами

Связующим звеном между ними является абстрактный базовый класс Awaitable. В нем определен единственный абстрактный метод __await__.

Любой объект, который реализует метод __await__, можно использовать в выражении await. Сопрограммы, как и будущие объекты, наследуют Awaitable напрямую. Задачи же расширяют будущие объекты.

Ловушки сопрограмм и задач

Tip

Есть две основные ошибки на пути преобразования приложения в асинхронное.

Первая – попытка выполнить счетный код в задачах или сопрограммах, не прибегая к многопроцессности, вторая – использовать блокирующие API ввода-вывода, пренебрегая многопоточностью.

Выполнение счетного кода

Если требуется выполнить счетную работу и все-таки использовать async / await, то это можно сделать. Но придется воспользоваться многопроцессностью и попросить asyncio выполнять наши задачи в пуле процессов.

Выполнение блокирующих API

Может возникнуть соблазн использовать существующие библиотеки ввода-вывода, обернув их сопрограммами. Однако при этом возникнут те же проблемы, что для счетных операций. Эти API будут блокировать главный поток. Поэтому, попытавшись выполнить блокирующий вызов API в сопрограмме, мы заблокируем сам поток цикла событий, а значит, воспрепятствуем выполнению всех остальных сопрограмм и задач. Примерами блокирующих API является библиотека requests или функция time.sleep. Вообще, любая функция, которая выполняет ввод-вывод, не являясь сопрограммой, или занимает процессор длительными операциями, может считаться блокирующей

Если вы все-таки хотите использовать библиотеку requests, то синтаксис async применить можно, но нужно явно попросить asyncio задействовать многопоточность с помощью исполнителя пула потоков.

Ручное управление циклом событий

Создание цикла событий вручную

Мы можем создать цикл событий, воспользовавшись методом asyncio.new_event_loop. Он возвращает экземпляр цикла событий, который дает доступ ко всем низкоуровневым методам, в частности методу run_until_complete, который принимает сопрограмму и исполняет ее до завершения. Закончив работу с циклом событий, мы должны закрыть его, чтобы освободить занятые ресурсы. Обычно это делается в блоке finally, чтобы цикл был закрыт даже в случае исключения.

import asyncio
 
async def main():
	await asyncio.sleep(1)
 
loop = asyncio.new_event_loop()
 
try:
	loop.run_until_complete(main())
finally:
	loop.close()

Получение доступа к циклу событий

Иногда бывает необходим доступ к текущему циклу событий. Библиотека asyncio предоставляет для этой цели функцию asyncio.get_running_loop.

Отладочный режим

При работе в отладочном режиме печатаются полезные сообщения, когда сопрограмма или задача работают больше 100 мс. Это может быть полезно для отладки ошибок, связанных со случайным выполнением блокирующего вызова.

Кроме того, если для некоторой сопрограммы отсутствует await, то возбуждается исключение, показывающее, в каком месте следовало бы добавить await.

Способы входа в отладочный режим

  • Использование asyncio.run: asyncio.run(coroutine(), debug=True)
  • Использование аргументов командной строки: python3 -X dev program.py
  • Использование переменных окружения: PYTHONASYINCIODEBUG=1 python3 program.py

Конкурентное выполнение запросов с помощью asyncio.gather

Для конкурентного выполнения допускающих ожидание объектов широко используется функция asyncio.gather. Она принимает последовательность допускающих ожидание объектов и запускает их конкурентно всего в одной строке кода. Если среди объектов есть сопрограмма, то gather автоматически обертывает ее задачей, чтобы гарантировать конкурентное выполнение. Это значит, что не нужно отдельно обертывать все сопрограммы по отдельности с помощью функции asyncio.create_task, как мы делали раньше.

asyncio.gather возвращает объект, допускающий ожидание. Если использовать его в выражении await, то выполнение будет приостановлено, пока не завершатся все переданные объекты. А когда это произойдет, asyncio.gather вернет список результатов работы.

Функция gather гарантирует детерминированный порядок результатов, несмотря на недетерминированность их получения.

Обработка исключений при использовании gather

asyncio.gather принимает необязательный параметр, return_exceptions, который позволяет указать, как мы хотим обрабатывать исключения от допускающих ожидание объектов. Это булево значение, поэтому возможно два варианта:

  • return_exceptions=False – это режим по умолчанию. Если хотя бы одна сопрограмма возбуждает исключение, то gather возбуждает то же исключение в точке await. Но, даже если какаято сопрограмма откажет, остальные не снимаются и продолжат работать при условии, что мы обработаем исключение и оно не приведет к остановке цикла событий и снятию задач;
  • return_exceptions=True – в этом случае исключения возвращаются в том же списке, что результаты. Сам по себе вызов gather не возбуждает исключений, и мы можем обработать исключения, как нам удобно.

Обработка результатов по мере поступления

Для решения этой проблемы asyncio предлагает функцию as_completed. Она принимает список допускающих ожидание объектов и возвращает итератор по будущим объектам. Эти объекты можно перебирать, применяя к каждому await. Когда выражение await вернет управление, мы получим результат первой завершившейся сопрограммы. Это значит, что мы сможем обрабатывать результаты по мере их доступности, но теперь порядок результатов не детерминирован, поскольку неизвестно, какой объект завершится первым.

as_completed справляется со своей задачей – возвращать результат по мере поступления, но она не лишена недостатков.

Первый заключается в том, что хотя мы и получаем результаты в темпе их поступления, но невозможно сказать, какую сопрограмму или задачу мы ждем, поскольку порядок абсолютно не детерминирован.

Второй недостаток в том, что, хотя исключения по истечении тайм-аута возбуждаются как положено, все созданные задачи продолжают работать в фоновом режиме. А если мы захотим их снять, то будет трудно понять, какие задачи еще работают.

Точный контроль с помощью wait

Функция wait в asyncio похожа на gather, но дает более точный контроль над ситуацией. У нее есть несколько параметров, позволяющих решить, когда мы хотим получить результаты. Кроме того, она возвращает два множества: задачи, завершившиеся успешно или в результате исключения, а также задачи, которые продолжают выполняться. Еще эта функция позволяет задать тайм-аут, который, однако, ведет себя не так, как в других функциях API: он не возбуждает исключений. В тех случаях, когда необходимо, эта функция позволяет решить некоторые отмеченные выше проблемы, присущие другим функциям asyncio.

Для каких задач какой подход лучше использовать:


Глава 5. Неблокирующие драйверы баз данных

Используем asyncpg для работы с PostgreSQL с использованием asyncio.


Глава 6. Счетные задачи

Дать краткое описание содержанию раздела (для распараллеливания счетных задач нужны процессы).

Решение задачи с помощью MapReduce и asyncio

В модели программирования MapReduce большой набор данных сначала разбивается на меньшие части. Затем мы можем решить задачу для поднабора данных, а не для всего набора – это называется отображением (mapping), поскольку мы «отображаем» данные на частичный результат.

После того как задачи для всех поднаборов решены, мы можем объединить результаты в окончательный ответ. Этот шаг называется редукцией (reducing), потому что «редуцируем» (сводим) несколько ответов в один. Подсчет частоты вхождения слов в большой набор текстовых данных – каноническая задача MapReduce. Если набор данных достаточно велик, то его разбиение на меньшие части может дать выигрыш в производительности, поскольку все операции отображения можно выполнять параллельно.

Разделяемые данные и блокировки

Библиотека multiprocessing поддерживает так называемые объекты разделяемой памяти. Это блок памяти, выделенный так, что к нему могут обращаться разные процессы. Каждый процесс может читать и записывать в этот блок.

Библиотека multiprocessing поддерживает два вида разделяемых данных: значения и массив. Под значением понимается одиночное значение, например целое число или число с плавающей точкой. А массив – это массив одиночных значений. В разделяемой памяти можно хранить только данные типов, определенных в модуле Python array, описанном в документации по адресу https://docs.python.org/3/library/array.html#module-array.

Tip

Состоянием гонки возникает, если исход последовательности операций зависит от того, какая операция заканчивается первой. Можно рассматривать операции как гонку на опережение; если порядок прихода к финишу правильный, то все работает нормально, в противном случае возможно неожиданное поведение.

Reference: What kinds of global value mutation are thread-safe?

Избежать гонки можно, синхронизировав доступ к тем разделяемым данным, которые мы собираемся модифицировать.

Tip

Один из механизмов для синхронизации доступа к разделяемым данным называется блокировкой, или мьютексом (mutex, от mutual exclusion – взаимное исключение). Он позволяет одному процессу заблокировать участок кода, т. е. запретить всем остальным его выполнение. Заблокированный участок обычно называют критической секцией. Если один процесс выполняет код в критической секции, а второй пытаемся выполнить тот же код, то второму придется подождать, пока первый закончит работу и выйдет из критической секции.

Блокировки поддерживают две основные операции: захват и освобождение. Гарантируется, что процесс, захвативший блокировку, – единственный, кто может выполнять код в критической секции. Закончив выполнение кода, требующего синхронизации доступа, мы освобождаем блокировку. Это дает возможность другим процессам захватить блокировку и выполнить код в критической секции. Если процесс попытается выполнить код в секции, заблокированной другим процессом, то будет приостановлен, пока этот другой процесс не освободит блокировку.

Чтобы захватить блокировку, нужно вызвать get_lock().acquire() инстанса Value, а для ее освобождения – метод get_lock().release().

Чтобы избежать гонки, код в критических секциях обязан выполняться последовательно. Это может отрицательно сказаться на производительности многопроцессного кода. Поэтому нужно внимательно следить за тем, чтобы защищать блокировкой только то, что абсолютно необходимо, и не мешать остальному коду выполняться конкурентно.


Глава 7. Решение проблем блокирования с помощью потоков

Блокирующие операции (например, requests.get()) следует выполнять в отдельном потоке.

Решая, сколько потоков нужно в приложении, лучше начать с малого (число ядер плюс еще немного – хорошая отправная точка), протестировать и постепенно увеличивать количество. Все дело в накладных расходах, связанных с потоками. Потоки создаются на уровне операционной системы и обходятся дороже сопрограмм. К тому же у контекстного переключения потоков на уровне ОС тоже есть цена. Сохранение и восстановление состояния потока при контекстном переключении съедает часть выигрыша, полученного от использования потоков.

Для блокировки доступа к данным имеется класс threading.Lock, поддерживающий контекстный менеджер (пример в listing_7_8.py).

Библиотека threading предоставляет реентерабельные (повторно входимые) блокировки. Это специальный вид блокировки, который допускает неоднократный захват из одного потока, позволяя ему «повторно входить» в критические секции. Реентерабельные блокировки реализованы в классе RLock.

На внутреннем уровне реентерабельная блокировка хранит счетчик захватов. При каждом захвате блокировки потоком, захватившим ее впервые, счетчик увеличивается на 1, а при каждом освобождении счетчик уменьшается. Когда счетчик обратится в 0, блокировка освобождается и ее могут захватить другие потоки.

Deadlock (тупиковая ситуация) возникает, когда имеет место неразрешимая конкуренция за разделяемый ресурс, в результате чего приложение зависает. Это называется взаимоблокировкой.

Решение задачи про обедающих философов при помощи Lock.

Tip

Самый простой способ вынести блокирующую работу с IO в отдельный поток, это вызов asyncio.to_thread().


Глава 8. Потоки данных (streams)

В asyncio потоки данных (streams) представляют собой высокоуровневый набор классов и функций для создания и управления сетевыми подключениями и вообще потоками данных. С их помощью мы можем создавать клиентские подключения для чтения и записи данных на сервер. Более того, мы можем сами создавать серверы и управлять ими. Эти API абстрагируют обширные знания, необходимые для управления сокетами, например о работе SSL и о потерянных подключениях.

Потоковые API надстроены над низкоуровневыми API транспорта и протоколов. Они обертывают сокеты (и вообще любой поток данных), предоставляя чистый API для чтения и записи данных.

Устроены эти API несколько иначе, чем прочие; в них используются обратные вызовы. Вместо того чтобы активно ждать данные от сокета, как мы делали раньше, библиотека сама вызовет написанный нами метод класса в момент, когда данные будут доступны.


Глава 9. Веб-приложения

В главе рассмотрено использование асинхронных и синхронных фреймворков для построения веб-приложений: aiohttp, Starlette, Flask, Django, а также серверов Gunicorn и Uvicorn.

Из интересного: на aiohttp можно построить веб-сервер. Сервер WebSockets.


Глава 10. Микросервисы

Основные характеристики микросервисов:

  • они слабо связаны и развёртываются независимо;
  • у каждого есть свой независимый технологический стек, включая модель данных;
  • они взаимодействуют между собой по какому-то протоколу, например REST или gRPC;
  • они следую принципу “одной обязанности”, т.е. микросервис “должен делать что-то одно, но делать это хорошо”.

Паттерн backend-for-frontend подразумевает, что UI не взаимодействует напрямую с несколькими сервисами. Для этого создается дополнительный сервис, который отправляет запросы и агрегирует результаты.

Паттерн “Прерыватель” позволяет исключить вызовы какого-то (необязательного) сервиса при возникновении определенного количества ошибок при обращении к нему за установленное время. Это позволит сократить время ожидания пользователем, если сервис всё равно не работает.

Дополнительные ресуры


Глава 11. Синхронизация

Возможные ошибки в модели однопоточной конкурентности: поток упирается в точку приостановки await, после чего начинает работать другая сопрограмма и модифицирует некоторое разделяемое состояние, так что первая сопрограмма после возобновления столкнется с неожиданностью.

Блокировки asyncio

Главное отличие заключается в том, блокировки asyncio – объекты, допускающие ожидание, которые приостанавливают выполнение сопрограммы, когда заблокированы. Это значит, что если сопрограмма ожидает освобождения блокировки, то может работать другой код. Кроме того, блокировки asyncio являются асинхронными контекстными менеджерами, и предпочтительно использовать их в сочетании с конструкцией async with.

Ограничение уровня конкурентности с помощью семафоров

Семафор похож на блокировку в том смысле, что его можно захватывать и освобождать, а основное отличие заключается в том, что захватить семафор можно не один раз, а несколько, – максимальное число задаем мы сами. Под капотом семафор следит за этим пределом; при каждом захвате предел уменьшается, а при каждом освобождении увеличивается. Как только счетчик обращается в нуль, дальнейшие попытки захватить семафор блокируются, пока кто-то не выполнит операцию освобождения, которая увеличит счетчик. Можно считать, что блокировка – частный случай семафора с пределом 1.

Если требуется ограничить всплески определенным числом запросов в единицу времени, то следует воспользоваться каким-нибудь алгоритмом формирования трафика, например «дырявым ведром» или «корзиной маркеров».

Info

Пожалуй, семафоры – самый практически полезный инструмент из описанных в этой главе.

Уведомление задач с помощью событий

События (asyncio.Event) позволяют сопрограммам приостанавливать своё выполнение до тех пор, пока событие не будет установлено вызовом Event.set().

У событий есть один недостаток, о котором следует помнить: они могут возникать чаще, чем ваши сопрограммы в состоянии на них реагировать. Предположим, что мы используем одно событие для пробуждения нескольких задач в технологическом процессе типа производитель–потребитель. Если все задачи-исполнители заняты в течение длительного времени, то событие может возникнуть, когда мы работаем, и мы его никогда не увидим.

Условия

Условие объединяет некоторые аспекты блокировки и события в один примитив синхронизации, по существу, обертывая поведение того и другого. Сначала мы захватываем блокировку условия, что дает сопрограмме монопольный доступ к разделяемому ресурсу, так что она может безопасно изменять его состояние. Затем мы ждем события с помощью сопрограммы wait или wait_for. Эти сопрограммы освобождают блокировку и блокируют выполнение до возникновения события, после чего заново захватывают блокировку, восстанавливая монопольный доступ.

Условия полезны в ситуациях, когда необходим доступ к разделяемому ресурсу, и перед началом работы требуется получить уведомление о некотором состоянии.


Глава 12. Асинхронные очереди

Суть асинхронной очереди: мы добавляем в очередь данные, нуждающиеся в обработке. Затем несколько исполнителей выбирают данные из очереди по мере поступления.

При использовании очередей asyncio в веб-приложении следует знать, какие могут возникать ошибки. Что, если один из экземпляров нашего API по какой-то причине (например, из-за исчерпания памяти) упадет или нужно будет перезапустить сервер, чтобы заново развернуть приложение? В таком случае необработанные элементы, находившиеся в очереди, будут потеряны, потому что они хранятся только в памяти.

Очередь отделяет порождение данных от их обработки, поскольку один производитель может помещать в очередь элементы, независимо и конкурентно обрабатываемые несколькими потребителями.


Глава 13. Управление подпроцессами

asyncio даёт возможность запускать подпроцессы при помощи asyncio.subprocess.Process.

Модуль subprocess можно использовать для асинхронного запуска подпроцессов с помощью сопрограмм create_subprocess_shell и create_subprocess_exec. Лучше использовать create_subprocess_exec, потому что при этом гарантируется, что поведение программы не будет зависеть от машины, на которой она выполняется.

Метод-сопрограмму communicate можно использовать для подачи данных на стандартный ввод подпроцесса.


Глава 14. Продвинутое использование asyncio

API, допускающие сопрограммы и функции

В asyncio есть функции, которые позволяют определить, является объект сопрограммой, чтобы вызвать его соответствующим образом.

# Listing 14.1
for task in self.tasks:  
    if asyncio.iscoroutinefunction(task):  
        awaitable_tasks.append(asyncio.create_task(task()))  
    elif asyncio.iscoroutine(task):  
        awaitable_tasks.append(asyncio.create_task(task))  
    else:  
        self.loop.call_soon(task)

Контекстные переменные

Для хранения глобальных переменных внутри потока, можно использовать поточно-локальные переменные (Thread-Local Data).

Концепция контекстных переменных описана в PEP 567 – Context Variables. Контекстные переменные похожи на поточно-локальные, но локальны для задачи, а не для потока. Это означает, что если задача создает контекстную переменную, то к ней будет иметь доступ любая внутренняя сопрограмма или задача, созданная внутри задачи-создателя. А никакие задачи вне этой цепочки не смогут ни увидеть, ни модифицировать эту переменную. Это позволяет хранить состояние, связанное с конкретной задачей, не передавая его явно в виде аргумента.

Пример использования см. в листинге 14.2.

Принудительный запуск итерации цикла событий

Вызов asyncio.sleep(0) принудительно запускает следующую итерацию цикла событий, что выливается в немедленное выполнение задачи. Пример см. в листинге 14.3.

Использование других реализаций цикла событий

В качестве цикла событий можно использовать, например, uvloop – она написана на C и работает быстрее.

Создание собственного цикла событий

Неочевидный аспект asyncio заключается в том, что концептуально она отделена от синтаксиса async/await и сопрограмм. Определение класса сопрограммы вообще находится вне модуля asyncio!

==Сопрограммы и синтаксис async/await – концепции, не зависящие от средств их выполнения.== В состав Python входит реализация цикла событий по умолчанию, предлагаемая asyncio,но ничто не мешает использовать любую другую реализацию, в том числе свою собственную.

Нестандартные объекты, допускающие ожидание

Чтобы определить объект, допускающий ожидание, нужно реализовать метод __await__. К методу __await__ предъявляется единственное требование – он должен возвращать итератор.

Объект Future – это обертка вокруг значения, которое будет доступно когда-то в будущем. Поэтому он может иметь два состояния: завершен и не завершен. Допустим, что мы находимся в бесконечном цикле событий и хотим с помощью итератора проверить, завершен ли будущий объект. Если операция завершилась, то можно просто вернуть результат и на этом покончить с итератором. Если нет, нам нужен способ сообщить: «Я еще не закончилась, проверь попозже». И в таком случае итератор может просто отдать себя самого!

Реализация задачи

Задача представляет собой комбинацию будущего объекта и сопрограммы. Будущий объект задачи завершается, когда завершается обернутая им сопрограмма.


📂 Reading | Последнее изменение: 09.12.2023 17:59