Generators and iterators can be very useful if you get a hang of it

Generators and iterators in Javascript for Promise Flow Control

A guide on how to use generators to create your own utility class to handle application flow with Promises in parallel. The package is available on npmjs.com and GitHub.

1
npm install node-promise-parallel --save

GitHub: https://github.com/snappyjs/node-promise-parallel
npmjs: https://www.npmjs.com/package/node-promise-parallel

Introduction

I heard about function* (generators) and iterators quite some time ago now but I didn’t really understand the need for them until lately when I started creating some web scraping NodeJS applications. Now when I have a good grasp on how they work I can’t even think about going back to not using them for supporting the application flow especially when using Promises.

In this guide I’ll show you how I created a utility class using generators execute Promises in parallel.

Usage Example

As always I’ll start with creating my own usage example to get a clear picture of what we want to achieve with our utility.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
const PromiseParallel = require('node-promise-parallel');

// Example generator function to create Promises to be executed by the queue.
// The generator have to yeild a Promise.
function* promiseGenerator() {
    for(let i = 0; i <= 10; i++) {
        yield new Promise(resolve => {
            setTimeout(resolve, (Math.random()*100)+1);
        });
    }
}

// Create a PromiseParallel, executing 3 Promises in parallel
// with a waitTime of 1000ms before staring the next one.
const pq = new PromiseParallel(promiseGenerator, 3, 1000);

// Listen on the events emitted from the PromiseParallel.
// 'resolved' - successfully resolved Promise.
// 'rejected' - the Promise was rejected.
// 'completed' - the generator is completed (no new Promises to consume)

pq.on('resolved', res => {
    console.log(`Result that was resolved: ${res}`)
}).on('rejected', err => {
    console.log(`Error occured: ${err}`);
}).on('completed', () => {
    console.log('Generator is completed.');
});

pq.start(); // Start the execution

API

A quick look at the above sample gives us a pretty clear picture of what we’ll have to implement. We will also have to have a recap on how generators work before we start coding.

constructor

  • generator – JS generators that will give us our iterator.
  • parallel – Number of Promises execute in parallel.
  • waitTime – Time in ms to wait before executing the next Promise from the iterator.

Public methods

  • #start() – Start executing the Promises from the generator.

With the following events emitted

  • resolved – when a request have been resolved.
  • rejected – when a request have been rejected.
  • completed – when the iterator is done.

Javascript generators (function*)

I like to look at the function* as a function that creates an iterator. The iterator is a object containing a key next() that will return the next state in the iterator each time it’s called. We don’t really need to use the function* syntax, we could always create our own generator, and it could look something like this:

1
2
3
4
5
6
7
8
9
10
11
function createIterator() {
    let index = 0;
    let arr = [0, 5, 10, 15];
    return {
        next: () => {
            return index < array.length ?
                { value: arr[index++], done: false} :
                { done: true };
        }
    }
}

But to reduce all the boilerplate needed to keep track of the internal state we could just create a function*

1
2
3
4
5
6
function* generator() {
   let arr = [0, 5, 10, 15];
   for(let value of arr) {
       yield value;
   }    
}

Both the above snippets of code would work the same, it’s just a lot less typing and easier to understand the function* syntax. So now when we have our generator function how do we use it?

1
2
3
4
5
6
let it = generator(); // Create the iterator
console.log(it.next().value) // Get the first value [0]
console.log(it.next().value) // Get the second value [5]
console.log(it.next().done) // False (we're not done yet) value = 10
console.log(it.next().done) // False (we're still not done) value = 15
console.log(it.next().done) // True iterator is completed nothing more is yielded.

So the iterator will give us a true response when we have passed the last value available. Which means that we have to check if we are done before we use the value (since we don’t have any value at all after we passed the last yield).

For …of …iterator

There’s a bonus to using iterators. They allow us to use the for...of statement when looping through them. So, just like when we loop through an Array we can now for...of our previously created generator, like this:

1
2
3
for(let value of generator()) {
    console.log(value);
}

That’s pretty handy!

Let’s get coding already!

