CPythonのgrennlet(グリーンスレッド)の実装について

OSが管理するスレッドと違って,ユーザのプログラムによって管理されるスレッドのことをグリーンスレッドといいます.他にもマイクロスレッドとか軽量スレッドとかいったりすることもあるようです.ネイティブスレッド(OSが管理するスレッド)と比較したときのグリーンスレッドの特徴として.
・スケジューリングをユーザプログラム自身がおこなう
・一般にスレッドの切り替えコストはネイティブスレッドより低い
・一つのプログラム内において実行されるグリーンスレッドは一つ (マルチコアでも,OSから見るとあくまでユーザプログラムは一つなので,グリーンスレッドが並列実行されることがない)などが上げられます.



greenletとは,PythonC言語実装であるCPythonにおけるグリーンスレッドの実装の一つです.(2014/4/29現在,greenletはpython2.xのみをサポートしています.また,以下での説明でgreenletのバージョンは0.4.2です.)
greenletを使用することでプログラムの処理を一旦停止して別の処理をおこなうコルーチンが簡単に実現できます.公式のサンプルコードそのままですが,

    from greenlet import greenlet

    def test1():
        print 12
        gr2.switch()
        print 34

    def test2():
        print 56
        gr1.switch()
        print 78

    gr1 = greenlet(test1)
    gr2 = greenlet(test2)
    gr1.switch()

このようにして実行すると,

12
56
34

が表示されます.greenlet()で新たなグリーンスレッド(greenletと呼びます)を作成し,switch()でそのgreenletに処理を切り替えます.greenletには親子関係が存在し,子が終了したら親の処理が再開されます.特に親を指定しない場合はメインスレッドが親になります.この例ではtest1()が終了したあと,メインスレッドに戻ってプログラムが終了します(test2()の残りの部分は実行されずに終わります).
プログラムを一旦停止するだけならpythonにはジェネレータがあるのでyieldを使えばできますが,greenletを使うことで関数の呼び出し階層にかかわらず同一スレッド内なら任意のgreenletへ処理を切り替える事が出来ます.


さて,このgreenletの中身ですが,pythonで実装されているのではなく,CPythonのモジュール(C言語拡張)として実装されています.CPythonの拡張方法の詳細はここに書いてあります.
greenletのオブジェクト(PyGrrenlet)は以下のように定義されています.


greenlet.h

typedef struct _greenlet {
    PyObject_HEAD
    char* stack_start;
    char* stack_stop;
    char* stack_copy;
    intptr_t stack_saved;
    struct _greenlet* stack_prev;
    struct _greenlet* parent;
    PyObject* run_info;
    struct _frame* top_frame;
    int recursion_depth;
    PyObject* weakreflist;
    PyObject* exc_type;
    PyObject* exc_value;
    PyObject* exc_traceback;
    PyObject* dict;
} PyGreenlet;

ここで特に重要なのが,stack_start, stack_stop, stack_copy, stack_prev, parentあたりです.一般にプログラムの処理を切り替える(コンテキストスイッチ)ために何が必要かというと,各種レジスタを保存することと,スタックを切り替えることです.greenletでもスタック上にレジスタを退避させ,そしてそのスタックを切り替えることで,コンテキストスイッチを実現します.greenletにおけるスタックレイアウトは以下のようになっています(ソースコードのコメントより)

Stack layout for a greenlet:

               |     ^^^       |
               |  older data   |
               |               |
  stack_stop . |_______________|
        .      |               |
        .      | greenlet data |
        .      |   in stack    |
        .    * |_______________| . .  _____________  stack_copy + stack_saved
        .      |               |     |             |
        .      |     data      |     |greenlet data|
        .      |   unrelated   |     |    saved    |
        .      |      to       |     |   in heap   |
 stack_start . |     this      | . . |_____________| stack_copy
               |   greenlet    |
               |               |
               |  newer data   |
               |     vvv       |

