Praktyczne użycie blokady AsyncIO przy pobieraniu tokena autoryzacyjnego

Grudzień 25, 2024 | #python , #async

010101
001010
000000
100011

Wstępne założenia

Mamy API, do którego dostęp ograniczony jest za pomocą tokena. Rodzaj tokena nie ma większego znaczenia ale istotne jest, że nowy token automatycznie unieważnia wcześniej pobrany token, oraz że ma on określony czas ważności. Zadaniem jest napisanie takiego managera tokenów, który umożliwi bezproblemowe, asynchroniczne wywoływanie żądań do punktów końcowych API wymagających autoryzacji. Dodatkowym wymaganiem jest unikanie zbędnych pobrań tokenów.

Nim przejdziemy do szczegółów - krótko o środowisku i strukturze projektu jaki powstał w trakcie realizacji

Pliki projektu

Podczas realizacji tego mini projektu powstały następujące pliki:

Hierarchia plików

module.py
test_token_manager.py
requirements.py
pytest.ini

Plik requirements.txt

asyncio==3.4.3
pytest==8.3.4
pytest-asyncio==0.25.0
pytest-benchmark==5.1.0

Treść pozostałych plików będzie prezentowana w dalszym ciągu artykułu.

Symulacja API

Charakterystykę wyżej opisanego hipotetycznego API odzwierciedla klasa ApiClient, która symuluje działanie dwóch ednpointów API tj.:

  1. fetch_token - zwracającego token autoryzacyjny
  2. fetch_data - zwracającego jakieś dane, ale - co najważniejsze - wymagającego tokenu autoryzacyjnego i to ostatnio pobranego tokenu.

Plik module.py

from abc import ABC, abstractmethod
import asyncio
from datetime import datetime, timedelta
import logging
import random
from typing import Callable
import uuid


logger = logging.getLogger(__name__)


class TokenManagerInterface(ABC):

    @abstractmethod
    def set_api_client(self):
        pass

    @abstractmethod
    async def get_token(self):
        pass


class ApiClient:
    def __init__(
        self, 
        *, 
        token_manager: TokenManagerInterface, 
        request_delay: Callable = None,
    ):
        token_manager.set_api_client(self)
        self._token_manager = token_manager
        self._request_delay = request_delay or (lambda: random.randint(1, 7))
        self._tokens = []

    async def fetch_token(self) -> tuple[str, datetime]:
        """
        Simulates fetching a token from an API
        """
        request_id = uuid.uuid4().hex[:8]
        logger.debug(f"API:fetch_token:{request_id}: getting token")
        await asyncio.sleep(self._request_delay())
        token = uuid.uuid4().hex[:8]
        self._tokens.append(token)
        logger.debug(f"API:fetch_token:{request_id}: token {token} received")
        return token, datetime.now() + timedelta(seconds=60)
    
    async def fetch_data(self, param: int = 0):
        """
        Simulates getting data using a token
        """
        token = await self._token_manager.get_token()
        logger.debug(f"API:fetch_data:{token}: getting data")

        await asyncio.sleep(self._request_delay())
        
        # If token is not the last one, raise an exception
        if token != self._tokens[-1]:
            logger.debug(f"API:fetch_data:{token}: autorization error")
            raise Exception("Authorization error")
        logger.debug(f"API:fetch_data:{token} data received")
        return {"param": param}

ApiClient w trakcie inicjalizacji przyjmuje dwa parametry. Jednym z nich jest token_manager będący pośrednikiem przy pobieraniu tokena z API. To jego implementacją będziemy zajmować się w tym artykule.

Drugim parametrem jest request_delay będący funkcją zwracającą liczbę sekund lub ułamków sekund - przyda się do symulowania opóźnienia w realizacji żądań wysyłanych do zewnętrznego systemu.

Podczas inicjalizacji tworzony jest także atrybut instancji self._tokens = [], który ma za zadanie rejestrować pobrane z API tokeny. Posłuży do liczenia ich w trakcie testów ale umożliwia też sprawdzenie czy dany token został pobrany jako ostatni - bo tylko ostatni uważany jest za aktywny.

Jak już wspomniałem wyzwaniem będzie prawidłowe zaimplementowanie logiki managera tokenów tak, aby dało się wykonać następujące zadanie:

Plik module.py

async def task_get_multiple_data_concurrently(api_client: ApiClient):
    tasks = [api_client.fetch_data(i) for i in range(3)]
    return await asyncio.gather(*tasks, return_exceptions=True)

Wstępna analiza

