Skip to content


Intro to Streams2: Design Patterns (Part 2)

This is the last part in the Intro to Streams2 series. If you haven’t read Part 1 yet, you should. The examples here build on the ones presented there.

Part 2 is about design patterns for streams. Design patterns help us write clean code, like this.

function publish(data) {
  data
    .pipe(config.formatter())
    .pipe(config.connector())
    .on('error', function(err) {
      self.emit('error', err);
    })
    .on('success', function() {
      self.emit('success');
    });
}

You could imagine that config.formatter() and config.connector() create already-configured objects. Something like this:

  var config = {
    formatter: function() {
      return new CsvFormatter(fieldOrder, fieldSeparator || ',');
    },
    connector: function() {
      return new FtpConnector(host, port || 21, user, password, filename);
    }
  };

Since we might want to change the configuration to return a JSON formatter that connects to Amazon S3 instead, we want all formatting and connector logic to be entirely encapsulated within their services.

Too bad many components won’t work this way out of the box. So what do we do? Design patterns to the rescue!

Adapter Pattern

Piping streams together is fun! Which means that node modules that handle streams but can’t be inserted in a pipe chain are no fun. How do we fix this? We write wrapper streams that adapt the module to work the way we want.

In this case, the culprit is node-ftp. This would be a great FTP module if only you could pipe a file to an FTP server. Instead, you have to call

ftp.put(readableStream, filename, callback)

Let’s start by rewriting the FTP connector, adapting it so that we can pipe to it.

var stream = require('stream'),
  FTP = require('ftp');
 
/*
 * Stream a file to FTP.
 */
function FtpConnector(host, port, user, password, filename, ftp) {
  var self = this;
 
  ftp = ftp || new FTP();
 
  stream.PassThrough.call(self);
 
  ftp.on('ready', function() {
    ftp.put(self, filename, function(err) {
      ftp.end();
      if (err) { self.emit('error', err); return; }
      self.emit('success');
    });
  });
 
  ftp.on('error', function(err) {
    self.emit('error', err);
  });
 
  ftp.connect({ host: host, port: port, user: user, password: password });
}
 
util.inherits(FtpConnector, stream.PassThrough);

We create the stream as a subclass of a PassThrough stream. Then in the constructor we create a new FTP connection and pass self into its put method. This effectively passes all stream input through directly into the FTP upload. Pretty sweet use of the PassThrough stream, if I do say so myself.

Composite Pattern

Ok, on to the CSV.

My data source emits Javascript objects, so I need to transform the object into an ordered row for the CSV stringifier. But I don’t want to add another pipe in the publish chain to do this transformation. What if I want to change the formatter to an XML formatter? (haha) The details of the formatter shouldn’t be exposed to the publisher. So I’d like to wrap up multiple transformers/pipes into a single stream module.

Here’s how we can do this, building on the CsvPrepStream transformer introduced in Part 1.

var stream = require('stream'),
  CSV = require('csv'),
/*
 * Formats a batch of data as a CSV with fields in the given order.
 */
function CsvFormatter(fields, delimiter, prepper) {
 var self = this;
 
 prepper = prepper || new CsvPrepStream(fields);
 
 stream.PassThrough.call(self, { objectMode: true });
 
 self.pipe = function (dest, pipeOpts) {
   /*
    * override pipe to first stream through CsvPrepStream for converting datapoint objects
    * to ordered arrays and then delegate to node-csv to convert each record to a csv row
    */
   var super_pipe = stream.PassThrough.prototype.pipe,
     csv = CSV().to.options({ delimiter:delimiter, header:true, columns:fields });
   return super_pipe.call(self, prepper).pipe(csv).pipe(dest, pipeOpts);
 };
}
 
util.inherits(CsvFormatter, stream.PassThrough);

We’ve extended the PassThrough stream like we did with the CsvPrepStream. This time, however, we’re overriding its pipe method so that it first sends the incoming stream data through the CsvPrepStream, and then through node-csv, before piping it out to its final destination. Note that we’re letting node-csv write out the header as the first line by passing header: true, columns: fields as its options. So now we have a single stream module (CsvFormatter) that accepts Javascript objects and outputs CSV rows with the fields taken from the objects and put in the requested order.

 Conclusion

In part 2, we’ve looked at how classic design patterns can be used to solve problems with node streams. Starting with the idea of a sexy publish function that streams data through a CSV formatter and out to an FTP server, we applied the adaptor pattern and then composer pattern to make this pipe chain possible. And, in the process, we discovered two new uses of the PassThrough stream. Sweet!

What design patterns do you use for streams in node.js?

Posted in Tutorials.


0 Responses

Stay in touch with the conversation, subscribe to the RSS feed for comments on this post.



Some HTML is OK

or, reply to this post via trackback.

 



Log in here!