Oct 2013

Concurrent Node.js

Introduction

Recently, a colleague of mine asked me to introduce the most important concepts of Node programming to a flock of interested people in our research group. Initially, I declined, considering the vast number of tutorials and books, but then thought it might be quite an interesting challenge: Is there any aspect of Node development that is not easily understood by Node beginners and that is poorly covered by the existing posts? Taking this into account, my main goals for this tutorial are: Which part of developing Node programs is the hardest to grasp for programmers proficient in imperative languages (Java, JavaScript, Objective-C, PHP, Python, Ruby, etc.)? In my opinion, the biggest issue is writing asynchronous, concurrent Node applications. At the same time, this seems to be the least covered aspect of all existing introductory tutorials. Last, I wanted to present relevant issues without getting too far ahead of the status quo (e.g., generators, which are still considerd experimental right now) or anything else that you would not want to use in production.

Overall, this tutorial should teach you to code reactive programs that are both maintainable and concise. Nonetheless, I will assume you have a basic idea about Node and JavaScript, so I do expect you have read at least one of those excellent introductions. In particular, you should have a basic idea of how Node uses callbacks to handle events, given that they are the core concept of Node apps. And although I will cover this aspect in more detail, Nodejitsu also has a short section on writing asynchronous JavaScript and spawning child processes in Node.

To make this discussion more valuable for real-world projects, I restrict myself to use “battle-tested” packages; After checking with npm and GitHub, I decided to only use async and Q, plus some underscore magic (only partial function application). The tutorial should explain to the reader how, using a limited set of tools, a Node developer can write highly asynchronous Node apps without ending up in "concurrency hell". This introduction will take the reader familiar with Node's basic callback mechanism (evil tongues would say "callback spaghetti") via asynchronous control structures to a concurrent, functional programming style that uses promises - also know as futures - to delegate the logical flow from the JavaScript code to the V8 engine.

A Quick Refresher

Lets look at a static HTTP file server, using the basic Node modules only.

var http = require("http")
var url = require("url")
var path = require("path")
var fs = require("fs")
var port = process.argv[2] || 80

http.createServer(function(request, response) {
  var uri = url.parse(request.url).pathname
  var filename = path.join(process.cwd(), uri)

  path.exists(filename, function(exists) {
    if(!exists) {
      response.writeHead(404, {"Content-Type": "text/plain"})
      response.write("404 Not Found\n")
      response.end()
    } else {
      if (fs.statSync(filename).isDirectory()) filename += '/index.html'

      fs.readFile(filename, "binary", function(err, file) {
        if(err) {
          response.writeHead(500, {"Content-Type": "text/plain"})
          response.write(err + "\n")
          response.end()
        } else {
          response.writeHead(200)
          response.write(file, "binary")
          response.end()
        }
      })
    }
  })
}).listen(parseInt(port, 80))

console.log("file server running at http://localhost:" + port + "/")

This highlights basic Node concepts; Upon receiving a request, the server makes an asynchronous check via the OS to ensure the file exists. The asynchronicity is indicated by the callback that gets sent to path.exists(). Then, again asynchronously (note the callback sent into fs.readFile()), Node reads the file, and if this job completes successfully, sends the file (data) to the response handler. Because both checks are done asynchronously, this leaves the server process free to handle other requests while the OS is doing the file lookups (i.e., "non-blocking"). However, this demonstrates a problem with this callback coding style: the functions are stacked one on top of another, making this code quite uncomfortable to read and maintain.

