ArchitectureStaff
Staff Prep 24: Redis Data Structures & Caching Patterns Beyond GET/SET
April 4, 202610 min readPART 02 / 06
Back to Part 23: VACUUM & Bloat. Redis is far more than a cache. Its native data structures solve problems that would require complex database schemas otherwise. Leaderboards, rate limiters, real-time feeds, distributed locks, and approximate unique counts — all solvable with the right Redis primitive. This is the guide to using Redis as a data structure server, not just a cache.
Sorted sets (ZSET): leaderboards and ranked data
python
import redis.asyncio as redis
r = redis.Redis(host="localhost", port=6379, decode_responses=True)
# Leaderboard: ZADD user_id score
async def update_score(user_id: int, points: int):
await r.zincrby("leaderboard:global", points, str(user_id))
async def get_top_10():
# ZREVRANGE: highest scores first, with scores
entries = await r.zrevrange("leaderboard:global", 0, 9, withscores=True)
return [{"user_id": int(uid), "score": score} for uid, score in entries]
async def get_user_rank(user_id: int) -> int:
rank = await r.zrevrank("leaderboard:global", str(user_id))
return rank + 1 if rank is not None else None # 1-indexed
async def get_user_score(user_id: int) -> float:
return await r.zscore("leaderboard:global", str(user_id)) or 0
# Range query: get users with score between 1000 and 5000
async def get_users_in_score_range(min_score: float, max_score: float):
return await r.zrangebyscore("leaderboard:global", min_score, max_score, withscores=True)
# Weekly leaderboard: key per week, TTL for auto-expiration
import datetime
def this_week_key() -> str:
week = datetime.date.today().isocalendar()
return f"leaderboard:{week.year}:w{week.week}"
async def update_weekly_score(user_id: int, points: int):
key = this_week_key()
await r.zincrby(key, points, str(user_id))
await r.expire(key, 7 * 24 * 3600) # auto-expire after 7 days
Redis streams: persistent message queue
python
import asyncio
# Producer: append events to stream
async def publish_event(event_type: str, data: dict):
stream_id = await r.xadd(
"events:user_activity",
{"type": event_type, "data": json.dumps(data)},
maxlen=100000, # cap at 100k entries (FIFO eviction)
approximate=True, # approximate maxlen (faster)
)
return stream_id
# Consumer group: multiple workers share the stream
async def setup_consumer_group():
try:
await r.xgroup_create("events:user_activity", "analytics_workers", id="0", mkstream=True)
except redis.ResponseError:
pass # group already exists
# Worker: pull and process events
async def process_events_worker(worker_id: str):
await setup_consumer_group()
while True:
# Read up to 10 new messages (blocking for up to 2 seconds)
messages = await r.xreadgroup(
"analytics_workers",
f"worker_{worker_id}",
{"events:user_activity": ">"}, # ">" means undelivered messages
count=10,
block=2000,
)
if not messages:
continue
for stream_name, entries in messages:
for message_id, data in entries:
try:
await handle_event(data)
await r.xack("events:user_activity", "analytics_workers", message_id)
except Exception as e:
# Leave unacknowledged — will be redelivered
print(f"Failed to process {message_id}: {e}")
Pub/sub: real-time notifications
python
import asyncio
# Publisher: broadcast event
async def notify_user(user_id: int, message: str):
channel = f"notifications:{user_id}"
await r.publish(channel, json.dumps({"message": message, "timestamp": time.time()}))
# Subscriber: WebSocket handler
async def websocket_handler(websocket, user_id: int):
channel = f"notifications:{user_id}"
pubsub = r.pubsub()
await pubsub.subscribe(channel)
try:
async for message in pubsub.listen():
if message["type"] == "message":
await websocket.send_text(message["data"])
finally:
await pubsub.unsubscribe(channel)
# Pattern subscribe: all notifications
async def subscribe_all_notifications():
pubsub = r.pubsub()
await pubsub.psubscribe("notifications:*")
async for message in pubsub.listen():
if message["type"] == "pmessage":
user_id = message["channel"].split(":")[1]
print(f"Notification for user {user_id}: {message['data']}")
HyperLogLog: approximate unique counts at scale
python
import time
# Count unique page visitors — without storing every visitor ID
async def track_page_view(page_id: str, user_id: str):
today = time.strftime("%Y-%m-%d")
key = f"unique_visitors:{page_id}:{today}"
await r.pfadd(key, user_id) # PFADD for HyperLogLog
await r.expire(key, 7 * 24 * 3600)
async def get_unique_visitors(page_id: str, date: str) -> int:
key = f"unique_visitors:{page_id}:{date}"
return await r.pfcount(key) # returns approximate count (0.81% std error)
# Accurate enough for analytics — uses only 12 KB regardless of cardinality
# Counting 1 million unique users: 12 KB in Redis vs 8 MB for a SET
# Merge multiple HyperLogLogs: total unique across pages
async def total_unique_visitors(page_ids: list[str], date: str) -> int:
keys = [f"unique_visitors:{pid}:{date}" for pid in page_ids]
return await r.pfcount(*keys) # pfcount with multiple keys = merged count
Distributed lock with redlock
python
import asyncio
import secrets
# Simple single-node distributed lock
async def acquire_lock(lock_name: str, ttl_seconds: int = 30) -> str | None:
token = secrets.token_hex(16) # unique token per lock acquisition
key = f"lock:{lock_name}"
# SET NX PX: set if not exists, with millisecond TTL
acquired = await r.set(key, token, nx=True, px=ttl_seconds * 1000)
return token if acquired else None
async def release_lock(lock_name: str, token: str) -> bool:
key = f"lock:{lock_name}"
# Lua: only release if we own the lock (prevents accidental release of another holder's lock)
script = """
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
else
return 0
end
"""
result = await r.eval(script, 1, key, token)
return bool(result)
# Usage
async def run_exclusive_job():
token = await acquire_lock("daily_report", ttl_seconds=60)
if not token:
print("Another worker is running this job")
return
try:
await generate_daily_report()
finally:
await release_lock("daily_report", token)
Quiz: test your understanding
Before moving on, answer these in your head (or out loud):
- You need a real-time leaderboard for a game with 1M players, updated multiple times per second. Why is a Postgres table a bad choice? How does a Redis sorted set solve it? What is the time complexity of getting rank 1-10?
- What is the difference between Redis Pub/Sub and Redis Streams? When would you choose streams over pub/sub?
- You need to count unique daily active users across 50 million users. Storing user IDs in a Redis SET vs HyperLogLog: compare memory usage and accuracy trade-offs.
- In the distributed lock implementation, why do you store a unique token in the lock value and check it before releasing? What attack does this prevent?
- Redis Pub/Sub does not persist messages. If a subscriber is offline when a message is published, it never receives it. How do you design a reliable notification system that handles offline users?
Next up — Part 25: Message Queues — Kafka vs SQS. Consumer groups, exactly-once semantics, and dead letter queues.
Share this