recursor
Reliable cursor for MongoDB.
The Problem
Let's say you're collecting some data and store it in MongoDB. It can be usage analytics and it can be something else.
After sometime, you have a brilliant idea. You want to add some sort of reporting system to your software. You want to build the report based on the collected data, and you want to keep building it as we collect more data over time.
Eventually, we're talking here Incremental Map/Reduce with one twist. Instead of processing the events in MongoDB, you'll process them in Node.
Such functionality, can be really useful for ETL operations, or any other operation where you need to process the event using other libraries. Like performing an HTTP Request to external API.
The solution
This module abstracts the process of storing and retrieving the id or some other correlation id of the last processed data unit.
The usage is really similar to the find
method that each mongo collection exposes.
Usage
Installation
npm install recursor
Usage
First let's include the module:
var recursor = ;
The second step would be to define the persitence, in which the state will be stored. In order to do that, you have to create an Instance of the repository object.
var recursorName = "someName"; // Name your recursor... Makesure that no other recursor will use this name.var recursorCollection = db; var storage = recursorName recursorCollection;
Right now, the state of the recursor
can be stored only in MongoDB. You can create new storage engines by inheriting
from the base object. More on that later.
The next step is to create an instance of Resumer
.
var eventsCollection = db;var resumer = storage eventsCollection;
Now we can do our MongoDB queries as usual:
resumer
That would be it...
From now on, the resumer would find only new added events.
Under the hood
Basically, each time an object makes it's way out of the finder
, it's uniqeIndex
would be stored using storage object
.
uniqueIndex
must be incremental and can be configured to any value (more on that later). The default value is { _id: 1}
.
Which means that _id
must be incremental.
Next time you hit the resume
method, the Resumer
will query for the last processed uniqueIndex
and will query
the storage from that point.
API
recursor.storage.MongoStorage
This class should store the state of the recursor
in MongoDB.
Args:
recursorName
- is the name of the recursor. You can have 2 different jobs running on the same collection of events.recursorName
is what differentiates those 2 jobs.recursorCollection
- In which collection the state of the recursor would be stored.
Usage:
var storage = "test job" db;
recursor.Resumer
Sticks everything together. Will expose an API for resuming from the last processed event.
Args:
repository
- Recursor storage engine that implements theRecursorRepository
interface.collection
- The Mongo collection that has the events that are going to be processed.options
- AuniqueIndex
can be defined. Default is{uniqueIndex: {_id: 1}}
. You can have compound index as well such as{uniqueIndex: {eventName: 1, timestamp: 1}}
. The only important thing to keep in mind is thatuniqueIndex
must be incremental.
Usage:
var storage = "test job" db;var resumer = storage db;
resumer.resume
This method creates a Finder
object (described bellow).
The only Arg
of resume is a callback. Here are the args of the callback:
err
finder
- an instance of a finder object.
Usage:
var storage = "test job" db;var resumer = storage db; resumer
Finder
An object that has a find
method, that you can use the same way you would with a collection.
finder.find
Same thing as find
on Mongo collections, with some limitations. It's not possible to overload args the same way
as it's possible with the original method.
Args:
selector
- Query/Selector as in mongooptions
- Options as in Mongo
Return value: MongoDB cursor. Exactly the same thing as collection.find()
.
Example:
var storage = "test job" db;var resumer = storage db; resumer
Where does the magic happens?
cursor.nextObject(cb)
- When you trigger this method, theResumer
will store a reference to this object before passing it to (cb).cursor.stream()
- Will store a reference for each object that is being streamed. If you consume the stream slower than it streams, the objects will be stored in a buffer. Which means thatResumer
will still store a reference to an object before it's being buffered.
Extras
As you understand, for each object that would be fetched from the DB, a recursor object
would be stored. This roundtrip
might be really expensive when you process good amount of events.
In order to make it more efficient, there are some wrappers for the storage mechanism.
recursor.storage.InMemoryStorage
This class implements the RecursorRepository
interface, and stores the state in memory. Basically, InMemoryStorage
is just a helper class. Other storage objects might depend on it's instances.
recursor.storage.BufferedRepository
Will store the state of the recursor every 10 insertions. (or any other amount passed in opts).
Args:
inMemoryStorage
- Intermediate storage. Should implement theRecursorRepository
interface.persistenceStorage
- The final storage that the state should be persisted at.options
- Reacts tobuffer:
option. (default{buffer: 10}
).
Example:
var memory = ;var storage = recursorName recursorCollection;var buf = memory storage; var resumer = buf eventsCollection;
recursor.storage.TimedRepository
Will store the state of the recursor every 10 seconds. (or any other amount passed in opts).
Args:
inMemoryStorage
- Intermediate storage. Should implement theRecursorRepository
interface.persistenceStorage
- The final storage that the state should be persisted at.options
- Reacts tosaveAfter:
option. (default{saveAfter: 10}
).
Example:
var memory = ;var storage = recursorName recursorCollection;var buf = memory storage;var timed = buf storage; var resumer = timed eventsCollection;
Repository composition
If you want to persist the state of the recursor both every 10 insertions and every 10 seconds, you can achieve this
by passing one of the repositories as the inMemoryStorage
of the other...
Example:
var memory = ;var storage = recursorName recursorCollection;var timed = memory storage; var resumer = timed eventsCollection;
Example
Instead of using MongoDB, the example uses TingoDB which has the same API.
var through = ;var Engine = ;var db = __dirname + '/storage'{};db; // Get the collectionvar collection = db; // Clean the collection from previous insertscollection; collection;collection;collection;collection; // Wait 1 second, just for the example and synchronization;
install
With npm do:
npm install recursor
license
MIT