Skip to content
On this page

1. 流的概念

  • 流是一组有序的,有起点和终点的字节数据传输手段
  • 它不关心文件的整体内容,只关注是否从文件中读到了数据,以及读到数据之后的处理
  • 流是一个抽象接口,被 Node 中的很多对象所实现。比如HTTP 服务器request和response对象都是流。

2. 可读流createReadStream

实现了 stream.Readable 接口的对象, 将对象数据读取为流数据, 当监听data事件后, 开始发射数据

JavaScript
    fs.createReadStream = function(path, options) {
        return new ReadStream(path, options);
    };
    util.inherits(ReadStream, Readable);
1
2
3
4

2.1 创建可读流

JavaScript
    var rs = fs.createReadStream(path, [options]);
1
  1. path读取文件的路径
  2. options
    • flags打开文件要做的操作,默认为'r'
    • encoding默认为null
    • start开始读取的索引位置
    • end结束读取的索引位置(包括结束位置)
    • highWaterMark读取缓存区默认的大小64kb

如果指定utf8编码highWaterMark要大于3个字节

2.2 监听data事件

流切换到流动模式, 数据会被尽可能快的读出

JavaScript
    rs.on('data', function(data) {
        console.log(data);
    });
1
2
3

2.3 监听end事件

该事件会在读完数据后被触发

JavaScript
    rs.on('end', function() {
        console.log('读取完成');
    });
1
2
3

2.4 监听error事件

JavaScript
    rs.on('error', function(err) {
        console.log(err);
    });
1
2
3

2.5 监听open事件

JavaScript
    rs.on('open', function() {
        console.log(err);
    });
1
2
3

2.6 监听close事件

JavaScript
    rs.on('close', function() {
        console.log(err);
    });
1
2
3

2.7 设置编码

与指定{encoding:'utf8'}效果相同,设置编码

JavaScript
    rs.setEncoding('utf8');
1

2.8 暂停和恢复触发data

通过pause()方法和resume()方法

JavaScript
    rs.on('data', function(data) {
        rs.pause();
        console.log(data);
    });
    setTimeout(function() {
        rs.resume();
    }, 2000);
1
2
3
4
5
6
7

3. 可写流createWriteStream

实现了stream. Writable接口的对象来将流数据写入到对象中

JavaScript
    fs.createWriteStream = function(path, options) {
        return new WriteStream(path, options);
    };

    util.inherits(WriteStream, Writable);
1
2
3
4
5

3.1 创建可写流

JavaScript
    var ws = fs.createWriteStream(path, [options]);
1
  1. path写入的文件路径
  2. options
    • flags打开文件要做的操作,默认为'w'
    • encoding默认为utf8
    • highWaterMark写入缓存区的默认大小16kb

3.2 write方法

JavaScript
    ws.write(chunk, [encoding], [callback]);
1
  1. chunk写入的数据buffer/string
  2. encoding编码格式chunk为字符串时有用,可选
  3. callback 写入成功后的回调

返回值为布尔值,系统缓存区满时为false, 未满时为true

3.3 end方法

JavaScript
    ws.end(chunk, [encoding], [callback]);
1

表明接下来没有数据要被写入 Writable 通过传入可选的 chunk 和 encoding 参数,可以在关闭流之前再写入一段数据 如果传入了可选的 callback 函数,它将作为 'finish' 事件的回调函数

3.4 drain方法

  • 当一个流不处在 drain 的状态, 对 write() 的调用会缓存数据块, 并且返回 false。 一旦所有当前所有缓存的数据块都排空了(被操作系统接受来进行输出), 那么 'drain' 事件就会被触发
  • 建议, 一旦 write() 返回 false, 在 'drain' 事件触发前, 不能写入任何数据块
JavaScript
    let fs = require('fs');
    let ws = fs.createWriteStream('./2.txt', {
        flags: 'w',
        encoding: 'utf8',
        highWaterMark: 3
    });
    let i = 10;

    function write() {
        let flag = true;
        while (i && flag) {
            flag = ws.write("1");
            i--;
            console.log(flag);
        }
    }
    write();
    ws.on('drain', () => {
        console.log("drain");
        write();
    });
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

3.5 finish方法

在调用了 stream.end() 方法,且缓冲区数据都已经传给底层系统之后, 'finish' 事件将被触发。

