Optimizing Event Processor Performance

Like parents want what's best for their children, we developers want what's best for our application. We concern ourselves with many aspects of our application, like modularity, readability of code, and, of course, performance! In this blog, we'll dive into how you can tune your event processors in the best way possible. 

This blog will present many optimizations, most of which come with caveats that should be minded when implementing them. Each optimization will have its limitations outlined in its respective section; please read them carefully before implementing them. Make optimizations one at a time and measure the results before implementing another. That way, measures that have a negative impact on a specific use case can be isolated from those that have a positive one.
 

Dirty jobs: event processor 

The tuning of event processors is different for each application, depending on its load, the logic executed, and the environment it is run in. Therefore, there is no one-size-fits-all optimization strategy. 
So to tune an event processor correctly, you must understand how it works and the aspects of it we are tuning. 

The following diagram contains the way an event processor works with the default setting of Axon Framework 4.6. This means using the TrackingEventProcessor with 1 thread and 1 segment set to a batch size of 1 while using Axon Server as the event store.

Diagram of default event processor behavior

Each thread of the TrackingEventProcessor will try to claim a segment by claiming a token in the token store. If there are no tokens in the token store yet, it will initialize the number of segments provided by the initialSegmentCount property by creating the same amount of tokens. 

After initialization, segments will no longer be changed by the event processor automatically in any way, so changing this property has no effect after the first run. The only way to change the number of segments is by splitting or merging them. 

After the thread has claimed a segment for processing, it will open a stream to the event store and have events pushed to it. If the event matches the segment, a batch is made. For this batch of 1, it will start a transaction, invoke the event handlers, update the token and commit the transaction. 

The default situation is inefficient when multiple events are available in the stream for processing. For each event, a batch of one is created that starts its own transaction and updates the token. So when 100 events are available, 100 batches are made that each separately start a transaction, execute the handler, store the updated token and commit the transaction. 

The problem becomes more apparent with replays, where you replay your entire event store containing possibly billions of events. What if we could reduce the overhead of the transaction to speed things up?

Increasing the batch size 


As a first optimization, we can increase the batch size property of the processor, processing more than one event in the same transaction. The batch succeeds or fails as one unit, and the token is updated only for the entire batch. The optimal size depends on the application, but you generally don't want to set it to a value that's too high. A batch size of 100 is reasonable, whereas a batch size of 100.000 typically is not. 
You can set the batch size of an event processor through its java configuration, like in this example, setting it to 10:

public class MyAxonConfiguration {
public void configure(EventProcessingConfigurer configurer) {
configurer.registerTrackingEventProcessor("processorName",
TrackingEventProcessorConfiguration.forSingleThreadedProcessing()
.andBatchSize(10)
);
}
}

Because the batch of events succeeds as one unit, failing to handle one event in the batch will fail the entire batch. The event processor will retry that batch later by entering retry mode unless another processor instance picks it up in the meantime.

It’s very important to note that by default, errors in the event handlers are only logged, not thrown, so event processors will only fail a batch if the database transaction itself fails as a whole. It’s smart to change to another strategy so that other errors, such as NullPointerExceptions, also cause the event processor to retry.

Rolling back with large batches is quite expensive if the last event in the batch fails, as it will cause all earlier events to be rolled back in the database and will have to be handled again later.

Token Stealing

Event processors are eager to start working, but each segment should only be processed by one processor. To enforce this, processors claim a token in the database that is backing a segment. 

Whenever the token of a segment is available for claiming, it will pick it up as long as it has segment capacity available. A segment is available if its token is unclaimed or the current claim has not been updated for a certain interval. This interval is what we call the tokenClaimInterval, and it's configurable on the event processor. In the future, we will rename these properties to segmentClaimInterval, because it's actually the segment that's claimed, not the token.

The claim interval exists so that when one of the instances of your application goes down, or is otherwise impaired, another instance of your application can pick it up. 

By default, the tokenClaimInterval is set to 5 seconds. That means if a batch, and thus its token update, takes longer than 5 seconds to be processed, the segment will be available for other nodes to process. Another application instance now takes the claim and starts processing: stealing the token from the previous node. Whenever the earlier instance is finally done processing and tries to commit the token, the commit will fail since another node now owns that segment. The TokenStore will throw an UnableToClaimTokenException: Unable to extend the claim on token for processor .  The following diagram depicts this process:

Diagram depicting how Token Stealing happens

A situation like this leads to an endless loop of retries and should be prevented by ensuring the batch size can be easily processed within the configured tokenClaimInterval

Adding More Segments

The TrackingEventProcessor uses only one thread and segment by default, meaning that all events are handled sequentially by the same thread. We can improve this by creating more segments to split the work between threads. You can do this in two ways: 

  • Setting the initialSegmentCount property to a higher value. When there are no tokens in the store, the processor will create the number of tokens provided in this argument. This is only done during the first run.

  • Splitting the existing segments. Contrary to the initialSegmentCount property, this works after the first run. The following blog goes into more detail.

You should always ensure there are enough threads to process the segments, or you will have unprocessed parts of your event stream. For example, if you have 4 segments and only 2 threads, 50% of your event stream will not be processed! 

It's therefore recommended to have less or an equal amount of segments compared to the threads per individual instance. It might seem wise to have 8 segments, 4 threads per instance and 2 instances to balance the load. However, if one of the two nodes goes down, 50% of your event stream is unprocessed, so you lose high availibility. It's better to run with 4 segments and 4 threads, or 8 segments and 8 threads. 

