26 lines
739 B
Python
26 lines
739 B
Python
import asyncio
|
|
import time
|
|
|
|
|
|
class InMemoryTokenBlacklist:
|
|
def __init__(self) -> None:
|
|
self._items: dict[str, float] = {}
|
|
self._lock = asyncio.Lock()
|
|
|
|
async def add(self, token: str, ttl: int) -> None:
|
|
now = time.time()
|
|
async with self._lock:
|
|
self._cleanup(now)
|
|
self._items[token] = now + ttl
|
|
|
|
async def has(self, token: str) -> bool:
|
|
now = time.time()
|
|
async with self._lock:
|
|
self._cleanup(now)
|
|
return token in self._items
|
|
|
|
def _cleanup(self, now: float) -> None:
|
|
expired = [token for token, expires_at in self._items.items() if expires_at <= now]
|
|
for token in expired:
|
|
self._items.pop(token, None)
|