Before we begin, it is also worth to refresh the six commandments (taken from Caolan McMahon's excellent post on Node style and structure) you should follow when coding any concurrent program:

  1. Never mix sync and async style: return a value or a callback, but not either.
  2. Return on a callback: write return callback(null, result) to prevent execution continuing beyond the CB.
  3. Always check errors in callbacks: make sure the CB has a way to deal with error states, as in if (err) { ... }.
  4. Avoid mutable state - it leads to "debugging hell" in concurrent programs (e.g., Heisenbugs).
  5. Only use try, throw & catch within imperative code sections (you cannot throw "across" a CB).
  6. Write tiny functions: maybe 3 statements, and no more than 5.

Multiprocessing in Node.js

Node is process-oriented, i.e., unlike some languages where you create multiple threads in your process, in node you need to "open" multiple OS processes (using popen(3)). The default tool to run multiple processes in Node is the child_process module, and, as an experimental add-on since Node v9, cluster can run multiple Node processes in parallel that share the same port or socket. When forking/spawning/executing child process, be aware that segmentation faults and other nasty errors in a child process have the potential to bring down your whole stack. In general, there are three calls that are relevant for running parallel processes:

  1. spawn provides a streaming process that you communicate with through Unix pipes. Think of it as in shell syntax "process < instream > outstream &".
  2. exec runs a single shell command and returns the exit status of the computation. Think of it as in shell syntax "exec input; echo $?". Note that while exec does have an output buffer, it is tiny and you should use the streaming interface of spawn if you expect to receive output from the process you are running.
  3. fork runs a worker script you can communicate with via a channel. Think of it as a bidirectional version of the shell pipe "server | worker &". Fork is also useful to isolate blocking calls from your otherwise non-blocking Node program.

Note that spawn and fork are much like background daemons, while exec runs some binary to completion.

Example Case: A Service Broker

Throughout this tutorial, we will use a theoretical service broker that might query a DB or use XHR to communicate with another server. It also could be some worker writing or reading data, or anything else that is somehow "IO-bound". Whatever you prefer, the point is that this is a blocking ("side-effecting") IO call in your program that you want to handle asynchronously. In other words, this means you will need to use callbacks (CB) to be notified when the IO event finished and with which result and error state.

First, we create a directory where we can put our tutorial code, and for the sake of popularity, use the Express web app framework:

$ mkdir reactive-node-tutorial
$ cd reactive-node-tutorial
$ npm install express
$ node_modules/express/bin/express
yes
$ npm install

We will use the following broker.js script, placed in this same directory:

/* a "blocking" query */
function query(ms) {
  var start = new Date().getTime()
  console.log("" + start + ": query started")
  while ((new Date().getTime() - start) < ms) {}
  console.log("" + new Date().getTime() + " query finished")
}


process.on('message', function(m) {
  // normally, here would probably be:
  // process.send(query(m))

  query(Math.floor(Math.random()*200+900))
  process.send(m)
  process.exit()
})

Obviously, this is quite a a toy example, because our worker receives and returns only one single message before shutting down. Similarly, our server API for this broker in app.js will be just as simple:

var broker = require('child_process')

function query(bucket, query, cb) {
  var channel = broker.fork(__dirname + '/db.js')
  channel.send(query)

  channel.on('message', function(result) {
    cb(null, result)
  })

  channel.on('error', function(err) {
    console.log(err)
    cb(err)
  })
}

Go ahead and replace everything in app.js with this code. In a real-world program, you would use a pool of workers and, each time you start a query, use another channel from your pool. As a matter of fact, most Node libraries will provide you with non-blocking versions of the API they wrap, so you might need no workers at all. Rather, you are more likely to start scaling Node across multiple cores and even machines using workers and IPC events. However, this will be enough to run our queries in parallel and demonstrate idioms that can help you write your code to be highly concurrent and easy to scale. For demonstration purposes this setting will do, so let's start coding our "reactive" app!

A Simple JSON-REST Service

Let's assume we want to provide a JSON-REST web service connecting a query for a blogger ID against the broker that in turn interfaces to some form of external API or DB. Upon receiving the user's ID, the server should query the broker for the user's data, and then, in a second step can use that data to fetch any posts the user wrote and any comments she made. In other words, we have a serial flow of fetching user and the an parallel step of fetching post and comment data. Once all this data is collected, the server should respond with a JSON object containing this data.

In the simplest case, you would string a few calls to your query API together and return the results, maybe like this (anti-pattern):

var app = require('express')()

app.get('/sync/:userId', function(req, res, next) {
  var userId = req.params.userId

  db_query('users', userId, function(err, user) {
    if (err) return next(err)

    db_query('posts', {poster: user}, function(err, posts) {
      if (err) return next(err)

      db_query('comments', {commenter: user}, function(err, comments) {
        if (err) return next(err)
        else return res.json({user: user, posts: posts, comments: comments})
      })
    })
  })
})


app.listen(8000)

(Again, you can append this to the app.js you created earlier, just as all following code snippets.) The above example gives rise to the (wrong) argument why callbacks are a Bad Idea (you will learn the true reason why callbacks are evil later): By placing one CB inside the former, or returning (think "break") early if an error occurred, we end up in callback hell. In other words, code that is no longer maintainable and hard to unit test, as we will see. Worse, we just lost the advantage of using Node... this code is actually being run synchronously, one broker query after the next! If you run this app in the shell and query the URL (e.g., curl http://localhost:8000/sync/example), you will get something like this output:

$ node app.js
1381334176812: query started
1381334177793 query finished
1381334177858: query started
1381334178830 query finished
1381334178897: query started
1381334179868 query finished

As can be seen, despite our best intentions, all queries are run one after the next instead of running the last two in parallel. So we need to dispatch our last two query calls simultaneously once we have the user data and use the callbacks to collect the results. In addition, this will allow us to "unstring" this callback chain a bit. This takes us straight into the realm of asynchronous programming.

Home-brew Asynchronicity

While this certainly is not the cleanest code, here is a quick shot at solving the problem of concurrently running these three queries:

app.get('/brew/:userId', function(req, res, next) {
  var userId = req.params.userId
  var result = {}
  var cbCounter = 0
  var gotError = false

  function checkError(err, cb) {
    if (gotError) return
    if (err) {
      gotError = true
      return next(err)
    }
    cbCounte))r++
    return cb()
  }

  db_query('users', userId, function(err, user) {
    return checkError(err,  function() {
      result.user = user

      db_query('posts', {poster: user}, function(err, posts) {
        return checkError(err, function() {
          result.posts = posts
          if (cbCounter == 3) return res.json(result)
        })
      })

      db_query('comments', {commenter: user}, function(err, comments) {
        return checkError(err, function() {
          result.comments = comments
          if (cbCounter == 3) return res.json(result)
        })
      })
    })
  })
})

