Axon is a popular framework for writing Java applications following DDD, event sourcing, and CQRS principles. While especially useful in a microservices context, Axon provides great value in building structured monoliths, that can be broken down into microservices when needed. A good place to see how is in our webinar recording section.
An important ingredient in the Framework is the explicit use of messages. One of the types of messages is events. In Axon Framework there are two ways of event processing - subscribing and tracking. The Framework provides Subscribing Event Processor (SEP) and Tracking Event Processor (TEP) components to support these two ways of event processing.
Events published on the Event Bus are supplied to the SEP in the thread that published them. Depending on the Event Processing Strategy these events might be processed in the same thread or a different one. Having said that, processing events in the same thread gives us the possibility to rollback the whole transaction if event processing is unsuccessful.
TEPs use their own threads and are in full control of how handling happens, although always independent of the transaction that has published the event (this transaction must have been committed). When we want to replay the events that have happened in the past (usually for read model (re)building) this is the mechanism to use. The rest of this blog discusses the features of TEP and how are those implemented.
Tracking Event Processor uses a Tracking Token to keep track of events that have been processed. A Tracking Token represents the position of an event in the event stream. Different Event Store implementations may use different implementations of the Tracking Token to reliably represent this position. To be able to continue event processing after the process restarts (we’ll see later that this is not the only reason), Tracking Token is stored in a Token Store. There are several implementations of Token Store - JPA, JDBC, Mongo, and, of course, you can provide your own. Usually, the best place to store the Tracking Token is the place where the projection (or Saga) is also stored. Figure 1 shows how these concepts are aligned together.
Each TEP claims its Tracking Token (in order to avoid multiple processing of the same event in different threads/nodes). Claiming the Tracking Token is the procedure of setting the owner of Tracking Token in the Token Store. The owner is not set indefinitely but for a configurable timeout. When this timeout expires and the current owner does not reclaim the token, a different owner (TEP) can claim it. TEP can release the claim telling other TEPs to continue with the processing. A conscious release of the Tracking Token sets the basis for TEP rebalancing - having a load of event processing equally distributed across TEPs.
In Axon, parallel processing is achieved by segmenting an event stream. For a certain TEP, we would start several threads which would work on their own segment of event stream in parallel. The number of segments per TEP is configurable. Let’s define what a segment actually is.
A segment is a fraction of the total stream of events (see Figure 2). In other words, Tracking Token is segmented into several portions which means that Token Store contains an entry per Tracking Token and Segment. A segment contains segment identifier and a mask. Mask is used to determine whether a certain event belongs to the given segment. Segment plays a significant role in parallel and distributed event processing. Each TEP can have several segments assigned to it. For each one of them, it starts a separate thread for event processing enabling us with parallel processing. Segments can be distributed among TEPs on several nodes giving us the possibility to process events in a distributed fashion (do note that TEPs on different nodes still have to claim the Tracking Token so they don’t process the same event).
In cases when we want to rebuild projections (view models), replaying past events comes in handy. The idea is to start from the beginning of time (or from a certain point in time) and invoke all event handlers anew. To do so, one has to reset the TEP (do note that before resetting the TEP should be stopped) at a certain point in time - this means that Tracking Token gets updated. You could ask yourself whether a manual update of Tracking Token in Token Store is sufficient to trigger a TEP to re-process the past events and you’re right, the replaying of events will happen in this way too. The benefit that you can extract from the Replay API is that TEP keeps the difference between the events that are newly published and ones that are replayed. This puts you in control which events get replayed and which don’t. Sagas are (by default) not replayable. For replay purposes, Axon provides a Replay Token.
When TEP is started it starts a worker for each segment in a separate thread. Each one of them tries to claim a Tracking Token for a certain amount of time. If claiming is successful, a processing loop is started. The happy flow of processing loop contains the steps shown in Figure 3.
- The event stream is opened at the position where Tracking Token points. If there aren’t events available (we wait for one second for events to become available) in the stream, Tracking Token claim gets extended and we start the loop from the beginning. Otherwise, we proceed with the loop.
- Events are read from the stream and put into a batch. Events that do not have a handler capable of handling them subscribed to this TEP do not get in the batch. If after this step, batch stays empty we extend the claim on Tracking Token, update the TEP Status and start the loop from the beginning. Otherwise, we proceed with the loop.
- The previously created batch of events gets processed by the event handlers subscribed to this TEP in a UOW (Unit Of Work). Before committing UOW, Tracking Token gets stored in the Token Store.
- TEP status contains information about the progress of event processing - segment, whether TEP is caught up with the event processing, whether TEP is replaying events, and the actual Tracking Token.
After event processing has stopped (either explicitly or by an error) the TEP releases the segment so another processor can claim it and continue processing.
In Tracking Event Processing, events are processed in different threads which makes the error handling more complicated. To solve this problem, Axon provides Error Handlers that may be configured on the TEP and act when the exception occurs in the event handling component. By default, exceptions are propagated ultimately causing the TEP to release its claims and start retrying. If necessary, custom Error Handlers may be provided to the TEP. The recommendation is to have error cases clearly defined and represented by corresponding exceptions. Having this, Error Handlers can act on them with different strategies in order to resolve the unexpected behaviour.
Tracking event processing is a really powerful mechanism in the Axon Framework which gives us
- Ability to replay events - an important advantage of event sourced systems is the ability to build new projections based on the past, or to rebuild existing ones if requirements change.
- Location transparency - process your events in whichever node you want, just make sure you have access to the event stream and token store.
- Performance - start several segments on the same node to process them in parallel and speed up the processing.
In CQRS architectures query models are updated separately from command models which gives us the possibility to scale our query models differently. In such cases (and many other) Tracking Processing is a preferred way of event processing. Hence, Tracking Processing is the default in when using an Event Store.
We hope the above is useful to Axon practitioners. Feel free to reach out to us with any questions or comments!