NeuroWhAI의 잡블로그

[TensorFlow] DQN으로 게임 플레이 학습하기 본문

개발 및 공부/라이브러리&프레임워크

[TensorFlow] DQN으로 게임 플레이 학습하기

NeuroWhAI 2018. 2. 24. 18:28


※ 이 글은 '골빈해커의 3분 딥러닝 텐서플로맛'이라는 책을 보고 실습한걸 기록한 글입니다.


드디어 마지막 챕터네요.
이건 이해하느라 좀 힘들었습니다 ㅠㅠ
심지어 코드도 길어서 타이핑하느라 손가락 부러지는 줄.


agent.py: model과 game을 import해서 최종 로직을 작성합니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
#-*- coding: utf-8 -*-
 
import sys
import tensorflow as tf
import numpy as np
import random
import time
 
from game import Game
from model import DQN
 
tf.app.flags.DEFINE_boolean("train", False,
    "학습모드. 게임을 화면에 보여주지 않습니다.")
FLAGS = tf.app.flags.FLAGS
 
MAX_EPISODE = 10000
TARGET_UPDATE_INTERVAL = 1000
TRAIN_INTERVAL = 4
OBSERVE = 100
 
NUM_ACTION = 3
SCREEN_WIDTH = 6
SCREEN_HEIGHT = 10
 
def train():
    print('Loading...')
 
    with tf.Session() as sess:
        game = Game(SCREEN_WIDTH, SCREEN_HEIGHT, show_game=False)
        brain = DQN(sess, SCREEN_WIDTH, SCREEN_HEIGHT, NUM_ACTION)
 
 
        rewards = tf.placeholder(tf.float32, [None])
        tf.summary.scalar('avg.reward/ep', tf.reduce_mean(rewards))
 
        saver = tf.train.Saver()
        sess.run(tf.global_variables_initializer())
 
        writer = tf.summary.FileWriter('logs', sess.graph)
        summary_merged = tf.summary.merge_all()
 
 
        brain.update_target_network()
 
 
        epsilon = 1.0
        time_step = 0
        total_reward_list = []
 
        
        for episode in range(MAX_EPISODE):
            terminal = False
            total_reward = 0
 
            state = game.reset()
            brain.init_state(state)
 
 
            while not terminal:
                if np.random.rand() < epsilon:
                    action = random.randrange(NUM_ACTION)
                else:
                    action = brain.get_action()
 
                if episode > OBSERVE:
                    epsilon -= 1 / 1000
 
                state, reward, terminal = game.step(action)
                total_reward += reward
 
 
                brain.remember(state, action, reward, terminal)
 
 
                if time_step > OBSERVE and time_step % TRAIN_INTERVAL == 0:
                    brain.train()
 
                if time_step % TARGET_UPDATE_INTERVAL == 0:
                    brain.update_target_network()
 
                time_step += 1
 
            
            print('게임횟수: %d 점수: %d' % (episode + 1, total_reward))
 
            total_reward_list.append(total_reward)
 
 
            if episode % 10 == 0:
                summary = sess.run(summary_merged,
                    feed_dict={rewards: total_reward_list})
                writer.add_summary(summary, time_step)
                total_reward_list = []
 
            if episode % 100 == 0:
                saver.save(sess, 'model/dqn.ckpt', global_step=time_step)
 
def replay():
    print('Loading...')
 
    with tf.Session() as sess:
        game = Game(SCREEN_WIDTH, SCREEN_HEIGHT, show_game=True)
        brain = DQN(sess, SCREEN_WIDTH, SCREEN_HEIGHT, NUM_ACTION)
 
 
        saver = tf.train.Saver()
        ckpt = tf.train.get_checkpoint_state('model')
        saver.restore(sess, ckpt.model_checkpoint_path)
 
 
        for episode in range(MAX_EPISODE):
            terminal = False
            total_reward = 0
 
            state = game.reset()
            brain.init_state(state)
 
 
            while not terminal:
                action = brain.get_action()
 
                state, reward, terminal = game.step(action)
                total_reward += reward
 
                brain.remember(state, action, reward, terminal)
 
                time.sleep(0.3)
 
 
            print('게임횟수: %d 점수: %d' % (episode + 1, total_reward))
 
def main(_):
    if FLAGS.train:
        train()
    else:
        replay()
 
if __name__ == '__main__':
    tf.app.run()
cs

model.py: DQN을 구현합니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
#-*- coding: utf-8 -*-
 
import tensorflow as tf
import numpy as np
import random
from collections import deque
 
class DQN:
    REPLAY_MEMORY = 10000
    BATCH_SIZE = 32
    GAMMA = 0.99
    STATE_LEN = 4
 
    def __init__(self, sess, width, height, n_action):
        self.session = sess
        self.n_action = n_action
        self.width = width
        self.height = height
        self.memory = deque()
        self.state = None
 
        self.input_X = tf.placeholder(tf.float32,
            [None, width, height, self.STATE_LEN])
        self.input_A = tf.placeholder(tf.int64, [None])
        self.input_Y = tf.placeholder(tf.float32, [None])
 
        self.Q = self._build_network('main')
        self.cost, self.train_op = self._build_op()
 
        self.target_Q = self._build_network('target')
 
    def _build_network(self, name):
        with tf.variable_scope(name):
            model = tf.layers.conv2d(self.input_X, 32, [44], padding='same',
                activation=tf.nn.relu)
            model = tf.layers.conv2d(model, 64, [22], padding='same',
                activation=tf.nn.relu)
            model = tf.contrib.layers.flatten(model)
            model = tf.layers.dense(model, 512, activation=tf.nn.relu)
 
            Q = tf.layers.dense(model, self.n_action, activation=None)
 
        return Q
 
    def _build_op(self):
        one_hot = tf.one_hot(self.input_A, self.n_action, 1.00.0)
        Q_value = tf.reduce_sum(tf.multiply(self.Q, one_hot), axis=1)
        cost = tf.reduce_mean(tf.square(self.input_Y - Q_value))
        train_op = tf.train.AdamOptimizer(1e-6).minimize(cost)
 
        return cost, train_op
 
    def update_target_network(self):
        copy_op = []
 
        main_vars = tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES,
            scope='main')
        target_vars = tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES,
            scope='target')
 
        for main_var, target_var in zip(main_vars, target_vars):
            copy_op.append(target_var.assign(main_var.value()))
 
        self.session.run(copy_op)
 
    def get_action(self):
        Q_value = self.session.run(self.Q,
            feed_dict={self.input_X: [self.state]})
 
        action = np.argmax(Q_value[0])
 
        return action
 
    def train(self):
        state, next_state, action, reward, terminal = self._sample_memory()
 
        target_Q_value = self.session.run(self.target_Q,
            feed_dict={self.input_X: next_state})
 
        Y = []
        for i in range(self.BATCH_SIZE):
            if terminal[i]:
                Y.append(reward[i])
            else:
                Y.append(reward[i] + self.GAMMA * np.max(target_Q_value[i]))
 
        self.session.run(self.train_op,
            feed_dict={
                self.input_X: state,
                self.input_A: action,
                self.input_Y: Y
            })
 
    def init_state(self, state):
        state = [state for _ in range(self.STATE_LEN)]
        self.state = np.stack(state, axis=2)
 
    def remember(self, state, action, reward, terminal):
        next_state = np.reshape(state, (self.width, self.height, 1))
        next_state = np.append(self.state[:, :, 1:], next_state, axis=2)
 
        self.memory.append((self.state, next_state, action, reward, terminal))
 
        if len(self.memory) > self.REPLAY_MEMORY:
            self.memory.popleft()
 
        self.state = next_state
 
    def _sample_memory(self):
        sample_memory = random.sample(self.memory, self.BATCH_SIZE)
 
        state = [memory[0for memory in sample_memory]
        next_state = [memory[1for memory in sample_memory]
        action = [memory[2for memory in sample_memory]
        reward = [memory[3for memory in sample_memory]
        terminal = [memory[4for memory in sample_memory]
 
        return state, next_state, action, reward, terminal
cs

game.py: 게임을 구현합니다.
골빈해커님의 깃헙 레포에서 다운로드 받으시면 됩니다.

결과:
(생략)
게임횟수: 2409 점수: 36
게임횟수: 2410 점수: 372
게임횟수: 2411 점수: 59
(생략)

(학습이 덜 끝난 상태에서 캡쳐했지만 왔다갔다 하면서 갈수록 점수가 올라가는게 보이시죠?)

(초록색이 플레이어고 검은색은 도로, 나머지는 장애물입니다)


코드 설명을 적어야 하는데... 음... 너무 많네요 ㅠㅠ

먼저 좀 고수준(?)의 시각에서 그냥 학습과 재현 순서를 대략적으로 적어볼까 합니다.
agent.py에 있는 내용이라고 보시면 됩니다.

일단 DQN은 상태에 따른 행동을 얻기 위한 기본 신경망과
상태에 따른 행동의 가치(혹은 상태의 가치?)를 얻기 위한 목표 신경망이 있으며
두 신경망은 똑같은 구조를 가집니다.
어떻게 똑같은 신경망으로 다른 역할이 되는지 아직도 좀 헷갈리긴 합니다만 데이터를 대입해보면서 따라가보니 되긴 하더군요.

학습(Train):
  1. 게임, DQN 생성
  2. 목표 신경망을 초기 기본 신경망의 데이터로 갱신
  3. 게임 초기화
  4. 게임의 초기 상태를 DQN에 저장
  5. 처음엔 높은 확률로 랜덤한 행동을 얻지만 갈수록 DQN에서 계산한 행동을 얻음.
  6. 얻은 행동을 게임에 적용해서 변한 상태, 보상 등을 얻음.
  7. 상태, 행동, 다음 상태, 보상 데이터를 DQN 학습용 메모리에 추가하고 DQN에 저장된 현재 상태 갱신
  8. 일정한 주기(4번)마다 학습용 메모리의 데이터로 기본 신경망 학습.
  9. 일정한 주기(1000번)마다 목표 신경망을 기본 신경망으로 갱신.
  10. 5~9 반복하다가 게임이 종료되면 3으로 돌아감

재현(Replay):
  1. 게임, DQN 생성
  2. 저장된 모델로 초기화
  3. 게임 초기화
  4. 게임의 초기 상태를 DQN에 저장
  5. DQN에서 현재 상태에 대한 행동을 얻어옴.
  6. 얻은 행동을 게임에 적용해서 변한 상태, 보상 등을 얻음.
  7. 상태, 행동, 다음 상태, 보상 데이터를 DQN 학습용 메모리에 추가하고 DQN에 저장된 현재 상태 갱신
  8. 5~7 반복하다가 게임이 종료되면 3으로 돌아감

agent.py 내용은 DQN이 대충 어떻게 동작하는지만 알면 그렇게 어렵지 않습니다.
문제는 model.py 내용인데 아직도 명확하게 이해를 못했습니다.
책에서 소개해준 강화학습, DQN 영상 강의가 있던데 나중에 봐야겠습니다.

일단 DQN은 클래스고 정의된 상수 등에 대한 설명입니다.
Replay Memory : 위에서 말한 학습용 메모리의 크기 제한 입니다.
Batch Size : 학습할때마다 학습용 메모리에서 몇개의 데이터를 선택할지.
Gamma : input_Y를 계산할때 보상 + Gamma * max(목표 신경망 출력) 이렇게 할때 씁니다.
 다음 상태에 대한 가치도 비용에 포함하기 위해서 저렇게 하는데 Gamma(=0.9)로 영향을 조금 줄여줍니다.
State Len : 하나의 학습 데이터에 포함된 게임 상태는 한 프레임만 있는게 아니라 State Len개의 화면 데이터가 됩니다.
input_X : [Batch size, Width, Height, State len] 모양으로 게임 화면 데이터를 표현하는 placeholder 입니다.
input_A : [Batch size] 모양으로 각 상태에서 어떤 행동을 했는지를 표현하는 placeholder 입니다.
input_Y : [Batch size] 모양으로 비용 계산에 쓰이는 값(보상 + Gamma * max(목표))을 계산해서 받을 placeholder 입니다.
n_action : 할 수 있는 행동의 개수입니다. (3개)

함수 하나씩 보겠습니다.

__init__:
DQN을 초기화 합니다.
기본 신경망, 목표 신경망을 만들고 placeholder, 비용, 최적화함수 등을 정의합니다.

_build_network:
신경망을 만듭니다.
기본, 목표 신경망은 구조가 똑같으므로 scope만 다르게 해주고 신경망 구성은 똑같이 합니다.
input_X -> Conv2D(32, [4, 4]) -> Conv2D(64, [2, 2])
 -> Flatten -> Dense(512) -> Dense(n_action)
이러 합니다.
화면 데이터를 받아서 취할 수 있는 행동에 대한 가치(점수)를 출력하도록 하는 구조입니다.
때문에 가치가 가장 높은 행동을 취하는걸 기본 신경망이 하고
가장 큰 가치를 입력 상태(화면 데이터)에 대한 가치로 취하는걸 목표 신경망이 하는것 같습니다.

_build_op:
비용함수, 최적화 연산을 만듭니다.
one_hot = input_A를 원 핫 인코딩해서 [Batch size, n_action] 모양으로 만듬
Q_value = 기본 신경망의 출력에 one_hot을 곱하고 reduce sum해서 했던 행동에 대한 가치만 얻어옴.
(Q_value 코드는 뭔가 복잡해보이지만 사실 필요한 값이 없어서 다시 계산하는 코드이다.
어떤 행동을 했는지는 알지만 그 행동의 가치는 저장해두지 않았기에 다시 계산하는거)
cost(비용) = input_Y와 Q_value간의 거리 오차 함수
train_op(최적화 연산) = Adam 최적화기를 사용해 cost를 줄이는 방향으로 학습.

update_target_network:
목표 신경망을 기본 신경망으로 갱신.
뭔 소린가 싶을 수 있지만 그냥 기본 신경망의 가중치 데이터를 그대로 목표 신경망의 가중치에 덮어씌운다는 소리.

get_action:
DQN에 저장된 현재 상태(self.state)를 기본 신경망에 넣어 각 행동별 가치를 얻고
가장 점수가 높게 계산된 행동 번호를 반환

_sample_memory:
학습용 메모리에서 랜덤으로 Batch size개의 데이터를 선택해서 반환.
별거 아닐 수 있지만 최신의 상태 데이터를 쌓아두고 고르게 학습하는게 DQN의 핵심 중 하나이다.

train:
기본 신경망을 한번 학습시킴.
_sample_memory를 호출해서 이번에 학습할 미니배치를 얻는다.
여기서 잠깐 학습 데이터에 대해 적자면 데이터는 상태, 다음 상태, 행동, 보상, 종료여부인데
'상태'에서 '행동'을 했더니 '다음 상태'가 되어 '보상'을 얻었다는 소리이다.
목표 신경망에 '다음 상태'를 넣어 다음 상태에 대한 각 행동별 가치를 얻는다.
input_Y에 넣을 값을 계산하는데 다음 상태에 게임이 종료됬다면 그냥 보상만 취하고
아니라면 아까 계산한 각 행동별 가치에서 가장 높은 값(=다음 상태의 가치)에 Gamma를 곱하고 보상과 더한 값을 쓴다.
얻은 미니배치 데이터를 넣어 최적화 연산을 수행합니다.

remember:
학습용 메모리에 데이터를 추가합니다.
next_state를 계산하는 과정은 복잡해보이지만 그냥 현재 상태(self.state)에서 가장 오래된 화면 데이터를 지우고
이번에 새로 기억할 화면(state)을 이어 붙혀준게 되는겁니다.
왜 이런 방식을 사용하는지 이론적으론 잘 모르겠으나 아마 이전 상태까지 고려해서 행동을 출력하도록 하기 위해서 일 겁니다.
그렇게 계산한 next_state와 기타 데이터를 전부 튜플로 합쳐서 메모리에 넣습니다.
메모리가 제한된 크기를 넘어서면 가장 오래된 데이터를 하나씩 제거합니다.

init_state:
get_action 코드를 보면 self.state를 기본 신경망의 입력으로 사용하며
remember에서 self.state를 다음 상태로 갱신해주는데
처음엔 self.state가 비어있으므로 agent에서 이걸 호출해 처음 상태를 지정해주게 된다.


와... 이걸 또 다 적은 제가 대견합니다. <<
그만큼 이해하고 싶은 욕망이 컸다고 생각해주시면 감사하겠습니다 ㅎㅎ

나중에 휴가 나가면 아타리 게임도 학습시켜보고 싶습니다.

그럼 이만!!




Comments