Requests in a queue using NodeJS

NodeJS: HTTP Requests queued for web scraping

tl;dr

A post on how I created a utility to queue up requests, with possibility to run a set amount of requests in parallel with a delay between them. All the code is available in github or npm if you just want to start using it.

1
npm install node-request-queue --save

GitHub: https://www.github.com/snappyjs/node-request-queue
npmjs: https://www.npmjs.com/node-request-queue

Introduction

So today I’m gonna start off with my first post ever. It’s gonna be about a little utility I’ve been working on to be able to handle HTTP-requests in a queue-like way with optional delay for some web scraping.

When performing web scraping you’d like to keep track of how many requests you fire off at once to make sure that you don’t get any (429) status codes.

Using the 80/20 principle like all my blog posts will be using I’ll get right to the good stuff.

Usage Example

To get a feel on what we need to do I like to start with the usage example and work from there with the actual implementation. So here it goes, this is what I’d like the request-queue to look like

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
const RequestQueue = require('node-request-queue');

// see: 'http://www.npmjs.com/request' for more information
const request = {
    method: 'GET',
    uri: 'http://www.snappyjs.com'
}

// 3 requests in parallel with a delay of 1000ms
const rq = new RequestQueue(3, 1000);

// Events
rq.on('resolved', res => {
    // Handle successfull response
}).on('rejected', err => {
    // Handle rejected response
}).on('completed', () => {
    // Handle queue empty.
});

// Add events
rq.push(request);
rq.pushAll([request, request, ...]);
rq.unshift(request);
rq.unshiftAll([request, request, ...]);

// Clear the queue
rq.clear();

So that it’s, a pretty good start on what we’d like to do. I’ve decided to use the node package request with the request-promise wrapper to perform the HTTP-request. Feel free to use any other Promise/Callback based HTTP-request package if you’ve got someone that fits your needs better.

API

A quick look at the above sample gives us a pretty clear picture of what we’ll have to implement.

  • #push() – add a request to the end of the queue.
  • #pushAll() – add an array of requests to the end of the queue.
  • #unshift() – add a request to the beginning of the queue.
  • #unshiftAll() – add an array of requests to the beginning of the queue.
  • #clear() – clear the queue.
  • #size() – get the length of the current queue.

With the following events

  • resolved – when a request have been resolved.
  • rejected – when a request have been rejected.
  • completed – when the queue is empty.

Let’s get started

We’ll start with initializing our node package and installing our dependencies. Run the commands below in the folder where you want to start the project

1
2
npm init
npm install request request-promise --save

And create an index.js file which will contain our code. We’ll start with our require-statements and some constants that will be used for the events.

1
2
3
4
5
6
7
8
9
10
11
'use strict';

const request = require('request-promise');
const EventEmitter = require('events');
const assert = require('assert');

const EVENTS = {
    RESOLVED: 'resolved',
    REJECTED: 'rejected',
    COMPLETED: 'completed'
}

We are importing request-promise to handle our requests, EventEmitter to emit some events with the status of our requests and finally assert to make sure that we are getting the correct types of data to our queue. The EVENTS is an object of the events that we will fire off via the EventEmitter.

Our class

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class RequestQueue extends EventEmitter {

    constructor(parallel = 1, waitTime = 0) {
    super();

    assert.ok(Number.isInteger(parallel) && parallel >= 1, 'Parallel needs to be a integer >= 1.');
    assert.ok(Number.isInteger(waitTime) && waitTime >= 0, 'waitTime needs to be a integer >= 0.');

    this._requests = []; // the actual queue of requests
    this._parallel = parallel; // number of parallel requests to be executed at once
    this._waitTime = waitTime; // the delay between new requests.

    this._running = 0; // The number of currently running requests.
    this._completed = false; // To tell if we need to emit 'completed' event.
    }
}

Here we are creating our class, extending EventEmitter, making sure that the input is correct and initializing our ‘private’ parameters.

Adding items to our queue

We need to be notified when items are added to the queue (_requests) and then spin our queue up as soon as anything is added. So we’ll start with a basic public method to help us add items to the queue.

1
2
3
4
5
6
7
8
9
10
  /**
   * Add a new request to the queue of requests to be executed.
   * @param  {Object} req Can be any request object or string to perform the request to.
   * @return {this}
   */

  push(req) {
    this._requests.push(req);
    this._next();
    return this;
  }

As you can see in the code above I’ve added a method call to this._next() when something is added (pushed) to the queue, this is ‘the heart and soul’ of our RequestQueue which will spin up the requests as the are pushed/unshifted to the queue. pushAll(), unshift(), unshiftAll() bascially all look the same, I won’t go through their implementation in detail, you can checkout source at github.

The heart and soul of our queue

So what’s this _next() method? That’s our heart and soul, where all the fancy stuff is happening.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
  _next() {
    while (this._running < this._parallel && this._requests.length !== 0) {
      this._completed = false; // We have a new request about to start so we are not completed.
      this._running++;
      request(this._requests.shift())
        .then(res => {
          this._running--;
          this.emit(EVENTS.RESOLVED, res);
          this._wait().then(() => this._next());
        })
        .catch(err => {
          this._running--;
          this.emit(EVENTS.REJECTED, err);
          this._wait().then(() => this._next());
        });
    }
    this._emitIfCompleted();
  }

Details about each row:

{2} – Loop as long as we haven’t reach our ‘parallel’ limit and we have items on the queue.
{3} – Boolean to make sure that we don’t emit the completed event since we are just starting a new request.
{4} – A counter so that we know how many requests are currently being executed.
{5} – Get the first request in the queue and execute it.
{6} – On a successful request.
{7} – Reduce the count of running requests.
{8} – Emit the resolved results.
{9} – Delay and then go back and check if we should execute the next request.
{11} – If an error occurred
{12} – Reduce the count of running requests.
{13} – Emit the rejected results.
{14} – Delay and then go back and check if we should execute the next request.

Hang in there… We’re almost done

So we’ve got the soul and heart of our queue the _next() method, we’ve just got one more thing to implement and that’s our completed event. If you pay extra attention on {17} in our _next() method you’ll see that we have a call to this._emitIfCompleted() which will help us emit our last event. So here it is:

1
2
3
4
5
6
  _emitIfCompleted() {
    if (this._running === 0 && this._requests.length === 0 && !this._completed) {
      this._completed = true;
      this.emit(EVENTS.COMPLETED);
    }
  }

{2} – Only emit if no current requests are running, the queue is empty and we haven’t already sent out the completed event.
{3} – Boolean to tell that we have already emitted a completed event for this “turn” of emptying the queue.
{4} – Emit the completed event.

That’s it!

You can find all of the code at my GitHub repository: https://github.com/snappyjs/node-request-queue

That’s it for my first post ever on my snappyJS page! I really hope that you got something out of reading it. I’d also like to add that I’m open for any feedback on anything in the comments section or you could always just send me an e-mail on tommy@snappyjs.com

What do you want my next post to be about?

Please follow and like SnappyJS:

3 thoughts on “NodeJS: HTTP Requests queued for web scraping

Leave a Reply

Your email address will not be published.