Skip to content

Aggregates & Deciders

waku supports two approaches to modeling event-sourced aggregates: OOP aggregates (mutable, class-based) and functional deciders (immutable, function-based). This page walks through both patterns end-to-end, then covers creation semantics and shared features like idempotency, concurrency control, and stream length guards.

OOP Aggregate Functional Decider
State Mutable object Immutable value
Testing AggregateSpec DSL DeciderSpec DSL
Complexity Simpler for basic CRUD Better for complex decision logic
Snapshots SnapshotEventSourcedRepository SnapshotDeciderRepository

Tip

Start with OOP aggregates. Move to deciders when decision logic is complex or you want pure-function testability.

Domain Events

Both approaches share the same event definitions — frozen dataclasses implementing IEvent:

from dataclasses import dataclass

from waku.messaging import IEvent


@dataclass(frozen=True, kw_only=True)
class AccountOpened(IEvent):
    account_id: str
    owner: str


@dataclass(frozen=True, kw_only=True)
class MoneyDeposited(IEvent):
    account_id: str
    amount: int


@dataclass(frozen=True, kw_only=True)
class MoneyWithdrawn(IEvent):
    account_id: str
    amount: int

OOP Aggregates

The classic approach — extend EventSourcedAggregate, raise events through command methods, and apply them to mutate internal state. This walkthrough builds a complete bank account example from aggregate to running application.

Defining the Aggregate

from typing_extensions import override

from waku.messaging import IEvent
from waku.eventsourcing import EventSourcedAggregate

from app.events import AccountOpened, MoneyDeposited, MoneyWithdrawn


class BankAccount(EventSourcedAggregate):
    def __init__(self) -> None:
        super().__init__()
        self.account_id: str = ''
        self.owner: str = ''
        self.balance: int = 0

    def open(self, account_id: str, owner: str) -> None:
        self._raise_event(AccountOpened(account_id=account_id, owner=owner))

    def deposit(self, account_id: str, amount: int) -> None:
        if amount <= 0:
            msg = 'Deposit amount must be positive'
            raise ValueError(msg)
        self._raise_event(MoneyDeposited(account_id=account_id, amount=amount))

    def withdraw(self, account_id: str, amount: int) -> None:
        if amount > self.balance:
            msg = f'Insufficient funds: balance={self.balance}, requested={amount}'
            raise ValueError(msg)
        self._raise_event(MoneyWithdrawn(account_id=account_id, amount=amount))

    @override
    def _apply(self, event: IEvent) -> None:
        match event:
            case AccountOpened(account_id=account_id, owner=owner):
                self.account_id = account_id
                self.owner = owner
            case MoneyDeposited(amount=amount):
                self.balance += amount
            case MoneyWithdrawn(amount=amount):
                self.balance -= amount
Why constructor fields have placeholder defaults

A new aggregate starts with version=-1 and these placeholder values. The first command raises events whose _apply() sets the real values. The defaults ('', 0) exist only to satisfy the type checker and provide a valid initial shape. See Handling Creation Commands for details.

Key points:

  • _raise_event() first applies the event (state mutation), then queues it for persistence
  • _apply() must handle every event type the aggregate can produce
  • Use match statements for clean event routing

Repository

Subclass EventSourcedRepository with the aggregate type parameter:

1
2
3
4
5
6
7
from waku.eventsourcing import EventSourcedRepository

from app.aggregate import BankAccount


class BankAccountRepository(EventSourcedRepository[BankAccount]):
    pass

Command Handler

EventSourcedCommandHandler coordinates loading, executing, saving, and publishing:

from dataclasses import dataclass

from typing_extensions import override

from waku.messaging import IRequest
from waku.eventsourcing import EventSourcedCommandHandler

from app.aggregate import BankAccount


@dataclass(frozen=True, kw_only=True)
class OpenAccountResult:
    account_id: str


@dataclass(frozen=True, kw_only=True)
class OpenAccountCommand(IRequest[OpenAccountResult]):
    account_id: str
    owner: str


class OpenAccountHandler(EventSourcedCommandHandler[OpenAccountCommand, BankAccount, OpenAccountResult]):
    @override
    def _is_creation_command(self, request: OpenAccountCommand) -> bool:
        return True

    @override
    def _aggregate_id(self, request: OpenAccountCommand) -> str:
        return request.account_id

    @override
    async def _execute(self, request: OpenAccountCommand, aggregate: BankAccount) -> None:
        aggregate.open(request.account_id, request.owner)

    @override
    def _to_response(self, aggregate: BankAccount) -> OpenAccountResult:
        return OpenAccountResult(account_id=aggregate.account_id)


@dataclass(frozen=True, kw_only=True)
class DepositResult:
    balance: int


@dataclass(frozen=True, kw_only=True)
class DepositCommand(IRequest[DepositResult]):
    account_id: str
    amount: int


class DepositHandler(EventSourcedCommandHandler[DepositCommand, BankAccount, DepositResult]):
    @override
    def _aggregate_id(self, request: DepositCommand) -> str:
        return request.account_id

    @override
    async def _execute(self, request: DepositCommand, aggregate: BankAccount) -> None:
        aggregate.deposit(request.account_id, request.amount)

    @override
    def _to_response(self, aggregate: BankAccount) -> DepositResult:
        return DepositResult(balance=aggregate.balance)

EventSourcedCommandHandler[RequestT, AggregateT, ResponseT] requires overriding three abstract methods and provides two optional hooks:

Method Abstract Description
_aggregate_id(request) -> str Yes Extract the aggregate identifier from the request
_execute(request, aggregate) -> None Yes Execute business logic on the aggregate
_to_response(aggregate) -> ResponseT Yes Convert the aggregate to a response value
_is_creation_command(request) -> bool No Return True for commands that create new aggregates (default: False)
_idempotency_key(request, version) -> str | None No Return a deduplication token (default: None) — see Idempotency
_create_attempt_context() -> AbstractAsyncContextManager No Return a context manager entered/exited per retry attempt (default: nullcontext()) — see Per-Attempt Context

EventSourcedVoidCommandHandler[RequestT, AggregateT] pre-implements _to_response() to return None — use it for commands that don't return a value.

Module Wiring

Register aggregates, event types, and command handlers with the module system:

from waku import module
from waku.messaging import MessagingExtension, MessagingModule
from waku.eventsourcing import EventSourcingConfig, EventSourcingExtension, EventSourcingModule
from waku.eventsourcing.store.in_memory import InMemoryEventStore

from app.commands import (
    DepositCommand,
    DepositHandler,
    OpenAccountCommand,
    OpenAccountHandler,
)
from app.events import AccountOpened, MoneyDeposited, MoneyWithdrawn
from app.repository import BankAccountRepository


@module(
    extensions=[
        EventSourcingExtension().bind_aggregate(
            repository=BankAccountRepository,
            event_types=[AccountOpened, MoneyDeposited, MoneyWithdrawn],
        ),
        MessagingExtension()
        .bind_request(OpenAccountCommand, OpenAccountHandler)
        .bind_request(DepositCommand, DepositHandler),
    ],
)
class BankModule:
    pass


@module(
    imports=[
        BankModule,
        EventSourcingModule.register(EventSourcingConfig(store=InMemoryEventStore)),
        MessagingModule.register(),
    ],
)
class AppModule:
    pass

Run

Wire everything together and send commands through the message bus:

import asyncio

from waku import WakuFactory
from waku.messaging import IMessageBus

from app.commands import DepositCommand, OpenAccountCommand
from app.modules import AppModule


async def main() -> None:
    app = WakuFactory(AppModule).create()

    async with app, app.container() as container:
        bus = await container.get(IMessageBus)

        await bus.invoke(OpenAccountCommand(account_id='acc-1', owner='dex'))
        result = await bus.invoke(DepositCommand(account_id='acc-1', amount=500))
        print(f'Balance: {result.balance}')


if __name__ == '__main__':
    asyncio.run(main())

Tip

This example uses InMemoryEventStore. See Event Store for PostgreSQL setup.

Functional Deciders

The decider pattern separates state, decisions, and evolution into pure functions. State is immutable — each event produces a new state value.

Defining State

1
2
3
4
5
6
7
from dataclasses import dataclass


@dataclass(frozen=True)
class BankAccountState:
    owner: str = ''
    balance: int = 0

Defining the Decider

A decider implements three methods from the IDecider protocol:

  • initial_state() — returns the starting state
  • decide(command, state) — validates and returns new events
  • evolve(state, event) — applies an event to produce new state