Z uwagi na to, że token ma określony czas życia oraz to, że tylko ostatni token jest ważny dobrze byłoby go pobrać raz, zapisać w buforze (ang. cache) i używać tak długo jak się da, aż do chwili kiedy wygaśnie. Pobranie kolejnego tokenu warto odroczyć do chwili kiedy znowu będzie potrzebny. Z tych powodów logikę uzyskiwania (czyli pobierania z API lub brania z cache) i przechowywania tokena chcemy powierzyć managerowi tokenów, który będzie potrzebował jakiegoś obiektu symulującego bufor.

Plik module.py

class Cache:
    def __init__(self):
        self._cache = {}

    async def get(self, key):
        res = self._cache.get(key)
        if res is None:
            logger.debug(f"CACHE:get: token not found")
            return None
        value, expire = res
        is_expired = expire <= datetime.now()
        if is_expired:
            logger.debug(f"CACHE:get: token {value} expired")
            await self.clear(key)
            return None
        logger.debug(f"CACHE:get: found token {value}")
        return value

    async def set(self, key, value, ttl):
        self._cache[key] = value, ttl
        logger.debug(f"CACHE:get: token {value} stored")

    async def clear(self, key):
        self._cache.pop(key)
        logger.debug(f"CACHE:clear: token removed")

Na potrzeby tego doświadczenia wystarczy klasa symulująca bufor jakim docelowo mógłby być np. Redis czy Memcached. Umożliwi ona przeprowadzenie testów i sprawdzenie działania koncepcji (ang. proof of concept) zakładającej unikanie pobierania tokena bezpośrednio z API jeśli tylko jest on dostępny w buforze.

Implementacja token managera bez żadnych blokad

Plik module.py

class TokenManager1(TokenManagerInterface):
    TOKEN_CACHE_KEY = "token"

    def __init__(self, cache: Cache):
        self._api_client = None
        self._cache = cache

    def set_api_client(self, api_client: ApiClient):
        self._api_client = api_client

    async def get_token(self):
        token = await self._receive_token()
        if not token:
            token = await self._fetch_and_store_token()
        return token
    
    async def _receive_token(self):
        return await self._cache.get(self.TOKEN_CACHE_KEY)
    
    async def _fetch_and_store_token(self):
        token, ttl = await self._api_client.fetch_token()
        await self._cache.set(self.TOKEN_CACHE_KEY, token, ttl)
        return token

Klasa TokenManager1 spełnia oczywiście interface TokenManagerInterface i umożliwia przekazanie do niej klienta API za pośrednictwem metody set_api_client. Z kolei metoda get_token umożliwia uzyskanie tokena z bufora (metoda _receive_token) lub też bezpośrednio z API (metoda _fetch_and_store_token). Po pobraniu tokena z API jest on zapisywany do bufora na określony czas po to, aby przy ponownym wywołaniu uniknąć tworzenia niepotrzebnego - potencjalnie bardziej kosztownego - żądania do zewnętrznego systemu.

Testy, które opisuję poniżej wykazuję jednak, że taka implementacja nie radzi sobie w przypadku wykonywania równoległych żądań. Dochodzi do sytuacji wyścigu (ang race condition) o dostęp do tokenu autoryzacyjnego. Efektem tego jest błąd autoryzacji będący wynikiem większości wykonanych asynchronicznie żądań

Implementacja token Managera z użyciem blokady AsyncIO

Rozwiązaniem jest zastosowanie blokady wzajemnego wykluczania (ang. mutual exclusion locks or mutex locks).

Plik module.py

class TokenManager2(TokenManager1):

    def __init__(self, cache: Cache):
        super().__init__(cache)
        self._lock = asyncio.Lock()

    async def get_token(self):
        token = await self._receive_token()
        if token:
            return token
        
        async with self._lock:
            token = await self._receive_token()
            if token:
                return token
            return await self._fetch_and_store_token()

Blokada w AsyncIO sprawia, że przy równoległym wywołaniu metody get_token tylko pierwsze wywołanie inicjuje proces pobierania tokena a wszystkie pozostałe czekają na zdjęcie blokady.

Zauważ, że wywołanie metody _receive_token zwracającej token z bufora zostało ponowione w sekcji async with self._lock ponieważ w przeciwnym wypadku, żądania czekające na zwolnienie blokady - po jej zwolnieniu także zainicjowałyby pobranie tokena z API.

Implementacja token managera z użyciem blokady i taska AsyncIO.

Możemy rozważyć też uruchomienie korutyny (ang. coroutine) _fetch_and_store_token jako asyncio.Task.

Plik module.py

class TokenManager3(TokenManager1):

    def __init__(self, cache: Cache):
        super().__init__(cache)
        self._lock = asyncio.Lock()
        self._token_fetch_task: asyncio.Task | None = None

    async def get_token(self):
        token = await self._receive_token()
        if token:
            return token
        
        async with self._lock:
            if self._token_fetch_task is None:
                self._token_fetch_task = asyncio.create_task(
                    self._fetch_and_store_token()
                )
                logger.debug("Created token fetch task")
            fetch_task = self._token_fetch_task

        try:
            token = await fetch_task
        except Exception as e:
            async with self._lock:
                self._token_fetch_task = None
            logger.error(f"Error fetching token: {e}")
            raise e

        return token