PyGreenletのstack_stopがそのgreenletのスタックの一番下,stack_startがスタックの一番上を指します.またスタック領域の一部は他のgreenletを実行する際にはヒープに退避されます.ヒープ領域を覚えるために必要なのがstack_copyとstack_savedです.全ての領域をヒープに退避せずに一部だけ退避させるのはメモリ効率のためでしょうか.


greenletのsiwtch()を呼んだとき,一般に以下のような順番で関数が呼ばれます.


g_switch() => g_switchstack() => g_slp_switch() => SLP_SAVESTATE => SLP_RESTORE_STATE


まずはg_switch()から見てきます.

static PyObject *
g_switch(PyGreenlet* target, PyObject* args, PyObject* kwargs)
{
    ...
    /* find the real target by ignoring dead greenlets,
       and if necessary starting a greenlet. */
    while (target) {
        if (PyGreenlet_ACTIVE(target)) {
            ts_target = target;
            err = g_switchstack();  # スタックの切り替え
            break;
        }
        if (!PyGreenlet_STARTED(target)) {
            void* dummymarker;
            ts_target = target;
            err = g_initialstub(&dummymarker);
            if (err == 1) {
                continue; /* retry the switch */
            }
            break;
        }
        target = target->parent;
    }
    ...
}

PyGreenlet_ACTIVE(target)が真のとき,つまり切り替え対象のgreenletが既に一度は実行されているとき,g_switchstack()を呼びます.

static int g_switchstack(void)
{
    ...
    int err;
    {   /* save state */
        PyGreenlet* current = ts_current;   // ts_currentが現在のgreenlet
        PyThreadState* tstate = PyThreadState_GET();
        current->recursion_depth = tstate->recursion_depth;
        current->top_frame = tstate->frame;
        current->exc_type = tstate->exc_type;
        current->exc_value = tstate->exc_value;
        current->exc_traceback = tstate->exc_traceback;
    }
    err = slp_switch();
    if (err < 0) {   /* error */
        ...
    }
    else {
        PyGreenlet* target = ts_target;
        PyGreenlet* origin = ts_current;
        ...
    }
    return err;
}

g_switchstack()では現在の状態をcurrentに保存したあと,slp_switch()を呼びます.このslp_switch()が実際のレジスタの退避とスタックの切り替えを行いますが,そういった処理はアセンブラを使わなければできないため,CPUごとにplatform/switch_amd64_unix.c のように定義されています.


switch_amd64_unix.c

#define REGS_TO_SAVE "r12", "r13", "r14", "r15"

static int
slp_switch(void)
{
    int err = 0;
    void* rbp;
    void* rbx;
    unsigned int csr;
    unsigned short cw;
    register long *stackref, stsizediff;
    __asm__ volatile ("" : : : REGS_TO_SAVE);
    __asm__ volatile ("fstcw %0" : "=m" (cw));
    __asm__ volatile ("stmxcsr %0" : "=m" (csr));
    __asm__ volatile ("movq %%rbp, %0" : "=m" (rbp));
    __asm__ volatile ("movq %%rbx, %0" : "=m" (rbx));
    __asm__ ("movq %%rsp, %0" : "=g" (stackref));
    {
        SLP_SAVE_STATE(stackref, stsizediff);
        __asm__ volatile (
            "addq %0, %%rsp\n"
            "addq %0, %%rbp\n"
            :
            : "r" (stsizediff)
            );
        SLP_RESTORE_STATE();
        err = fancy_return_zero();
    }
    __asm__ volatile ("movq %0, %%rbx" : : "m" (rbx));
    __asm__ volatile ("movq %0, %%rbp" : : "m" (rbp));
    __asm__ volatile ("ldmxcsr %0" : : "m" (csr));
    __asm__ volatile ("fldcw %0" : : "m" (cw));
    __asm__ volatile ("" : : : REGS_TO_SAVE);
    return err;
}