from dataclasses import dataclass, replace

from waku.eventsourcing import IDecider

from app.events import AccountOpened, MoneyDeposited
from app.state import BankAccountState


@dataclass(frozen=True, kw_only=True)
class OpenAccount:
    account_id: str
    owner: str


@dataclass(frozen=True, kw_only=True)
class DepositMoney:
    account_id: str
    amount: int


BankCommand = OpenAccount | DepositMoney
BankEvent = AccountOpened | MoneyDeposited


class BankAccountDecider(IDecider[BankAccountState, BankCommand, BankEvent]):
    def initial_state(self) -> BankAccountState:
        return BankAccountState()

    def decide(self, command: BankCommand, state: BankAccountState) -> list[BankEvent]:
        match command:
            case OpenAccount(account_id=aid, owner=owner):
                return [AccountOpened(account_id=aid, owner=owner)]
            case DepositMoney(account_id=aid, amount=amount):
                if amount <= 0:
                    msg = 'Deposit amount must be positive'
                    raise ValueError(msg)
                return [MoneyDeposited(account_id=aid, amount=amount)]

    def evolve(self, state: BankAccountState, event: BankEvent) -> BankAccountState:
        match event:
            case AccountOpened(owner=owner):
                return replace(state, owner=owner)
            case MoneyDeposited(amount=amount):
                return replace(state, balance=state.balance + amount)
        return state

Repository

DeciderRepository requires three type parameters: [State, Command, Event].

1
2
3
4
5
6
7
8
from waku.eventsourcing import DeciderRepository

from app.decider import BankCommand, BankEvent
from app.state import BankAccountState


class BankAccountDeciderRepository(DeciderRepository[BankAccountState, BankCommand, BankEvent]):
    pass

Command Handler

DeciderCommandHandler adds a _to_command() step that converts the CQRS request into a domain command:

from dataclasses import dataclass

from typing_extensions import override

from waku.messaging import IRequest
from waku.eventsourcing import DeciderCommandHandler

from app.decider import BankCommand, BankEvent, OpenAccount
from app.state import BankAccountState


@dataclass(frozen=True, kw_only=True)
class OpenAccountResult:
    owner: str


@dataclass(frozen=True, kw_only=True)
class OpenAccountRequest(IRequest[OpenAccountResult]):
    account_id: str
    owner: str


class OpenAccountDeciderHandler(
    DeciderCommandHandler[
        OpenAccountRequest,
        BankAccountState,
        BankCommand,
        BankEvent,
        OpenAccountResult,
    ],
):
    @override
    def _aggregate_id(self, request: OpenAccountRequest) -> str:
        return request.account_id

    @override
    def _to_command(self, request: OpenAccountRequest) -> BankCommand:
        return OpenAccount(account_id=request.account_id, owner=request.owner)

    @override
    def _to_response(self, state: BankAccountState, version: int) -> OpenAccountResult:
        return OpenAccountResult(owner=state.owner)

DeciderCommandHandler[RequestT, StateT, CommandT, EventT, ResponseT] requires overriding three abstract methods and provides two optional hooks:

Method Abstract Description
_aggregate_id(request) -> str Yes Extract the aggregate identifier from the request
_to_command(request) -> CommandT Yes Convert the CQRS request to a domain command
_to_response(state, version) -> ResponseT Yes Convert the final state and version to a response
_idempotency_key(request, version) -> str | None No Return a deduplication token (default: None)
_create_attempt_context() -> AbstractAsyncContextManager No Return a context manager entered/exited per retry attempt (default: nullcontext()) — see Per-Attempt Context

Note

_to_response() receives (state, version) — not an aggregate. This differs from the OOP handler which receives the aggregate object.

DeciderVoidCommandHandler[RequestT, StateT, CommandT, EventT] pre-implements _to_response() to return None.

Module Wiring

Use bind_decider() instead of bind_aggregate():

from waku import module
from waku.messaging import MessagingExtension, MessagingModule
from waku.eventsourcing import EventSourcingConfig, EventSourcingExtension, EventSourcingModule
from waku.eventsourcing.store.in_memory import InMemoryEventStore

