凹みTips

C++、JavaScript、Unity、ガジェット等の Tips について雑多に書いています。

Node.js でマルチスレッド対応のネイティブモジュールを作成する

はじめに

前回(Node.js で C++ アドオンから EventEmitter のイベントリスナを呼ぶ - 凹みTips)、C++ のネイティブモジュールから EventEmitter を利用して JavaScript 側で定義したイベントリスナを呼ぶ方法を紹介しました。ただ Node.js はシングルスレッドベースの非同期処理を行っているため、このイベントリスナを呼ぶ過程のどこかで重い処理を行うと、全体の処理がブロックされてしまいます。また、これを避けるために、子スレッドを作成して処理が終了したら JavaScript のイベントリスナを呼ぶ、としようとしても、別スレッド内から Node.js の走るスレッドの v8 の世界へ直接アクセスすることが出来ないため、イベントリスナを呼ぶことができません*1
そこで、libeio / libev に代わって Node.js のコアとして置き換わりつつある libuv の機能である uv_queue_work を用いた、別スレッドでの非同期処理の実現方法を紹介します。

コード

第1引数で受け取った数字を2倍して第2引数のコールバックの引数として1秒後に呼ぶ、といった簡単な内容になっています。この「2倍して1秒間待ってる」ところが非同期処理で行われています。

async.js
var async = require('./build/Debug/addon').async;

async(100, function(result) {
	console.log(result);
});
addon.cc
#include <iostream>
#include <thread>
#include <node.h>

using namespace v8;

// スレッド間でやりとりするデータ(任意)
struct my_struct
{
	int result;
	Persistent<Function> callback;
};

// 別スレッドで実行する非同期プロセス
// この中で v8 の世界に入ることは出来ない
void AsyncWork(uv_work_t* req) {
	std::cout << "AsyncWork\t: " << std::this_thread::get_id() << std::endl;

	// データを非同期で処理したりする
	my_struct* data = static_cast<my_struct*>(req->data);
	data->result *= 2;
	std::this_thread::sleep_for( std::chrono::seconds(1) );
}

// 非同期プロセスが実行した後に呼ばれる
void AsyncAfter(uv_work_t* req) {
	std::cout << "AsyncAfter\t: " << std::this_thread::get_id() << std::endl;

	// Async 時にセットしたコールバック経由で AsyncWork で処理したデータを返す
	my_struct* data = static_cast<my_struct*>(req->data);
	Local<Value> argv[1] = { Number::New(data->result) };
	data->callback->Call(Context::GetCurrent()->Global(), 1, argv);
	
	// お掃除
	data->callback.Dispose();
	delete data;
	delete req;
}

// JavaScript の世界から呼ばれる
Handle<Value> Async(const Arguments& args)
{
	std::cout << "Async\t\t: " << std::this_thread::get_id() << std::endl;
	HandleScope scope;

	// 別スレッドへ渡すデータを作る
	my_struct* data = new my_struct;
	data->result = args[0]->Int32Value();
	data->callback = Persistent<Function>::New( args[1].As<Function>() );

	// 別スレッドで処理を行う
	uv_work_t *req = new uv_work_t;
	req->data = data;
	uv_queue_work(uv_default_loop(), req, AsyncWork, AsyncAfter);

	return scope.Close( Undefined() );
}

// Async 関数を JavaScript の世界へ送り出す
void Init(Handle<Object> target) {
	NODE_SET_METHOD(target, "async", Async);
}
NODE_MODULE(addon, Init)
binding.gyp
{
    'targets': [
        {
            'target_name' : 'addon',
            'sources'     : [ 'addon.cc' ],
            'cflags'      : [ '-std=c++0x' ],
            'conditions'  : [
                ['OS=="mac"', {
                    'include_dirs' : [
                        '/usr/local/include/libcxx',
                    ],
                    'xcode_settings' : {
                        'OTHER_CFLAGS': [
                            '-std=c++0x',
                        ],
                    },
                }],
            ],
        },
    ],
}
ビルド & 実行
$ node-gyp configure build
$ node async
Async		: 0x7fff7d2cc180
AsyncWork	: 0x100f46000
AsyncAfter	: 0x7fff7d2cc180
200