JavaScript
    var writer = fs.createWriteStream('./2.txt');
    for (let i = 0; i < 100; i++) {
        writer.write(`hello, ${i}!\n`);
    }
    writer.end('结束\n');
    writer.on('finish', () => {
        console.error('所有的写入已经完成!');
    });
1
2
3
4
5
6
7
8

4.pipe方法

4.1 pipe方法的原理

JavaScript
    var fs = require('fs');
    var ws = fs.createWriteStream('./2.txt');
    var rs = fs.createReadStream('./1.txt');
    rs.on('data', function(data) {
        var flag = ws.write(data);
        if (!flag)
            rs.pause();
    });
    ws.on('drain', function() {
        rs.resume();
    });
    rs.on('end', function() {
        ws.end();
    });
1
2
3
4
5
6
7
8
9
10
11
12
13
14

4.2 pipe用法

JavaScript
    readStream.pipe(writeStream);
    var from = fs.createReadStream('./1.txt');
    var to = fs.createWriteStream('./2.txt');
    from.pipe(to);
1
2
3
4

将数据的滞留量限制到一个可接受的水平,以使得不同速度的来源和目标不会淹没可用内存。

4.3 unpipe用法

  • readable.unpipe()方法将之前通过stream.pipe()方法绑定的流分离
  • 如果 destination 没有传入, 则所有绑定的流都会被分离.
JavaScript
    let fs = require('fs');
    var from = fs.createReadStream('./1.txt');
    var to = fs.createWriteStream('./2.txt');
    from.pipe(to);
    setTimeout(() => {
        console.log('关闭向2.txt的写入');
        from.unpipe(writable);
        console.log('手工关闭文件流');
        to.end();
    }, 1000);
1
2
3
4
5
6
7
8
9
10

4.4 cork

调用 writable.cork() 方法将强制所有写入数据都存放到内存中的缓冲区里。 直到调用 stream.uncork() 或 stream.end() 方法时,缓冲区里的数据才会被输出。

4.5 uncork

writable.uncork()将输出在 stream.cork() 方法被调用之后缓冲在内存中的所有数据。

JavaScript
    stream.cork();
    stream.write('1');
    stream.write('2');
    process.nextTick(() => stream.uncork());
1
2
3
4

5. 简单实现

5.1 可读流的简单实现

JavaScript
    let fs = require('fs');
    let ReadStream = require('./ReadStream');
    let rs = ReadStream('./1.txt', {
        flags: 'r',
        encoding: 'utf8',
        start: 3,
        end: 7,
        highWaterMark: 3
    });
    rs.on('open', function () {
        console.log("open");
    });
    rs.on('data', function(data) {
        console.log(data);
    });
    rs.on('end', function () {
        console.log("end");
    });
    rs.on('close', function () {
        console.log("close");
    });
    /**
     open
     456
     789
     end
     close
     **/
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
JavaScript
    let fs = require('fs');
    let EventEmitter = require('events');

    class WriteStream extends EventEmitter {
        constructor(path, options) {
            super(path, options);
            this.path = path;
            this.fd = options.fd;
            this.flags = options.flags || 'r';
            this.encoding = options.encoding;
            this.start = options.start || 0;
            this.pos = this.start;
            this.end = options.end;
            this.flowing = false;
            this.autoClose = true;
            this.highWaterMark = options.highWaterMark || 64 * 1024;
            this.buffer = Buffer.alloc(this.highWaterMark);
            this.length = 0;
            this.on('newListener', (type, listener) => {
                if (type == 'data') {
                    this.flowing = true;
                    this.read();
                }
            });
            this.on('end', () => {
                if (this.autoClose) {
                    this.destroy();
                }
            });
            this.open();
        }

        read() {
            if (typeof this.fd != 'number') {
                return this.once('open', () => this.read());
            }
            let n = this.end ? Math.min(this.end - this.pos, this.highWaterMark) : this.highWaterMark;
            fs.read(this.fd, this.buffer, 0, n, this.pos, (err, bytesRead) => {
                if (err) {
                    return;
                }
                if (bytesRead) {
                    let data = this.buffer.slice(0, bytesRead);
                    data = this.encoding ? data.toString(this.encoding) : data;
                    this.emit('data', data);
                    this.pos += bytesRead;
                    if (this.end && this.pos > this.end) {
                        return this.emit('end');
                    }
                    if (this.flowing)
                        this.read();
                } else {
                    this.emit('end');
                }
            })
        }

        open() {
            fs.open(this.path, this.flags, this.mode, (err, fd) => {
                if (err) return this.emit('error', err);
                this.fd = fd;
                this.emit('open', fd);
            })
        }

        end() {
            if (this.autoClose) {
                this.destroy();
            }
        }

        destroy() {
            fs.close(this.fd, () => {
                this.emit('close');
            })
        }

    }

    module.exports = WriteStream;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80

