본문 바로가기
Have Done/Reinforcement Learning

[강화학습] Space-Invader 환경설정 후 학습하기

by 에아오요이가야 2022. 4. 14.

아주 간단한 tutorial을 소개해 드리고자 한다.

강화 학습은 EnvironmentAgentInteraction을 통해 학습하는데

 

. py 코드가 자연스럽게 세 개가 된다

Environment.py

Agent.py

Interaction.py

 

이렇게 딱딱 나눌 수 있는 그런 건 아니라서 조금 부가 설명을 하자면

main.py -> environment 설정과 interaction을 실행시키는 코드

agent.py -> agent를 선언하고 취할 행동들을 선언 interaction의 주체

model.py -> interaction의 연산 부분

 

그리하여 역순으로 코드를 소개해드리겠습니다~!

main.py입니다. 간단하게 DQN을 짜 놓은 모습~

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np

class DQN(nn.Module):
    def __init__(self):
        super(DQN,self).__init__()
        self.conv1 = nn.Conv2d(1,32,8,stride =4, padding=1)
        self.conv2 = nn.Conv2d(32,64,4,stride=2) 
        self.conv3 = nn.Conv2d(64,128,3)
        self.fc1 = nn.Linear(128*19*8,512)
        self.fc2 = nn.Linear(512,6)

        self.optimizer = optim.RMSprop(self.parameters(),lr = 0.005)
        self.loss = nn.MSELoss()
        self.device = torch.device('cpu')
        self.to(self.device)

    def forward(self,observation):
        observation = torch.Tensor(observation).to(self.device)
        observation = observation.view(-1,1,185,95)
        observation = F.relu(self.conv1(observation))
        observation = F.relu(self.conv2(observation))
        observation = F.relu(self.conv3(observation))
        observation = observation.view(-1,128*19*8)
        observation = F.relu(self.fc1(observation))

        actions = self.fc2(observation)

        return actions

 

agent.py입니다.

뭐가 쫌 많은데! 주석을 다 달아놨습니다 ㅎㅎㅎ

import numpy as np
import torch 
from model import DQN