W tej wersji token manager w bloku async with self._lock nie sięga już do bufora, ale tworzony jest task _token_fetch_task, który zrealizuje pobranie tokenu z API. Nowy task tworzony jest tylko raz dla danej instancji token managera. Dzięki temu wszystkie równolegle uruchamiane żądania, które potrzebują tokena czekają na wynik tego samego zadania. Zadanie _token_fetch_task jest resetowane dopiero po jego zakończeniu.

Utworzenie taska _token_fetch_task komplikuje implementację, ale powinno teoretycznie dać większą efektywność i kontrolę nad realizacją zadania asynchronicznego.

Kiedy używasz asyncio.create_task, korutyna jest planowana do uruchomienia współbieżnie z innymi zadaniami. Oznacza to, że podczas gdy jedno zadanie czeka (na przykład, czekając na zwolnienie blokady), inne zadania mogą kontynuować wykonywanie. Użycie obiektu Task daje kilka dodatkowych możliwości. Można anulować zadanie jeśli jest taka potrzeba, sprawdzić jego status, obsłużyć wyjątki niezależnie od głównego przepływu wykonania, dodać coś ekstra do kontekstu zadania, przypisać funkcje zwrotne (ang callback) dla sukcesu itd.

Testy

Przed przystąpieniem do testów trzeba minimalnie dokonfigurować środowisko testowe.

Plik pytest.ini

[pytest]
log_cli_level = DEBUG
asyncio_default_fixture_loop_scope = session

Plik test_token_manager.py

import pytest
import asyncio
from datetime import datetime
import logging
import random
from module import (
    ApiClient,
    Cache,
    TokenManager1,
    TokenManager2,
    TokenManager3,
    task_get_multiple_data_concurrently,
)

@pytest.fixture
def token_manager(token_manager_class):
    return token_manager_class(Cache())


@pytest.fixture
def api_client(token_manager):
    return ApiClient(
        token_manager=token_manager,
        request_delay=lambda: random.randint(1, 7)/10
    )

@pytest.mark.asyncio
@pytest.mark.parametrize("token_manager_class", [
     TokenManager1,
     TokenManager2,
     TokenManager3
])
async def test_token_manager_fetches_token(api_client):
    results = await task_get_multiple_data_concurrently(api_client)
    for result in results:
        if isinstance(result, Exception):
            pytest.fail(f"Error occurred: {result}")
        else:
            assert result is not None, "Result should not be None"
    assert len(api_client._tokens) == 1, "Only one token should be fetched"

Po uruchomieniu

pytest -k test_token_manager_fetches_token

Okazuje się, że TokenManager1 nie przechodzi testu, a analizując debug logi wyraźnie widać kilkukrotną próbę pobrania tokenu i tylko jedno żądanie zakończone sukcesem.

================================== FAILURES =================================
_____________________________________________________________________________ test_token_manager_fetches_token[TokenManager1] _____________________________________________________________________________

api_client = <module.ApiClient object at 0x74ffc42ef4a0>

    @pytest.mark.asyncio
    @pytest.mark.parametrize("token_manager_class", [
        TokenManager1,
        TokenManager2,
        TokenManager3
    ])
    async def test_token_manager_fetches_token(api_client):
        results = await task_get_multiple_data_concurrently(api_client)
        for result in results:
            if isinstance(result, Exception):
>               pytest.fail(f"Error occurred: {result}")
E               Failed: Error occurred: Authorization error

test_token_manager.py:56: Failed
----------------------------- Captured log setup -----------------------------
DEBUG    Using selector: EpollSelector
DEBUG    Using selector: EpollSelector
----------------------------- Captured log call ------------------------------
DEBUG    CACHE:get: token not found
DEBUG    API:fetch_token:c35f07e8: getting token
DEBUG    CACHE:get: token not found
DEBUG    API:fetch_token:0c0384ad: getting token
DEBUG    CACHE:get: token not found
DEBUG    API:fetch_token:47ce2bf2: getting token
DEBUG    API:fetch_token:c35f07e8: token 77dac51f received
DEBUG    CACHE:get: token 77dac51f stored
DEBUG    API:fetch_data:77dac51f: getting data
DEBUG    API:fetch_token:0c0384ad: token 17786b7c received
DEBUG    CACHE:get: token 17786b7c stored
DEBUG    API:fetch_data:17786b7c: getting data
DEBUG    API:fetch_token:47ce2bf2: token da21de1e received
DEBUG    CACHE:get: token da21de1e stored
DEBUG    API:fetch_data:da21de1e: getting data
DEBUG    API:fetch_data:17786b7c: autorization error
DEBUG    API:fetch_data:77dac51f: autorization error
DEBUG    API:fetch_data:da21de1e data received
---------------------------- Captured log teardown ---------------------------
DEBUG    Using selector: EpollSelector
=========================== short test summary info ==========================
FAILED test_token_manager.py::test_token_manager_fetches_token[TokenManager1] - Failed: Error occurred: Authorization error
================= 1 failed, 2 passed, 17 deselected in 2.85s ================

