Update 2023.11.17 2017.05.05

Python デザインパターン サンプルコード State
Mark Summerfield『実践 Python 3』デザインパターンのサンプルコード
Python3(3.11)で動くソースコード(.pyファイル .ipynbファイル)あります
「anaconda3」on .py「PyCharm」.ipynb「Jupyter Notebook」

著作権の問題があるので,本に書いてないことだけを解説します。つまり,視点を変えて解説します。

◆◆State パターンの使われる場面◆◆

以下は,同じサイトの別記事について説明されている。
https://yamakatsusan.web.fc2.com/hp_software/pythonpattern16.html
『State パターン 結城 浩「Java言語で学ぶデザインパターン入門」をPython化』

Java サンプルでは,金庫の見張りの同名作業の内容を,昼間と夜間という異なる状態により替えるというものである。

GoF の C++ サンプルでは,ネットワーク通信の同名応答を,異なる状態により替えるというものである。

Web で見つけた例は,ゲームのキャラクタの様々の状態があり,様々な同名局面の対応を替えるというものである。同名局面とは,想像がつくと思うが,「あるものを食べた」「敵に出会った」「穴に落ちた」等々である。自分の状態と同名局面の組み合わせにより対応を替えるのである。

オブジェクトの書き方の詳細は後述するが,Java サンプルでは(時計の比較以外)if 文がまったくなく,この発想は個人が独力で思いつくことが難しいと思われる。また,アルゴリズム?のミスがほとんどなくなるのではないかと思われる。

(2023-11-17)Python3.11で動作確認済み


◆◆State パターンとは◆◆

GoFによれば,State パターンの目的は, 「オブジェクトの内部状態が変化したときに,オブジェクトが振る舞いを変えるようにする。クラス内では,振る舞いの変化を記述せず,状態を表すオブジェクトを導入することでこれを実現する。」

GoFによれば,State パターンの別名は,Objects for States です。

GoFによれば,次に示すいずれかの場合に,State パターンを利用する。
・オブジェクトの振る舞いが状態に依存し,実行時にはオブジェクトがその状態により振る舞いを変えなければならない場合。
・オペレーションが,オブジェクトの状態に依存した多岐にわたる条件文を持っている場合。この状態はたいてい1つ以上の列挙型の定数で表されており,たびたび複数のオペレーションに同じ条件構造が現れる。State パターンでは,1つ1つの条件分岐を別々のクラスに受け持たせる。これにより,オブジェクトの各状態を1つのオブジェクトとして扱うことができるようになる。

GoFによれば,State パターンに関連するパターンは次のようなものである。
Flyweight パターン:いつどのように ConcreteState オブジェクトが共有されるのかを説明している。
Singleton パターン:ConcreteState オブジェクトは,しばしば Singleton パターンにあてはまる。

◆◆State パターンのサンプルのクラス構成◆◆

ソースコードは巻末にあり,ソースファイルはこのWebの左上隅にありダウンロードできます。
【multiplexer1.py】
                 Counter                        Event
                (__call_)                         ()

                    ↑                             ↑
              Multiplexer                 def generate_random_events
   (connect, disconnect, send)
【multiplexer2.py】
                 Counter                        Event
                (__call_)                         ()

                    ↑                             ↑
              Multiplexer                 def generate_random_events
    (connect, disconnect, send, state)
【multiplexer3.py】
                 Counter                        Event
                (__call_)                         ()

                    ↑                             ↑
              Multiplexer                 def generate_random_events
   (pipeline, connect, disconnect)
class Multiplexer は,モードによって振る舞いを変えるメソッドを持つ。
class Counter は,カウンタです。
class Event は,イベントの内容です。
def generate_random_events は,イベントを起こします。

◆◆State パターンの実装◆◆

【multiplexer1.py】

State パターンは,昼と夜,晴と雨などモード(状態)によって,オブジェクト(同名メソッド)が振る舞いを変えるようにするものである。

サンプルでは,class Multiplexer が振る舞いを変える。multiplexer とは多重送信機と言う意味があるらしい。ここでは,イベントを受信してカウンタに送信している。そして,その振る舞いはモード(状態)により変化している。

