Skip to content


Intro to Streams2: New Node Streams (Part 1)

There’s a saying among the core Node contributors: if you’re not using streams, you’re doing Node wrong*. So I spent last week learning the new Node streaming API (“streams2″) while building a small node app. While the documentation is pretty good, there’s not a lot of examples that are both realistic and easily digestible.

This two-part tutorial aims to build on the DailyJS’s Five Minute Guide to Streams2, providing real use cases for each of the different types of new streams (part 1) and the application of more complex design patterns to streams (part 2).If you haven’t read DailyJS’s tutorial yet, go ahead and read it now. Really, go.

You’re back? Good. You’ll notice that I use a different style of inheritance. For now, consider it personal preference and feel free to rewrite using your preferred style. :)

Readable Stream

Let’s start by creating a Readable stream that emits Javascript objects. This was the first custom stream I wrote, since I was buffering data in memory and then wanted to stream it through a CSV formatter and out through FTP. We’ll create a streams2 Readable stream that accepts a Javascript array and emits one element of that array at a time.

var util = require('util'),
  stream = require('stream');
 
function ArrayStream(data) {
  var self = this;
 
  stream.Readable.call(self, { objectMode: true });
 
  self._read = function (size) {
    data.forEach(function(element) {
      self.push(element);
    });
    self.push(null);
  };
}
 
util.inherits(ArrayStream, stream.Readable);
 
module.exports = ArrayStream;

Notice that we pass objectMode=true as an option to the stream, which tells it we’re handling Javascript objects rather than buffers or strings. The size argument to _read is advisory, so we’ve safely ignored it. We push each element out, one at a time. Finally, you signal the end of a Readable stream by pushing null.

A jasmine-node test for the ArrayStream class will demonstrate ArrayStream usage.

var stream = require('stream'),
  ArrayStream = require("../lib/streams/array");
 
describe("Streams: ArrayStream", function () {
  it("should stream one element at a time from the array", function (finish) {
    var array = [ { a: 1, b: 2, c: 3 }, { x: 8, y: 9, z: 10 } ],
      source = new ArrayStream(array),
      actual = [];
 
    source
      .on('data', function (data) {
        actual.push(data);
      })
      .on('end', function () {
        expect(actual).toEqual(array);
        finish();
      });
  });
 
}); // ArrayStream

As you can see, ArrayStream emits ‘data’ and ‘end’ events. In fact, ArrayStream can do everything a Readable stream can do, including being piped.

Transform Stream

Okay, so now we have a stream of objects. What if we want to CSVify the objects? We need a Transform stream to convert each object into an array representing a single ordered CSV record. This is what a streaming CSV stringifier needs as input.

var util = require('util'),
  stream = require('stream');
 
/*
 * Converts a datapoint into a CSV row (ordered array of fields).
 */
function CsvPrepStream(fields) {
 var self = this;
 
 stream.Transform.call(self, { objectMode: true });
 
 self._transform = function (datapoint, encoding, callback) {
   var record = [];
   fields.forEach(function(field) {
     record.push(datapoint[field]);
   });
   self.push(record);
   callback();
 };
}
 
util.inherits(CsvPrepStream, stream.Transform);
 
exports.CsvPrepStream = CsvPrepStream;

Note the use of objectMode=true again. This transform stream takes in a datapoint object, loops over the fields to be present in the CSV file (i.e., keys in the datapoint object), and pushes the values of the fields into a record array. We then push the transformed record out and signal completion by calling callback(). If there was an error, we could pass it to the callback as callback(err).

PassThrough Stream

Now we’ll see one of the most frequent uses of the new PassThrough stream: testing! Let’s write a jasmine-node test for CsvPrepStream; this test will demonstrate usage of PassThrough streams in addition to the functionality of CsvPrepStream.

var CsvPrepStream = require('../lib/formatters/csv').CsvPrepStream,
  stream = require('stream');
 
describe("Formatter: CsvPrepStream", function () {
 
  var data = [{
    key1: "foo",
    key2: "bar",
    key3: "baz"
  }, {
    key2: "zar",
    key1: "boo",
    key3: "zas"
  }];
 
  var fields = ["key1", "key2", "key3"];
 
  it("should emit records with correctly ordered fields", function (finish) {
    var prepper = new CsvPrepStream(fields),
      source = stream.PassThrough({ objectMode: true }),
      actual = [];
 
    source.pipe(prepper);
    source.write(data[0]);
    source.write(data[1]);
    source.end();
 
    prepper
      .on('data', function (data) {
        actual.push(data);
      })
      .on('end', function () {
        expect(actual).toEqual([ ["foo", "bar", "baz"], ["boo", "zar", "zas"] ]);
        finish();
      });
  });
 
}); // Formatter: CsvPrepStream

As you can see, PassThrough streams make great sources of data in testing! Just write some data into it in chunks, then end the stream. It’s that easy. If you’re writing anything other than strings or buffers, just remember that it must have objectMode=true.

Note that in a real test you’d want to test all possible scenarios, such as fields in the array that aren’t present as keys in the object and keys in the object that aren’t present in the array. This merely serves as an example.

Writable Stream

The final stream we’re going to cover today is the Writable stream. Once the CsvFormatter was written, we needed a way to capture the entire CSV contents as a string for testing it. Thus, the StringBufferStream was born.

var util = require('util'),
  stream = require('stream');
 
function StringBufferStream() {
  var self = this;
 
  self.string = "";
 
  stream.Writable.call(self);
 
  self._write = function(chunk, encoding, callback) {
    self.string += chunk.toString();
    callback();
  };
}
 
util.inherits(StringBufferStream, stream.Writable);
 
module.exports = StringBufferStream;

And for completeness sake, let’s go ahead and write a jasmine test for our StringBufferStream.

var StringBufferStream = require('../libs/streams/string-buffer'),
  stream = require('stream');
 
describe("Streams: StringBufferStream", function () {
 
  it("should store the chunks in an internal string buffer", function (finish) {
    var source = stream.PassThrough(),
      buffer = new StringBufferStream();
 
    source.pipe(buffer);
    source.write("boo yeah, ");
    source.write("mi amigos");
    source.end();
 
    buffer.on('finish', function () {
      expect(buffer.string).toEqual("boo yeah, mi amigos");
      finish();
    });
  });
 
}); // StringBufferStream

Conclusion

In this tutorial, we have seen real use cases for almost** all of the “streams2″ stream: Readable, Writable, Transform, and PassThrough. We’ve also seen examples of how to pass Javascript objects through your stream pipeline and how to test your custom streams with jasmine-node. In Part 2 of this write-up, we’ll cover some more advanced uses cases and apply classic design patterns to streams to solve some tricky problems. Stay tuned!

* At least that’s what I heard from a colleague who attended the last NodeConf.
** Once I find a good use case for the Duplex stream, I’ll write something up for it too.

Posted in Tutorials.


2 Responses

Stay in touch with the conversation, subscribe to the RSS feed for comments on this post.

Continuing the Discussion

  1. Intro to Streams2: Design Patterns (Part 2) – Cody A. Ray linked to this post on April 27, 2013

    […] is the last part in the Intro to Streams2 series. If you haven’t read Part 1 yet, you should. The examples here build on the ones presented […]

  2. Lesson Learned: Circuit Breakers – Cody A. Ray linked to this post on June 22, 2013

    […] learn about circuit breakers until the app featured in the “Intro to Streams” series (part 1, part 2) was complete. Let’s walk through the streaming example again and add a circuit […]



Some HTML is OK

or, reply to this post via trackback.

 



Log in here!