Alternatively, upgrade to the PooledStreamingEventProcessor, which has no threading limitations! 

PooledStreamingEventProcessor

With the introduction of the PooledStreamingEventProcessor in Axon Framework 4.5, most of the threading troubles go away. The number of segments it can handle is virtually unlimited. At least it is not limited by the size of the thread pool. It uses a coordinator task to claim tokens, read the event stream, and then distribute events to workers which run in separate thread pools. No segment will ever go unclaimed again! You can see how this works in the following diagram.
 

Diagram depicting how a PooledStreamingEventProcessor works

In addition, using the PooledStreamingEventProcessor has another huge benefit: The event stream is only read once per instance instead of once per thread. 
When using the TrackingEventProcessor with 16 segments on 2 instances, the same event stream will be read 16 times. However, with the PooledStreamingEventProcessor , it's only read once per instance. So only 2 times, instead of 16. This saves a lot of computing power and network IO. 

The reference guide contains detailed instructions on how to configure the PooledStreamingEventProcessor for your application. 

The PooledStreamingEventProcessor only comes with one caveat, which is insignificant in most use cases. The position in the event stream is always that of the oldest or slowest segment. This is generally no problem since segments are processed at similar speeds. We will make this the default processor type in the next major version of the framework.

Caching

We often find our code repeating actions, which can be quite expensive. For example, retrieving a JPA entity from a repository by something else than its primary key might cause significant overhead if done for every event in a batch of 100, as it will load the JPA entity many times.

We can optimize these expensive computations by caching them in the UnitOfWork. For every batch, a new UnitOfWork is created, so we can be sure that caching for one batch does not interfere with the other.

The UnitOfWork contains a resource map that you can put resources in. We can use the getOrComputeResource to either get the already loaded entity or load it from the database. After processing the last event of the transaction, we will save all entities present in the map to the database in one go.

The following code sample contains a sample implementation of this method. 
 

class MyCachingEventHander {

@Autowired
private MyEntityRepository repository;

@MessageHandlerInterceptor
public void interceptor(InterceptorChain chain, UnitOfWork<?> unitOfWork, @ConcludesBatch boolean concludesBatch) throws Exception {
chain.proceed();
if (concludesBatch) {
List<Object> entities = unitOfWork.resources().entrySet()
.stream()
.filter(entry -> entry.getKey().startsWith("entitity_"))
.map(Map.Entry::getValue)
.collect(Collectors.toList());
repository.saveAll(entities);
}
}

public void handle(MyAwesomeEvent event, UnitOfWork<?> unitOfWork) {
var myEntity = unitOfWork
.getOrComputeResource(
"entity_" + event.getEntityPropertyValue(),
(key) -> repository.findByProperty(event.getEntityPropertyValue())
);
// Do operations on the entity
}
}

The event handler in the sample code explicitly saves all entities to the database after processing the last event. This might not be necessary, depending on your configuration, but is included for illustration.

Axon Server

If your event store is on a (No)SQL Database, upgrading to Axon Server will severely improve your performance. Axon Server is our flagship product optimized for writing and reading events at great speeds and efficiency. 

Axon Server actively pushes events to each connected application. When an event is published, all nodes of an application will immediately receive that event. 

Unfortunately, (No)SQL databases don’t provide push functionality in a way the framework can use. Because of this limitation, the event store implementations in the framework for these databases have to use a polling mechanism. This can create additional delays between the moment an event is published and consumed. 

We have a version of Axon Server available for free! Check it out, and reach out to us if you want to try out the Enterprise Edition with many additional features. 

Look In The Mirror

In the end, an event processor executes a task that takes time. The less time it takes, the faster your event processing will be. We provide many tools in the framework to monitor your application, and these can also be used to optimize event performance. 

Adding the axon-micrometer module to your application is always a good idea. This will automatically add many metrics to your application and can be scraped using various tools, such as prometheus. These metrics will show the time it takes for events to be processed and provide you with other great insights. 

The most beneficial thing you can do is to trace your application with AxonIQ Console.

Conclusion

There are various ways we can tune our event processors for more performance. Summarizing all and their effects looks like this: 

  • Increasing the batch size allows for more work in a single transaction.

  • Splitting segments will allow the application to do more in parallel.

  • Switching to the Pooled Streaming Event Processor will ensure we are not wasting tons of IO.

  • Caching resources in the unit of work can be done to maximize the potential of the batch size, at the cost of additional complexity.

  • Ensuring the use of Axon Server as the event store will provide high performance and responsiveness.

  • Looking at our own code for culprits with tracing will identify needless delays we can eliminate. 

With all these performance tips and improvements, you can ensure your projections are at their peak performance, even during the replay of many events.

Mitchell Herrijgers
Solutions Architect for Axon Framework at AxonIQ Mitchell is a software craftsman, passionate coder, and eager to learn. He likes complex challenges and he doesn't get discouraged easily. He is interested in the following topics; Kotlin/Java, Event-Sourcing / CQRS / Axon Framework, Cloud/AWS/Infrastructure as Code, and performance tuning. After his study, in Computer Science at the Rotterdam University of Applied Sciences, he worked as a software engineer at ING Bank, Codecentric, and the Port of Rotterdam.
Mitchell Herrijgers

Share: