アプリとサービスのすすめ

アプリやIT系のサービスを中心に書いていきます。たまに副業やビジネス関係の情報なども気ままにつづります

強化学習で「手押し井戸ポンプ」で水をくむ動作をArduino Unoに学習させる【AI・Hardware】

「手押し井戸ポンプ」は、昔から田舎で使われてる「井戸から手動で水をくむポンプ」のこと(Amazonでも売ってる)。

「手押し井戸ポンプ」の水をくむまでの大まかな動作は

押す(down)=>水をくむ(push pump)=>あげる(up)

の行動パターンを、シーソーのごとく繰り返すことで、井戸から水をくめる。

手押し井戸ポンプの動作原理f:id:trafalbad:20200620111544g:plain



今回はArduinoを使って、実際に近い状況を再現(シミュレーション)して、強化学習で水をくむ動作を学習させた。

深層学習の強化学習アルゴリズムはCartPoleでお馴染みの「DQN」を使う。特にkeras-rlとか強化学習ライブラリを使わずに、普通にDQNの学習コード書いた。

その要点をまとめて行こうと思う。

目次
1.Arduinoで「手押し井戸ポンプ」の状況の再現
2.Arduinoの構成
3.DQNで学習
4.学習結果
5.最後に




1.Arduinoで「手押し井戸ポンプ」の状況の再現


まず、Arduinoで実際の「手押し井戸ポンプ」の状況を実際に再現してシミュレーションした。

はじめにwebカメラでデフォルトの画像を読み込ませる。

それをニューラルネットワーク(NN)に読み込ませて、DQNの学習システムに沿って水をくむ行動パターンを学習させていく。

1.webカメラで画像を読み込み、NNに入れる

2.PC側でNNから行動を出力して、Arduinoに送る

3.Arduinoで行動を読み取り動作させる。ポンプで水をくめたら、PCに「1」の信号送る

4.Arduinoから「1」を受け取りPC側で報酬(Reward)として受け取る


f:id:trafalbad:20200620111631j:plain

#include <Servo.h>

Servo mServo;

int state[2];
int pin_number = 10;

void setup() {
  mServo.attach(9);
  mServo.write(10);
  delay(500);
  Serial.begin(9600);
  pinMode(pin_number, OUTPUT);
  state[0] = 0;
  state[1] = 1;
  digitalWrite(pin_number, LOW);
}

void loop() {
  if (Serial.available() > 0) {
    char c = Serial.read();
    if (c == 'p') {
      if (state[0] == 0) {
        Serial.print("0");
        mServo.write(90);
        delay(500);
        state[0] = 1;
      }
      else {
        Serial.print("0");
        mServo.write(10);
        delay(500);
        state[0] = 0;
        state[1] = 1;
        digitalWrite(10, LOW);
      }
    }
    else if (c == 'i') {
      if (state[0] == 1 && state[1] == 1) {
        Serial.print("1");
        delay(500);
        digitalWrite(10, HIGH);
        state[1] = 0;
      }
      else {
        Serial.print("0");
      }
    }
    else if (c == 'c') {
      state[0] = 0;
      state[1] = 1;
      digitalWrite(10, LOW);
      mServo.write(10);
      delay(500);
    }
  }
}



Arduinoとリアルの「手押し井戸ポンプ」の行動の対応関係


「手押し井戸ポンプ」から水をくむのに必要な行動パターンは3つあって、順番に行動する(行動パターンを覚える)必要がある。



水をくむまで行動パターン

Action1(a1) : 押す(down)
Action2(a2) : 水をくむ(Push pump)
Action(a3) : あげる(up)



リアルの「手押し井戸ポンプ」の行動パターンf:id:trafalbad:20200620112256j:plain



Arduinoのシミュレーションの行動パターンf:id:trafalbad:20200620111714j:plain







2.Arduinoの構成

Arduino(Uno)で用意したのは付属パーツは

サーボモータ

・USBカメラ(Mac用)

・抵抗

・LED



Arduino回路図f:id:trafalbad:20200620111744j:plain




実際にUSBカメラで読み込むArduinoの映像

この投稿をInstagramで見る

#arduino for development

Tatsuya Hagiwara(@gosei_creater)がシェアした投稿 -








3.DQNで学習

アルゴリズムはDeep-QNetwork(DQN)。
DQNはモデルフリーなので報酬とか戦略の設定が大事だと思う。


このDQNのメカニズムを使ってNNに「手押し井戸ポンプ」から水を汲む行動パターンを覚えさせる。



DQNでの学習サイクル(Episode cycle)
f:id:trafalbad:20200620111911j:plain








Model



DQNではlossにHuber lossを使うのが一般的らしい。「Squeeze-and-Excitation Networks」を使って少し凝ったCNNを作った。

ef QFunction(inputs_):
    o = Conv2D(32, (3, 3), padding='same', kernel_initializer='random_uniform')(inputs_)
    o = channel_spatial_squeeze_excite(o)
    o = MaxPooling2D((2, 2))(o)
    o = Conv2D(64, (3, 3), padding='same', kernel_initializer='random_uniform')(o)
    o = channel_spatial_squeeze_excite(o)
    o = MaxPooling2D((2, 2))(o)
    o = Conv2D(64, (3, 3), padding='same', kernel_initializer='random_uniform')(o)
    o = channel_spatial_squeeze_excite(o)
    o = Flatten()(o)
    o = Dense(2, activation='linear')(o)
    model = Model(inputs=inputs_, outputs=o)
    model.compile(optimizer=Adam(lr=0.001), loss=huber_loss_mean, metrics=['acc'])
    return model

USBカメラから画像を読み込む部分。

cap = cv2.VideoCapture(0)

def capture(ndim=3):
    ret, frame = cap.read()
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    xp = int(frame.shape[1]/2)
    yp = int(frame.shape[0]/2)
    d = 400
    resize = 128
    cv2.rectangle(gray, (xp-d, yp-d), (xp+d, yp+d), color=0, thickness=10)
    cv2.imshow('gray', gray)
    gray = cv2.resize(gray[yp-d:yp + d, xp-d:xp + d],(resize, resize))
    env = np.asarray(gray, dtype=np.float32)
    if ndim == 3:
        return env[np.newaxis, :, :] 
    else:
        return env[np.newaxis, np.newaxis, :, :] 

camera_image = capture(ndim=3)
cap.release()



Environment


1回のepisodeでStepを30回回して、水汲みに8回成功したらsuccessに+1する。それを全episode中に5回(success=5)繰り返せたら学習終了。

NUM_EPISODES = 50 # エピソード数
GAMMA = 1.0 

# 探索パラメータ
E_START = 1.0 # εの初期値
E_STOP = 0.01 # εの最終値
E_DECAY_RATE = 0.001 # εの減衰率

SUCCESS_REWARD = 8

# メモリパラメータ
MEMORY_SIZE = 10000 # 経験メモリのサイズ
MAX_STEPS = 30 # 最大ステップ数
BATCH_SIZE = MAX_STEPS

H = 128
W = 128
C = 1
inputs = Input([H, W, C])

class Environment:
    def __init__(self, inputs):
        # main-networkの作成
        self.main_qn = QFunction(inputs)

        # target-networkの作成
        self.target_qn = QFunction(inputs)
        # 経験メモリの作成
        self.memory = Memory(MEMORY_SIZE)

 # 行動価値関数
    def action_value_function(self, action_b, step_reward_b):
        target = 0
        if action_b == 1:
            target = GAMMA + step_reward_b/MAX_STEPS
        else:
            target = GAMMA + step_reward_b/MAX_STEPS
        return target

    def default(self):
       # Memoryの初期化
        self.memory = Memory(MEMORY_SIZE)

    def run(self):
        # エピソード数分のエピソードを繰り返す
        total_step = 0 # 総ステップ数
        success_count = 0 # 成功数
        for episode in range(1, NUM_EPISODES+1):
            step = 0 # ステップ数
            R = 0
            batch_size = 0
            ser.write(b"c") # Arduino側を初期状態に戻す

            # target-networkの更新
            self.target_qn.set_weights(self.main_qn.get_weights())
            
            # 1エピソードのループ
            for _ in range(1, MAX_STEPS+1):
                step += 1
                total_step += 1
                camera_state = capture(ndim=3)
                camera_state = camera_state.reshape(1, H, W, C)
                # εを減らす
                epsilon = E_STOP + (E_START - E_STOP)*np.exp(-E_DECAY_RATE*total_step)
          
                # ランダムな行動を選択
                if epsilon > np.random.rand():
                    action = int(np.random.randint(0, 2, 1))
                # 行動価値関数で行動を選択
                else:
                    action = np.argmax(self.main_qn.predict(camera_state))
                    pred = self.main_qn.predict(camera_state)
   
                # 行動に応じて状態と報酬を得る
                reward = action_step(action)
                R += reward
                self.memory.add((camera_state, action, reward, R)) 
                print('step', step, 'action', action, "R", R, 'reward', reward)
            if R >=SUCCESS_REWARD:
                success_count += 1
                
            # ニューラルネットワークの入力と出力の準備
            inputs = np.zeros((BATCH_SIZE, H, W, C)) # 入力(状態)
            targets = np.zeros((BATCH_SIZE, 2)) # 出力(行動ごとの価値)
            # バッチサイズ分の経験を取得
            minibatch = self.memory.sample(BATCH_SIZE)
            
            # ニューラルネットワークの入力と出力の生成
            for i, (state_b, action_b, reward_b, step_reward_b) in enumerate(minibatch):
                
                # 入力に状態を指定
                inputs[i] = state_b
                
                # 採った行動の価値を計算
                target = self.action_value_function(action_b, step_reward_b)
                # 出力に行動ごとの価値を指定
                targets[i] = self.main_qn.predict(state_b)
                targets[i][action_b] = target # 行動の価値

            # 行動価値関数の更新
            print('training....')
            self.main_qn.fit(inputs, targets, epochs=30, verbose=0)
            
            # エピソード完了時のログ表示
            print('エピソード: {}, ステップ数: {}, epsilon: {:.4f}'.format(episode, step, epsilon))
            self.default()
            # 5回成功で学習終了
            if success_count >= 5:
                break

WEbカメラから画像をNNに入れる => 行動価値関数を出力 => 行動に変換 => 報酬get


の簡単な流れを図にしてみた。
f:id:trafalbad:20200620111700j:plain




4.学習結果


epochの1~50までのパターン行動の成功回数の推移
f:id:trafalbad:20200620112004p:plain

順調にパターンを学習して、水をくむ回数が増えていってるのがわかる。


行動パターンを覚えさせるために、行動価値関数を次のように設計した。

# 行動価値関数
 def action_value_function(action_b, step_reward_b):
        target = 0
        if action_b == 1:
            target = GAMMA + step_reward_b/MAX_STEPS
        else:
            target = GAMMA + step_reward_b/MAX_STEPS
        return target

# 行動を返す関数
def action_step(actions):
    r = 0
    if actions==0:
        ser.write(b"p")
    else:
        ser.write(b"i")
    time.sleep(1.0)
    r = ser.read() 
    return int(r)

この設定だとepisodeが進むにつれて、それぞれの行動(0と1)の行動価値が最後は均衡してくる。

NNからoutputされる行動価値の推移f:id:trafalbad:20200620112018j:plain

もっと上手い設定方法はあるけど、とりあえずうまく学習できてたので自分としては上出来。



5.最後に

50回目までにStep1回で水汲み16回以上を達成できるようになって、無事クリア。DQN

・報酬の設定

・行動価値をどう決めるか 

・戦略

・経験の学習



とかが一番学習結果に影響した。逆にネットワークは別にそんな複雑なものである必要はなかった。

現に今回は白黒画像でやってもこれだけの成果が出たし。強化学習は戦略とか「どう学ばせるか」が肝だと思う。


ハードウェアで強化学習ができるのはいろいろと厨二病的な愉悦。

f:id:trafalbad:20200620112502j:plain