from app.decider import BankAccountDecider
from app.events import AccountOpened, MoneyDeposited
from app.handler import OpenAccountDeciderHandler, OpenAccountRequest
from app.repository import BankAccountDeciderRepository


@module(
    extensions=[
        EventSourcingExtension().bind_decider(
            repository=BankAccountDeciderRepository,
            decider=BankAccountDecider,
            event_types=[AccountOpened, MoneyDeposited],
        ),
        MessagingExtension().bind_request(OpenAccountRequest, OpenAccountDeciderHandler),
    ],
)
class BankDeciderModule:
    pass


@module(
    imports=[
        BankDeciderModule,
        EventSourcingModule.register(EventSourcingConfig(store=InMemoryEventStore)),
        MessagingModule.register(),
    ],
)
class AppModule:
    pass

The bootstrap and run code is the same as the OOP example — replace the BankAccountModule import with BankAccountDeciderModule.

Handling Creation Commands

The two patterns handle creation differently.

OOP Aggregates

OOP aggregates use explicit creation: override _is_creation_command() to return True for commands that create new aggregates. The handler skips load() and creates a blank aggregate directly. Loading a non-existent aggregate raises AggregateNotFoundError — this protects against update commands accidentally hitting missing streams.

class OpenAccountHandler(EventSourcedCommandHandler[OpenAccountCommand, BankAccount, OpenAccountResult]):
    def _aggregate_id(self, request: OpenAccountCommand) -> str:
        return request.account_id

    def _is_creation_command(self, request: OpenAccountCommand) -> bool:
        return True

    async def _execute(self, request: OpenAccountCommand, aggregate: BankAccount) -> None:
        aggregate.open(request.account_id, request.owner)

    def _to_response(self, aggregate: BankAccount) -> OpenAccountResult:
        return OpenAccountResult(account_id=aggregate.account_id)

On save, a new aggregate (version -1) uses NoStream expected version — the event store rejects the append if the stream already exists, preventing duplicate creation. Creation commands are not retried on concurrency conflicts.

Note

Update commands (_is_creation_command returns False, the default) load the aggregate from the event store. If the stream doesn't exist, AggregateNotFoundError is raised immediately.

Functional Deciders

Deciders use uniform stream loadingload() returns initial_state() for non-existent streams instead of raising an error. No _is_creation_command is needed; the decider itself controls creation logic through state inspection.

For simple aggregates, a flat state with defaults is enough — initial_state() returns BankAccountState() and the first command fills in the real values:

@dataclass(frozen=True)
class BankAccountState:
    owner: str = ''
    balance: int = 0

But what if creation and update commands need different validation? A flat state can't distinguish "not yet created" from "created with empty values." Use a discriminated union to make the state self-describing:

from dataclasses import dataclass


@dataclass(frozen=True)
class NotCreated:
    pass


@dataclass(frozen=True)
class Active:
    owner: str
    balance: int = 0


BankAccountState = NotCreated | Active  # (1)


class BankAccountDecider(IDecider[BankAccountState, BankCommand, BankEvent]):
    def initial_state(self) -> BankAccountState:
        return NotCreated()

    def decide(self, command: BankCommand, state: BankAccountState) -> list[BankEvent]:
        match (command, state):
            case (OpenAccount(), NotCreated()):
                return [AccountOpened(account_id=command.account_id, owner=command.owner)]
            case (OpenAccount(), Active()):
                raise ValueError('Account already exists')
            case (DepositMoney(), Active()):
                return [MoneyDeposited(account_id=command.account_id, amount=command.amount)]
            case _:
                raise ValueError('Account not opened')

    def evolve(self, state: BankAccountState, event: BankEvent) -> BankAccountState:
        match (event, state):
            case (AccountOpened(owner=owner), NotCreated()):
                return Active(owner=owner)
            case (MoneyDeposited(amount=amount), Active()):
                return Active(owner=state.owner, balance=state.balance + amount)
            case _:
                raise TypeError(f'Unexpected {type(event).__name__} in {type(state).__name__}')
  1. Union type aliases require explicit aggregate_name on the repository — see Aggregate Naming for details.

Tip

Start with a flat state for simple aggregates. Migrate to a discriminated union when creation and update commands need different invariants — the type system and pattern matching will enforce valid transitions at compile time.

