Axon Framework 4.6.0: Fault-tolerant event processors

Event processors are a big part of what makes Axon Framework the great, production-ready CQRS framework it is today. Recently, they got even better. With the recent Axon Framework 4.6.0 release, we introduced a new feature: The Dead Letter Queue. Users have often requested the feature, and we would like to share this feature with you in this blog!

Stuck event processors

Imagine you have an event processor. It aggregates some events into a reliable overview for use on the dashboard of your application. It might look like the ShipEventHandler following code sample. 

@ProcessingGroup("ship-overview")
@RequiredArgsConstructor
@Service
public class ShipEventHandler {
private final ShipEntityRepository repository;

@EventHandler
public void on(ShipAddedEvent event) {
final ShipEntity ship = new ShipEntity(event.getName().toLowerCase());
repository.save(ship);
}
}

It saves the ship’s name to the database while making the name lowercase. And it has always worked very well. But not anymore; someone deployed a broken change into production. Because of this bug, a user created a ship with an empty name!

The event processor tries to process this event and fails due to a NullPointerException. With the default configuration, the framework would only log the error and continue. By only logging and continuing, the data is inconsistent; a ship is now missing from the overview. 

We can improve this by configuring a PropagatingErrorHandler for the processing group; the exception will bubble up, and the event processor will go into retry mode. In retry mode, it will retry processing the events with exponential backoff. It will keep trying to process the event periodically but never succeed until someone fixes the mistake and deploys it into production. 

The downside is that it stops the event processor entirely. It cannot progress any further, and the data gets staler over time until we deploy a fix. It’s stuck, with no easy way out. 
 

Introducing the Dead Letter Queue

Axon Framework 4.6.0 introduces the Dead Letter Queue that users can configure for any event processor. When an event processor has one configured and encounters an error, the processor will persist the message in the Dead Letter Queue for later processing. The processor will then be able to continue despite the error, processing all other events that do not exhibit the same problem. 

More often than not, events depend on each other. You cannot update a ship’s name until it was created earlier. You cannot modify a database row that doesn’t exist. So consequently, taking one event out of the stream and not processing it can be dangerous. 
To prevent this from becoming a problem, The event processor checks the Dead Letter Queue every time it’s processing a message to see whether there is a similar event. If there is, the event processor will also add this event to the queue. This check uses the sequence identifier of an event, which is the aggregate identifier by default. 

Dead lettering to the rescue!

Luckily for the team, the person that broke the event processor also configured a Dead Letter Queue for this event processor. Let’s look at how you can do this in the following sample configuration.

@Configuration
public class ShipEventConfiguration {
@Autowired
public void configure(EventProcessingConfigurer configurer) {
configurer.registerDeadLetterQueue("ship-overview", config -> JpaSequencedDeadLetterQueue
.builder()
.processingGroup("ship-overview")
.serializer(config.serializer())
.transactionManager(config.getComponent(TransactionManager.class))
.entityManagerProvider(config.getComponent(EntityManagerProvider.class))
.build());
}
}

As you can see, it was pretty easy to configure. After deploying a fix, the team can retry items in the Dead Letter Queue by calling the SequencedDeadLetterProcessor API. At the time of writing, there is no mechanism to retry this automatically. You will have to build this yourself.
We can see this happening in the following Spring service.
 

@Configuration
@RequiredArgsConstructor
public class ShipEventRetryService {
private final EventProcessingConfiguration configuration;

public void retry() {
configuration.sequencedDeadLetterProcessor("ship-overview")
.ifPresent(SequencedDeadLetterProcessor::processAny);

// Or, for only retrying messages of ShipAddedEvent
configuration.sequencedDeadLetterProcessor("ship-overview")
.ifPresent(p -> p.process(deadLetter -> deadLetter.message().getPayload() instanceof ShipAddedEvent));
}
}

You can call this API through any method of choice, for example, by using a REST endpoint or a Spring @Scheduled annotation. 
Based on the predicate provided, the process method will retry the first matching sequence of events (meaning, with the same sequence identifier). The predicate will receive the first event of each sequence, with additional information about what caused it to be enqueued. You can then decide whether you want to process this event again.
Calling the processAny method will result in retrying the oldest event sequence based on the date it was queued or last retried. 

Giving up retries

Sometimes we are sure we don’t want to retry a specific event or error. Or we do not want to retry a dead-letter sequence after it failed ten consecutive tries.

To achieve this, we can register a custom dead letter policy to the event processor, specifying through a method whether we want to retry the event. We can see a comprehensive policy in the following sample, which you can use for inspiration.
 

@Configuration
public class ShipEventDecisionConfiguration {
@Autowired
public void configure(EventProcessingConfigurer configurer) {
configurer.registerDeadLetterPolicy("ship-overview", configuration ->
(letter, cause) -> {
if (cause instanceof NullPointerException) {
// It's pointless..
return Decisions.doNotEnqueue();
}
final int retries = (int) letter.diagnostics().getOrDefault("retries", -1);
if (letter.message().getPayload() instanceof ShipAddedEvent) {
// Important, always retry
return Decisions.enqueue(cause);
}
if(retries < 10) {
// Let's continue and increase retries!
return Decisions.enqueue(cause, deadLetter -> deadLetter.diagnostics().and("retries", retries + 1));
}
// Exhausted retries
return Decisions.doNotEnqueue();
});
}
}

This sample exhibits the following behaviors:

  • It ignores NullPointerExceptions.
  • It retries ShipAddedEvent, regardless of the number of retries.
  • It retries all other messages ten times before no longer enqueueing it

As you can see, you can make these policies as specific for your use case as you like. 

Wrapping up

The Dead Letter Queue functionality in Axon Framework 4.6.0 significantly benefits the way you operate your event processors. They become more reliable and no longer get stuck on singular defective events, allowing you to sleep better at night. If you want to try out this feature using a workshop format, check out the workshop we created to get hands-on experience. 

We are always looking to improve the framework further and are always open to feedback. Feel free to share it with us on our Discuss platform!

Mitchell Herrijgers
Solutions Architect for Axon Framework at AxonIQ Mitchell is a software craftsman, passionate coder, and eager to learn. He likes complex challenges and he doesn't get discouraged easily. He is interested in the following topics; Kotlin/Java, Event-Sourcing / CQRS / Axon Framework, Cloud/AWS/Infrastructure as Code, and performance tuning. After his study, in Computer Science at the Rotterdam University of Applied Sciences, he worked as a software engineer at ING Bank, Codecentric, and the Port of Rotterdam.
Mitchell Herrijgers

Share: