When you have a microservice architecture, you have to deal with failures on many different levels. One of the patterns commonly used to deal with failures of remote calls is the circuit breaker. It helps preventing cascading failures, when a problem in one services causes exhaustion of resources in other services. Let’s take a look at one example, how one might go about implementing this pattern in a Node.js microservice.
The Service
We’ll use an asynchronous worker as an example. This worker processes a RabbitMQ queue and as part of the workload calls a target service. A concrete example of such service could be a worker sending SMS by calling an external third-party gateway.
The service implements basic mechanism for delivery reliability. When the external gateway times out or returns error, the message is returned back to the queue and another attempt can be made. In this scenario, there is no backoff period, so when the message is returned back to the queue and there is an available worker, it is retried immediately.
This might pose a problem in case the gateway returns immediately an error, as the messages will cycle very quickly and, for example, can generate tons of log messages. This might cause the machine to run out of disk space. I know you have all of this covered by monitoring, but still…
Circuit Breaker
In order to mitigate this and other types of cascading failures, we can use the circuit breaker pattern. As the name suggest, it handles the non-standard situation by severing the connection between the caller and callee, until the situation is resolved. The resolution can be manual or automatic.
The manual resolution can mean that when the connection is severed, a human operator has to intervene and restore it. The simplest form would be that the worker terminates itself and the operator has to start it back again. This is however not very resilient, so it’s much better to implement some sort of automatic recovery. The worker can retry the service call after a while and if the call succeeds, re-enable the connection.
The choice of timeout period in this case is important. If it’s too short, we’ll retry needlessly if the problem persists. If it’s too long, an intermittent connection problem can result in an overly lengthy outage. It’s beneficial to implement exponential backoff for the retries. It means after the circuit breaker we pause for a short period, such as 5 seconds and re-try. If it fails again, we double the waiting period to 10 seconds and continue in such fashion.
In this particular case, when we detect that something is wrong with the external gateway, we’ll pause the processing of messages. There is no point in calling the gateway, if we know it will fail anyway.
To implement it, we’ll use a counter. Every time we get an error or timeout from the gateway, we’ll increase this counter. Every time the call succeeds, we’ll reset the counter to zero. If the counter reaches a certain threshold, we’ll stop processing the messages for a while. After this pause, we’ll reset the counter to zero and start processing the messages again.
This pattern will not protect you against intermittent failures. Or even cases when half of the calls fail. But that is not the goal of circuit breaker. Even in the case when most of the calls fail, it still means some succeed, so we are running, albeit slower. We are only interested in cases when the other side is completely down and it’s wasteful to even try calling it.
It’s also a good idea to fire a special event into your log when the circuit breaker trips and specifically monitor for these events.
Implementation
I’ve put together a bare-bone implementation. For simplicity, it lacks proper error handling et al. so please don’t copy/paste this into production.
First, let’s take a look at the service without circuit breaker.
const amqp = require('amqp');
const request = require('request');
const GATEWAY_URL = 'https://smsgateway/...';
const AMQP_URL = 'amqp://...';
// this is called every time a message is received from queue
const process = (message, headers, info, original) => {
// construct the payload for the external gateway service
const payload = {
recipient: message.data.recipient,
body: message.data.body,
};
// call the external SMS gateway
request.post(GATEWAY_URL, payload, (error, response) => {
// the call failed
if (error || response.statusCode >= 500) {
// return the message back to the queue
original.reject(true);
} else {
// acknowledge the message,
// removing it from queue permanently
original.acknowledge();
}
});
};
// create AMQP connection
const conn = amqp.createConnection(AMQP_URL);
// when connected, create the queue
conn.on('ready', () => {
conn.queue('myqueue', (createdQueue) => {
// when the queue is created, bind it to
// myexchange and receive all messages
createdQueue.bind('myexchange', '#');
// call the process function for all messages
createdQueue.subscribe(process);
});
});
Not much to see here. The script will connect to AMQP server, create a queue and bind it to an exchange to receive the messages. When a message is received, the process
function is called. This function calls the external gateway. If the call succeeds, it acknowledges the receipt of the message. If it fails, it rejects it, which will return it back to the queue and it will be processed again.
Now we’ll implement the circuit breaker into this.
First, we need to prepare two functions. One for (re-)subscribing to the queue, and one for temporary unsubscribe (pause). The pause
function uses the consumer tag we remember during the subscription process. It also schedules a re-subscribe after the BREAKER_PAUSE
timeout period passes.
const subscribe = (processFunction) => {
queue.subscribe(processFunction).addCallback((ok) => {
// we have to remember the consumer tag, because we will need it
// to unsubscribe from the queue later on
ctag = ok.consumerTag;
});
};
const pause = (processFunction) => {
// unsubscribe from the queue, thus stop processing messages
queue.unsubscribe(ctag);
// wait for 30 seconds
setTimeout(() => {
// reset the circuit breaker counter and re-subscribe to receive messages
breaker = 0;
subscribe(processFunction);
}, BREAKER_PAUSE);
};
We also need to change the processing function to react accordingly to gateway failure. When it fails, we increment the breaker
counter. If this counter reaches pre-set threshold, we call the pause
function prepared earlier. If the call succeeds, we reset the counter.
// call the external SMS gateway
request.post(GATEWAY_URL, payload, (error, response) => {
// the call failed
if (error || response.statusCode >= 500) {
// increment the circuit breaker counter
breaker += 1;
// check if the counter is over threshold
if (breaker >= BREAKER_THRESHOLD) {
// trip the circuit breaker if over
pause(processMessage);
}
// return the message back to the queue
originalMessage.reject(true);
} else {
// reset the circuit breaker counter
breaker = 0;
// acknowledge the message, thus removing it from queue permanently
originalMessage.acknowledge();
}
});
And that’s pretty much it. Of course, in a real implementation we’d at least have some logging and monitoring of this happening, but in this simple form it can be used as an example how to implement circuit breaker in this worker-style service scenario.
Circuit breaker in on-line services
Pausing the processing of incoming messages is usable in this worker-type service. But what about on-line services, where you cannot pause and need to answer incoming requests? As usual, the answer is “It depends”. You have choice of several scenarios based on your use case.
In the worst case, when you absolutely need the answer from the upstream service to give any meaningful answer, you have to respond with an error code. But even in this worst case the circuit breaker is useful, because at least you “fail fast” and the caller can make other choices.
You can also choose to serve stale data, in case you made the call recently and retain some sort of cached data. It might not be up to date, but it can be acceptable and is better than nothing.
Dependent on you business logic you can also make certain business choices. Let’s take an example of a service, which answers the question whether a product is available in the warehouse or not. If 99% of your products are usually in stock, it can be acceptable to reply “it’s available” even if you are not sure, because the connection to your warehouse management system is offline. It might be good idea to log these assumed replies and produce a report of these cases for your staff to check later.
The source code is available on Github.
Comments