AbstractMessageQueue
The AbstractMessageQueue attempts to define a common interface for interacting with message queues. By depending on this interface instead of any particular queue implementation, application code can remain flexible for different environments with different requirements.
I have tried to keep the interface to a minimum so as to make it easy to implement. Any extra features can be added at an implementor's discression, or by wrapping a queue instance in another library.
Inspired by abstract-blob-store.
In this package
This package contains:
- A class for helping to create your implementation of a MessageQueue
- A function for testing your implementation against the spec
You do not have to use either of these in order to be compliant - they are just (hopefully) useful tools.
Using a MessageQueue
The MessageQueue is designed to be used in a for await
loop.
The loop must be run in series, but you may have multiple loops running in paralel.
for await (const message of queueInstance) {
const success = processMessage(message);
// On successful processing, remove message from queue
if(success) {
queueInstance.delete();
}
}
Abstract Spec
A message queue instance only needs to have three methods:
- `[Symbol.asyncIterator]`
- `delete`
- `send`
[Symbol.asyncIterator]
A method that returns an instance of an async iterator.
The iterator must have a next
method, and should have return
and throw
methods.
The easiest way to do this is by using an async generator, but does not have to be. When run, the iterator must do the following:
- Fetch and remove the next message from the queue (this removal may be temporary and time-based)
- Yield the message
- If the
delete
method has been called, permanently remove the message from the queue - Otherwise, return the message to the queue (the message does not have to be immediately available again)
Steps 3 and 4 should happen regardless of whether an error is thrown at the yield stage.
An illistration of how to achieve this with an asyncIterator:
class MyQueue {
delete(){
this.deleteCalled = true;
}
async * [Symbol.asyncIterator](){
while(true) {
this.deleteCalled = false;
const { id, message } = await somehowGetNextMessage();
try {
yield message;
} finally {
if(this.deleteCalled) {
deleteMessage(id);
} else {
retryMessage(id);
}
}
}
}
}
This method may also take other actions such as opening and closing connections to external resources.
delete
This is a simple, syncronous message that sets a flag for the message to be deleted from the queue when the iterator is resumed or finalized.
send
When called with a message as the first argument, add that message to the queue.
Implementing a MessageQueue
You can use this package to create your message queue class by extending the provided MessageQueue class:
import MessageQueue from 'abstract-message-queue';
class MyQueue extends MessageQueue {
constructor(){
// Call the superconstructor with config methods:
super({
// The next method fetches the next message
async next(){
return {
id: 'optional - some value used to identify a message interally',
message: 'the message - may be of any time, including objects'
}
},
async delete(){
// Remove the message from the queue entirely
},
async retry(){
// Re-add the message to the queue to be tried again at a later date
}
})
}
send(message){
// Add the new message to the queue
}
async * [Symbol.asyncIterator](){
// You may optionally extend the iterator method to open and close connections
this.connection = openConnection();
try {
yield* super[Symbol.asyncIterator]();
} finally {
this.connection.close();
}
}
}
Dependencies
abstract-message-queue
MessageQueue
See: default
-
abstract-message-queue
-
module.exports
⏏ - new module.exports(options)
- instance
- static
-
module.exports
⏏
module.exports
new module.exports(options)
Construct an instance of the message queue
Param | Type | Description |
---|---|---|
options | Object |
Protected methods for the queue |
options.next | function |
Function that gets the next message from the queue. Its return value must be an object with a "message" property and an optional "id" |
options.retry | function |
Function that re-queues the last item returned by next . The item ID will be passed as the first argument. |
options.delete | function |
Function that deletes the last item rturned by next . The item ID will be passed as the first argument. |
module.exports.delete()
Deletes the last message from the queue
Kind: instance method of module.exports
module.exports.default
Utility class to help implement the AbstractMessageQueue
Kind: static property of module.exports