class Multiplexer は,ACTIVE, DORMANTの2つのモード(状態)を持っていて,
  def connect
  def disconnect
  def send
の3つのメソッドを持ち,その振る舞いを,
  if self.state == Multiplexer.ACTIVE:
により,変えている。State パターンとしては,ごく単純な形式である。

応用として,2つのモード(状態)により,イベントの扱いがガラリと変えるところの仕組みがポイントであろうが,サンプルは,その仕組みが巧みである。

サンプルでは,
  cars
  vans
  trucks
というイベントがランダムに起きて,そのイベントを Multiplexer に送信して,Multiplexer ではそのイベントを数えるという作業を行なう。

そのあたりのプログラムがたいへん巧みにできている。

メインルーチンの31行目,38行目,45行目の print の出力を見てみる。モード(状態)が DORMANT のときは,イベントを受け付けていないことが判る。

【サンプル出力】

After 100 active events:  cars=150 vans=42 trucks=14 total=206
After 100 dormant events: cars=150 vans=42 trucks=14 total=206
After 100 active events:  cars=303 vans=83 trucks=30 total=416

プログラムを詳細に見ていこう。

メインルーチンの最初 19行目から Counter をインスタンス化する。引数が,0個,1個,2個の例があるが,class Counter のコンストラクタは,可変長引数を持ちすべてに対応する(59行目)。

print の変数は,
  carCounter.cars
  commercialCounter.vans
  commercialCounter.trucks
  totalCounter.count
の4つである。ポチの前はCounterインスタンスである(Counter object と書かれることが多い)。ポチの後はインスタンス変数である。引数0個のときは,
  self.count
というインスタンス変数を持つ。引数が1個以上のときは,setattr により,
  self.name(self.cars, self.vans, self.trucks) というインスタンス変数を持つ。いずれも初期値は0である。

ここでのポイントは,イベントによりインスタンス変数が違うというところである。普通は,インスタンス変数が同じ名称でインスタンスにより識別するところである。

ややトリッキーなインスタンスとインスタンス変数の組合せであるが,これを記録しておく必要がある。それが,メインルーチンの24行目である。イベント名称とコールバック関数を対で,92行目で作った
  self.callbacksForEvent
に格納する。これの形式は,collections.defaultdict(list)である。これは,キーの存在を気にすることなく,キーとバリューを追加していける辞書(dict)である。この内容は,3つのイベントの個々と合計の6組である。

ここで,callback というのは,Counterインスタンスのことであるが,引数としてコーラブル(呼び出し可能)ということらしい。確かに Multiplexer のメソッドの引数に使われている。

メインルーチンの29行目からは,イベントを起こし,Multiplexer に送信することがわずか2行で書かれている。36行目も43行目も同じである。直前で,
  multiplexer.state
を変更している。Multiplexer をインスタンス化したときはデフォルト(コンストラクタ)で ACTIVE になっている。
  def generate_random_events
は,イベントの
  cars, vans, trucks
を,
  11 : 3 : 1
の確立で引数のループだけ起こすようになっている。ただし,1回のループでイベントを1回から3回まで起こす。

メインルーチンで,
  for event in generate_random_events(100):(29, 36, 43行目)
として呼ばれた
  def generate_random_events(count):(50行目)
は,100回のループごとに,event に Eventインスタンスを渡すために,戻りにyieldを使う。

そして,そのループごとに,そのイベントを Multiplexer に送信する(30, 37, 44行目)。そして,そのイベントから
  callbacksForEvent
の中から callback を取り出し,
  callback(event)
として呼ぶのである(109行目)。インスタンスを()付きで呼ぶと,特殊メソッド
  __call__
が呼び出される(70行目)。ここで,カウンタが加算されていくのだが,Eventインスタンスからイベント名称が判り,それがカウンタの名称でもあるので,getattr, setattr によりカウンタが動作する。

カウンタのフルネームは,前述したが,print される変数に書いてある。

【multiplexer2.py】

multiplexer2.py が multiplexer1.py と違う点は,
multiplexer1.py では,class Multiplexer に次の3つのメソッド
  def connect
  def disconnect
  def send
があるが,multiplexer2.py では,上の3つのメソッドは def 定義ではなく,101行目の
  def state
