Node.js включает встроенный модуль stream, который позволяет нам работать с потоковыми данными и создавать высокопроизводительные приложения.
Основные типы потоков внутри Node.js:
- Readable: потоки, из которых можно считывать данные (например, fs.createReadStream());
- Writable: потоки, в которые могут быть записаны данные (например, fs.createWriteStream());
- Duplex: потоки, которые являются одновременно Readable и Writable (например, net.Socket);
- Transform: потоки преобразования (Duplex).
Readable Stream
Readable stream - это абстракция определенного источника, из которого считываются данные (например, файла).
Примеры readable streams в приложениях Node.js:
- HTTP responses, on the client
- HTTP requests, on the server
- fs read streams
- zlib streams
- crypto streams
- TCP sockets
- child process stdout and stderr
- process.stdin - для чтения пользовательского ввода через stdin в терминале.
Чтение данных из Readable Streams в Node.js
После подключения readable stream к источнику данных (например, файлу) можно использовать несколько способов считывания данных через поток:
Функция readDataFromFile() читает содержимое файла fileToRead.txt, расположенного в папке '__dirname/files', с помощью Readable Stream и выводит в консоль
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
import { createReadStream } from 'node:fs'; import { fileURLToPath } from 'node:url'; import path from 'path' const __filename = fileURLToPath(import.meta.url) const __dirname = path.dirname(__filename) const FILE_PATH = path.join(__dirname, 'files', 'fileToRead.txt') function readDataFromFile() { const readable = createReadStream(FILE_PATH, { highWaterMark: 20 }) readable.on("data", (chunk) => { console.log(`Read ${chunk.length} bytes, content: "${chunk.toString()}"\n`); }); readable.on('end', () => { console.log(`No readable data`); }) } readDataFromFile() |
Другие события Readable Stream 'close', 'end', 'error', 'pause', 'readable', 'resume'
Свойство { highWaterMark: 20 }, передаваемое в качестве опции в fs.createReadStream(), определяет размер чанка (chunk) данных (буфер внутри потока). По умолчанию у fs readable stream потоков, доступных для чтения, highWaterMark установлено на 64 кБ.
Ниже, для примера, простое чтение и вывод в консоль (в process.stdout) с иcпользованием промиса функции pipeline(), которая обеспечивает соединение нескольких потоков, передачу ошибок между потоками и генераторами, правильную очистку и обратный вызов после завершения передачи:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
import { createReadStream } from 'node:fs'; import { pipeline } from 'stream/promises'; import { fileURLToPath } from 'node:url'; import path from 'path' const __filename = fileURLToPath(import.meta.url) const __dirname = path.dirname(__filename) const FILE_PATH = path.join(__dirname, 'files', 'fileToRead.txt') function readDataFromFile() { const readable = createReadStream(FILE_PATH) pipeline(readable, process.stdout) } readDataFromFile() |
Использование асинхронных итераторов является альтернативным способом чтения данных из readable stream .
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
import { createReadStream } from 'node:fs'; import { fileURLToPath } from 'node:url'; import path from 'path' const __filename = fileURLToPath(import.meta.url) const __dirname = path.dirname(__filename) const FILE_PATH = path.join(__dirname, 'files', 'fileToRead.txt') async function readDataFromFile() { try { const readable = createReadStream(FILE_PATH, { highWaterMark: 20 }) for await (const chunk of readable) { console.log(`Read ${chunk.length} bytes\n"${chunk.toString()}"\n`); } } catch (error) { throw new Error('ERROR: no read data') } } readDataFromFile() |
О контроле состояния потока readable stream в Node.js читай на Хабре.
Writable Streams в Node.js
Для записи данных из приложения в определенное место назначения (например, в файл) используются writable streams.
Примеры writable streams в Node.js:
- HTTP requests, on the client
- HTTP responses, on the server
- fs write streams
- zlib streams
- crypto streams
- TCP sockets
- child process stdin
- process.stdout, process.stderr
Поток process.stdout может использоваться для записи данных в терминал и используется внутри console.log.
Функция, которая записывает данные, вводимые в консоли (process.stdin), в содержимое файла fileToWrite.txt , используя Writable Stream:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
import { createWriteStream } from 'fs' import { fileURLToPath } from 'url'; import path from 'path'; import { pipeline } from 'stream/promises'; const __filename = fileURLToPath(import.meta.url) const __dirname = path.dirname(__filename) const write = async (fileNameToWrite = '') => { try { const FILE_PATH = path.join(__dirname, 'files', fileNameToWrite) // { flags: 'a' } - дописывает содержимое, а не перезаписывет (если без флага 'a') await pipeline(process.stdin, createWriteStream(FILE_PATH, { flags: 'a' })) } catch (error) { throw new Error(error) } }; await write('fileToWrite.txt'); |
Функция, которая сжимает файл по адресу FILE_PATH в архив с адресом ARCHIVE_PATH с использованием zlib и Streams API:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
import { createGzip } from 'node:zlib'; import { pipeline } from 'node:stream/promises'; import { createReadStream, createWriteStream } from 'node:fs'; import { FILE_PATH, ARCHIVE_PATH } from './constants.js' const gzip = createGzip(); const source = createReadStream(FILE_PATH); const destination = createWriteStream(ARCHIVE_PATH); const compress = async () => { await pipeline(source, gzip, destination).catch((err) => { throw new Error('Сompression error', err) }); }; await compress(); |
Функция, которая распаковывает архив с адресом ARCHIVE_PATH в файл, расположенный по адресу FILE_PATH, с использованием zlib и Streams API:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
import { createGunzip } from 'node:zlib'; import { pipeline } from 'node:stream/promises'; import { createReadStream, createWriteStream } from 'node:fs'; import { FILE_PATH, ARCHIVE_PATH } from './constants.js' // файл с пользовательскими константами const gzip = createGunzip(); const source = createReadStream(ARCHIVE_PATH); const destination = createWriteStream(FILE_PATH); const decompress = async () => { await pipeline(source, gzip, destination).catch((err) => { throw new Error('Decompression error', err) }); }; await decompress(); |
Transform Streams в Node.js
Transform stream реализует соединение readable stream и writable stream через transform stream, что позволяет преобразовать чанк (chank) данных после получения из readable stream и далее отправить его в writable stream.
Для преобразования порции данных (chank) используется приватный метод _transform класса Transform. Этот метод принимает 3 параметра:
- chunk (часть данных);
- encoding (кодировка, если chunk - это строка);
- callback(err, data) (функция, которая вызывается после неудачной или успешной записи).
Функция, которая считывает строку из process.stdin, переворачивает текст с помощью Transform Stream и затем записывает его в process.stdout
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
import { stdin, stdout } from 'node:process'; import { Transform } from 'node:stream'; import { pipeline } from 'node:stream/promises'; const reversStr = (str) => str.toString().split("") .reverse() .join("") const transform = async () => { const myTransform = new Transform({ transform(chunk, encoding, callback) { callback(null, reversStr(chunk)) } }) await pipeline(stdin, myTransform, stdout) }; await transform(); |