前半部分でスタック上に必要なレジスタを退避し,SLP_SAVE_STATE()とその次の部分でスタックの切り替えをおこないます.

    __asm__ volatile ("" : : : REGS_TO_SAVE);

とするとgccインラインアセンブラの記述によってREGS_TO_SAVEで指定したレジスタがワーカレジスタであることがコンパイラに伝わるので,これらのレジスタを退避するコードをコンパイラが生成してくれます.具体的には,

 0x100000f54:  pushq  %r15
 0x100000f56:  pushq  %r14
 0x100000f58:  pushq  %r13
 0x100000f5a:  pushq  %r12
 ... (処理)
 0x100000f72:  popq   %r12
 0x100000f74:  popq   %r13
 0x100000f76:  popq   %r14
 0x100000f78:  popq   %r15

のようになります.r12-r15を退避させているのはx64の呼び出し規約によります.fstcwでFPUの制御レジスタの退避,stmxcsrでSSE関連の制御レジスタの退避をします."=m"(cw)のようにしているので,スタック上のローカル変数に値が退避されていることが分かります.スタックが切り替わった後半部分ではレジスタの復元をおこないます.スタックが切り替わっているのでこの関数から戻った時点で既に切り替え対象だったgreenletに処理が移っています.分かりにくいですが前半部分で処理を停止したgreenletが次に再開されるときはSLP_RESTORE_STATE()からということです.(ところで一番最後のREGS_TO_SAVEの文はいらないような..?)


SLP_SAVE_STATE()とSLP_RESTORE_STATE()で一体何をしているかというと,この2つはマクロとしてgreenlet.cに定義されています.

#define SLP_SAVE_STATE(stackref, stsizediff)        \
  stackref += STACK_MAGIC;              \
  if (slp_save_state((char*)stackref)) return -1;   \
  if (!PyGreenlet_ACTIVE(ts_target)) return 1;      \
  stsizediff = ts_target->stack_start - (char*)stackref

#define SLP_RESTORE_STATE()         \
  slp_restore_state()

既にgreenletが存在している場合(switch()の呼び出しが2回目以降),SLP_SAVE_STATE()はslp_save_state()関数を呼び,さらにstsizediffに切り替え先のgreenletと現在のスタックの先頭との差を格納します.SLP_SAVE_STATE()の次にあるインラインアセンブラ部分で%rspと%rbpにその差を加える事でスタックの切り替えを実現している訳です.
slp_save_state()では,ヒープに必要なだけスタックの退避をおこないます.つまり,切り替え先のstack_stop(target_stop)が,現在のstack_stopよりも上にあるなら,その分をヒープに格納します.

static int GREENLET_NOINLINE(slp_save_state)(char* stackref)
{
    /* must free all the C stack up to target_stop */
    char* target_stop = ts_target->stack_stop; # 切り替え先のスタックの下限
    PyGreenlet* owner = ts_current;
    assert(owner->stack_saved == 0);
    if (owner->stack_start == NULL)
        owner = owner->stack_prev;  /* not saved if dying */
    else
        owner->stack_start = stackref;
    
    while (owner->stack_stop < target_stop) {
        /* ts_current is entierely within the area to free */
        if (g_save(owner, owner->stack_stop))
            return -1;  /* XXX */
        owner = owner->stack_prev;
    }
    if (owner != ts_target) {
        if (g_save(owner, target_stop))
            return -1;  /* XXX */
    }
    return 0;
}

g_save()で実際にヒープへのコピーをおこないます.