While this version now runs the last two queries concurrently, and, if you have more than one CPU core at least, in parallel, the code has become quite bloated. You need to ensure that all three calls have completed before returning a valid result. You need to ensure to only return the error once. And you need to make sure you have collected the intermediate results. Last, if you add another query, you might forget to update the counter checks, use a wrong counter value, or forget to store the intermediate result. Luckily, these issues have been solved with the second most popular node module used by fellow Node coders, async (the most popular dependency being ... you guessed it, underscore, which we will use in a bit, too).

Controlling Data Flow

$ npm install async

We wil use the auto method of async to make the code more readable, leaving all the mentioned issues up to the library to take care of. auto takes a "tasks object" (an object where each property is a function considered a task) and returns an equally shaped object with the results if all tasks were run without errors. In addition to the tasks, you can list any other tasks you handed to auto that have to be completed before that specific task is run:

var async = require('async')

app.get('/async/:userId', function(req, res, next) {
  var userId = req.params.userId

  async.auto({
    user: function(cb) {
      db_query('users', userId, cb)
    },
    posts: ['user', function(cb, res) {
      db_query('posts', {poster: res.user}, cb)
    }],
    comments: ['user', function(cb, res) {
      db_query('posts', {commenter: res.user}, cb)
    }]
  }, function(err, result) {
    if (err) next(err)
    else res.json(result)
  })
})

As shown, we send the auto function an object with three tasks, user, posts, and comments. Both the posts and comments tasks require that the user task has been run successfully before them, as indicated by the array value they map to. The tasks (functions) have to accept a standard Node callback (function(err, result)) and should delegate that to some asynchronous task. In the case of dependent functions, they can also accept the result of the prior task(s). The overall outcome then is pushed into another callback that is the last argument of auto; This CB will receive the error status or, if no error occurred, the result of each individual tasks, in the same "shape" as the object sent to auto.

We have now solved our issues and have achieved our goal of a clean coding of our asynchronous tasks. However, if you later need to write more complex code, the control to structure defined by your callbacks still gets back to you. Because you are sending the data as well as the callbacks along your function chain, you are mixing two things you should keep separate: In a nutshell, you are coding the control flow of your program, while instead you should be describing what is known as data dependencies and let the Node engine figure out when to run which task (callback).

The Promised Land

As a matter of fact, with async you will be able to write concurrent Node.js apps perfectly fine. This last section will introduce you to concepts that will help you get rid of the problems described at the end of the former section.

To avoid the use of control flow statements, declarative languages instead have developed the concept of the future, also known as a promise. So far, we were using callbacks, that is, functions that do not return a value. This means they are hard to use when composing new functions from them and, as they do not return a value, are exclusively executed for their side-effects. This means that both function composition and unit testing of these callbacks is rather a pain. And it means that you have to manually control the "flow" of your data through these callbacks. For example, to read a file in Node, the idiomatic structure is:

var _ = require('underscore');

function finish(res, data) {
  res.write(data)
  res.end()
}

function sendData(res, data) {
  res.writeHead(200)
  finish(res, data)
}

function reportError(res, err) {
  res.writeHead(500)
  console.log(err)
  finish(res, data)
}

function onResult(res, err, data) {
  if (err) reportError(res, err)
  else sendData(res, data)
}

function(req, res) {
  var callback = _.partial(onResult, res)

  fs.readFile(req.params.filename, callback)
}

