아주 간단한 tutorial을 소개해 드리고자 한다.
강화 학습은 Environment와 Agent의 Interaction을 통해 학습하는데
. py 코드가 자연스럽게 세 개가 된다
Environment.py
Agent.py
Interaction.py
이렇게 딱딱 나눌 수 있는 그런 건 아니라서 조금 부가 설명을 하자면
main.py -> environment 설정과 interaction을 실행시키는 코드
agent.py -> agent를 선언하고 취할 행동들을 선언 interaction의 주체
model.py -> interaction의 연산 부분
그리하여 역순으로 코드를 소개해드리겠습니다~!
main.py입니다. 간단하게 DQN을 짜 놓은 모습~
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
class DQN(nn.Module):
def __init__(self):
super(DQN,self).__init__()
self.conv1 = nn.Conv2d(1,32,8,stride =4, padding=1)
self.conv2 = nn.Conv2d(32,64,4,stride=2)
self.conv3 = nn.Conv2d(64,128,3)
self.fc1 = nn.Linear(128*19*8,512)
self.fc2 = nn.Linear(512,6)
self.optimizer = optim.RMSprop(self.parameters(),lr = 0.005)
self.loss = nn.MSELoss()
self.device = torch.device('cpu')
self.to(self.device)
def forward(self,observation):
observation = torch.Tensor(observation).to(self.device)
observation = observation.view(-1,1,185,95)
observation = F.relu(self.conv1(observation))
observation = F.relu(self.conv2(observation))
observation = F.relu(self.conv3(observation))
observation = observation.view(-1,128*19*8)
observation = F.relu(self.fc1(observation))
actions = self.fc2(observation)
return actions
agent.py입니다.
뭐가 쫌 많은데! 주석을 다 달아놨습니다 ㅎㅎㅎ
import numpy as np
import torch
from model import DQN
class Agent(object):
def __init__(self,gamma, epsilon,
maxMemorySize, min_eps=0.01,
replace = 10000, actionSpace=[0,1,2,3,4,5]):
self.GAMMA = gamma # discount factor
self.EPSILON = epsilon # epsilon parameter
self.MIN_EPS = min_eps # minimum epsilon
self.actionSpace = actionSpace #[0~5]까지의 action들 집합
self.memSize = maxMemorySize
self.replace_target_cnt = replace
self.memory = []
self.memCntr = 0
self.steps = 0
self.learn_step_counter = 0
self.Q_eval = DQN() # evaluation network
self.Q_next = DQN() # target network
# 0: no action, 1: fire, 2: move right, 3: move left #4: move right fire, 5: move left fire
# action space[0~5]에서 action 하나 고름
# learning rate 느낌의 역할
def chooseAction(self,observation):
rand = np.random.random()
# rand 와 1-epsilon을 비교하여 rand가 작으면
# observation을 inference하여 action을 고름
if rand < 1-self.EPSILON:
action = torch.argmax(self.Q_eval.forward(observation)[1]).item()
# rand 와 1-epsilon을 비교하여 rand가 크면
# 걍 random한 action을 취함
else:
action = np.random.choice(self.actionSpace)
self.steps +=1
return action
def chooseAction_with_confidence(self,observation):
action = torch.argmax(self.Q_eval.forward(observation)[1]).item()
self.steps +=1
return action
def learn(self, batch_size):
self.Q_eval.optimizer.zero_grad()
# 변수로 주어진 replace_target_cnt 만큼의 학습이 이뤄지면
# Q_next 모델을 Q_eval weight로 업데이트한다
# Q_next가 새로운 목표인 *느낌*
if self.replace_target_cnt is not None and \
self.learn_step_counter%self.replace_target_cnt ==0:
self.Q_next.load_state_dict(self.Q_eval.state_dict())
# memory안에서 시작점을 정하고싶어 그래서
# memCntr + batch_size가 memSize[최대사이즈]보다 작으면
# memStart를 0부터 memCntr까지의 숫자중 하나로 설정하고
# 거기부터 batch_size만큼을 memory에서 골라와서 minibatch로 잡고
# memCntr + batch_size가 memSize[최대사이즈]보다 크면
# memCntr - batch_size -1을 시작점으로 잡고 batch_size만큼 잘라서 minibatch로 잡는다.
if self.memCntr + batch_size <self.memSize:
memStart = int(np.random.choice(range(self.memCntr)))
else:
memStart = int(np.random.choice(range(self.memSize-batch_size-1)))
miniBatch = self.memory[memStart:memStart+batch_size]
# memory를 miniBatch로 재설정 한다. self.memory와 다른거였나?
memory = np.array(miniBatch,dtype=object)
#pred_Q를 Q_eval모델의 memory에서 state로 inference한다.
pred_Q = self.Q_eval.forward(list(memory[:,0][:])).to(self.Q_eval.device)
#next_Q를 Q_next모델의 memory에서 next_state로 inference한다. 리스트 꼴로 나오는듯?
next_Q = self.Q_next.forward(list(memory[:,3][:])).to(self.Q_next.device)
#maxAction은 inference된 next_Q중에서 가장 큰수를 고른다 [0~5]중의 하나의 수 겠지
maxAction = torch.argmax(next_Q,dim=1).to(self.Q_eval.device)
#rewards는 memory에서 memory로 저장된것들은 가져온다.
rewards = torch.Tensor(list(memory[:,2])).to(self.Q_eval.device)
#target_Q를 pred_Q로 assign해주고
target_Q = pred_Q
#target_Q의 maxAction번째를 reward + discount factor가 곱해진 next_Q의[2] reward인가? next_Q[2]원래 1이었음
target_Q[:,maxAction] = rewards + self.GAMMA*torch.max(next_Q[2])
# steps이 500을 넘어가면 = choose Action을 500번 넘게 하면
# 즉 500번 움직일동안 살아 남았다면 우리모델의 신뢰도를 살짝 높이자
# epsilon의 크기를 줄여서
# random한 action들의 횟수를 줄이고 inference action의 횟수를 늘인다
if self.steps >6000:
# EPSILON - 1e-4가 EPS하한선 보다 크면 1e-4를 빼주고
if self.EPSILON - 1e-5 > self.MIN_EPS:
self.EPSILON -= 1e-5
#아니면 epsilon을 걍 유지하는거지 이게 minimum eps니까
else:
self.EPSILON = self.EPSILON
#loss는 (target_Q - pred_Q)^2 하고 backpropagation한다.
loss = self.Q_eval.loss(target_Q,pred_Q).to(self.Q_eval.device)
loss.backward()
self.Q_eval.optimizer.step()
self.learn_step_counter +=1
def model_save(self):
# torch.save(model.state_dict(), PATH)
torch.save(self.Q_eval.state_dict(),'./Q_eval.pth')
torch.save(self.Q_next.state_dict(),'./Q_next.pth')
# memory에 행동양식 저장
def storeTransition(self,state,action,reward,next_state):
# memCntr이 memSize보다 작으면
# memory에 [state,action,reward,next_state]를 붙임
# (state,next_state) : environment, next_environment
# 즉 지금 상태에서 action[0~5]을 취했을때의 reward를 받고, 그에 대한 다음 상태 까지의 전체를 저장함
if self.memCntr<self.memSize:
self.memory.append([state,action,reward,next_state])
# memCntr이 memSize보다 크면
# memory의 memCntr를 memSize로 나눈 나머지 번째의 원소를
# 즉 memory list원소를 1번째 부터 다시 채워넣음
# [state,action,reward,next_state]로 바꿈
else:
self.memory[self.memCntr%self.memSize] =[state,action,reward,next_state]
self.memCntr +=1
이제 대망의 main.py입니다
import gym
import numpy as np
import time
from agent import Agent
from utils import plotLearning
if __name__ == '__main__':
# 실제 게임을 화면상에서 보고싶다면 윗줄을 실행시키시면 됩니당~
# env = gym.make('ALE/SpaceInvaders-v5',render_mode='human')
env = gym.make('ALE/SpaceInvaders-v5')
player = Agent(gamma=0.9, epsilon=0.95,
maxMemorySize=100000, replace = 200)
print()
print('*'*30)
print(' 초기 행동양식 저장 시작')
# 이부분안하면 학습이 안됨? 왜 해주는거지?
# player의 memCntr 이 memSize보다 커질때까지 돌려라
# '''
while player.memCntr< player.memSize:
# reset = episode가 끝날때 재시작 버튼 이며 environment 즉 환경 자체 frame이라고 생각해도 됨
# observation size = (210,160,3)
observation = env.reset()
# 현재 게임 끝났는지 확인해주는 변수 done
done = False
while not done:
#0: no action, 1: fire, 2: move right, 3: move left #4: move right fire, 5: move left fire
#action space[0~5]에서 action 하나 고름
action = env.action_space.sample()
#next_observation, reward, done, info 을 매 스탭 action을 취한것에 대한 return으로 받아줌
next_observation, reward, done, info = env.step(action)
#이번 게임 끝났고, 목숨 남은거 0개이면 reward를 -100으로 설정함
# 이게 왜 필요함 어차피 다음 게임에서 reward 업데이트 하는데
# if done and info['lives'] == 0:
# reward = -100
#끝났으면 observation = env.reset()를 observation_과 비교함?
#여기가 무슨의미인지 모르겠음
player.storeTransition(np.mean(observation[15:200,30:125], axis=2),
action, reward,
np.mean(next_observation[15:200,30:125], axis=2))
# observation을 next_observation으로 update해줌
observation = next_observation
print(' 초기 행동양식 저장 완료')
# '''
scores = [] # score저장 해놓을 list
epsHistory = [] #eps 저장해놓을 list
numGames = 500
batch_size = 32
# print()
print('*'*30)
print(' 강화 학습 시작')
print('*'*30)
print()
for i in range(numGames):
# i=-1
# while player.EPSILON<0.1:
# i+=1
done = False
epsHistory.append(player.EPSILON)
# observation : environment 즉 환경 자체 frame이라고 생각해도 됨
observation = env.reset()
# np.sum 하는 이유가 뭐임 -> len을 1로 만들어 주기 위함
# observation size = (210,160,3)
# 실제 environment인 ROI를 잡은뒤 R,G,B 세개를 다 더 해서 frame으로 잡아줌 이래 더하면 len(frames)가 무조건 1인데?
frames = [np.sum(observation[15:200,30:125],axis = 2)]
# print(frames)
score = 0
lastAction = 0
while not done:
# len(frames)이 3이면 frame이 observation이 되어 action을 고르고 frame을 초기화해주고,
# len(frames)이 3이 아니면 lastAction을 action으로 update해줌
# 3번의 프레임 마다 새로운 움직임을 넣어라 - 0,1 번째는 지난 액션을 반복
if len(frames) == 3:
action = player.chooseAction(frames)
# action = player.chooseAction_with_confidence(frames)
frames = []
else:
action = lastAction
# next_observation, reward, done, info 을 매 스탭 action을 취한것에 대한 return으로 받아줌
next_observation,reward,done,info = env.step(action)
# score에 reward를 더해준다
score +=reward
# 현재 게임이 끝나고 목슴이 0이 되면 reward를 -100으로 정해줌
# 이게 왜 필요함 어차피 다음 게임에서 reward 업데이트 하는데
# if done and info['lives'] ==0:
# reward = 0
# frame에 next_observation의 값을 붙여줌 즉 다음 frame
# 여기서 frame의 len가 길어짐
# np.sum 하는 이유가 뭐임 -> len을 1로 만들어 주기 위함
frames.append(np.sum(next_observation[15:200,30:125],axis = 2))
# 왜 여기선 np.mean하는거임?
player.storeTransition(np.mean(observation[15:200,30:125], axis=2),
action, reward,
np.mean(next_observation[15:200,30:125], axis=2))
# observation update함
observation = next_observation
# batch_size만큼 학습한뒤
# player.learn(batch_size)
#action을 lastAction으로 때려 넣어준다
lastAction = action
# if done and info['lives'] ==0:
# player.learn(batch_size)
player.learn(batch_size)
# Episode와 score epsilon 출력하고
print('Episode -', i+1, '-> score= ',int(score), 'epsilon= %.3f'%player.EPSILON)
# 한 episode에서 나온 score를 저장해줌
scores.append(score)
# Game 횟수만큼 (score, epsilon) plotting
player.model_save()
x = [i + 1 for i in range(numGames)]
fileName = 'test'+str(numGames)+'.png'
plotLearning(x,scores,epsHistory,fileName)
이렇게 세 파일을 하나의 directory에 넣고 main.py를 실행시켜 보면~~ 다양한 문제가 있을 겁니다!!
환경설정이라던가 import가 안된다던가 등등등
그런 건 상황이 다 다르기 때문에 검색하면서 직접 해결하시는 게 가장 빠르고 정확하고요!
제 다른 글 중에 특히 env.make() 부분의 문제를 해결한 글이 있습니다. 한번 훑어보시면 좋겠어요~!
'Have Done > Reinforcement Learning' 카테고리의 다른 글
[강화학습] CS234 class 2 (0) | 2022.04.26 |
---|---|
[강화학습] CS234 class1 (0) | 2022.04.26 |
[강화학습] OPEN AI GYM issue (0) | 2022.04.11 |
[RL] Q - Learning (0) | 2022.03.29 |
[RL] Reinforcement Learning 구분 (0) | 2022.03.29 |
댓글