の中で定義されていることである。このメソッドは,同名メソッド2つあり,デコレータによって,gettter, setter に分かれている。

上の3つのメソッドのACTIVEモード時の動作の内容は,3つのローカルメソッドに書かれている。DORMANTモード時の動作は,
  def state
の中で,lambda で書かれている。

multiplexer1.py では,3つのメソッドの中の if 文で,モード切替がされていたが,
multiplexer2.py では,3種類のメソッドは,モードごとの同名メソッドを持っている。

こちらのモード切替は,state の setter によってなされる。こちらの方が State パターンらしいと言える。

【multiplexer3.py】

multiplexer3.py が multiplexer1.py と違う点は,
multiplexer1.py では,class Multiplexer に次の3つのメソッド
  def connect
  def disconnect
  def send
があるが,multiplexer3.py では,def send が無く,98行目の
  def pipeline
が追加されている。これは,コルーチンになっていて,それのデコレータは,モジュール
  Qtrac.py
の中で定義されている。

multiplexer1.py の
  multiplexer.send(event)
の代わりに,multiplexer3.py では,
  pipeline.send(event)(32, 39, 46行目)
が使われている。

その1行前の for in ループには,52行目のジェネレータ
  def generate_random_events
がある。ジェネレータは,yield のところで一時停止していて,for in ループにより,yield の値が pull される。

一方,97行目のコルーチンは,無限ループの中に yield があり,ここで一時停止していて,コルーチンの
  pipeline.send(event)(32, 39, 46行目)
によって,yield に値が push される。

このコルーチンの動作は,multiplexer1.py の
  def send
と同じである。

◆◆ソースコード◆◆

このWebページの左上隅からダウンロードできます。

ソースファイルは4つです。
・multiplexer1.py;State パターンのPythonサンプル(その1)
・multiplexer2.py;State パターンのPythonサンプル(その2)
・multiplexer3.py;State パターンのPythonサンプル(その3)
・Qtrac.py;multiplexer3.pyがインポートする

【multiplexer1.py】
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
#!/usr/bin/env python3
# Copyright c 2012-13 Qtrac Ltd. All rights reserved.
# This program or module is free software: you can redistribute it and/or
# modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version. It is provided for educational
# purposes and is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.

import collections
import random

random.seed(917) # Not truly random for ease of regression testing


def main():
    totalCounter = Counter()
    carCounter = Counter("cars")
    commercialCounter = Counter("vans", "trucks")

    multiplexer = Multiplexer()
    for eventName, callback in (("cars", carCounter),
            ("vans", commercialCounter), ("trucks", commercialCounter)):
        multiplexer.connect(eventName, callback)
        multiplexer.connect(eventName, totalCounter)

    for event in generate_random_events(100):
        multiplexer.send(event)
    print("After 100 active events:  cars={} vans={} trucks={} total={}"
            .format(carCounter.cars, commercialCounter.vans,
                    commercialCounter.trucks, totalCounter.count))

    multiplexer.state = Multiplexer.DORMANT
    for event in generate_random_events(100):
        multiplexer.send(event)
    print("After 100 dormant events: cars={} vans={} trucks={} total={}"
            .format(carCounter.cars, commercialCounter.vans,
                    commercialCounter.trucks, totalCounter.count))

    multiplexer.state = Multiplexer.ACTIVE
    for event in generate_random_events(100):
        multiplexer.send(event)
    print("After 100 active events:  cars={} vans={} trucks={} total={}"
            .format(carCounter.cars, commercialCounter.vans,
                    commercialCounter.trucks, totalCounter.count))


def generate_random_events(count):
    vehicles = (("cars",) * 11) + (("vans",) * 3) + ("trucks",)
    for _ in range(count):
        yield Event(random.choice(vehicles), random.randint(1, 3))



class Counter:

    def __init__(self, *names):
        self.anonymous = not bool(names)
        if self.anonymous:
            self.count = 0
        else:
            for name in names:
                if not name.isidentifier():
                    raise ValueError("names must be valid identifiers")
                setattr(self, name, 0)


    def __call__(self, event):
        if self.anonymous:
            self.count += event.count
        else:
            count = getattr(self, event.name)
            setattr(self, event.name, count + event.count)