5.2 可写流的简单实现

JavaScript
    let fs = require('fs');
    let FileWriteStream = require('./FileWriteStream');
    let ws = FileWriteStream('./2.txt', {
        flags: 'w',
        encoding: 'utf8',
        highWaterMark: 3
    });
    let i = 10;

    function write() {
        let flag = true;
        while (i && flag) {
            flag = ws.write("1", 'utf8', (function(i) {
                return function() {
                    console.log(i);
                }
            })(i));
            i--;
            console.log(flag);
        }
    }
    write();
    ws.on('drain', () => {
        console.log("drain");
        write();
    });
    /**
     10
     9
     8
     drain
     7
     6
     5
     drain
     4
     3
     2
     drain
     1
     **/
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
JavaScript
    let fs = require('fs');
    let EventEmitter = require('events');
    class WriteStream extends EventEmitter {
        constructor(path, options) {
            super(path, options);
            this.path = path;
            this.fd = options.fd;
            this.flags = options.flags || 'w';
            this.mode = options.mode || 0o666;
            this.encoding = options.encoding;
            this.start = options.start || 0;
            this.pos = this.start;
            this.writing = false;
            this.autoClose = true;
            this.highWaterMark = options.highWaterMark || 16 * 1024;
            this.buffers = [];
            this.length = 0;
            this.open();
        }

        open() {
            fs.open(this.path, this.flags, this.mode, (err, fd) => {
                if (err) return this.emit('error', err);
                this.fd = fd;
                this.emit('open', fd);
            })
        }

        write(chunk, encoding, cb) {
            if (typeof encoding == 'function') {
                cb = encoding;
                encoding = null;
            }

            chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, this.encoding || 'utf8');
            let len = chunk.length;
            this.length += len;
            let ret = this.length < this.highWaterMark;
            if (this.writing) {
                this.buffers.push({
                    chunk,
                    encoding,
                    cb,
                });
            } else {
                this.writing = true;
                this._write(chunk, encoding, this.clearBuffer.bind(this));
            }
            return ret;
        }

        _write(chunk, encoding, cb) {
            if (typeof this.fd != 'number') {
                return this.once('open', () => this._write(chunk, encoding, cb));
            }
            fs.write(this.fd, chunk, 0, chunk.length, this.pos, (err, written) => {
                if (err) {
                    if (this.autoClose) {
                        this.destroy();
                    }
                    return this.emit('error', err);
                }
                this.length -= written;
                this.pos += written;
                cb && cb();
            });
        }

        clearBuffer() {
            let data = this.buffers.shift();
            if (data) {
                this._write(data.chunk, data.encoding, this.clearBuffer.bind(this))
            } else {
                this.writing = false;
                this.emit('drain');
            }
        }

        end() {
            if (this.autoClose) {
                this.emit('end');
                this.destroy();
            }
        }

        destroy() {
            fs.close(this.fd, () => {
                this.emit('close');
            })
        }

    }

    module.exports = WriteStream;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94

5.3 pipe

JavaScript
    let fs = require('fs');
    let ReadStream = require('./ReadStream');
    let rs = ReadStream('./1.txt', {
        flags: 'r',
        encoding: 'utf8',
        highWaterMark: 3
    });
    let FileWriteStream = require('./WriteStream');
    let ws = FileWriteStream('./2.txt', {
        flags: 'w',
        encoding: 'utf8',
        highWaterMark: 3
    });
    rs.pipe(ws);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
