Logo
Technical Article

Concurrency Patterns for Booking Systems — Python

11 min read

Concurrency Patterns for Booking Systems — Python#

SDE-3 Interview Prep | Slice Fintech | DSA & Coding Round


The Core Problem#

A booking system must enforce:

Total booked ≤ Total capacity
No two users can book the same unit simultaneously

Python has two concurrency models you must know cold:

  • Threading — true parallelism blocked by the GIL (I/O-bound tasks)
  • Asyncio — cooperative multitasking (event loop, coroutines)
  • Multiprocessing — true CPU parallelism (bypasses GIL)

Pattern 1 — Threading Lock (Pessimistic)#

1a. Global Lock (coarse-grained)#

import threading
from typing import Dict

class BookingService:
    def __init__(self):
        self.inventory: Dict[str, int] = {}
        self._lock = threading.Lock()

    def book(self, item_id: str, quantity: int) -> bool:
        with self._lock:  # context manager — auto-releases even on exception
            available = self.inventory.get(item_id, 0)
            if available < quantity:
                return False
            self.inventory[item_id] = available - quantity
            return True

    def release(self, item_id: str, quantity: int) -> None:
        with self._lock:
            self.inventory[item_id] = self.inventory.get(item_id, 0) + quantity

Problem: All items share one lock. Booking seat A1 blocks booking seat B5.


1b. Fine-Grained Lock (per-item)#

import threading
from collections import defaultdict
from typing import Dict

class BookingService:
    def __init__(self):
        self.inventory: Dict[str, int] = {}
        self._locks: Dict[str, threading.Lock] = defaultdict(threading.Lock)

    def book(self, item_id: str, quantity: int) -> bool:
        with self._locks[item_id]:  # only locks THIS item
            available = self.inventory.get(item_id, 0)
            if available < quantity:
                return False
            self.inventory[item_id] = available - quantity
            return True

    def release(self, item_id: str, quantity: int) -> None:
        with self._locks[item_id]:
            self.inventory[item_id] = self.inventory.get(item_id, 0) + quantity

⚠️ SDE-3 gotcha: defaultdict(threading.Lock) is NOT thread-safe for creation. In high concurrency, use threading.Lock() to guard the dict itself:

class SafeBookingService:
    def __init__(self):
        self.inventory: Dict[str, int] = {}
        self._locks: Dict[str, threading.Lock] = {}
        self._meta_lock = threading.Lock()  # guards lock creation

    def _get_lock(self, item_id: str) -> threading.Lock:
        with self._meta_lock:
            if item_id not in self._locks:
                self._locks[item_id] = threading.Lock()
            return self._locks[item_id]

    def book(self, item_id: str, quantity: int) -> bool:
        lock = self._get_lock(item_id)
        with lock:
            available = self.inventory.get(item_id, 0)
            if available < quantity:
                return False
            self.inventory[item_id] = available - quantity
            return True

1c. RLock (Reentrant Lock — same thread can re-acquire)#

class BookingServiceWithRLock:
    def __init__(self):
        self.inventory: Dict[str, int] = {}
        self._lock = threading.RLock()  # same thread can acquire multiple times

    def book(self, item_id: str, quantity: int) -> bool:
        with self._lock:
            if not self._validate(item_id, quantity):
                return False
            # _validate also acquires _lock — works with RLock, deadlocks with Lock
            return self._commit(item_id, quantity)

    def _validate(self, item_id: str, quantity: int) -> bool:
        with self._lock:  # safe because RLock
            return self.inventory.get(item_id, 0) >= quantity

    def _commit(self, item_id: str, quantity: int) -> bool:
        with self._lock:  # safe because RLock
            self.inventory[item_id] = self.inventory.get(item_id, 0) - quantity
            return True

Pattern 2 — ReadWriteLock (Read-Heavy Systems)#

Python's threading has no built-in RWLock. Implement it:

import threading

class ReadWriteLock:
    """Multiple readers OR one writer at a time."""

    def __init__(self):
        self._read_ready = threading.Condition(threading.Lock())
        self._readers = 0

    def acquire_read(self):
        with self._read_ready:
            self._readers += 1

    def release_read(self):
        with self._read_ready:
            self._readers -= 1
            if self._readers == 0:
                self._read_ready.notify_all()

    def acquire_write(self):
        self._read_ready.acquire()
        while self._readers > 0:
            self._read_ready.wait()

    def release_write(self):
        self._read_ready.release()