Instead, if the readFile function were to return a value, this could be expressed in a much cleaner way, without having to nest (structure) the tasks into callback chains. However, as we know, at the time of calling readFile, no such value exists - as a matter of fact, we do not even know if we will be able to read the file at all. So the only thing we can return is a "future" value, or a "promise" to return such a value (hence the name of these data structures). For this reason, such a data type would have to provide three methods:

promise.resolve(result)
promise.reject(reason)
result = promise.then(onFulfilled(result), onRejected(error))

The called function that returned the promise will, in the future, decide to resolve or reject the promise. And that new promise will then have to describe what the program should do if the promise has been fulfilled or rejected. The most important concept is that promises are propagating: The return value of then (result in the above API description) is yet another promise (the "output promise"):

  • If you return a value from any of the two handlers (fulfilled, rejected), the output promise will get fulfilled, too.
  • If you throw an exception in any of the two handlers, the output promise will get rejected.
  • If you return yet another promise from any of the two handlers, the result will become that promise.

This means you can comfortably chain promises, while you can rely on return values (instead of side effects). With this, we now can separate the concerns and instead describe the asynchronous reading of a file using the promise:

var _ = require('_')
var Promise = require('q')
// wrap fs.readFile as a "promise-returning function":
var readFile = Promise.nfbind(fs.readFile)

function setHeader(header, res, data) {
  res.setHeader(header)
  return data
}

var set200 = _.partial(setHeader, 200)
var set500 = _.partial(setHeader, 500)

function(req, res) {
  var data = readFile(req.params.filename)
  var error = _.partial(set500, res)
  var send = _.partial(set200, res)

  data.then(send, error)
    .then(res.write, console.log)
    .fin(function() { res.end() })
}

Promises therefore make it possible to resolve function values in a time-independent manner: We can call the then method after or before either resolve or reject have been executed. Furthermore, libraries that implement promises must guarantee that no matter how often we check the promise' state (call then), that promise always will be resolved or rejected the same way. We have also eliminated the linear dependencies of each function on the next, thus our functions become much more convenient to reuse. Last, this version is actually safer than the callback implementation: If setting the header fails for any reason, this gets logged, and the parameter-less fin method is always executed with res.end(), ensuring all responses will be closed.

This makes promises ideal to resolve I/O bound (blocking) tasks. (Do not use promises for CPU intensive tasks! Protip: If reading blogs about comparing promise libraries, make sure the tests are using the right kind of task.)

From Control Flow to Data Dependencies

As we have seen so far, the elegance of using promises instead of callbacks is that blocking functions now can return values (promises) instead of requiring you to send functions (callbacks) to them. In other words, promise-returning functions behave like any other function. Furthermore, the return value of then will always be another ("output") promise, possibly even the return value from or exception thrown in either handler sent to it. Because of this behavior, you can keep chaining promises over your success (onFulfilled(result)) and error functions (onRejected(error)), and the final outcome will always be the same result value or error. As you return promises from your functions instead of having to send them continuation functions, you have separated the control flow from your code.

If you want to learn the nitty-gritty theory behind the benefits of moving from callback-based control flow to data dependencies encoded as promises, James Coglan has an excellent blog post on that matter. And if you need more hands-on experience than what we have discussed so far, StrongLoop provides a very comprehensive tutorial of using promises. We instead will directly get our hands dirty and use this knowledge to describe our three queries problem in terms of promises. To do so, we will require the popular Q and underscore libraries:

$ npm install q
$ npm install underscore

Without much fanfare, this is how our JSON-REST service looks like using promises and some functional trickery:

var Q = require('q')
var _ = require('underscore')
var q_query = Q.nfbind(db_query)

function fetchPostsAndComments(user) {
  return Q.all([
    user,
    q_query('posts', {poster: user}),
    q_query('comments', {commenter: user})
  ])
}

function respond(res, user, posts, comments) {
  res.json({user: user, posts: posts, comments: comments})
}

app.get('/promise/:userId', function(req, res, next) {
  var userId = req.params.userId
  var response = _.partial(respond, res)

  q_query('users', userId)
    .then(fetchPostsAndComments, next)
    .spread(response, next)
})

So what has changed? Admittedly, except for the main function itself being a bit shorter, this code is longer than the asynchronous version. Nonetheless, after introducing you to the benefits of promises, you should be able to see the elegance of this final solution. In particular, because of the separation of concerns, your code has become much easier to unit test. And because each function is clear and expressive, the code is easy to understand. Last, we have reached our goal of three statements per function, too.

Well, I hope to have enlightened you in a way or another and maybe made you a better Node developer!