Learn advanced node js, Duplex streams

A duplex stream is a stream that implements both a readable and a writable. These streams allow data to pass through. Readable streams will pipe data into a duplex stream, and the duplex stream can also write that data. So duplex stream represent the middle sections of pipelines. I’m looking at the code that we created in the last lesson where we pipe from out read stream. Which is reading a video, powder-day.mp4, to our write stream, which is creating a copy of that video. What we’re going to go ahead and do is implement a duplex stream here.

Now this is inside of your lesson files under chapter two. Chapter two lesson seven, and within the start folder. So in order to do this, we’re going to go ahead and grab the most basic type of duplex stream, which is a pass through. And we can get that from our stream module. And what we can do is create a new pass through stream called report.

And when I say they represent the middle sections of a pipe line, that means that we can put them in between readables and writables. So a duplex stream can be piped between a readable and a writable. So it means that this duplex stream can receive data from our read stream and then send that data to our write stream. So what would be the point of implementing a pass through stream if we wanted to see something about the data or report on it? A typical duplex stream doesn’t change anything about the data. There’s a type of duplex stream called transform streams, which we will cover in greater detail in the next lesson, and technically the pass through is a transform stream, but we’re just going to use it as a basic duplex stream in this example.

So what I’m going to do is create a total variable equal to zero, and then the report has an on data event because it’s a readable and a writable stream. So I can listen for when data comes into this report stream and get that data chunk by chunk. So what I’m going to go ahead and do is increment our total so we can see how much data has passed through. Woops and I should spell total correctly.

And then we’ll also go ahead and console log how many bytes of data have currently passed through by logging our total. There we go. So now we can actually see some reporting on how many bytes are passing through our duplex stream. So I’m going to come over here to the terminal and type node, dot, and we can see that all of these bytes have passed through the stream. So we still get the copy. We’ve still copied the file, but the, woops, but the pass through has provided some reporting.

So duplex streams help us compose streams into pipelines. Let’s say we actually want to slow this whole thing down. We can create another type of duplex steam called throttle. So we’ll create something called throttle, and here’s an instance of it, and we will go ahead and send it 10 milliseconds. So let’s say we want it to slow this whole thing down. So we’ll add throttle to our pipeline. So we’re going to start with the read stream, and that’s going to be passed through the throttle duplex stream, and then passed through the report duplex stream, and then eventually we’ll get to our write stream where the file copy is made.

So in order to do this, we need to create a new throttle type and we’re going to create a duplex stream type. So class throttle extends duplex. When we implement a duplex stream we have to put a read and a write method because it has both. So what we’re going to go ahead and do is add a constructor as well, and the constructor will take in the number of milliseconds that we should delay.

So I’m going to go ahead and invoke the super and then we’ll say this.delay = ms, and then I’m going to go ahead and come over to my write side and grab the chunk, and the encoding, and the callback. So what we’ll do is we will this.push(chunk). So we will actually write the chunks of data as they’re received, and what we’ll do is call the callback to let us know that, that write has been completed.

So in order to actually have a delay here, what I want to do is I want to set a timeout around this callback. So I’m going to call setTimeout, and we will invoke the callback when the timeout is finished and the timeout is going to be this.delay. Now the other thing that we want to implement here is another method called final. And final means that we are getting no more data from the read stream, so we also want to clear out our write stream. I’m going to go ahead and call this.push(null) so that we can see that the write stream has ended.

Now I’m not going to do anything on the read side. We’re just going to currently read the data as it comes into this throttle, and then we shall throttle it. So we’re going to write one chunk at a time, but we’re going to make the chunk delay for a little bit. So let’s go ahead and save this and watch it run. So coming over to the terminal we’ll type node, dot, and now you can see the bytes have really slowed down. That’s because we have sent our stream through a throttle and then we’re actually seeing those bytes because we’ve also sent our stream through a report.

So if I were to take this pass through throttle and set it to 100 milliseconds, and save it. Come over here, clear out our terminal. Node, dot, one more time. Now we can see that we’ve really throttled our stream and we are only allowing it to write bytes really slow. So duplex streams are a necessary component when you want to compose streams into complex pipelines. The duplex stream implements both a readable and a writable side, and therefore they represent the center parts of a pipeline.

Leave a Reply

Notify of