class InventoryService:
    def __init__(self):
        self.inventory: Dict[str, int] = {}
        self._rwlock = ReadWriteLock()

    def check_availability(self, item_id: str) -> int:
        """Many threads can call this simultaneously."""
        self._rwlock.acquire_read()
        try:
            return self.inventory.get(item_id, 0)
        finally:
            self._rwlock.release_read()

    def book(self, item_id: str, quantity: int) -> bool:
        """Exclusive write access."""
        self._rwlock.acquire_write()
        try:
            available = self.inventory.get(item_id, 0)
            if available < quantity:
                return False
            self.inventory[item_id] = available - quantity
            return True
        finally:
            self._rwlock.release_write()

Pattern 3 — Semaphore (Capacity Limiting)#

import threading
from typing import Set

class SeatBookingService:
    def __init__(self, total_seats: int):
        self._semaphore = threading.Semaphore(total_seats)
        self._booked_seats: Set[str] = set()
        self._lock = threading.Lock()

    def book_seat(self, seat_id: str) -> bool:
        # Acquire a permit (blocks if 0 permits available)
        acquired = self._semaphore.acquire(blocking=True, timeout=0.5)
        if not acquired:
            return False  # no seats left within timeout

        with self._lock:
            if seat_id in self._booked_seats:
                self._semaphore.release()  # return permit — seat already taken
                return False
            self._booked_seats.add(seat_id)
            return True

    def cancel_seat(self, seat_id: str) -> None:
        with self._lock:
            if seat_id in self._booked_seats:
                self._booked_seats.discard(seat_id)
                self._semaphore.release()  # return the permit


# Usage
service = SeatBookingService(total_seats=50)

def try_book(seat_id: str):
    result = service.book_seat(seat_id)
    print(f"Seat {seat_id}: {'✓ booked' if result else '✗ failed'}")

threads = [threading.Thread(target=try_book, args=(f"A{i}",)) for i in range(60)]
for t in threads:
    t.start()
for t in threads:
    t.join()

Pattern 4 — Condition Variable (Wait/Notify)#

import threading
from collections import deque

class BoundedBookingQueue:
    """Block producers when queue is full; block consumers when empty."""

    def __init__(self, max_size: int):
        self._queue: deque = deque()
        self._max_size = max_size
        self._lock = threading.Lock()
        self._not_full = threading.Condition(self._lock)
        self._not_empty = threading.Condition(self._lock)

    def put(self, booking_request: dict) -> None:
        with self._not_full:
            while len(self._queue) >= self._max_size:
                self._not_full.wait()  # release lock and sleep
            self._queue.append(booking_request)
            self._not_empty.notify()

    def get(self) -> dict:
        with self._not_empty:
            while len(self._queue) == 0:
                self._not_empty.wait()  # release lock and sleep
            item = self._queue.popleft()
            self._not_full.notify()
            return item


# Or just use the stdlib version
import queue

booking_queue = queue.Queue(maxsize=1000)  # thread-safe, bounded

def producer(request):
    booking_queue.put(request, timeout=1.0)  # blocks if full

def worker():
    while True:
        request = booking_queue.get()  # blocks if empty
        try:
            process_booking(request)
        finally:
            booking_queue.task_done()  # signal completion

Pattern 5 — Idempotency Token (Fintech Critical)#

Most important pattern for Slice. Prevents double-booking from retried requests.

import threading
import time
import uuid
from typing import Optional
from dataclasses import dataclass

@dataclass
class BookingResult:
    success: bool
    booking_id: Optional[str]
    message: str

class IdempotentBookingService:
    def __init__(self):
        self._processed: Dict[str, BookingResult] = {}  # key → result
        self._in_flight: Dict[str, threading.Event] = {}  # key → event
        self._lock = threading.Lock()
        self.inventory: Dict[str, int] = {}

    def book(self, idempotency_key: str, item_id: str, quantity: int) -> BookingResult:
        # Fast path — already processed
        with self._lock:
            if idempotency_key in self._processed:
                return self._processed[idempotency_key]

            # In-flight — wait for the first thread to finish
            if idempotency_key in self._in_flight:
                event = self._in_flight[idempotency_key]
            else:
                # First thread — take ownership
                event = threading.Event()
                self._in_flight[idempotency_key] = event
                event = None  # signal that THIS thread should process

        if event is not None:
            event.wait(timeout=5.0)  # wait for first thread
            with self._lock:
                return self._processed.get(idempotency_key,
                    BookingResult(False, None, "Processing timed out"))

        # This thread owns the processing
        try:
            result = self._process_booking(item_id, quantity)
            with self._lock:
                self._processed[idempotency_key] = result
            return result
        finally:
            with self._lock:
                event = self._in_flight.pop(idempotency_key, None)
            if event:
                event.set()  # wake up waiting threads

    def _process_booking(self, item_id: str, quantity: int) -> BookingResult:
        available = self.inventory.get(item_id, 0)
        if available < quantity:
            return BookingResult(False, None, "Insufficient inventory")
        self.inventory[item_id] = available - quantity
        booking_id = f"BKG-{uuid.uuid4().hex[:8].upper()}"
        return BookingResult(True, booking_id, "Booking confirmed")

