Ponz Dev Log

ゆるくてマイペースな開発日記

ずっと避けて通っていたNode.jsのStreamに助けられた話

自分は業務・個人開発ともに普段はNode.jsでアプリ開発することが多いのですが、 今回は1プロセスのバッチでは到底処理しきれない量のデータを Node.js Stream で何とか乗り切った話です。 乗り切るまでの過程も忘れないように覚え書きします。

数十MB ~ GB単位のデータセットを加工→ファイルダンプするシチュエーションをイメージしてください。

fs.writeFile で一気に書き出すとデータセット加工時に死ぬ

まずStreamを使用する前に、Node.js標準ライブラリの fs を使って以下のような考え方で機能実現を試みました。

  • (前提) 1プロセスのNode.jsバッチアプリケーションで処理する。
  • 処理順序は、作成済みの配列を _createLine で加工 → 末尾で改行して文字列連結 → ファイルに非同期で書き出し の順に行う。
  • ファイルの書き出しには、Node.js標準モジュールの fs.writeFile を用いて一括でファイルに書き出す。

以下サンプルコードです。 (もちろん以下のソースコードは当時とは別物のサンプルです)

class DumpServiceFsImpl {
    async dump(fileName: string) {
        const rows = this.dataset.map(this._createLine).join("\n");
        await this._writeFileAsync(`./data/${fileName}`, rows);
    }
}

上記の方法で実行してみると、データセットが500万件を超えたあたりで以下のようなエラーが出るようなりました。

FATAL ERROR: Ineffective mark-compacts near heap limit Allocation failed - JavaScript heap out of memory

そうです、Heap Out of memoryです。これはNode.jsのヒープサイズ上限を超えたオブジェクトを保持すると発生するエラーです。 元のデータセットと一緒に加工後のデータも全て保持して倍以上のヒープを消費したことが原因でした。

Out of MemoryはJavaのアプリだと(うっかり実装で)良く出会う印象ですが、Node.jsもOut of Memoryで落ちることあるのですよ。 (軽量なアプリケーションの作成にばかりNode.jsを使っているとなかなか出会さないだけ)

ヒープサイズを増やせば解決するか?

Out of Memoryで死んだのは、ヒープサイズ上限を超えたオブジェクトを保持したからでした。 上記のポストにも記載されていますが、nodeコマンドに特定のフラグ (--max-old-space-size) をつけて実行することでヒープサイズの上限を引き上げて解決できます。 Node.jsのコアであるV8エンジンのヒープサイズ上限は元々1.7GBとそこそこ大きいサイズを持っていますが、それでも足りない場合の対処方法です。

stackoverflow.com

ですが、自分の場合はプラットフォームの制約から安易にヒープサイズを上げることができなかったのでこの方法は断念。。。(実行環境のインスタンスのメモリが2GBしかなかった)

スケールアップ(ヒープサイズ上限の引き上げ) & スケールアウト(複数のプロセスで実行)の道が閉ざされてしまったので、アプリの処理をヒープサイズを消費しない省エネな作りに変えて対応することにしました。

StreamでちょっとずつETL

そこで白羽の矢が立ったのが Node.js Streamです。StreamはNode.jsの標準ライブラリに含まれています。 入力のオブジェクトをStreamに読み込むと、Stream内部で持っているバッファに読み込んだ要素を順次溜めていき、閾値まで溜めたらアウトプットにまとめて書き出すことができます。

Streamは読み込み / 加工 / 書き込み用のStreamが用意されているため、少しずつ読み込み→加工→書き出しとStreamをつなげることでヒープサイズが枯渇しないようなETLの実装が可能です。 今回のユースケースにはピッタリです。

最終的には大量データの書き出しは以下のコードで解決できました。(これもサンプルです)

import {Order} from "./models/model";

// 変換ストリームを自分で実装
class LineTransform extends Transform {
    private rowCounter: number;

    constructor(options = {}) {
        // objectMode: true で chunkをObjectで受け取る。
        // この設定しないとchunkはstring or Buffer型を期待するため、TYPE_ERRORで例外を投げる。
        super({...options, objectMode: true});
        this.rowCounter = 1;
    }

    _transform(chunk: Order, encoding: string, callback: (error?: (Error | null)) => void): void {
        const line = this._createLine(chunk, this.rowCounter);
        this.push(line); // 書き出し先のStreamに要素を追加
        this.rowCounter++; // increment counter
        callback();
    }
}

import intoStream from "into-stream";  // Object -> ReadableStreamへの変換ライブラリ
import {createWriteStream} from "fs";

export default class implements DumpService {
    private readonly dataset: Order[];

    constructor(dataset: Order[]) {
        this.dataset = dataset;
    }

    async dump(fileName: string) {
        const path = `./data/${fileName}`;
        logger.debug(`Dump dataset into ${path}`);

        const ordersStream = intoStream.object(this.dataset);  // データセットをReadableStreamに変換して読み出す
        const transformer = new LineTransform();  // 変換ストリームでデータセットの各要素を加工
        const outputStream = createWriteStream(path);  // Writable Streamでファイルに出力
        await ordersStream.pipe(transformer).pipe(outputStream);  // pipeでStreamを連結
    }
}

解説すると、Readable Stream, Writable Stream, Transformerを3種類を活用しています。 Stream同士は pipe メソッドで連結可能なので、読み込み / 加工 / 書き込みでそれぞれ責務を分割しながらヒープサイズの消費を押さえて大量データをETLできます。

またそれぞれのStreamはInterfaceとして提供されているので、カスタムのStreamを実装も可能です。 上記のコードだとデータセットを加工するTransformクラスを継承したStreamクラスを作成しています。

Streamは普段使わないと正体が掴めない代物です。ずっと避けて通ってみたものの、触ってみると案外使いやすい便利な標準ライブラリでした。

Streamを使えばOOMを必ず回避できるのか?

Node.js Streamがヒープサイズの消費を押さえた省エネな作りをしていたとしても、OOMを回避できない場面が出てきます。 一度に大量のオブジェクトを作ってしまったらStreamに流し込む前にヒープサイズが枯渇してしまいます。

実際に数百MB以上のデータセットを扱ったときは、100万件程度でデータセットを分割して加工→ダンプ(追記)する方法とStreamで少しずつ流し込む方法の両方を採用していました。

次に同じような実装をするときは、1つのバッチにまとめずに複数のアプリに分散 or 責務分割するか、Node.jsをやめて大量処理に向いたSpark (Python, Java, Scala) やETLサービスを使って実現したいですね。


以上。