On save, version -1 maps to NoStream and version ≥ 0 maps to Exact(version). If two concurrent creates race, one wins and the other gets a concurrency conflict — the retry loop re-loads the now-existing stream and the decider handles it (e.g., rejecting with "Account already exists").

Shared Features

Idempotency

Command handlers support idempotent event appends through the _idempotency_key() hook. Override it to build a deduplication token from the request and the current stream version:

class OpenAccountHandler(EventSourcedCommandHandler[OpenAccountCommand, BankAccount, OpenAccountResult]):
    def _idempotency_key(self, request: OpenAccountCommand, version: int) -> str | None:
        return request.idempotency_key  # (1)
  1. Return None (the default) to skip deduplication and use random UUIDs.
class OpenAccountDeciderHandler(
    DeciderCommandHandler[
        OpenAccountRequest,
        BankAccountState,
        BankCommand,
        BankEvent,
        OpenAccountResult,
    ],
):
    def _idempotency_key(self, request: OpenAccountRequest, version: int) -> str | None:
        return request.idempotency_key  # (1)
  1. Return None (the default) to skip deduplication and use random UUIDs.

The version parameter is the stream version at the time the aggregate or state was loaded (-1 for creation commands). This enables three idempotency strategies:

Strategy Key Use case
Client-provided ID request.command_id Client sends a unique ID per action; version ignored
Version-aware key f'{request.account_id}:deposit:{version}' Repeatable commands — retry-safe, yet allows re-execution after state changes
Static key f'{request.account_id}:open' Terminal actions — once per aggregate, version ignored

When an idempotency_key is provided, the repository generates per-event keys in the format {idempotency_key}:0, {idempotency_key}:1, etc. Retrying the same command with the same key is safe — the event store returns the existing stream version without duplicating events.

See Event Store — Idempotency for deduplication semantics and error handling.

Stream Length Guard

Repositories can enforce a maximum stream length to prevent unbounded event replay. Set the max_stream_length class variable on your repository:

class BankAccountRepository(EventSourcedRepository[BankAccount]):
    max_stream_length = 500
class BankAccountDeciderRepository(DeciderRepository[BankAccountState, BankCommand, BankEvent]):
    max_stream_length = 500

When a stream exceeds the configured limit, load() raises StreamTooLargeError. See snapshots for the solution.

Tip

The default is None (no limit). Use this as a safety valve for aggregates that might accumulate many events — it catches unbounded growth before it impacts performance.

Note

Snapshot-aware repositories (SnapshotEventSourcedRepository, SnapshotDeciderRepository) inherit the guard but only apply it during full replay. When a valid snapshot exists, the repository replays only the events after the snapshot, which naturally stays within bounds.

Concurrency Control

Both repository types use ExpectedVersion for optimistic concurrency:

Variant Behavior
NoStream() Stream must not exist (creation)
Exact(version=N) Stream version must match exactly
StreamExists() Stream must exist (any version)
AnyVersion() No version check

The repositories handle this automatically — NoStream for new aggregates, Exact for existing ones. A ConcurrencyConflictError is raised on mismatch.

Automatic Retry

Both EventSourcedCommandHandler and DeciderCommandHandler include a built-in retry loop for optimistic concurrency conflicts. When save() raises ConcurrencyConflictError, the handler re-loads the aggregate from the store and re-executes the command with fresh state.

The default is 3 attempts (1 initial + 2 retries). Override max_attempts on your handler subclass to change this:

class DepositHandler(EventSourcedCommandHandler[DepositCommand, BankAccount, DepositResult]):
    max_attempts = 5  # 1 initial + 4 retries
class DepositDeciderHandler(
    DeciderCommandHandler[DepositRequest, BankAccountState, BankCommand, BankEvent, DepositResult],
):
    max_attempts = 5

Set max_attempts = 1 for no retries — only the initial attempt runs, and ConcurrencyConflictError propagates immediately.

Tip

The retry loop re-reads state from the event store on each attempt, so it always works with the latest version. No backoff is applied — the handler retries immediately with fresh state.

Note

OOP creation commands (_is_creation_command returns True) are not retried — a ConcurrencyConflictError on creation means the stream already exists, and retrying with a blank aggregate would fail again. Decider commands are always retried because load() returns real state on retry.

Per-Attempt Context

