Concurrency Patterns for Booking Systems — Python#
SDE-3 Interview Prep | Slice Fintech | DSA & Coding Round
The Core Problem#
A booking system must enforce:
Total booked ≤ Total capacity
No two users can book the same unit simultaneously
Python has two concurrency models you must know cold:
- Threading — true parallelism blocked by the GIL (I/O-bound tasks)
- Asyncio — cooperative multitasking (event loop, coroutines)
- Multiprocessing — true CPU parallelism (bypasses GIL)
Pattern 1 — Threading Lock (Pessimistic)#
1a. Global Lock (coarse-grained)#
import threading
from typing import Dict
class BookingService:
def __init__(self):
self.inventory: Dict[str, int] = {}
self._lock = threading.Lock()
def book(self, item_id: str, quantity: int) -> bool:
with self._lock: # context manager — auto-releases even on exception
available = self.inventory.get(item_id, 0)
if available < quantity:
return False
self.inventory[item_id] = available - quantity
return True
def release(self, item_id: str, quantity: int) -> None:
with self._lock:
self.inventory[item_id] = self.inventory.get(item_id, 0) + quantity
Problem: All items share one lock. Booking seat A1 blocks booking seat B5.
1b. Fine-Grained Lock (per-item)#
import threading
from collections import defaultdict
from typing import Dict
class BookingService:
def __init__(self):
self.inventory: Dict[str, int] = {}
self._locks: Dict[str, threading.Lock] = defaultdict(threading.Lock)
def book(self, item_id: str, quantity: int) -> bool:
with self._locks[item_id]: # only locks THIS item
available = self.inventory.get(item_id, 0)
if available < quantity:
return False
self.inventory[item_id] = available - quantity
return True
def release(self, item_id: str, quantity: int) -> None:
with self._locks[item_id]:
self.inventory[item_id] = self.inventory.get(item_id, 0) + quantity
⚠️ SDE-3 gotcha: defaultdict(threading.Lock) is NOT thread-safe for creation. In high concurrency, use threading.Lock() to guard the dict itself:
class SafeBookingService:
def __init__(self):
self.inventory: Dict[str, int] = {}
self._locks: Dict[str, threading.Lock] = {}
self._meta_lock = threading.Lock() # guards lock creation
def _get_lock(self, item_id: str) -> threading.Lock:
with self._meta_lock:
if item_id not in self._locks:
self._locks[item_id] = threading.Lock()
return self._locks[item_id]
def book(self, item_id: str, quantity: int) -> bool:
lock = self._get_lock(item_id)
with lock:
available = self.inventory.get(item_id, 0)
if available < quantity:
return False
self.inventory[item_id] = available - quantity
return True
1c. RLock (Reentrant Lock — same thread can re-acquire)#
class BookingServiceWithRLock:
def __init__(self):
self.inventory: Dict[str, int] = {}
self._lock = threading.RLock() # same thread can acquire multiple times
def book(self, item_id: str, quantity: int) -> bool:
with self._lock:
if not self._validate(item_id, quantity):
return False
# _validate also acquires _lock — works with RLock, deadlocks with Lock
return self._commit(item_id, quantity)
def _validate(self, item_id: str, quantity: int) -> bool:
with self._lock: # safe because RLock
return self.inventory.get(item_id, 0) >= quantity
def _commit(self, item_id: str, quantity: int) -> bool:
with self._lock: # safe because RLock
self.inventory[item_id] = self.inventory.get(item_id, 0) - quantity
return True
Pattern 2 — ReadWriteLock (Read-Heavy Systems)#
Python's threading has no built-in RWLock. Implement it:
import threading
class ReadWriteLock:
"""Multiple readers OR one writer at a time."""
def __init__(self):
self._read_ready = threading.Condition(threading.Lock())
self._readers = 0
def acquire_read(self):
with self._read_ready:
self._readers += 1
def release_read(self):
with self._read_ready:
self._readers -= 1
if self._readers == 0:
self._read_ready.notify_all()
def acquire_write(self):
self._read_ready.acquire()
while self._readers > 0:
self._read_ready.wait()
def release_write(self):
self._read_ready.release()
class InventoryService:
def __init__(self):
self.inventory: Dict[str, int] = {}
self._rwlock = ReadWriteLock()
def check_availability(self, item_id: str) -> int:
"""Many threads can call this simultaneously."""
self._rwlock.acquire_read()
try:
return self.inventory.get(item_id, 0)
finally:
self._rwlock.release_read()
def book(self, item_id: str, quantity: int) -> bool:
"""Exclusive write access."""
self._rwlock.acquire_write()
try:
available = self.inventory.get(item_id, 0)
if available < quantity:
return False
self.inventory[item_id] = available - quantity
return True
finally:
self._rwlock.release_write()
Pattern 3 — Semaphore (Capacity Limiting)#
import threading
from typing import Set
class SeatBookingService:
def __init__(self, total_seats: int):
self._semaphore = threading.Semaphore(total_seats)
self._booked_seats: Set[str] = set()
self._lock = threading.Lock()
def book_seat(self, seat_id: str) -> bool:
# Acquire a permit (blocks if 0 permits available)
acquired = self._semaphore.acquire(blocking=True, timeout=0.5)
if not acquired:
return False # no seats left within timeout
with self._lock:
if seat_id in self._booked_seats:
self._semaphore.release() # return permit — seat already taken
return False
self._booked_seats.add(seat_id)
return True
def cancel_seat(self, seat_id: str) -> None:
with self._lock:
if seat_id in self._booked_seats:
self._booked_seats.discard(seat_id)
self._semaphore.release() # return the permit
# Usage
service = SeatBookingService(total_seats=50)
def try_book(seat_id: str):
result = service.book_seat(seat_id)
print(f"Seat {seat_id}: {'✓ booked' if result else '✗ failed'}")
threads = [threading.Thread(target=try_book, args=(f"A{i}",)) for i in range(60)]
for t in threads:
t.start()
for t in threads:
t.join()
Pattern 4 — Condition Variable (Wait/Notify)#
import threading
from collections import deque
class BoundedBookingQueue:
"""Block producers when queue is full; block consumers when empty."""
def __init__(self, max_size: int):
self._queue: deque = deque()
self._max_size = max_size
self._lock = threading.Lock()
self._not_full = threading.Condition(self._lock)
self._not_empty = threading.Condition(self._lock)
def put(self, booking_request: dict) -> None:
with self._not_full:
while len(self._queue) >= self._max_size:
self._not_full.wait() # release lock and sleep
self._queue.append(booking_request)
self._not_empty.notify()
def get(self) -> dict:
with self._not_empty:
while len(self._queue) == 0:
self._not_empty.wait() # release lock and sleep
item = self._queue.popleft()
self._not_full.notify()
return item
# Or just use the stdlib version
import queue
booking_queue = queue.Queue(maxsize=1000) # thread-safe, bounded
def producer(request):
booking_queue.put(request, timeout=1.0) # blocks if full
def worker():
while True:
request = booking_queue.get() # blocks if empty
try:
process_booking(request)
finally:
booking_queue.task_done() # signal completion
Pattern 5 — Idempotency Token (Fintech Critical)#
Most important pattern for Slice. Prevents double-booking from retried requests.
import threading
import time
import uuid
from typing import Optional
from dataclasses import dataclass
@dataclass
class BookingResult:
success: bool
booking_id: Optional[str]
message: str
class IdempotentBookingService:
def __init__(self):
self._processed: Dict[str, BookingResult] = {} # key → result
self._in_flight: Dict[str, threading.Event] = {} # key → event
self._lock = threading.Lock()
self.inventory: Dict[str, int] = {}
def book(self, idempotency_key: str, item_id: str, quantity: int) -> BookingResult:
# Fast path — already processed
with self._lock:
if idempotency_key in self._processed:
return self._processed[idempotency_key]
# In-flight — wait for the first thread to finish
if idempotency_key in self._in_flight:
event = self._in_flight[idempotency_key]
else:
# First thread — take ownership
event = threading.Event()
self._in_flight[idempotency_key] = event
event = None # signal that THIS thread should process
if event is not None:
event.wait(timeout=5.0) # wait for first thread
with self._lock:
return self._processed.get(idempotency_key,
BookingResult(False, None, "Processing timed out"))
# This thread owns the processing
try:
result = self._process_booking(item_id, quantity)
with self._lock:
self._processed[idempotency_key] = result
return result
finally:
with self._lock:
event = self._in_flight.pop(idempotency_key, None)
if event:
event.set() # wake up waiting threads
def _process_booking(self, item_id: str, quantity: int) -> BookingResult:
available = self.inventory.get(item_id, 0)
if available < quantity:
return BookingResult(False, None, "Insufficient inventory")
self.inventory[item_id] = available - quantity
booking_id = f"BKG-{uuid.uuid4().hex[:8].upper()}"
return BookingResult(True, booking_id, "Booking confirmed")
Pattern 6 — Asyncio (Async/Await)#
For high-throughput I/O-bound booking services.
import asyncio
import time
from typing import Dict, Optional
class AsyncMutex:
"""Async-compatible mutex."""
def __init__(self):
self._lock = asyncio.Lock()
async def __aenter__(self):
await self._lock.acquire()
return self
async def __aexit__(self, *args):
self._lock.release()
class AsyncBookingService:
def __init__(self):
self.inventory: Dict[str, int] = {}
self._locks: Dict[str, asyncio.Lock] = {}
def _get_lock(self, item_id: str) -> asyncio.Lock:
if item_id not in self._locks:
self._locks[item_id] = asyncio.Lock()
return self._locks[item_id]
async def book(self, item_id: str, quantity: int) -> bool:
async with self._get_lock(item_id):
available = self.inventory.get(item_id, 0)
if available < quantity:
return False
await asyncio.sleep(0) # yield to event loop (simulate DB write)
self.inventory[item_id] = available - quantity
return True
async def book_with_timeout(self, item_id: str, quantity: int, timeout: float) -> bool:
try:
return await asyncio.wait_for(self.book(item_id, quantity), timeout=timeout)
except asyncio.TimeoutError:
raise TimeoutError(f"Booking timed out after {timeout}s")
# Async Semaphore — limit concurrent DB connections
class AsyncInventoryService:
def __init__(self, max_concurrent: int = 10):
self._semaphore = asyncio.Semaphore(max_concurrent)
self.inventory: Dict[str, int] = {}
async def check_and_book(self, item_id: str, quantity: int) -> bool:
async with self._semaphore: # at most max_concurrent concurrent DB ops
await asyncio.sleep(0.01) # simulate DB query
available = self.inventory.get(item_id, 0)
if available < quantity:
return False
self.inventory[item_id] = available - quantity
return True
# Run multiple bookings concurrently
async def main():
service = AsyncBookingService()
service.inventory["concert-A"] = 10
tasks = [service.book("concert-A", 1) for _ in range(15)]
results = await asyncio.gather(*tasks)
booked = sum(results)
print(f"Booked: {booked}/10 succeeded") # exactly 10
asyncio.run(main())
Pattern 7 — Distributed Lock (Redis)#
For multi-process / multi-pod deployments.
import redis
import uuid
import time
class RedisDistributedLock:
"""Redlock-inspired distributed lock using Redis SET NX PX."""
def __init__(self, redis_client: redis.Redis, ttl_ms: int = 5000):
self._client = redis_client
self._ttl_ms = ttl_ms
def acquire(self, lock_key: str, token: str) -> bool:
"""Atomic SET NX PX — returns True if lock acquired."""
result = self._client.set(
f"lock:{lock_key}",
token,
nx=True, # only set if NOT exists
px=self._ttl_ms # auto-expire (prevents deadlock on crash)
)
return result is True
def release(self, lock_key: str, token: str) -> bool:
"""Lua script ensures we only delete OUR lock."""
script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
result = self._client.eval(script, 1, f"lock:{lock_key}", token)
return result == 1
class DistributedBookingService:
def __init__(self, redis_url: str = "redis://localhost:6379"):
self._redis = redis.from_url(redis_url)
self._lock = RedisDistributedLock(self._redis)
def book(self, item_id: str, user_id: str) -> bool:
token = f"{user_id}-{uuid.uuid4()}" # unique per attempt
acquired = self._lock.acquire(item_id, token)
if not acquired:
raise RuntimeError("Resource locked. Retry later.")
try:
return self._process_booking(item_id, user_id)
finally:
self._lock.release(item_id, token)
def _process_booking(self, item_id: str, user_id: str) -> bool:
# DB operation — safe across multiple pods
return True
Pattern 8 — Retry with Exponential Backoff#
import time
import random
from functools import wraps
from typing import Callable, TypeVar, Any
T = TypeVar("T")
def retry_with_backoff(
max_retries: int = 3,
base_delay: float = 0.1,
max_delay: float = 10.0,
exceptions: tuple = (Exception,)
):
"""Decorator for exponential backoff with jitter."""
def decorator(func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs) -> Any:
for attempt in range(max_retries + 1):
try:
return func(*args, **kwargs)
except exceptions as e:
if attempt == max_retries:
raise
delay = min(base_delay * (2 ** attempt), max_delay)
jitter = random.uniform(0, delay * 0.1) # ±10% jitter
print(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay:.2f}s")
time.sleep(delay + jitter)
return wrapper
return decorator
@retry_with_backoff(max_retries=3, base_delay=0.1, exceptions=(RuntimeError,))
def book_with_distributed_lock(item_id: str, user_id: str) -> bool:
# Will automatically retry 3 times with exponential backoff
service = DistributedBookingService()
return service.book(item_id, user_id)
Pattern 9 — ThreadPoolExecutor (Concurrent Processing)#
from concurrent.futures import ThreadPoolExecutor, as_completed, Future
from typing import List
import threading
class BookingBatchProcessor:
def __init__(self, max_workers: int = 4):
self._executor = ThreadPoolExecutor(max_workers=max_workers)
self._service = SafeBookingService()
def process_batch(self, requests: List[dict]) -> List[bool]:
"""Process all requests concurrently, return results in order."""
futures: List[Future] = []
for req in requests:
future = self._executor.submit(
self._service.book,
req["item_id"],
req["quantity"]
)
futures.append(future)
return [f.result() for f in futures] # blocks until all done
def process_with_timeout(self, requests: List[dict], timeout: float) -> List[bool]:
"""Process with per-request timeout."""
futures = {
self._executor.submit(self._service.book, req["item_id"], req["quantity"]): i
for i, req in enumerate(requests)
}
results = [False] * len(requests)
for future in as_completed(futures, timeout=timeout):
idx = futures[future]
try:
results[idx] = future.result()
except Exception as e:
results[idx] = False
return results
def __enter__(self):
return self
def __exit__(self, *args):
self._executor.shutdown(wait=True)
# Usage
with BookingBatchProcessor(max_workers=4) as processor:
requests = [
{"item_id": f"seat-{i}", "quantity": 1}
for i in range(20)
]
results = processor.process_batch(requests)
print(f"Successful bookings: {sum(results)}/{len(requests)}")
Decision Framework#
Is this I/O-bound (DB, network)?
├── Single-process → asyncio + asyncio.Lock / asyncio.Semaphore
└── Multi-threaded → threading.Lock + ThreadPoolExecutor
Is this CPU-bound?
└── multiprocessing (bypasses GIL)
Contention HIGH on same resource?
└── threading.Lock (pessimistic)
Contention LOW, mostly reads?
└── ReadWriteLock (many readers, one writer)
Need to prevent DUPLICATE requests?
└── Idempotency map + threading.Event
Need CAPACITY limiting (N slots)?
└── threading.Semaphore or asyncio.Semaphore
Need MULTI-PROCESS / multi-pod safety?
└── Redis distributed lock (SET NX PX + Lua release)
Need retry on failure?
└── Exponential backoff decorator
GIL Cheat Sheet (SDE-3 Must Know)#
| Scenario | Use |
|---|---|
| I/O-bound concurrent tasks | threading or asyncio |
| CPU-bound parallel tasks | multiprocessing |
| High-throughput async I/O | asyncio + aiohttp |
| Shared state between threads | threading.Lock |
| Shared state between processes | multiprocessing.Manager or Redis |
Interview Cheat Sheet — What Slice SDE-3 Interviewers Want to Hear#
- Identify the race condition first — describe the problem before the solution
- Mention GIL awareness — know when threading helps vs. doesn't
- Idempotency unprompted — this signals fintech engineering maturity
- Always use context managers (
with lock:) — never manual acquire/release - Mention Redis for distributed — single-process locks don't work across pods
- Quantify — "handles ~3000 RPS with p99 < 100ms under normal load"