stream-interceptor

0.1.1 • Public • Published

stream-interceptor Build Status NPM

Tiny node.js module to intercept, modify and/or ignore chunks of data and events in any readable compatible stream before it's processed by other stream consumers (e.g: via pipe()).

It becomes particularly useful to deal with net/http/fs streams.

Installation

npm install stream-interceptor

Examples

Existent stream
var Readable = require('stream').Readable
var interceptor = require('stream-interceptor')
 
// Create a new Readable stream
var stream = new Readable
stream._read = function () { /* ... */ }
 
// Make it interceptable
interceptor(stream)
 
// Prepare to capture chunks
stream.capture(function (chunk, next) {
  next(chunk + chunk)
})
 
// We gonna handle strings
stream.setEncoding('utf8')
 
// Push chunks to the stream
stream.push('Foo')
stream.push('Bar')
stream.push(null) // we're done!
 
// Listen for events like a stream consumer
stream.on('data', function (chunk) {
  console.log('Modified chunk:', chunk)
})
 
stream.on('end', function () {
  console.log('We are done!')
})
Capture HTTP response
var http = require('http')
var interceptor = require('stream-interceptor')
 
// Test server
var server = http.createServer(function (req, res) {
  res.writeHead(200)
  res.write('Foo')
  res.write('Bar')
  res.end()
}).listen(3000)
 
http.get('http://localhost:3000', function (response) {
  // http.IncomingMessage implements a Readable stream
  var stream = interceptor(response)
 
  stream.capture(function (chunk, next) {
    next(chunk + chunk + '\n')
  })
 
  stream.on('end', function () {
    console.log('Response status:', response.statusCode)
    server.close()
  })
 
  stream.pipe(process.stdout)
})
Capture asynchronously
var Readable = require('stream').Readable
var interceptor = require('stream-interceptor')
 
// Create a new Readable stream
var stream = new Readable
stream._read = function () { /* ... */ }
 
// Make it interceptable
interceptor(stream)
 
// Prepare to capture chunks asyncronously
// Chunks will be processed always as FIFO queue
stream.capture(function (chunk, next) {
  setTimeout(function () {
    next(chunk + chunk + '\n')
  }, Math.random() * 1000)
})
 
// We gonna handle strings
stream.setEncoding('utf8')
 
// Push chunks to the stream
stream.push('Slow Foo')
stream.push('Slow Bar')
stream.push(null) // we're done!
 
stream.pipe(process.stdout)
Ignore chunks
var Readable = require('stream').Readable
var interceptor = require('stream-interceptor')
 
// Create a new Readable stream
var stream = new Readable
stream._read = function () { /* ... */ }
 
// Make it interceptable
interceptor(stream)
 
// Prepare to capture chunks
stream.capture(function (chunk, next) {
  if (chunk === 'Bad') {
    return next(true) // Ignore chunk
  }
  next(chunk + '\n')
})
 
// We gonna handle strings
stream.setEncoding('utf8')
 
// Push chunks to the stream
stream.push('Bad')
stream.push('Ugly')
stream.push('Good')
stream.push(null) // we're done!
 
stream.pipe(process.stdout)
Interceptable stream
var Interceptable = require('stream-interceptor').Interceptable
 
// Implements both Readable and Interceptable stream
var stream = new Interceptable
stream._read = function () { /* ... */ }
 
// Prepate to capture chunks
stream.capture(function (chunk, next) {
  next(chunk + chunk + '\n')
})
 
// Push chunks to the stream
stream.push('Foo')
stream.push('Bar')
stream.push(null) // we're done!
 
stream.pipe(process.stdout)
Event interceptor
var Interceptable = require('stream-interceptor').Interceptable
 
// Implements both Readable and Interceptable stream
var stream = new Interceptable
stream._read = function () { /* ... */ }
 
// Prepate to capture events
stream.captureEvent('error', function (err, next) {
  next(true) // always ignore errors
})
 
// Prepate to capture chunks
stream.capture(function (chunk, next) {
  next(chunk + chunk + '\n')
})
 
stream.on('error', function (err) {
  console.error('Error:', err) // won't be called
})
 
// Push data in the stream
stream.push('Foo')
stream.push('Bar')
stream.push(null)
 
// Simulate an error
stream.emit('error', 'Damn!')
 
stream.pipe(process.stdout)

API

streamIntercept(readableStream) => Interceptable

Wraps any readable stream turning it an interceptable stream. Interceptable stream implements the same interface as Readable.

Interceptable([ options ])

Alias: Interceptor

Creates a new Interceptable stream. Inherits from Readable stream.

Interceptable#interceptable => boolean

Property to determine if the stream is interceptable.

Interceptable#capture(fn) => Interceptable

Subscribe to capture chunks of data emitted by the stream.

Interceptable#captureEvent(event, fn) => Interceptable

Capture data emitted by a specific event. fn argument expects two arguments: chunk, callback.

When you're done, you must call the callback passing the new argument: callback(chunk)

You can optionally ignore chunks passing true to the callback: callback(true).

isInterceptor(stream) => boolean

Hook()

Hook layer internally used capture and handle events of the stream.

Queue()

FIFO queue implementation internally used.

Chunk()

Internal chunk event structure.

License

MIT - Tomas Aparicio

Package Sidebar

Install

npm i stream-interceptor

Weekly Downloads

4

Version

0.1.1

License

MIT

Last publish

Collaborators

  • h2non