Override _create_attempt_context() to wrap each retry attempt in an async context manager. The handler calls the factory once per attempt — on conflict, the current context exits (receiving the exception), and a fresh one is created for the next attempt.

This is useful for scoping a Unit of Work, database session, or transaction to each attempt so that a failed attempt's side effects are rolled back before retrying:

class DepositHandler(EventSourcedVoidCommandHandler[DepositCommand, BankAccount]):
    def __init__(
        self,
        repository: BankAccountRepository,
        publisher: IPublisher,
        uow: UnitOfWork,
    ) -> None:
        super().__init__(repository, publisher)
        self._uow = uow

    def _create_attempt_context(self) -> AbstractAsyncContextManager[Any]:
        return self._uow

    # ... other overrides ...
class DepositDeciderHandler(
    DeciderVoidCommandHandler[DepositRequest, BankAccountState, BankCommand, BankEvent],
):
    def __init__(
        self,
        repository: BankAccountDeciderRepository,
        decider: BankAccountDecider,
        publisher: IPublisher,
        uow: UnitOfWork,
    ) -> None:
        super().__init__(repository, decider, publisher)
        self._uow = uow

    def _create_attempt_context(self) -> AbstractAsyncContextManager[Any]:
        return self._uow

    # ... other overrides ...

The UoW begins a transaction on entry, commits on success, and rolls back on exception — so each retry attempt starts with a clean slate. The default implementation returns nullcontext(), which adds no wrapping behavior.

Why not event-level conflict resolution?

Some frameworks (notably Marten for .NET) offer event-level conflict resolution: when a concurrency conflict occurs, instead of retrying the whole command, they compare the committed events against your pending events and accept the append if the event types are "compatible."

waku deliberately uses full retry (reload state → re-execute command → save) instead. Event-level resolution is faster (skips reload + re-execute), but it's a correctness risk: your pending events were computed against stale state. Even when event types don't conflict, the semantics might.

Example: two concurrent DepositMoney commands on an account with a max_balance limit. Each individually is valid, but together they exceed the limit. Event-level resolution would accept both; full retry catches the violation because the second attempt runs decide() against the updated balance.

Full retry is always safe because business logic runs against the real, current state.

Aggregate Naming

Both repository types auto-resolve aggregate_name from their type parameters. This name determines the event stream prefix (e.g., BankAccount-acc-1).

Resolution rules

Pattern Source Example
OOP Aggregate class name, as-is EventSourcedRepository[BankAccount]"BankAccount"
Decider State class name, State suffix stripped DeciderRepository[BankAccountState, ...]"BankAccount"

For deciders, the canonical naming convention is {AggregateName}State (e.g., CounterState, BankAccountState). The State suffix is automatically removed to derive the stream prefix. If the state class has no State suffix, the full name is used as-is.

Union state types require explicit aggregate_name

When using a discriminated union as the state type (e.g., NotCreated | Active), auto-resolution cannot infer a name — union types have no __name__ attribute. You must set aggregate_name explicitly:

class BankAccountRepository(DeciderRepository[NotCreated | Active, BankCommand, BankEvent]):
    aggregate_name = 'BankAccount'

Alternatively, wrap the union in a TypeAliasType (PEP 695 type statement on Python 3.12+) — the alias name will be used for auto-resolution:

type BankAccountState = NotCreated | Active  # Python 3.12+

class BankAccountRepository(DeciderRepository[BankAccountState, BankCommand, BankEvent]):
    pass  # aggregate_name inferred as "BankAccount"

Explicit override

Set aggregate_name as a class variable to override auto-resolution:

class LegacyAccountRepo(EventSourcedRepository[BankAccount]):
    aggregate_name = 'Account'

Uniqueness

aggregate_name must be unique across all repositories in the application. Duplicate names are detected at startup and raise DuplicateAggregateNameError. Two repositories with the same aggregate_name would write to the same event streams, causing data corruption.

Warning

The stream ID format uses a hyphen separator (BankAccount-acc-1), so aggregate_name must not contain hyphens. This is validated at StreamId construction time.

Further reading

  • Event Store — in-memory and PostgreSQL event persistence
  • Projections — build read models from event streams
  • Snapshots — optimize loading for long-lived aggregates
  • Testing — Given/When/Then DSL for aggregates and deciders
  • Schema Evolution — upcasting and event type registries