I want to explain the usage of this post in common use cases:
Of course in all of these cases, the task is more complex. It may seem, at first sight, that we will connect to our backend which will send notifications from some clients to others. But if we have few processes of NodeJS (e.g. pm2 cluster or docker swarm), different users will be connected to different processes hidden behind load balancer.
80% of NodeJS developers make the same mistake – they assume that their application will work as a single process. You can say that your application is one service but if you use pm2 in cluster mode then you have multiple processes already.
Imagine that you create orders in one instance of the application, process them in another instance and send via WebSocket info to the client in third place. Creating and changing orders statuses should trigger client notification on the web interface. Also OrderProcessor is subscribed for receiving events about order creation to start processing when new order is created.
This was not an issue till you were performing all of this in one process, because you can insert notify call anywhere you need. But in the current situation, we have distributed processing between multiple nodes. This means that you need to have some tools to exchange events between these independently running processes.
A good practice is to avoid keeping any state in the process to let it run independently.
Usually, developers assume their application will be running in a single process. They start to keep some state in process memory (e.g. changes of order status, identifiers etc.). As a result, this leads to issues when the application needs to be scaled (multiple nodes will not have a shared state which changed in one of them). A good practice is to avoid keeping any state in the process to let it run independently. All needed changes can be sent to every node as an event, as well as emitted by this node if it becomes a source of changes. Notificator itself is a way to make backend stateless and delegate notifications to an independent process.
Currently, we have such event-based multi-tenant project with similar requirements. And we built a specific service for sending notifications between independent processes. We used NodeJS and RabbitMQ as a transport. Also we use a docker to develop and deploy to production. RabbitMQ server is running as a docker container as well. So minimal stack for this solution is:
And I want to describe and analyze the approaches and patterns which we used during the implementation. Here we go.
To begin with the design that we used, we often try to keep the Layers Architecture pattern in our projects. This means that our application has multiple zones of responsibility, which should be kept only inside separate parts, they should not leak from the lower level layer to the upper one. You can find the REST backend using this approach on our Starter app for NodeJS which is open-sourced now.
We also try keep in mind the Dependency Inversion Principle:
High-level modules should not depend on low-level modules. Both should depend on abstractions. Abstractions should not depend upon details. Details should depend upon abstractions.
In our case this means that connecting to RabbitMQ or other PubSub should not be implemented directly in the application, but it should be hidden behind an abstraction. You can save yourself from the pain in the future by using this approach in case when you may need to replace RabbitMQ with Redis at some point. If you have an abstraction that will not change, you will need to implement the same interface methods that were previously implemented using another driver for RabbitMQ.
So we used the same approach when built our Notificator module. First of all we defined the layers:
Also we decided to use Dependency Injection: each lower-layer will be injected in a constructor of upper-layer class.
Of course these all are not the application layers. Notificator itself was used on Service layer of our backend. Let’s have a look how we can implement this layers of a notificator module one by one.
First of all let’s define the Notificator class. It will be used to notify about some events which happen as well as give possibility to receive this event in other node of application. I defined such requirements for this class:
// Notificator.js class Notificator { constructor(args) { if (!args.pubsub) throw new Error('"pubsub" is required'); this.pubsub = args.pubsub; } async init() { try { await this.pubsub.connect(); await this.pubsub.createChannel('notifications'); } catch (error) { console.error('Notificator initialization failed.'); console.error(error); } } notify(message) { try { this.pubsub.publish('notifications', message); } catch (error) { console.error('Failed to notify'); console.error(error); } } receive(messageHandler) { this.pubsub.subscribe('notifications', messageHandler); } }
As a result we have a simple class which keep only required logic. Next step will be defining an abstraction.
To define an abstraction for the pubsub provider we should create a class that will call methods directly from a driver. This driver will give the implementation of the interaction with the infrastructural part – RabbitMQ in our case. So, this class has the following requirements:
// PubSub.js const PubSubDriverInterface = require('./drivers/PubSubDriverInterface'); class PubSub { constructor(args) { if (!args.driver) throw new Error('"driver" is required'); if (!(args.driver instanceof PubSubDriverInterface)) { throw new Error('Driver does not implement interface of "PubSubDriverInterface"'); } this.driver = args.driver; } async connect() { this.connection = await this.driver.connect(); return this.connection; } async createChannel(channel) { return this.driver.createChannel(channel); } publish(channel, message) { return this.driver.publish(channel, message); } subscribe(channel, messageHandler) { return this.driver.subscribe(channel, messageHandler); } }
The main role of this interface is just to enumerate methods and their signatures. They will become sort of “contract” between our application and pubsub service. This means that you will need to implemente them in a case when you decided to replace the current pubsub with another. In this case other application code will not be affected by this change at all. Basically, the requirements for driver are the following:
// PubSubDriverInterface.js class PubSubDriverInterface { constructor(args) { this.channels = {}; this.handlers = {}; } async connect() { throw new Error('"connect" method not implemented'); } async createChannel(channel) { throw new Error('"createChannel" method not implemented'); } publish(channel, message) { throw new Error('"publish" method not implemented'); } subscribe(channel, handler) { throw new Error('"subscribe" method not implemented'); } }
Now when abstractions are described, we can go towards the implementation of an exact driver.
For the interaction with RabbitMQ there are quite a few packages on NPM. Actually, I tried to use some of them, but finally I chose amqplib. It seems to be stable, has many installs on NPM, 100% test coverage, well written documentation and it was also used in the examples of Official RabbitMQ manual for JavaScript. As you may guess, most of the code of this part is taken from these examples. I changed some methods a bit to make them promisified.
Also, the logic was wrapped in a class methods for RabbitDriver so I will explain each of them. This class implements methods of PubSubDriverInterface that we previuosly described.
Constructor of RabbitDriver will have specific for interaction with RabbitMQ process options. Here is main re
// Rabbit.js const amqp = require('amqplib/callback_api'); const PubSubDriverInterface = require('./PubSubDriverInterface'); class RabbitDriver extends PubSubDriverInterface { constructor(args) { super(args); if (!args.endpoint) throw new Error('"endpoint" is required'); if (!args.login) throw new Error('"login" is required'); if (!args.password) throw new Error('"password" is required'); this.endpoint = args.endpoint; this.login = args.login; this.password = args.password; } ... }
The amqp.connect() is used inside to perform the connection to the RabbitMQ server. I decided to prepare a connection string. But you can use object with multiple params instead of concatenated URL as a first argument. Second optional argument is a callback which can get an error and connection. As a result of a successful connection I save it to this for next usage in other methods.
// Rabbit.js const amqp = require('amqplib/callback_api'); const PubSubDriverInterface = require('./PubSubDriverInterface'); class RabbitDriver extends PubSubDriverInterface { ... async connect() { const connectString = `amqp://${this.login}:${this.password}@${this.endpoint}`; return new Promise((res, rej) => { amqp.connect(connectString, (error, connection) => { if (error) { console.error(`Failed to connect to ${this.endpoint}`); return rej(error); } console.info(`Connected to RabbitMQ on ${this.endpoint}`); this.connection = connection; res(connection); }); }); } ... }
The next thing we need is to create a channel. The connection method connection.createChannel() is used for this. It also has a callback which gives an error and a channel. The channel will be saved to the class context object using channelName mapping. This can help us if we need to support multiple channels. A single channel that we called notifications was enough for the current Notificator.
Also we need to call channel.assertExchange() to make RabbitMQ work in pubsub mode. The first argument accepts channelName. Second argument defines exchange type. There are a few exchange types available: direct, topic, headers and fanout. We’ll focus on the last one – the fanout. The fanout exchange just broadcasts all the messages it receives to all the queues it knows.
// Rabbit.js const amqp = require('amqplib/callback_api'); const PubSubDriverInterface = require('./PubSubDriverInterface'); class RabbitDriver extends PubSubDriverInterface { ... async createChannel(channelName) { return new Promise((res, rej) => { this.connection.createChannel((error, channel) => { if (error) { console.error(`Failed to create channel "${channelName}"`); return rej(error); } channel.assertExchange(channelName, 'fanout', { durable: false }); this.channels[channelName] = channel; console.info(`Created channel "${channelName}"`); res(channel); }); }); } ... }
The channel.publish() method is used for publishing. It accepts channel, routingKey and message args that should be buffered. Routing key is left as an empty string, because we use only one topic in the notificator. Also we have small formatMessage utility function to format message to string, because e.g. we want provide an API that can accept not only strings, but also objects.
// Rabbit.js const amqp = require('amqplib/callback_api'); const PubSubDriverInterface = require('./PubSubDriverInterface'); class RabbitDriver extends PubSubDriverInterface { ... publish(channel, message) { console.info(`Publishing message "${message}" to channel "${channel}"`); if (!this.channels[channel]) throw Error(`Channel for exchange ${channel} not exists`); this.channels[channel].publish(channel, '', Buffer.from(formatMessage(message))); } ... } function formatMessage(message) { let messageStr; if (typeof message === 'string') { messageStr = message; } else if (typeof message === 'object') { messageStr = JSON.stringify(message); } return messageStr; }
In fact it has no topics as regular pubsub services like MQTT. This option will make Rabbit create multiple queues (for each subscriber separate) under the hood, but if they are binded to one exchange, all of them will receive everything published to this channel, like messages are broadcasting. In regular RabbitMQ mode you have sendToQueue and consume, and only one of multiple consumers will get message, which is okay for job manager e.g. but not suitable for our case.
To subscribe for messages in the channel we need receive a queue that will be receving messages. We can get it by using the channel.assertQueue() method. When we get it, it should be bound to the channel using channel.bindQueue and passed to channel.consume where the actual messageHandler can be set. Also in this case we don’t need acknowledgement for our messages { noAck: true }.
Read Also: Global IoT in consumer electronics market
// Rabbit.js const amqp = require('amqplib/callback_api'); const PubSubDriverInterface = require('./PubSubDriverInterface'); class RabbitDriver extends PubSubDriverInterface { ... subscribe(channel, messageHandler) { if (!this.channels[channel]) throw Error(`Channel for queue ${channel} not exists`); this.channels[channel].assertQueue('', { exclusive: true }, (error, q) => { if (error) throw error; console.info(` [*] Waiting for messages for ${channel}. To exit press CTRL+C`); this.channels[channel].bindQueue(q.queue, channel, ''); this.channels[channel].consume(q.queue, (message) => { this._messageHandler({ channel, message }, messageHandler); }, { noAck: true }); }); } ... }
Defines the default message handler. It will be called in any case and will also pass a message to a custom handler passed in subscribe. The parseMessage util we implemented can be used as a simple parsing function for getting objects from strings in order to make our Driver API accept Objects as well as returning them.
// Rabbit.js const amqp = require('amqplib/callback_api'); const PubSubDriverInterface = require('./PubSubDriverInterface'); class RabbitDriver extends PubSubDriverInterface { ... _messageHandler({ queue, message, noAck = false }, messageHandler) { const messageString = message.content.toString(); console.info(` [x] Received "${messageString}"`); if (typeof messageHandler === 'function') messageHandler(parseMessage(messageString)); } ... } function parseMessage(message) { try { return JSON.parse(message); } catch (error) { console.warn(`message ${message} an not be parsed as JSON`); return message; } }
In order to use this code we need to:
This can be done as a singleton:
// notificatorSingleton.js const { pubsub } = require('./etc/config'); const PubSub = require('./PubSub'); const Notificator = require('./Notificator'); const RabbitDriver = require('./drivers/Rabbit'); const rabbitDriver = new RabbitDriver({ endpoint : pubsub.endpoint, login : pubsub.login, password : pubsub.password }); const notificator = new Notificator({ pubsub : new PubSub({ driver : rabbitDriver }) }); module.exports = notificator;
Usually it’s not required to call a singleton class in such way, enough to have a name starting with small letter. But notificatorSingleton can save us from filename conflicts in case-insensitive OS.
And finally notify/receive scripts:
// notify.js const notificator = require('./notificatorSingleton'); async function main() { let iterator = 0; setInterval(() => { notificator.notify({ text: `iteration ${iterator}` }); ++iterator; }, 5000); } main();
When we start these scripts as separate processes, the messages are sent from multiple notifiers to multiple receivers:
When we started to use the current notificator implementation, we mentioned that this library did not provide auto reconnecting out of the box. That’s why we needed to add additional logic to avoid crushing ar losing the connection when RabbitMQ was down. Also we needed to return to the working state when the RabbitMQ became available again. We slightly changed the Notificator and RabbitDriver to make this work.
In first one we added a check in init() that it successfully connected or not (in case if you try to initialize the notificator, but RabbitMQ is not available) and flag isInited which turned to true when init() passed without errors. Also we added checks in the notify and receive in the beginning of this methods to avoid calling not initialized notificator:
// Notificator.js class Notificator { constructor(args) { this.pubsub = args.pubsub; this.isInited = false; } async init() { if (this.isInited) return; try { console.info('Notificator initialization started...'); await this.pubsub.connect(); await this.pubsub.createChannel('notifications'); this.isInited = true; } catch (error) { console.error(error); } } notify(message) { if (!this.isInited) { console.warn('Can not notify. Notificator not inited'); return; } ... } receive(messageHandler) { if (!this.isInited) { console.warn('Can not receive. Notificator not inited'); return; } ... } }
We need to change a bit more in the RabbitMQ driver. First of all we need to handle the lost connection and then call connect after some timeout. We added two handlers on error and on close events for connection. Both of them were calling connect() method after a timeout and set the isReconnecting flag to true:
// Rabbit.js class Rabbit extends PubSubDriverInterface { ... async connect() { try { this.connection = await new Promise((res, rej) => { amqp.connect(connectString, (error, connection) => { if (error) return rej(error); console.info(`Connected to RabbitMQ on ${connectString}`); res(connection); }); }); } catch (error) { console.error(`Failed to connect to ${connectString}`); await new Promise(res => setTimeout(() => res(), 5000)); console.info('Trying to reconnect...'); return this.connect(); } this.connection.on('error', (error) => { if (error.message !== 'Connection closing') { console.error('[AMQP] conn error'); console.error(error); this.isReconnecting = true; return setTimeout(this.connect.bind(this), 5000); } }); this.connection.on('close', () => { console.warn('[AMQP] reconnecting started'); this.isReconnecting = true; return setTimeout(this.connect.bind(this), 5000); }); return this.connection; }
Now connection will be established again. But we will lose all handlers bound to subscriptions and channels which that we had. To solve this we decided to:
// Rabbit.js class Rabbit extends PubSubDriverInterface { ... async connect() { ... if (this.isReconnecting) { await this._recreateChannels(); await this._reassignHandlers(); logger.info('Reconnected successfully.'); this.isReconnecting = false; } return this.connection; } async _recreateChannels() { console.log('Recreating channels...'); for (const channelName in this.channels) { if (!this.channels[channelName]) continue; await this.createChannel(channelName); } console.log('Recreating channels completed.'); } _reassignHandlers() { console.log('Reassigning handlers...'); for (const channelName in this.handlers) { if (!this.handlers[channelName]) continue; for (const handler of this.handlers[channelName]) { this.subscribe(channelName, handler, true); } } console.log('Reassign handlers completed.'); } async createChannel(channelName, pubsubMode = true) { ... if (!this.handlers[channelName]) this.handlers[channelName] = []; ... } subscribe(exchange, messageHandler, isReconnecting = false) { ... if (!isReconnecting) this.handlers[exchange].push(messageHandler); } }
Unfortunately I did not find a nice way how to test the reconnects by restarting the RabbitMQ server automatically in the CI process. This was tested manually with different scenarios:
There are different approaches how you can test the interaction of your application with RabbitMQ. Some people use AQMP mocks for this purpose. There few such mocks available on NPM.
We usually use Gitlab-CI for running tests, which is able to run docker-in-docker containers. This allows us to start real RabbitMQ process for running unit/functional tests. That’s why I used this approach, which relies on a real running RabbitMQ process. E.g. using this approach you cen test that receive() handler will get messages from notify() call. For this enough to require notificatorSingleton, and send message after a small timeout, which turns the notificator in initialized state (also I used Jest test framework for this examples):
// Notificator.test.js const notificator = require('../lib/notificatorSingleton'); describe('Notificator tests', () => { test('positive: can notify/receive messages', async () => { await notificator.init(); setTimeout(() => { notificator.notify({ test: 1 }); }, 100); const result = await new Promise(res => notificator.receive((msg) => res(msg))); expect(result.test).toBe(1); }); )
Another test we can make is to check that we receive notifications that are sent from one notificator to another:
// Notificator.test.js describe('Notificator tests', () => { ... test('positive: can notify/receive messages between different nodes', async () => { const notificatorOne = createNotificator(); const notificatorTwo = createNotificator(); await notificatorOne.init(); await notificatorTwo.init(); setTimeout(() => { notificatorOne.notify({ test: 1 }); }, 100); const result = await new Promise(res => notificatorTwo.receive((msg) => res(msg))); expect(result.test).toBe(1); });
For the last test we could not require notificatorSingleton twice, because it will be the same instance. So we use a simple createNotificator() function which instantiates a notificator for us in same way as it’s done in singleton:
const Notificator = require('../lib/Notificator'); const notificator = require('../lib/notificatorSingleton'); const PubSub = require('../lib/PubSub'); const RabbitDriver = require('../lib/drivers/Rabbit'); ... function createNotificator() { const rabbitDriver = new RabbitDriver({ endpoint : pubsub.endpoint, login : pubsub.login, password : pubsub.password }); const notificator = new Notificator({ pubsub : new PubSub({ driver : rabbitDriver }) }); return notificator; }
You may also like: 5 Steps to Train Word2vec Model with NodeJS
Hope this example will be useful for you. Actually this approach solves a real life issue when you need to communicate between nodes of a multi-tenant application. Check out the complete working sources on my GitHub. Even with some tests ????. Thanks for reading!
Learn more about how we engage and what our experts can do for your business
Written by:
Technical Lead at 2Smart
Software developer with almost 6 years of experience, interested in smart-home products and learning new technologies
In this guide, we provide you with comprehensive information on how to enable Facebook login in the React Native application, a description of the process…
How to Update Internet-of-Things (IoT) Devices: Programmer’s Guide The Internet of things is growing exponentially every year. Internet-connected devices are changing the way we work,…
3 Steps to Blur Face with NodeJS and OpenCV: How to Blur Image This reading is about a task which our team had on one…
By this article, I’d like to provide you a checklist that will help you to test your product better and discover it more in-depth. Things…
How to Use Docker Compose for Automated Testing Many engineers have been confused when designing a method for isolated testing. Fortunately, today we have great…
Do you know that only 26% of IT projects are completed on time and budget, 46% are late or over budget, and 28% fail? Failure…