Backup unhandled events to Kafka with shutdown hook using @PreDestroy

I was struggling with a task which I’ve mentally called “a distributed back-ups using Kafka”. So the idea in a few words was that we had a service used to invoke database functions. The service had a few RESTful endpoints and was accepting a predefined set of input objects along with their keys.

Well, the idea is that every object itself is a part of a client profile, because the whole client profile is assembling in a database based on predefined fields. It also imposed some restrictions regarding my task – input requests came in an arbitrary order but it was prohibited to pass the inputs as they come from the client. We had to hold the rest while clientId and mainAccount objects will come (in other case stored function fails 🙁 ). So we have a kind of “delayed” or “pending” loading to the DB.

I will intentionally omit the implementation details. I’ve used Apache Kafka state stores (GlobalKTable to be more specifically) to create a table of incoming events (links between clientId and mainAccount) and distribute it across all Kafka Streams instances.

The runtime

We are running on Kubernetes therefore our microservices are able to scaling automatically depends on the current load. At some moment I’ve understood that I need to register a shutdown hook to force my microservice unload unprocessed data to a backup Kafka topic. To do so in Spring Boot I was needed to understand how Kubernetes stops microservices during downscaling.

And the answer is: Kubernetes sends SIGTERM signal to stop the process in the container. As it states here, JVM should handle it in a way that suports the shutdown hook mechanism.

SIGTERM signal in Java

Since all unhandled requests were stored at ConcurrentHashMaps I simply needed to iterate through them and send each entry to a pre-defined Kafka topic.

Shutdown hooks in Spring Boot

The simplest implementation I could find was to use the @PreDestroy Java annotation.

The PreDestroy annotation is used on methods as a callback notification to signal that the instance is in the process of being removed by the container. The method annotated with PreDestroy is typically used to release resources that it has been holding.

I created a method marked with @PreDestroy and ran the application in Intellij IDEA IDE. I loaded a test data into the app and stop it by clicking “Stop” in IDEA expecting my hook to being executed, but haah, not this time!It seems that IDEA doesn’t stop the application gracefully when usingStop button (or Ctrl + F2 on Windows).

To check this I ran my app in the command line using java -jar jarname.jar and after I loaded all test data I’ve pressed Ctrl + C. Here I saw that shutdown hook was executed succesfully.

Note that when @PreDestroy is triggered it (probably, I’m not sure yet) seems that all Spring beans are shutting down thus you’ll be unable to use injected components. In my case I’ve needed to use KafkaProducer to send unhandled events to backup Kafka topics. So I’ve created a temporary KafkaProducer at the beginning of the @PreDestroy annotated method and close it just before the end of the method.

UPD: My Kafka Producer was closing since that was defined in stop() method since the parent class implemented SmartLifecycle interface.

/**
     * Unloads all pending map's events to the pending Kafka topics.
     * In case of current microservice instance received a shutdown event,
     * other running microservices should read the topics and re-process these events gracefully.
     */
    @PreDestroy
    public void unloadEventsToKafka() {
        log.info("Shutdown callback triggered - @PreDestroy");
        // here the current instance is not running!
        producer = new KafkaProducer<>(configuration); // the old producer will be already closed here so we have to create a new Producer
        accountMap.forEach((key, event) -> {
            try {
                producer.send(new ProducerRecord<>(ACCT_BACKUP_TOPIC, key, accountSerde.serializer().serialize(ACCT_BACKUP_TOPIC, event)));
            } finally {
                producer.flush();
            }
        });
        depositMap.forEach((key, event) -> {
            try {
                producer.send(new ProducerRecord<>(DEPOSIT_BACKUP_TOPIC, key, depositSerde.serializer().serialize(DEPOSIT_BACKUP_TOPIC, event)));
            } finally {
                producer.flush();
            }
        });
        producer.close(1000, TimeUnit.MILLISECONDS); // we doesn't need it anymore
        log.info("Pending elements were stored successfully!");
    }

Conclusion

So whuch experience I’ve gained through this task?

  • The @PreDestroy annotation can be used as a shutdown hook in Spring;
  • It seems that Intellij IDEA doesn’t stop the processes gracefully (but you can try to debug your app and evaluate System.exit(0) expression where needed);
  • Kubernetes sends SIGTERM to stop a process inside a container;
  • JVM should invoke shutdown hooks when SIGTERM is received;
  • Kafka is good as a backup queue too 🙂 ;