class Agent(object):
    def __init__(self,gamma, epsilon,
                maxMemorySize, min_eps=0.01,
                replace = 10000, actionSpace=[0,1,2,3,4,5]):
        self.GAMMA = gamma # discount factor
        self.EPSILON = epsilon # epsilon parameter
        self.MIN_EPS = min_eps # minimum epsilon
        self.actionSpace = actionSpace #[0~5]까지의 action들 집합
        
        self.memSize = maxMemorySize
        self.replace_target_cnt = replace
        self.memory = []
        self.memCntr = 0
        
        self.steps = 0
        self.learn_step_counter = 0
        self.Q_eval = DQN() # evaluation network
        self.Q_next = DQN() # target network
    
    # 0: no action, 1: fire, 2: move right, 3: move left #4: move right fire, 5: move left fire
    # action space[0~5]에서 action 하나 고름
    # learning rate 느낌의 역할
    def chooseAction(self,observation):
        rand = np.random.random()
        # rand 와 1-epsilon을 비교하여 rand가 작으면 
        # observation을 inference하여 action을 고름
        if rand < 1-self.EPSILON:
            action = torch.argmax(self.Q_eval.forward(observation)[1]).item()
        # rand 와 1-epsilon을 비교하여 rand가 크면
        # 걍 random한 action을 취함            
        else:
            action = np.random.choice(self.actionSpace)
        
        self.steps +=1
        return action

    def chooseAction_with_confidence(self,observation):
        action = torch.argmax(self.Q_eval.forward(observation)[1]).item()
        self.steps +=1
        return action

    def learn(self, batch_size):
        self.Q_eval.optimizer.zero_grad()
        # 변수로 주어진 replace_target_cnt 만큼의 학습이 이뤄지면
        # Q_next 모델을 Q_eval weight로 업데이트한다
        # Q_next가 새로운 목표인 *느낌*
        if self.replace_target_cnt is not None and \
            self.learn_step_counter%self.replace_target_cnt ==0:
            self.Q_next.load_state_dict(self.Q_eval.state_dict())

        # memory안에서 시작점을 정하고싶어 그래서
        # memCntr + batch_size가 memSize[최대사이즈]보다 작으면
        # memStart를 0부터 memCntr까지의 숫자중 하나로 설정하고 
        # 거기부터 batch_size만큼을 memory에서 골라와서 minibatch로 잡고
        # memCntr + batch_size가 memSize[최대사이즈]보다 크면
        # memCntr - batch_size -1을 시작점으로 잡고 batch_size만큼 잘라서 minibatch로 잡는다.
        if self.memCntr + batch_size <self.memSize:
            memStart = int(np.random.choice(range(self.memCntr)))
        else:
            memStart = int(np.random.choice(range(self.memSize-batch_size-1)))
        miniBatch  = self.memory[memStart:memStart+batch_size]

        # memory를 miniBatch로 재설정 한다. self.memory와 다른거였나?
        memory = np.array(miniBatch,dtype=object)

        #pred_Q를 Q_eval모델의 memory에서 state로 inference한다.
        pred_Q = self.Q_eval.forward(list(memory[:,0][:])).to(self.Q_eval.device)
        #next_Q를 Q_next모델의 memory에서 next_state로 inference한다. 리스트 꼴로 나오는듯?
        next_Q = self.Q_next.forward(list(memory[:,3][:])).to(self.Q_next.device)

        #maxAction은 inference된 next_Q중에서 가장 큰수를 고른다 [0~5]중의 하나의 수 겠지
        maxAction = torch.argmax(next_Q,dim=1).to(self.Q_eval.device)
        #rewards는 memory에서 memory로 저장된것들은 가져온다.
        rewards = torch.Tensor(list(memory[:,2])).to(self.Q_eval.device)
        
        #target_Q를 pred_Q로 assign해주고
        target_Q = pred_Q
        
        #target_Q의 maxAction번째를 reward + discount factor가 곱해진 next_Q의[2] reward인가? next_Q[2]원래 1이었음
        target_Q[:,maxAction] = rewards + self.GAMMA*torch.max(next_Q[2])

        # steps이 500을 넘어가면 = choose Action을 500번 넘게 하면
        # 즉 500번 움직일동안 살아 남았다면 우리모델의 신뢰도를 살짝 높이자
        # epsilon의 크기를 줄여서 
        # random한 action들의 횟수를 줄이고 inference action의 횟수를 늘인다
        if self.steps >6000:
            # EPSILON - 1e-4가 EPS하한선 보다 크면 1e-4를 빼주고
            if self.EPSILON - 1e-5 > self.MIN_EPS:
                self.EPSILON -= 1e-5
            #아니면 epsilon을 걍 유지하는거지 이게 minimum eps니까
            else:
                self.EPSILON = self.EPSILON
        
        #loss는 (target_Q - pred_Q)^2 하고 backpropagation한다.
        loss = self.Q_eval.loss(target_Q,pred_Q).to(self.Q_eval.device)
        loss.backward()
        self.Q_eval.optimizer.step()
        self.learn_step_counter +=1
    
    def model_save(self):
        # torch.save(model.state_dict(), PATH)
        torch.save(self.Q_eval.state_dict(),'./Q_eval.pth')
        torch.save(self.Q_next.state_dict(),'./Q_next.pth')

    # memory에 행동양식 저장 
    def storeTransition(self,state,action,reward,next_state):
        # memCntr이 memSize보다 작으면
        # memory에 [state,action,reward,next_state]를 붙임
        # (state,next_state) : environment, next_environment
        # 즉 지금 상태에서 action[0~5]을 취했을때의 reward를 받고, 그에 대한 다음 상태 까지의 전체를 저장함
        if self.memCntr<self.memSize:
            self.memory.append([state,action,reward,next_state])
        # memCntr이 memSize보다 크면 
        # memory의 memCntr를 memSize로 나눈 나머지 번째의 원소를 
        # 즉 memory list원소를 1번째 부터 다시 채워넣음
        # [state,action,reward,next_state]로 바꿈
        else:
            self.memory[self.memCntr%self.memSize] =[state,action,reward,next_state]
        self.memCntr +=1

 

이제 대망의 main.py입니다

 

import gym
import numpy as np
import time
from agent import Agent
from utils import plotLearning

if __name__ == '__main__':
    # 실제 게임을 화면상에서 보고싶다면 윗줄을 실행시키시면 됩니당~
    # env = gym.make('ALE/SpaceInvaders-v5',render_mode='human')
    env = gym.make('ALE/SpaceInvaders-v5')

    player = Agent(gamma=0.9, epsilon=0.95,
                     maxMemorySize=100000, replace = 200)
    print()
    print('*'*30)
    print(' 초기 행동양식 저장 시작')
    # 이부분안하면 학습이 안됨? 왜 해주는거지?
    # player의 memCntr 이 memSize보다 커질때까지 돌려라
    # '''
    while player.memCntr< player.memSize:
        # reset = episode가 끝날때 재시작 버튼 이며 environment 즉 환경 자체 frame이라고 생각해도 됨
        # observation size = (210,160,3)
        observation = env.reset()
        # 현재 게임 끝났는지 확인해주는 변수 done
        done = False
        while not done:
            #0: no action, 1: fire, 2: move right, 3: move left #4: move right fire, 5: move left fire
            #action space[0~5]에서 action 하나 고름
            action = env.action_space.sample()
            #next_observation, reward, done, info 을 매 스탭 action을 취한것에 대한 return으로 받아줌
            next_observation, reward, done, info = env.step(action)
            #이번 게임 끝났고, 목숨 남은거 0개이면 reward를 -100으로 설정함
            # 이게 왜 필요함 어차피 다음 게임에서 reward 업데이트 하는데
            # if done and info['lives'] == 0:
            #     reward = -100
            #끝났으면 observation = env.reset()를 observation_과 비교함?
            
            #여기가 무슨의미인지 모르겠음
            
            player.storeTransition(np.mean(observation[15:200,30:125], axis=2), 
                                    action, reward,
                                np.mean(next_observation[15:200,30:125], axis=2))
            # observation을 next_observation으로 update해줌
            observation = next_observation
    print(' 초기 행동양식 저장 완료')
    # '''

    scores = [] # score저장 해놓을 list
    epsHistory = [] #eps 저장해놓을 list
    numGames = 500
    batch_size = 32
    # print()
    print('*'*30)
    print(' 강화 학습 시작')
    print('*'*30)
    print()
    for i in range(numGames):
    # i=-1
    # while player.EPSILON<0.1:
    #     i+=1
        done = False
        epsHistory.append(player.EPSILON)
        # observation : environment 즉 환경 자체 frame이라고 생각해도 됨
        observation = env.reset()
        # np.sum 하는 이유가 뭐임 -> len을 1로 만들어 주기 위함
        # observation size = (210,160,3)
        # 실제 environment인 ROI를 잡은뒤 R,G,B 세개를 다 더 해서 frame으로 잡아줌 이래 더하면 len(frames)가 무조건 1인데?
        frames = [np.sum(observation[15:200,30:125],axis = 2)]
        # print(frames)
        score = 0
        lastAction = 0
        
        while not done:
            # len(frames)이 3이면 frame이 observation이 되어 action을 고르고 frame을 초기화해주고,
            # len(frames)이 3이 아니면 lastAction을 action으로 update해줌
            # 3번의 프레임 마다 새로운 움직임을 넣어라 - 0,1 번째는 지난 액션을 반복
            if len(frames) == 3:
                action = player.chooseAction(frames)
                # action = player.chooseAction_with_confidence(frames)
                frames = []
            else:
                action = lastAction
            
            # next_observation, reward, done, info 을 매 스탭 action을 취한것에 대한 return으로 받아줌
            next_observation,reward,done,info = env.step(action)
            # score에 reward를 더해준다
            score +=reward

            # 현재 게임이 끝나고 목슴이 0이 되면 reward를 -100으로 정해줌
            # 이게 왜 필요함 어차피 다음 게임에서 reward 업데이트 하는데
            # if done and info['lives'] ==0:
            #     reward = 0

            # frame에 next_observation의 값을 붙여줌 즉 다음 frame
            # 여기서 frame의 len가 길어짐
            # np.sum 하는 이유가 뭐임 -> len을 1로 만들어 주기 위함
            frames.append(np.sum(next_observation[15:200,30:125],axis = 2))
            
            # 왜 여기선 np.mean하는거임?
            player.storeTransition(np.mean(observation[15:200,30:125], axis=2), 
                                    action, reward,
                                    np.mean(next_observation[15:200,30:125], axis=2))
            # observation update함
            observation = next_observation
            # batch_size만큼 학습한뒤
            # player.learn(batch_size)
            #action을 lastAction으로 때려 넣어준다
            lastAction = action
            # if done and info['lives'] ==0:
            #     player.learn(batch_size)
            player.learn(batch_size)

        # Episode와 score epsilon 출력하고
        print('Episode -', i+1, '-> score= ',int(score), 'epsilon= %.3f'%player.EPSILON)
        # 한 episode에서 나온 score를 저장해줌
        scores.append(score)
    
    # Game 횟수만큼 (score, epsilon) plotting
    player.model_save()
    x = [i + 1 for i in range(numGames)]
    fileName = 'test'+str(numGames)+'.png'
    plotLearning(x,scores,epsHistory,fileName)

이렇게 세 파일을 하나의 directory에 넣고 main.py를 실행시켜 보면~~ 다양한 문제가 있을 겁니다!!

환경설정이라던가 import가 안된다던가 등등등

 

그런 건 상황이 다 다르기 때문에 검색하면서 직접 해결하시는 게 가장 빠르고 정확하고요!

제 다른 글 중에 특히 env.make() 부분의 문제를 해결한 글이 있습니다. 한번 훑어보시면 좋겠어요~!

'Have Done > Reinforcement Learning' 카테고리의 다른 글

[강화학습] CS234 class 2  (0) 2022.04.26
[강화학습] CS234 class1  (0) 2022.04.26
[강화학습] OPEN AI GYM issue  (0) 2022.04.11
[RL] Q - Learning  (0) 2022.03.29
[RL] Reinforcement Learning 구분  (0) 2022.03.29

댓글