解説

Async 内で呼び出している uv_queue_work は次のような引数を取る実装になっています(threadpool.c:L171)。

// uv_queue_work(イベントループ, 非同期処理でのやり取り用データ, 非同期処理を行う関数, 非同期処理終了後のコールバック);
int uv_queue_work(uv_loop_t* loop,
                  uv_work_t* req,
                  uv_work_cb work_cb,
                  uv_after_work_cb after_work_cb) {
  uv__req_init(loop, req, UV_WORK);
  req->loop = loop;
  req->work_cb = work_cb;
  req->after_work_cb = after_work_cb;
  uv__work_submit(loop, &req->work_req, uv__queue_work, uv__queue_done);
  return 0;
}

イベントループへは現在回っているもの( uv_default_loop() )を与えておきます。やりとり用のデータは uv_work_t* 型で、uv.h で定義されている(uv.h:L352)ように、uv_work_t 型は void* 型の data をメンバに持っています。なので、ここにユーザが定義した適当なデータのポインタを突っ込んで置きます。そして、この uv_work_t* 型の引数を受け取る非同期処理を行う関数(ここでは AsyncWork)、それが終わった後のコールバック用関数(AsyncAfter)を与えます。この AsyncWork や AsyncAfter 内で uv_work_t* 型の引数 req の data を元々の型にキャストして色々な処理を行います。
実行結果を見ても分かる通り、AsyncWork 内だけ別スレッドでの実行となっています。なので、この中では v8 の世界にアクセス出来ないので、処理に必要なデータは予め data の中に突っ込んで置くようにします。

別スレッドからの実行

現在は、メインスレッドから uv_queue_work を実行していますが、これを利用すれば別スレッドで動いている場所から同様に uv_queue_work を実行して非同期処理を行うこともできます。イメージとしては、別スレッドでコールバックが呼ばれたとき、そのタイミングで JavaScript のイベントリスナを呼びたい、といった状況です。

addon.cc
// JavaScript の世界から呼ばれる
Handle<Value> Async(const Arguments& args)
{
	std::cout << "Async\t\t: " << std::this_thread::get_id() << std::endl;
	HandleScope scope;

	// 別スレッドへ渡すデータをセットする
	my_struct* data = new my_struct;
	data->result = args[0]->Int32Value();
	data->callback = Persistent<Function>::New( args[1].As<Function>() );

	// 別スレッドで処理が行われる(非同期のコールバックとか)
	std::thread t([&]{
		std::cout << "Another Thread\t: " << std::this_thread::get_id() << std::endl;

		uv_work_t *req = new uv_work_t;
		req->data = data;
		uv_queue_work(uv_default_loop(), req, AsyncWork, AsyncAfter);
	});
	t.join();

	return scope.Close( Undefined() );
}

結果

Async		: 0x7fff7d2cc180
Another Thread	: 0x100f86000
AsyncWork	: 0x100fc9000
AsyncAfter	: 0x7fff7d2cc180
200

最終的な AsyncAfter は元々のスレッドに戻ってくるのでそのタイミングでイベントリスナやコールバックを呼んであげれば OK です。

おわりに

元々は音声認識ライブラリ Julius の発話開始コールバックを JavaScript 側に通知したくて調べたのですが、思ったより時間がかかってしまいました。libuv は触りたてなので、間違っている場所があるかもしれませんが、その際は教えていただけると助かります。

*1:v8 ではマルチスレッド処理に対応しており、v8::Locker を HandleScope を作成する前に呼ぶことでメインスレッドで走る v8 の世界にアクセスできるのですが、これを行うと処理が止まってしまいます(デッドロックしてる?)。node-waf 時代は出来たのですが…