static int g_save(PyGreenlet* g, char* stop)
{
    /* Save more of g's stack into the heap -- at least up to 'stop'

       g->stack_stop |________|
                     |        |
                     |    __ stop       . . . . .
                     |        |    ==>  .       .
                     |________|          _______
                     |        |         |       |
                     |        |         |       |
      g->stack_start |        |         |_______| g->stack_copy

     */
    intptr_t sz1 = g->stack_saved;
    intptr_t sz2 = stop - g->stack_start;
    assert(g->stack_start != NULL);
    if (sz2 > sz1) {
        char* c = (char*)PyMem_Realloc(g->stack_copy, sz2);
        if (!c) {
            PyErr_NoMemory();
            return -1;
        }
        memcpy(c+sz1, g->stack_start+sz1, sz2-sz1);
        g->stack_copy = c;
        g->stack_saved = sz2;
    }
    return 0;
}

slp_restore_state()では逆にヒープに退避していたスタック領域をスタックに復元します.実行するgreenletのスタック領域は必ず全てスタックになければいけないため,ヒープに退避していたもの全てを復元します(復元しても良いように事前にslp_save_state()で重複する部分はヒープに退避されています).

static void GREENLET_NOINLINE(slp_restore_state)(void)
{
    PyGreenlet* g = ts_target;
    PyGreenlet* owner = ts_current;
    
    /* Restore the heap copy back into the C stack */
    if (g->stack_saved != 0) {
        memcpy(g->stack_start, g->stack_copy, g->stack_saved);
        PyMem_Free(g->stack_copy);
        g->stack_copy = NULL;
        g->stack_saved = 0;
    }
    if (owner->stack_start == NULL)
        owner = owner->stack_prev; /* greenlet is dying, skip it */
    while (owner && owner->stack_stop <= g->stack_stop)
        owner = owner->stack_prev; /* find greenlet with more stack */
    g->stack_prev = owner;
}

ということで以上がgreenletにおけるコンテキストスイッチの概要ですが, 一番最初に greenletを作って実行するときはどのようになっているでしょうか.


まず,最初にgreeletオブジェクトを作成するときにはgreen_new()が呼ばれます.

static PyObject* green_new(PyTypeObject *type, PyObject *args, PyObject *kwds)
{
    PyObject* o;
    if (!STATE_OK)
        return NULL;
    
    o = type->tp_alloc(type, 0);
    if (o != NULL) {
        Py_INCREF(ts_current);
        ((PyGreenlet*) o)->parent = ts_current;
    }
    return o;
}

ここで,STATE_OKは以下のマクロで,

#define STATE_OK    (ts_current->run_info == PyThreadState_GET()->dict \
            || !green_updatecurrent())

最初はgreen_updatecurrent()が呼ばれます.

static int green_updatecurrent(void)
{
    ...
    /* first time we see this tstate */
    current = green_create_main();
    ...
    ts_current = current;
    ...
    return 0;
}

この中でgreen_create_main()が呼ばれ,メインスレッドのgreenletが作成され,それがts_currentになります.

static PyGreenlet* green_create_main(void)
{
    PyGreenlet* gmain;
    PyObject* dict = PyThreadState_GetDict();
    if (dict == NULL) {
        if (!PyErr_Occurred())
            PyErr_NoMemory();
        return NULL;
    }

    /* create the main greenlet for this thread */
    gmain = (PyGreenlet*) PyType_GenericAlloc(&PyGreenlet_Type, 0);
    if (gmain == NULL)
        return NULL;
    gmain->stack_start = (char*) 1;
    gmain->stack_stop = (char*) -1;
    gmain->run_info = dict;
    Py_INCREF(dict);
    return gmain;
}

したがって,最初に作成したgreenletの親はメインスレッドのgreenletになります.(その後green_init()が呼ばれ,明示的に親を指定した場合はそれが親になります).さて,新規のgreenletに対してswitch()をした場合,g_switch()内からg_initialstub()が呼ばれます.

static PyObject *
g_switch(PyGreenlet* target, PyObject* args, PyObject* kwargs)
{
    ...
    /* find the real target by ignoring dead greenlets,
       and if necessary starting a greenlet. */
    while (target) {
        if (PyGreenlet_ACTIVE(target)) {
            ts_target = target;
            err = g_switchstack()
            break;
        }
        if (!PyGreenlet_STARTED(target)) {
            void* dummymarker;
            ts_target = target;
            err = g_initialstub(&dummymarker); # 初めてgreenletを実行する場合
            if (err == 1) {
                continue; /* retry the switch */
            }
            break;
        }
        target = target->parent;
    }
    ...
}

