Have you ever found yourself staring at a production issue, wondering “What the hell happened here?” You know something went wrong, but the system has no memory of what actually occurred. If this sounds familiar, you’re not alone.
In my years of building distributed systems, I’ve encountered this frustration countless times. Traditional architectures, while familiar and comfortable, often leave us blind when things go wrong. We lose context, we lose history, and we lose the ability to understand our own systems.
But what if I told you there’s a different way? A way that gives your systems perfect memory, complete audit trails, and debugging superpowers that would make traditional approaches seem primitive?
This is the story of how I learned to stop worrying and love raw events. It’s about event sourcing, CQRS, and how Python’s ecosystem provides everything you need to build systems that can explain themselves.
The Problem: When Systems Forget
Let me start with a story that probably sounds familiar to many of you. It’s Monday afternoon, and you’re getting ready to wrap up your day when the Slack message arrives:
“Sarah’s account is missing!”
You check the database. Nothing. The user record is gone. Completely vanished.
Tuesday morning, you’re still trying to figure out what happened. When was it deleted? Who did it? Why? Your manager is asking questions, and you have no answers.
This is the nightmare scenario that every developer is afraid of. Your system has no memory of what happened, and you’re left scrambling to piece together clues from logs, monitoring systems, and whatever else you can find.
Why Traditional Systems Fail Us
Here’s the thing about traditional architectures: they’re designed around the concept of current state. You have one row per entity in your database, and when something changes, you update that row. When something gets deleted, it’s gone.
At best, if you’re smart about it, you might implement soft deletes. You keep a deleted_at
timestamp and maybe an updated_by
field. But even then, you’re only capturing the bare minimum. You lose the why – the reason for the deletion. You lose the context – what led up to this decision. You lose the sequence – the chain of events that brought you to this point.
And if you don’t have soft deletes? Well, then you’ve got nothing. The record is gone forever, along with any trace that it ever existed.
This is the fundamental limitation of traditional systems. They’re optimized for storing current state, not for remembering what happened. They’re designed for efficiency, not for understanding.
But what if we flipped this on its head? What if, instead of storing current state, we stored every change that ever happened? What if our system had perfect memory?
The Solution: Events as the Source of Truth
This is where event sourcing comes in. Instead of storing current state, we store events – immutable facts that represent every change that ever happened in our system.
Think of events as the DNA of your application. Every change, every decision, every action is recorded as an event that can never be altered or deleted. Events are stored as facts without interpretation, and this is actually a good thing – it means different parts of your system can interpret the same event differently.
Here’s what a UserDeleted
event looks like:
id: 550e8400-e29b-41d4-a716-446655440000
aggregate_id: 123e4567-e89b-12d3-a456-426614174000
event_type: USER_DELETED
version: 1
timestamp: 2024-03-15T16:47:23Z
revision: 5
data:
deleted_by: admin@company.com
reason: User requested account deletion
Notice a few key things here:
- Events are immutable facts – they never change once created
- The
version
field lets us evolve our event structure over time while maintaining backward compatibility - The
revision
field ensures we can replay events in the correct order - The
data
field contains all the context we need – who deleted the user and why
This is the power of events. Instead of losing information, we’re capturing everything. We’re building a complete history of our system.
The Event Store: Your System’s Memory
Now, where do we store all these events? In an Event Store – an append-only storage system where events are organized in streams per aggregate.
Think of it like this: each user has their own story, and that story is told through a sequence of events. Here’s what Sarah’s complete story looks like:
-
UserCreated
– Sarah joins the platform -
UserNameChanged
– She updates her display name -
UserEmailChanged
– She changes her email address -
UserStatusChanged
– Her account gets activated -
UserDeleted
– She requests account deletion
The beautiful thing about this approach is that the stream becomes the source of truth. We can rebuild any point in time by replaying these events in order. Want to know what Sarah’s account looked like last Tuesday? Just replay the events up to that point. This is how we get time travel capabilities.
CQRS: Separating What We Do from What We Read
Now, here’s where things get interesting. In traditional systems, we use the same models for both writing and reading data. But what if we separated these concerns? What if we optimized our write path for consistency and our read path for performance?
This is the core idea behind CQRS (Command Query Responsibility Segregation).
Commands: Expressing Intent
Commands represent the intent to change the system state. They’re like saying “I want to create a new user account” or “I want to change this user’s password.”
Commands are validated before execution, they’re idempotent for safety, and they serve as the entry point for all changes. When a command is executed, it creates events that get stored in the event store.
Queries: Optimized for Reading
Queries, on the other hand, represent the intent to read data from the system. They’re completely separate from commands and are optimized for specific access patterns.
Here’s the key insight: your read models don’t have to follow database normalization rules. They can be denormalized for performance. You can have multiple read models for the same data, each optimized for different use cases.
This separation gives you the best of both worlds: strong consistency on the write side, and high performance on the read side.
Aggregates: Where Business Logic Lives
So far, we’ve talked about events and commands, but where does the actual business logic live? That’s where aggregates come in.
Aggregates are the heart of your domain logic. They contain all the business rules and apply them to create events. Here’s how it works:
- First, we rebuild the aggregate’s current state by applying all previous events from the event store
- Once the aggregate is up-to-date, we call a business logic method that validates the command against business rules
- When rules pass, the method creates new events
- When rules fail, it returns errors
This ensures business invariants are maintained while keeping the aggregate state current.
Projections: Building Read Models from Events
Now, here’s the final piece of the puzzle. We have events being created by aggregates, but these events are just stored in the event store. How do we actually use them to build the read models that our queries will use?
That’s where projections come in. Projections listen to events and build optimized read models for fast querying.
When a UserCreated
event happens, we create a user record. When EmailChanged
happens, we update the email field. This gives us event-driven read model updates that are optimized for queries and eventually consistent.
The beautiful thing about projections is that one event can trigger multiple actions. The same UserCreated
event might:
- Create a user record in the read model
- Send a welcome email
- Create an audit log entry
- Trigger analytics tracking
This decoupling allows us to add new behaviors without changing existing code.
From Theory to Practice: Building It with Python
Alright, so we’ve covered the theory. Now let’s talk about how to actually build this stuff. The good news? The Python ecosystem has everything you need to implement event sourcing with CQRS effectively.
We’ll use FastAPI for the web layer, Celery for asynchronous event processing, and see how all the pieces fit together to create a robust, scalable system.
But first, let me show you how everything connects in practice.
The Complete Picture
Let me walk you through how this all works together in practice. This diagram shows the two distinct paths: commands (write) and queries (read).
The Command Path (Write): When a user wants to change their password, they make an API call to our FastAPI endpoint. This creates a ChangePasswordCommand
and passes it to the command handler. The command handler then:
- Retrieves all previous events for this user from the event store
- Creates a
UserAggregate
and rebuilds its current state by applying all those events - Calls the aggregate’s business method
change_password
, which validates business rules and generates new events - Stores the new events in the event store and dispatches them to the event bus
The Query Path (Read): When someone wants to get user data, they make a query API call. This goes directly to our FastAPI query endpoint, which uses the query handler to fetch data from the optimized read models.
This is CQRS in action – commands go through the full event sourcing pipeline, queries read directly from optimized read models. Same API entry point, completely different paths.
FastAPI: Command & Query Interface
Here’s how we implement the CQRS separation in FastAPI:
@router.post("/users/{user_id}/change-password/")
async def change_password(
user_id: UUID,
password_data: ChangePasswordDTO,
handler: ChangePasswordCommandHandler = Depends(InfraFactory.create_change_password_command_handler)
) -> ChangePasswordResponseDTO:
command = ChangePasswordCommand(user_id=user_id, password_data=password_data)
await handler.handle(command)
return ChangePasswordResponseDTO(success=True, message="Password updated successfully")
@users_router.get("/{user_id}/")
async def get_user(
user_id: UUID,
query_handler: GetUserQueryHandler = Depends(InfraFactory.create_get_user_query_handler)
) -> UserReadDTO:
return await query_handler.handle(GetUserQuery(user_id=user_id))
Our APIs are very simple – they use Pydantic models for both the request payloads and the internal commands/queries. When a request comes in, we create the appropriate command or query object using Pydantic, get the handler from our infrastructure factory through dependency injection, and simply call its handle method.
Command Handlers: Orchestration Logic
Behind the API, command handlers are the orchestrators – they don’t contain business logic, they delegate and coordinate. Here’s our ChangePasswordCommandHandler
:
class ChangePasswordCommandHandler(CommandHandler[ChangePasswordCommand]):
async def handle(self, command: ChangePasswordCommand) -> None:
# Get events and rebuild aggregate state
events = await self.event_store.get_stream(command.user_id)
user = UserAggregate(command.user_id)
for event in events:
user.apply(event)
# Execute business logic and persist
new_events = user.change_password(
command.password_data.current_password,
command.password_data.new_password
)
# Persist and dispatch events using unit of work
async with self.uow:
await self.event_store.append_to_stream(command.user_id, new_events)
await self.event_handler.dispatch(new_events)
The Unit of Work pattern (async with self.uow:
) is crucial here – it ensures that both storing events to the event store AND dispatching them to the event bus happen atomically. If either operation fails, the entire transaction is rolled back.
Aggregates: Pure Python Business Logic
Aggregates are pure Python classes that contain all the business rules and domain logic. They don’t have any dependencies on infrastructure, databases, or external services. They just take commands (like change_password
) and produce events (like PASSWORD_CHANGED
).
class UserAggregate(Aggregate):
"""User domain aggregate - encapsulates user business logic"""
def change_password(self, current_password: str, new_password: str) -> List[EventDTO]:
if not self._verify_password(current_password):
raise InvalidPasswordError("Current password is incorrect")
if len(new_password) < 8:
raise ValidationError("Password must be at least 8 characters")
return [PasswordChangedEvent(
aggregate_id=self.id,
changed_at=datetime.utcnow(),
password_hash=hash_password(new_password)
)]
def apply(self, event: EventDTO) -> None:
if event.event_type == EventType.PASSWORD_CHANGED:
self._apply_password_changed_event(event)
# ... other event types
The apply
method is crucial for event sourcing – it’s how we reconstruct the aggregate’s current state by replaying all previous events.
Event Handler: Celery Integration
Our CeleryEventHandler
acts as the bridge between our domain events and Celery. It maps each event type to one or more Celery tasks:
class CeleryEventHandler(EventHandler):
def __init__(self):
# Map event types to Celery tasks
self.event_handlers: Dict[EventType, List[str]] = {
EventType.USER_CREATED: [
"process_user_created_task",
"send_welcome_email_task"
],
# ... other event types
}
async def dispatch(self, events: List[Event]) -> None:
for event in events:
for task_name in self.event_handlers[event.event_type]:
# All tasks receive the same event payload structure
celery_app.send_task(task_name, kwargs={"event": event.model_dump()})
This mapping is crucial – it’s how we define what should happen when each type of event occurs. We can easily add new side effects by just adding new tasks to the mapping, without touching the aggregate or command handler code.
Celery Tasks: Event Processing
On the receiving end, Celery tasks are wrappers that call the appropriate projection handlers:
@app.task(
name="process_user_created_task", bind=True, acks_late=True,
)
def process_user_created_task(self, event: Dict[str, Any]) -> None:
"""Celery task for processing USER_CREATED events"""
# Get infrastructure factory
factory = get_infrastructure_factory()
# Get projection
projection = factory.create_user_created_projection()
# Process the event using async_to_sync
async_to_sync(projection.handle)(EventDTO(**event))
logger.info(f"Successfully processed USER_CREATED event for user {EventDTO(**event).aggregate_id}")
Reliability is crucial here. Notice the acks_late=True
parameter – this means Celery won’t acknowledge the message until the task completes successfully. If the task fails, the message goes back to the queue for retry.
Projections: Event-Driven Read Models
Projections are the final piece of the event sourcing puzzle – they take events and transform them into optimized read models for fast querying:
class UserCreatedProjection(Projection[UserCreatedEvent]):
async def handle(self, event: UserCreatedEvent) -> None:
# Build read model from event
user_data = {
"aggregate_id": event.aggregate_id,
"name": event.data.get("name"),
"email": event.data.get("email"),
"status": event.data.get("status"),
"created_at": event.timestamp,
}
# Save to read model
await self.db.save(user_data)
Projections focus solely on one thing: ensuring the read model is properly updated from events. They don’t handle business logic (that’s the aggregate’s job), they don’t orchestrate the process (that’s the command handler’s job), and they don’t manage storage details (that’s the repository’s job).
The Reality Check: What Actually Happens in Production
So far, this all sounds great in theory. But what happens when you actually try to build this stuff and put it in production? Well, that’s where things get interesting.
The transition from theory to practice often reveals challenges that you don’t see in the documentation. Eventual consistency isn’t just a technical concept – it’s a UX challenge. Performance issues emerge as your event streams grow longer. And debugging becomes both easier and more complex at the same time.
Let me share some of the real-world lessons I’ve learned from implementing event sourcing and CQRS in production environments. These insights will help you avoid common pitfalls and make informed decisions about your own implementations.
The Eventual Consistency UX Problem
Here’s a scenario that will sound familiar to anyone who’s worked with event sourcing systems. Sarah updates her name from “Sarah” to “Sara” in your application. The API returns success immediately, but when the frontend calls the backend to get the updated data, it still shows “Sarah”.
This is the reality of eventual consistency – your API says “success” but the read model might not be updated yet. The command has been processed, the event has been stored, but the projection that updates the read model is still running in the background.
I’ve seen two approaches to handle this:
-
Optimistic Updates (Naive): The frontend updates the UI immediately, but if the user refreshes or calls the backend, they might see old data. This is actually the approach I’ve used most often in production, and for many use cases, it’s proven more than sufficient.
-
Outbox Pattern (Advanced): We store events in an outbox table with job status, track processing status like pending, processing, completed, or failed, and create views of unprocessed events.
The key insight is that eventual consistency isn’t just a technical challenge – it’s a UX challenge. Users expect immediate feedback, but they also expect consistency.
Performance with Snapshots
Another real-world challenge in event sourcing is performance degradation as event streams grow longer. Consider a user who has been active for 2 years and accumulated 10,000+ events. When they try to change their password, the system has to replay all those events to reconstruct their current state, which can take 5 seconds or more.
The solution is snapshots – we save state every 1,000 events:
async def handle(self, command: ChangePasswordCommand) -> None:
snapshot = await self.snapshot_store.get_latest_snapshot(command.user_id)
events = await self.event_store.get_stream(command.user_id, start_revision=snapshot.revision)
user = UserAggregate.from_snapshot(snapshot)
for event in events: # Only 50 events instead of 10,000!
user.apply(event)
Instead of replaying 10,000 events, we only replay the recent 50 events since the last snapshot. This reduces the password change time from 5 seconds to 0.5 seconds.
Debugging Superpowers: Testing Business Logic
Instead of trying to recreate scenarios in test environments, we can rebuild the exact state at any moment in history. This diagram shows how we can trace through the event stream to understand exactly what happened and when. The debugging superpowers come from the ability to replay events and see the system state evolve over time.
Here’s how we can test the exact scenario that caused Sarah’s account deletion:
class TestSarahAccountDeletion(AsyncIOIsolatedTestCase):
async def test_sarah_account_deletion_scenario(self):
# Arrange - Replay Sarah's exact production events
sarah = UserAggregate()
sarah.apply(UserCreatedEvent("sarah_456", name="Sarah", email="sarah@company.com"))
sarah.apply(UserLoginEvent("sarah_456", ip="192.168.1.100"))
sarah.apply(UserProfileUpdatedEvent("sarah_456", field="role", value="admin"))
# Act - Test the deletion event that caused the issue
result = sarah.apply(UserDeletedEvent("sarah_456", deleted_by="admin@company.com", reason="User requested account deletion"))
# Assert - Verify business logic behavior
self.assertTrue(result.is_success)
self.assertEqual(sarah.deleted_by, "admin@company.com")
self.assertEqual(sarah.deletion_reason, "User requested account deletion")
This gives us the ability to debug issues that happened hours or days ago, and test business logic against real production data at any point in time. This is debugging and testing superpowers combined – we can answer “What was Sarah’s state when her account was deleted?” with complete certainty.## Summary: Key Takeaways and Strategic Insights
After implementing event sourcing in production environments, I’ve learned that success comes down to making the right decisions at the right time. Let me share the strategic insights that will help you navigate this architectural choice.
🚀 Start Simple: The Power of Incremental Adoption
One of the biggest mistakes I see teams make is trying to implement event sourcing with all the bells and whistles from day one. You don’t need Kafka, you don’t need specialized event stores, and you definitely don’t need to rewrite your entire system.
Start with what you know: PostgreSQL as your event store works perfectly fine for most applications. Redis or even a simple message queue can handle your event bus needs initially. The Python ecosystem gives you everything you need – FastAPI, Celery, SQLAlchemy, and Pydantic are all production-ready and battle-tested.
Why this matters: By starting simple, you can prove the value of event sourcing to your team and stakeholders before investing in more complex infrastructure. You can learn the patterns, understand the trade-offs, and build confidence in the approach. Once you’ve validated the benefits, you can always evolve to more sophisticated tooling.
⚠️ When NOT to use Event Sourcing: The Honest Assessment
Event sourcing isn’t a silver bullet, and I’ve seen teams struggle when they try to apply it everywhere. Here’s my honest assessment of when to avoid it:
Simple CRUD with basic audit needs: If you’re building a straightforward CRUD application where you only need to know “who changed what and when,” traditional audit logging is probably sufficient. Event sourcing adds complexity that you don’t need.
High-frequency systems requiring immediate consistency: Trading systems, real-time gaming, or any system where milliseconds matter and you need immediate consistency across all reads – event sourcing’s eventual consistency model will cause more problems than it solves.
Teams without distributed systems experience: Event sourcing introduces concepts like eventual consistency, event replay, and projection management that can be overwhelming for teams new to distributed systems. Start with simpler patterns first, build that experience, then consider event sourcing.
The key insight: Event sourcing is a powerful tool, but it’s not the right tool for every job. The complexity it introduces needs to be justified by the benefits it provides.
🎯 What you gain: The Strategic Advantages
When event sourcing is the right fit, the benefits are transformative:
Complete audit trail & time travel: This isn’t just about compliance or debugging – it’s about building systems that can explain themselves. When something goes wrong, you can see exactly what happened, when it happened, and why it happened. This level of observability is invaluable for building reliable systems.
Debugging superpowers with real production data: Instead of trying to recreate issues in test environments, you can replay the exact sequence of events that caused problems in production. This isn’t just faster debugging – it’s more accurate debugging. You’re working with real data, real timing, and real conditions.
Scalability with eventual consistency: By separating your write and read concerns, you can scale them independently. Your write path can be optimized for consistency and correctness, while your read path can be optimized for performance and user experience. This separation of concerns is crucial for building systems that can grow.
The strategic advantage: Event sourcing gives you systems that are not just functional, but understandable. They’re not just scalable, but debuggable. They’re not just fast, but reliable.
Conclusion
The Python ecosystem is incredibly powerful for distributed systems. FastAPI’s dependency injection system makes it trivial to wire up command and query handlers. Celery’s battle-tested task queue handles event processing reliably. Pydantic’s validation ensures your events are always well-formed. You don’t need to look outside Python for distributed systems capabilities.
This combination gives you everything you need to build systems that can explain themselves, scale gracefully, and handle real-world complexity. Most importantly, we’ve solved the exact problem we started with – we can now tell Sarah exactly when, why, and by whom her account was deleted. The nightmare becomes a solvable mystery with a complete audit trail.
Resources
This post is based on my presentation at PyCon Greece 2025. Feel free to explore the complete implementation on GitHub and adapt these patterns to your own projects.