class Event:

    def __init__(self, name, count=1):
        if not name.isidentifier():
            raise ValueError("names must be valid identifiers")
        self.name = name
        self.count = count


class Multiplexer:

    ACTIVE, DORMANT = ("ACTIVE", "DORMANT")

    def __init__(self):
        self.callbacksForEvent = collections.defaultdict(list)
        self.state = Multiplexer.ACTIVE


    def connect(self, eventName, callback):
        if self.state == Multiplexer.ACTIVE:
            self.callbacksForEvent[eventName].append(callback)


    def disconnect(self, eventName, callback=None):
        if self.state == Multiplexer.ACTIVE:
            if callback is None:
                del self.callbacksForEvent[eventName]
            else:
                self.callbacksForEvent[eventName].remove(callback)


    def send(self, event):
        if self.state == Multiplexer.ACTIVE:
            for callback in self.callbacksForEvent.get(event.name, ()):
                callback(event)


if __name__ == "__main__":
    main()

【multiplexer2.py】
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
#!/usr/bin/env python3
# Copyright c 2012-13 Qtrac Ltd. All rights reserved.
# This program or module is free software: you can redistribute it and/or
# modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version. It is provided for educational
# purposes and is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.

import collections
import random

random.seed(917) # Not truly random for ease of regression testing


def main():
    totalCounter = Counter()
    carCounter = Counter("cars")
    commercialCounter = Counter("trucks", "vans")

    multiplexer = Multiplexer()
    for eventName, callback in (("cars", carCounter),
            ("vans", commercialCounter), ("trucks", commercialCounter)):
        multiplexer.connect(eventName, callback)
        multiplexer.connect(eventName, totalCounter)

    for event in generate_random_events(100):
        multiplexer.send(event)
    print("After 100 active events:  cars={} vans={} trucks={} total={}"
            .format(carCounter.cars, commercialCounter.vans,
                    commercialCounter.trucks, totalCounter.count))

    multiplexer.state = Multiplexer.DORMANT
    for event in generate_random_events(100):
        multiplexer.send(event)
    print("After 100 dormant events: cars={} vans={} trucks={} total={}"
            .format(carCounter.cars, commercialCounter.vans,
                    commercialCounter.trucks, totalCounter.count))

    multiplexer.state = Multiplexer.ACTIVE
    for event in generate_random_events(100):
        multiplexer.send(event)
    print("After 100 active events:  cars={} vans={} trucks={} total={}"
            .format(carCounter.cars, commercialCounter.vans,
                    commercialCounter.trucks, totalCounter.count))


def generate_random_events(count):
    vehicles = (("cars",) * 11) + (("vans",) * 3) + ("trucks",)
    for _ in range(count):
        yield Event(random.choice(vehicles), random.randint(1, 3))


class Counter:

    def __init__(self, *names):
        self.anonymous = not bool(names)
        if self.anonymous:
            self.count = 0
        else:
            for name in names:
                if not name.isidentifier():
                    raise ValueError("names must be valid identifiers")
                setattr(self, name, 0)


    def __call__(self, event):
        if self.anonymous:
            self.count += event.count
        else:
            count = getattr(self, event.name)
            setattr(self, event.name, count + event.count)


class Event:

    def __init__(self, name, count=1):
        if not name.isidentifier():
            raise ValueError("names must be valid identifiers")
        self.name = name
        self.count = count


class Multiplexer:

    ACTIVE, DORMANT = ("ACTIVE", "DORMANT")

    def __init__(self):
        self.callbacksForEvent = collections.defaultdict(list)
        self.state = Multiplexer.ACTIVE


    @property
    def state(self):
        return (Multiplexer.ACTIVE if self.send == self.__active_send
                else Multiplexer.DORMANT)


    @state.setter
    def state(self, state):
        if state == Multiplexer.ACTIVE:
            self.connect = self.__active_connect
            self.disconnect = self.__active_disconnect
            self.send = self.__active_send
        else:
            self.connect = lambda *args: None
            self.disconnect = lambda *args: None
            self.send = lambda *args: None


    def __active_connect(self, eventName, callback):
        self.callbacksForEvent[eventName].append(callback)


    def __active_disconnect(self, eventName, callback=None):
        if callback is None:
            del self.callbacksForEvent[eventName]
        else:
            self.callbacksForEvent[eventName].remove(callback)


    def __active_send(self, event):
        for callback in self.callbacksForEvent.get(event.name, ()):
            callback(event)