Dla TokenManager2 i TokenManager3 testy przechodzą, ale powyższy test nic nam nie mówi o różnicy w implementacji.

Do zbadania tych różnic napiszmy test, który porówna efektywność wykonania.

Plik test_token_manager.py

@pytest.mark.parametrize("token_manager_class", [TokenManager2, TokenManager3])
def test_token_manager_benchmark(benchmark, token_manager):
    """
    Benchmark comparing the performance of
    TokenManager2 and TokenManager3
    with many concurrent requests.
    """
    async def run_test():
        api_client = ApiClient(token_manager=token_manager, request_delay=lambda: 0.1)

        tasks = [api_client.fetch_data(i) for i in range(50)]
        await asyncio.gather(*tasks, return_exceptions=True)

    def sync_run():
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        try:
            loop.run_until_complete(run_test())
        finally:
            loop.close()

    benchmark(sync_run)

Po uruchomieniu benchmark-a w 10 rundach

pytest -k test_token_manager_benchmark --benchmark-only --benchmark-min-rounds=10

Otrzymane wyniki (pozwoliłem sobie przedstawić je w formie tabeli)

Nazwa Testu Min (ms) Max (ms) Średnia (ms) StdDev (ms) Mediana (ms) IQR (ms) Outliers OPS Rundy Iteracje
TokenManager3 104.48 108.74 107.51 1.2378 107.92 0.9929 1;1 9.3011 10 1
TokenManager2 107.47 110.00 108.40 0.6945 108.43 0.6459 2;1 9.2249 10 1

wskazują na minimalną przewagę TokenManager3 w następujących kategoriach:

  • Czas wykonania (Min, Max, Średnia, Mediana) - naturalnie im niższy tym lepiej
  • Outliers - liczba wartości znacznie odstających od pozostałych. Ich niewielka liczba jest dopuszczalna natomiast wysoka sugeruje niestabilność implementacji albo problemy z testami.
  • OPS - operacje na sekundę - więcej operacji na sekundę skraca czas wykonania

z kolei TokenManager2 zanotował wygraną w:

  • StdDev (Odchylenie standardowe) - mierzy, jak bardzo poszczególne wyniki testów różnią się od średniej wartości. Niskie wyniki wskazują na wysoką powtarzalność i stabilność testów.
  • IQR (Interquartile Range - Rozpiętość międzykwartylowa) IQR to miara rozproszenia, która wskazuje różnicę między 75. a 25. percentylem danych. Oznacza to, że IQR obejmuje środkowe 50% wyników. Niskie IRQ oznacza, że większość wyników znajduje się blisko mediany, co sugeruje małą zmienność

Podsumowanie

Testy wykazały, że asyncio.Lock okazał się niezbędny do takiego pobierania tokena autoryzacyjnego aby bezproblemowo udało się zrealizować kilka równocześnie wykonywanych żądań, z których każdy do pomyślnego zakończenia wymagał ostatnio pobranej wartości tokena.

Z kolei przedstawiony wynik benchmarku porównujący dwie implementacje stosujące blokadę mutex dał minimalne zwycięstwo TokenManager3. Na koniec dodam jednak, że wielokrotnie uruchamiany benchmark, dawał różne wyniki i nie zawsze trzecia implementacja była na podium. Natomiast to co powtarzało się między wywołaniami, to niewielka różnica osiąganych rezultatów czego wnioskiem jest, że dla tak skonstruowanego testu, oba rozwiązania są podobnie szybkie i stabilne. Wykazanie większych różnic i korzyści na rzecz jednego lub drugiego rozwiązania wymagałoby napisania bardziej złożonych testów zapewne także ze współdziałaniem korutyn, które nie potrzebują tokenu, a zatem założona blokada powinna mieć jak najmniejszy wpływ na ich działanie.

Akceptuję Ta strona zapisuje niewielkie pliki tekstowe, nazywane ciasteczkami (ang. cookies) na Twoim urządzeniu w celu lepszego dostosowania treści oraz dla celów statystycznych. Możesz wyłączyć możliwość ich zapisu, zmieniając ustawienia Twojej przeglądarki. Korzystanie z tej strony bez zmiany ustawień oznacza zgodę na przechowywanie cookies w Twoim urządzeniu.