g_initialstub()では,初期設定ののち,g_switchstack()をしてスタックを切り替えます.

static int GREENLET_NOINLINE(g_initialstub)(void* mark)
{
    ...
    /* start the greenlet */
    self->stack_start = NULL;
    self->stack_stop = (char*) mark;
    if (ts_current->stack_start == NULL) {
        /* ts_current is dying */
        self->stack_prev = ts_current->stack_prev;
    }
    else {
        self->stack_prev = ts_current;
    }
    ...

    /* restore arguments in case they are clobbered */
    ts_target = self;
    ts_passaround_args = args;
    ts_passaround_kwargs = kwargs;

    /* perform the initial switch */
    err = g_switchstack();   # スタックの切り替え

    /* returns twice!
       The 1st time with err=1: we are in the new greenlet
       The 2nd time with err=0: back in the caller's greenlet
    */
    if (err == 1) {
        ...

        if (args == NULL) {
            /* pending exception */
            result = NULL;
        } else {
            /* call g.run(*args, **kwargs) */
            result = PyEval_CallObjectWithKeywords(   # 実際の処理の開始
                run, args, kwargs);
            ...
        }
        ...
        # greenletの処理が終了したので,親を再開させる
        /* jump back to parent */
        self->stack_start = NULL;  /* dead */
        for (parent = self->parent; parent != NULL; parent = parent->parent) {
            result = g_switch(parent, result, NULL);  # ここで親に処理を切り替える.成功すればもう戻ってこない
            /* Return here means switch to parent failed,
             * in which case we throw *current* exception
             * to the next parent in chain.
             */
            assert(result == NULL);
        }
        /* We ran out of parents, cannot continue */
        PyErr_WriteUnraisable((PyObject *) self);
        Py_FatalError("greenlets cannot continue");
    }
    ...
    return err;
}

ここで,重要なのが,ソースコメントにも書いてあるとおり,g_switchstack()からは2回リターンするということです(...!!!).
どういう事かというと,g_switchstack()を実行してスタックを切り替えると,スタックが切り替わった時点で既に実行は新しいgreenletになっていますが,この新しいgreenletの状態でg_switchstack()がリターンします.そしてしばらくした後,その新しいgreenletが終了(あるいは明示的にswitch()した場合),切り替え元のgreenletの処理が再開されるわけですが,その切り替え元のgreenletの状態でg_switchstack()が再びリターンするということです.g_switchstack()から戻ってきたとき,新しいgreenletなのか,それとも切り替えもとのgreenletなのかでg_switchstack()の戻り値は異なります.
これはslp_switch()をよく見ればわかります.slp_switch()で通常の切り替えが発生した場合,その戻り値は0です.しかし,切り替え先のgreenletが新しい場合,つまりヒープやスタックから情報を復元する必要がない場合,SLP_SAVE_STATE()でreturn 1が返されます.

#define SLP_SAVE_STATE(stackref, stsizediff)		\
  stackref += STACK_MAGIC;				\
  if (slp_save_state((char*)stackref)) return -1;	\
  if (!PyGreenlet_ACTIVE(ts_target)) return 1;		\  # ここが新しいgreenletの場合実行される
  stsizediff = ts_target->stack_start - (char*)stackref

g_switchstack()の戻り値が1の場合,新しいgreenletなのでそのgreenletを実行します.そして,そのgreenletが終了した場合親の処理を再開させます.勿論.そのgreenletが新たに別のswitch()を呼んで処理を切り替えることもあるでしょう.


ということで以上がgreenletの処理の概要になります.つくづくよくできてますね (´ρ`)