if __name__ == "__main__":
    main()

【multiplexer3.py】
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
#!/usr/bin/env python3
# Copyright c 2012-13 Qtrac Ltd. All rights reserved.
# This program or module is free software: you can redistribute it and/or
# modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version. It is provided for educational
# purposes and is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.

import collections
import random
from Qtrac import coroutine

random.seed(917) # Not truly random for ease of regression testing


def main():
    totalCounter = Counter()
    carCounter = Counter("cars")
    commercialCounter = Counter("trucks", "vans")

    multiplexer = Multiplexer()
    pipeline = multiplexer.pipeline()
    for eventName, callback in (("cars", carCounter),
            ("vans", commercialCounter), ("trucks", commercialCounter)):
        multiplexer.connect(eventName, callback)
        multiplexer.connect(eventName, totalCounter)

    for event in generate_random_events(100):
        pipeline.send(event)
    print("After 100 active events:  cars={} vans={} trucks={} total={}"
            .format(carCounter.cars, commercialCounter.vans,
                    commercialCounter.trucks, totalCounter.count))

    multiplexer.state = Multiplexer.DORMANT
    for event in generate_random_events(100):
        pipeline.send(event)
    print("After 100 dormant events: cars={} vans={} trucks={} total={}"
            .format(carCounter.cars, commercialCounter.vans,
                    commercialCounter.trucks, totalCounter.count))

    multiplexer.state = Multiplexer.ACTIVE
    for event in generate_random_events(100):
        pipeline.send(event)
    print("After 100 active events:  cars={} vans={} trucks={} total={}"
            .format(carCounter.cars, commercialCounter.vans,
                    commercialCounter.trucks, totalCounter.count))


def generate_random_events(count):
    vehicles = (("cars",) * 11) + (("vans",) * 3) + ("trucks",)
    for _ in range(count):
        yield Event(random.choice(vehicles), random.randint(1, 3))


class Counter:

    def __init__(self, *names):
        self.anonymous = not bool(names)
        if self.anonymous:
            self.count = 0
        else:
            for name in names:
                if not name.isidentifier():
                    raise ValueError("names must be valid identifiers")
                setattr(self, name, 0)


    def __call__(self, event):
        if self.anonymous:
            self.count += event.count
        else:
            count = getattr(self, event.name)
            setattr(self, event.name, count + event.count)


class Event:

    def __init__(self, name, count=1):
        if not name.isidentifier():
            raise ValueError("names must be valid identifiers")
        self.name = name
        self.count = count


class Multiplexer:

    ACTIVE, DORMANT = ("ACTIVE", "DORMANT")

    def __init__(self):
        self.callbacksForEvent = collections.defaultdict(list)
        self.state = Multiplexer.ACTIVE


    @coroutine
    def pipeline(self):
        while True:
            event = (yield)
            if self.state == Multiplexer.ACTIVE:
                for callback in self.callbacksForEvent.get(event.name, ()):
                    callback(event)


    def connect(self, eventName, callback):
        if self.state == Multiplexer.ACTIVE:
            self.callbacksForEvent[eventName].append(callback)


    def disconnect(self, eventName, callback=None):
        if self.state == Multiplexer.ACTIVE:
            if callback is None:
                del self.callbacksForEvent[eventName]
            else:
                self.callbacksForEvent[eventName].remove(callback)


if __name__ == "__main__":
    main()

【Qtrac.py】
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#!/usr/bin/env python3
# Copyright c 2012-13 Qtrac Ltd. All rights reserved.
# This program or module is free software: you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version. It is provided for
# educational purposes and is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.

import abc
import collections
import errno
import functools
import os
import sys


def coroutine(function):
    @functools.wraps(function)
    def wrapper(*args, **kwargs):
        generator = function(*args, **kwargs)
        next(generator)
        return generator
    return wrapper

以上

トップページに戻る
inserted by FC2 system