Pattern 6 — Asyncio (Async/Await)#

For high-throughput I/O-bound booking services.

import asyncio
import time
from typing import Dict, Optional

class AsyncMutex:
    """Async-compatible mutex."""
    def __init__(self):
        self._lock = asyncio.Lock()

    async def __aenter__(self):
        await self._lock.acquire()
        return self

    async def __aexit__(self, *args):
        self._lock.release()


class AsyncBookingService:
    def __init__(self):
        self.inventory: Dict[str, int] = {}
        self._locks: Dict[str, asyncio.Lock] = {}

    def _get_lock(self, item_id: str) -> asyncio.Lock:
        if item_id not in self._locks:
            self._locks[item_id] = asyncio.Lock()
        return self._locks[item_id]

    async def book(self, item_id: str, quantity: int) -> bool:
        async with self._get_lock(item_id):
            available = self.inventory.get(item_id, 0)
            if available < quantity:
                return False
            await asyncio.sleep(0)  # yield to event loop (simulate DB write)
            self.inventory[item_id] = available - quantity
            return True

    async def book_with_timeout(self, item_id: str, quantity: int, timeout: float) -> bool:
        try:
            return await asyncio.wait_for(self.book(item_id, quantity), timeout=timeout)
        except asyncio.TimeoutError:
            raise TimeoutError(f"Booking timed out after {timeout}s")


# Async Semaphore — limit concurrent DB connections
class AsyncInventoryService:
    def __init__(self, max_concurrent: int = 10):
        self._semaphore = asyncio.Semaphore(max_concurrent)
        self.inventory: Dict[str, int] = {}

    async def check_and_book(self, item_id: str, quantity: int) -> bool:
        async with self._semaphore:  # at most max_concurrent concurrent DB ops
            await asyncio.sleep(0.01)  # simulate DB query
            available = self.inventory.get(item_id, 0)
            if available < quantity:
                return False
            self.inventory[item_id] = available - quantity
            return True


# Run multiple bookings concurrently
async def main():
    service = AsyncBookingService()
    service.inventory["concert-A"] = 10

    tasks = [service.book("concert-A", 1) for _ in range(15)]
    results = await asyncio.gather(*tasks)
    booked = sum(results)
    print(f"Booked: {booked}/10 succeeded")  # exactly 10


asyncio.run(main())

Pattern 7 — Distributed Lock (Redis)#

For multi-process / multi-pod deployments.

import redis
import uuid
import time

class RedisDistributedLock:
    """Redlock-inspired distributed lock using Redis SET NX PX."""

    def __init__(self, redis_client: redis.Redis, ttl_ms: int = 5000):
        self._client = redis_client
        self._ttl_ms = ttl_ms

    def acquire(self, lock_key: str, token: str) -> bool:
        """Atomic SET NX PX — returns True if lock acquired."""
        result = self._client.set(
            f"lock:{lock_key}",
            token,
            nx=True,          # only set if NOT exists
            px=self._ttl_ms   # auto-expire (prevents deadlock on crash)
        )
        return result is True

    def release(self, lock_key: str, token: str) -> bool:
        """Lua script ensures we only delete OUR lock."""
        script = """
        if redis.call("get", KEYS[1]) == ARGV[1] then
            return redis.call("del", KEYS[1])
        else
            return 0
        end
        """
        result = self._client.eval(script, 1, f"lock:{lock_key}", token)
        return result == 1


class DistributedBookingService:
    def __init__(self, redis_url: str = "redis://localhost:6379"):
        self._redis = redis.from_url(redis_url)
        self._lock = RedisDistributedLock(self._redis)

    def book(self, item_id: str, user_id: str) -> bool:
        token = f"{user_id}-{uuid.uuid4()}"  # unique per attempt
        acquired = self._lock.acquire(item_id, token)

        if not acquired:
            raise RuntimeError("Resource locked. Retry later.")

        try:
            return self._process_booking(item_id, user_id)
        finally:
            self._lock.release(item_id, token)

    def _process_booking(self, item_id: str, user_id: str) -> bool:
        # DB operation — safe across multiple pods
        return True

Pattern 8 — Retry with Exponential Backoff#

import time
import random
from functools import wraps
from typing import Callable, TypeVar, Any

T = TypeVar("T")

def retry_with_backoff(
    max_retries: int = 3,
    base_delay: float = 0.1,
    max_delay: float = 10.0,
    exceptions: tuple = (Exception,)
):
    """Decorator for exponential backoff with jitter."""
    def decorator(func: Callable) -> Callable:
        @wraps(func)
        def wrapper(*args, **kwargs) -> Any:
            for attempt in range(max_retries + 1):
                try:
                    return func(*args, **kwargs)
                except exceptions as e:
                    if attempt == max_retries:
                        raise
                    delay = min(base_delay * (2 ** attempt), max_delay)
                    jitter = random.uniform(0, delay * 0.1)  # ±10% jitter
                    print(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay:.2f}s")
                    time.sleep(delay + jitter)
        return wrapper
    return decorator


@retry_with_backoff(max_retries=3, base_delay=0.1, exceptions=(RuntimeError,))
def book_with_distributed_lock(item_id: str, user_id: str) -> bool:
    # Will automatically retry 3 times with exponential backoff
    service = DistributedBookingService()
    return service.book(item_id, user_id)

Pattern 9 — ThreadPoolExecutor (Concurrent Processing)#

from concurrent.futures import ThreadPoolExecutor, as_completed, Future
from typing import List
import threading

class BookingBatchProcessor:
    def __init__(self, max_workers: int = 4):
        self._executor = ThreadPoolExecutor(max_workers=max_workers)
        self._service = SafeBookingService()

    def process_batch(self, requests: List[dict]) -> List[bool]:
        """Process all requests concurrently, return results in order."""
        futures: List[Future] = []
        for req in requests:
            future = self._executor.submit(
                self._service.book,
                req["item_id"],
                req["quantity"]
            )
            futures.append(future)

        return [f.result() for f in futures]  # blocks until all done

    def process_with_timeout(self, requests: List[dict], timeout: float) -> List[bool]:
        """Process with per-request timeout."""
        futures = {
            self._executor.submit(self._service.book, req["item_id"], req["quantity"]): i
            for i, req in enumerate(requests)
        }
        results = [False] * len(requests)
        for future in as_completed(futures, timeout=timeout):
            idx = futures[future]
            try:
                results[idx] = future.result()
            except Exception as e:
                results[idx] = False
        return results

    def __enter__(self):
        return self

    def __exit__(self, *args):
        self._executor.shutdown(wait=True)


# Usage
with BookingBatchProcessor(max_workers=4) as processor:
    requests = [
        {"item_id": f"seat-{i}", "quantity": 1}
        for i in range(20)
    ]
    results = processor.process_batch(requests)
    print(f"Successful bookings: {sum(results)}/{len(requests)}")

Decision Framework#

Is this I/O-bound (DB, network)?
├── Single-process → asyncio + asyncio.Lock / asyncio.Semaphore
└── Multi-threaded → threading.Lock + ThreadPoolExecutor

Is this CPU-bound?
└── multiprocessing (bypasses GIL)

Contention HIGH on same resource?
└── threading.Lock (pessimistic)

Contention LOW, mostly reads?
└── ReadWriteLock (many readers, one writer)

Need to prevent DUPLICATE requests?
└── Idempotency map + threading.Event

Need CAPACITY limiting (N slots)?
└── threading.Semaphore or asyncio.Semaphore

Need MULTI-PROCESS / multi-pod safety?
└── Redis distributed lock (SET NX PX + Lua release)

Need retry on failure?
└── Exponential backoff decorator

GIL Cheat Sheet (SDE-3 Must Know)#

ScenarioUse
I/O-bound concurrent tasksthreading or asyncio
CPU-bound parallel tasksmultiprocessing
High-throughput async I/Oasyncio + aiohttp
Shared state between threadsthreading.Lock
Shared state between processesmultiprocessing.Manager or Redis

Interview Cheat Sheet — What Slice SDE-3 Interviewers Want to Hear#

  1. Identify the race condition first — describe the problem before the solution
  2. Mention GIL awareness — know when threading helps vs. doesn't
  3. Idempotency unprompted — this signals fintech engineering maturity
  4. Always use context managers (with lock:) — never manual acquire/release
  5. Mention Redis for distributed — single-process locks don't work across pods
  6. Quantify — "handles ~3000 RPS with p99 < 100ms under normal load"

Related Posts