So now when we know what we need to know about generators and iterators we can get to the good part. the code for our PromiseParallel utility class! Let’s start importing that we need and creating some constants that will help us keep track of the events we are going to have to emit later.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
const EventEmitter = require('events');
const assert = require('assert');

const EVENTS = {
    RESOLVED: 'resolved',
    REJECTED: 'rejected',
    COMPLETED: 'completed'
};

/**
 * 'resolved' - when a promise have been resolved.
 * 'rejected' - when a promise have been rejected.
 * 'completed' - when all promises have been either reoslved or rejected.
 */

class PromiseParallel extends EventEmitter {
    /**
     * Create a new PromiseParallel with a set poolSize and a generator for it.
     * @param {Number} [poolSize=5] The number of concurrent promises to be run at any single time.
     * @param {function*} generator A function* that should yield a promise to be added to the batch.
     * @param {Number} [waitTime=0] the time in ms to wait before executing next promise from iterator.
     */

    constructor(generator, poolSize = 5, waitTime = 0) {
        super();
        assert.equal(typeof generator, 'function', 'A generator must be used.');
        assert.equal(Number.isInteger(poolSize), true, 'poolSize must be of type integer (defaults to 5).');

        this._poolSize = poolSize;
        this._iterator = generator();
        this._running = 0;
        this._stopped = false;
        this._waitTime = waitTime;
        this._completed = false;
    }
}

{24, 25} – We are making sure that we are getting the correct input from the user.
{27} – The poolSize is the amount of promises that will be run in parallel.
{28} – Create the iterator.
{29} – running is to help us keep track of how many Promises are currently being executed.
{30} – Boolean to see if the user wants us to stop executing the Promises from the iterator.
{31} – The time in ms to wait before executing the next Promise from the iterator.
{32} – Boolean to tell if iterator is completed (for emitting the event)

The heart and soul of PromiseParallel

So now we’ll handle that critical method that will use our iterator to fetch the Promises and execute them for us. This is where all the action happens.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
_next() {
    if (this._stopped) return;
        let prom = this._iterator.next();
        if (!prom.done) {
            this._running++;
            prom.value
                .then(res => {
                    this._resolveReject(EVENTS.RESOLVED, res);
                 })
                .catch(err => {
                    this._resolveReject(EVENTS.REJECTED, err);
                });
     } else if (this._running === 0 && !this._completed) {
         this._completed = true;
         this.emit(EVENTS.COMPLETED);
     }
}

The logic here can be a bit tricky so I’ll go through it line by line
{2} – If the user stopped we’ll just return, executing nothing.
{3} – We are getting the Promise from the iterator. (factory)
{4} – If the iterator is not done we’ll continue.
{5} – We’re going to execute another Promise so add it to the count of running.
{7, 10} – Set resolve/reject to be emitted using our helper method.
{13} – When the iterator is done, no more Promises are executing and we haven’t yet emitted the completed event, we will emit it.

Our _resolveReject() method is pretty simple, when a Promise have been resolved or rejected we’ll make sure to start the next on by calling _next() again after waiting _waitTimems.

1
2
3
4
5
_resolveReject(event, data) {
    this._running--;
    this.emit(event, data);
    this._wait().then(() => this._next());
}

start();

Now when we have our main method to handle the _next() Promise from our iterator we’ll need somehow to spin-up and start executing our Promises and that’s not as difficult as it sounds. We’ll just keep running our _next() method until we reach our _poolSize.

1
2
3
4
5
6
7
start() {
    this._stopped = false;
    for (let i = 0; i < this._poolSize; i++) {
        this._next();
    }
    return this;
}

That’s it!

You can find all the code at my GitHub repository https://github.com/snappyjs/node-promise-parallel
Or you can just npm install it if you don’t want to write it yourself.

1
npm install node-promise-parallel --save

I’ve also created another package specific for web scraping using requets. You can find it on my other blog post node-request-queue.

If you have any input on how to make the code clearer, faster or just want to help me fix some spelling mistakes feel free to fork and push to my branch on GitHub. In my next post I’ll be discussing some best practices when setting up your mongoose (mongodb) wrapper in NodeJS.

Please follow and like SnappyJS:

2 thoughts on “Generators and iterators in Javascript for Promise Flow Control

Leave a Reply

Your email address will not be published.