JavaScript
    ReadStream.prototype.pipe = function(dest) {
        this.on('data', (data) => {
            let flag = dest.write(data);
            if (!flag) {
                this.pause();
            }
        });
        dest.on('drain', () => {
            this.resume();
        });
        this.on('end', () => {
            dest.end();
        });
    }
    ReadStream.prototype.pause = function() {
        this.flowing = false;

    }
    ReadStream.prototype.resume = function() {
        this.flowing = true;
        this.read();
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

5.4 暂停模式

JavaScript
    let fs = require('fs');
    let ReadStream2 = require('./ReadStream2');
    let rs = new ReadStream2('./1.txt', {
        start: 3,
        end: 8,
        encoding: 'utf8',
        highWaterMark: 3
    });
    rs.on('readable', function() {
        console.log('readable');
        console.log('rs.buffer.length', rs.length);
        let d = rs.read(1);
        console.log(d);
        console.log('rs.buffer.length', rs.length);

        setTimeout(() => {
            console.log('rs.buffer.length', rs.length);
        }, 500)
    });
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
JavaScript
    let fs = require('fs');
    let EventEmitter = require('events');
    class ReadStream extends EventEmitter {
        constructor(path, options) {
            super(path, options);
            this.path = path;
            this.highWaterMark = options.highWaterMark || 64 * 1024;
            this.buffer = Buffer.alloc(this.highWaterMark);
            this.flags = options.flags || 'r';
            this.encoding = options.encoding;
            this.mode = options.mode || 0o666;
            this.start = options.start || 0;
            this.end = options.end;
            this.pos = this.start;
            this.autoClose = options.autoClose || true;
            this.bytesRead = 0;
            this.closed = false;
            this.flowing;
            this.needReadable = false;
            this.length = 0;
            this.buffers = [];
            this.on('end', function() {
                if (this.autoClose) {
                    this.destroy();
                }
            });
            this.on('newListener', (type) => {
                if (type == 'data') {
                    this.flowing = true;
                    this.read();
                }
                if (type == 'readable') {
                    this.read(0);
                }
            });
            this.open();
        }

        open() {
            fs.open(this.path, this.flags, this.mode, (err, fd) => {
                if (err) {
                    if (this.autoClose) {
                        this.destroy();
                        return this.emit('error', err);
                    }
                }
                this.fd = fd;
                this.emit('open');
            });
        }

        read(n) {
            if (typeof this.fd != 'number') {
                return this.once('open', () => this.read());
            }
            n = parseInt(n, 10);
            if (n != n) {
                n = this.length;
            }
            if (this.length == 0)
                this.needReadable = true;
            let ret;
            if (0 < n < this.length) {
                ret = Buffer.alloc(n);
                let b;
                let index = 0;
                while (null != (b = this.buffers.shift())) {
                    for (let i = 0; i < b.length; i++) {
                        ret[index++] = b[i];
                        if (index == ret.length) {
                            this.length -= n;
                            b = b.slice(i + 1);
                            this.buffers.unshift(b);
                            break;
                        }
                    }
                }
                if (this.encoding) ret = ret.toString(this.encoding);
            }

            let _read = () => {
                let m = this.end ? Math.min(this.end - this.pos + 1, this.highWaterMark) : this.highWaterMark;
                fs.read(this.fd, this.buffer, 0, m, this.pos, (err, bytesRead) => {
                    if (err) {
                        return
                    }
                    let data;
                    if (bytesRead > 0) {
                        data = this.buffer.slice(0, bytesRead);
                        this.pos += bytesRead;
                        this.length += bytesRead;
                        if (this.end && this.pos > this.end) {
                            if (this.needReadable) {
                                this.emit('readable');
                            }

                            this.emit('end');
                        } else {
                            this.buffers.push(data);
                            if (this.needReadable) {
                                this.emit('readable');
                                this.needReadable = false;
                            }

                        }
                    } else {
                        if (this.needReadable) {
                            this.emit('readable');
                        }
                        return this.emit('end');
                    }
                })
            }
            if (this.length == 0 || (this.length < this.highWaterMark)) {
                _read(0);
            }
            return ret;
        }

        destroy() {
            fs.close(this.fd, (err) => {
                this.emit('close');
            });
        }

        pause() {
            this.flowing = false;
        }

        resume() {
            this.flowing = true;
            this.read();
        }

        pipe(dest) {
            this.on('data', (data) => {
                let flag = dest.write(data);
                if (!flag) this.pause();
            });
            dest.on('drain', () => {
                this.resume();
            });
            this.on('end', () => {
                dest.end();
            });
        }

    }

    module.exports = ReadStream;
    /**
     * if (n !== 0)
     state.emittedReadable = false; 只要要读的字节数不是0就需要触发readable事件
     如果传入的NaN,则将n赋为缓区的长度,第一次就是0
    
     缓存区为0就开始读吧
     如果n等于0就返回null,state.needReadable = true;
     如果缓存区为0,是  state.needReadable = true; 需要触发readable
    
     **/
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
沪ICP备20006251号-1