python main.py --mode test --download True --output_dir ./output
python main.py --mode eval --download True --output_dir ./output --checkpoint ./output/model_epoch14.pt
python main.py --mode test --download True --output_dir ./output --checkpoint ./output/model_epoch14.pt
import torch
import torch.nn as nn
# 그래프의 기본적 building blocks를 제공
from torchvision.datasets import MNIST
# torchvision: 유명한 데이터셋, 모델 아키텍처, 이미지 변환 등을 제공
# torchvision.datasets: 빌트인 데이터셋을 제공하는 모듈
import torchvision.transforms as transforms
# 이미지 transform을 제공하는 모듈
from torch.utils.data.dataloader import DataLoader
# torch.utils.data.DataLoader: 데이터 로딩 유틸리티 클래스
import torch.optim as optim
# optimization 알고리즘들을 제공하는 패키지
import argparse
# sys.argv를 어떻게 parsing할 지 파악하는 등의 역할을 함
import sys, os
from lenet5 import LeNet5
from loss import *
from tools import *
### command-line에서 사용자가 입력한 arguments 들을 parsing
### -> mode, 데이터셋 download 여부, output 파일 저장 위치, checkpoint 파일 지정
def parse_args():
parser = argparse.ArgumentParser(description="MNIST")
# ArgumentParser 객체: command-line을 파이썬 데이터형으로 parsing할 수 있는 객체
parser.add_argument('--mode',
dest = 'mode',
help = 'train / eval / test',
default = None,
type = str)
parser.add_argument('--download',
dest = 'download',
help = 'download Fashion MNIST dataset',
default = False,
type = bool)
parser.add_argument('--output_dir',
dest = 'output_dir',
help = 'output directory',
default = './output',
type = str)
parser.add_argument('--checkpoint',
dest = 'checkpoint',
help = 'checkpoint trained model',
default = None,
type = str)
# add_argument() 메서드: ArgumentParser이 command-line의 문자을 객체로 어떻게 변환시킬지 지정
### 사용자가 args를 지정하지 않았다면 올바른 사용법 출력 후 종료
if len(sys.argv) == 1:
parser.print_help()
sys.exit()
args = parser.parse_args()
# ArgumentParser.parse_args() 메서드: 인자 문자열을 객체로 변환 & namespace의 attribute로 설정
# 예: Namespace(checkpoint=None, download=True, mode='train', output_dir='./output') 식으로 나옴
# args는 <class 'argparse.Namespace'> 타입을 가짐
return args
def get_data():
download_root = "./mnist_dataset"
my_transform = transforms.Compose([
transforms.Resize([32, 32]),
transforms.ToTensor(),
transforms.Normalize((0.5,), (1.0,))
])
# torchvision.transforms.Compose 클래스: 파라미터로써 compose를 할 Transform 객체의 리스트인 transforms를 받음
# torchvision.transforms.Resize 클래스: 입력 이미지를 특정 사이즈로 변환함
# torchvision.transforms.ToTensor 클래스: [0, 255] 범위의 (H X W X C)의 PIL 이미지나 numpy.ndarray를 [0.0, 1.0] 범위의 (C X H X W) float tensor로 변경함
# torchvision.transforms.Normalize 클래스:
# tensor 이미지를 mean & standard deviation으로 정규화시킴
# 파라미터 mean = 각 채널의 mean의 sequence / 파라미터 std = 각 채널의 standard deviation의 sequence
train_dataset = MNIST(
root=download_root,
transform=my_transform,
train=True,
download=args.download)
eval_dataset = MNIST(
root=download_root,
transform=my_transform,
train=False,
download=args.download
)
test_dataset = MNIST(
root=download_root,
transform=my_transform,
train=False,
download=args.download
)
# torchvision.datasets.MNIST 클래스 파라미터
# root: 데이터들의 root 디렉터리
# train: train된 파일에서 불러와 데이터셋을 만들지 여부
# download: True로 설정 시, 인터넷에서 데이터를 다운받아 root dir에 넣음
# transform: PIL 이미지를 받아 변형된 형태로 리턴하는 함수나 transform
return train_dataset, eval_dataset, test_dataset
def make_dataloader(train_dataset, eval_dataset, test_dataset):
train_loader = DataLoader(train_dataset,
batch_size=8,
num_workers=0,
pin_memory=True,
drop_last=True,
shuffle=True)
eval_loader = DataLoader(eval_dataset,
batch_size=1,
num_workers=0,
pin_memory=True,
drop_last=False,
shuffle=False)
test_loader = DataLoader(test_dataset,
batch_size=1,
num_workers=0,
pin_memory=True,
drop_last=False,
shuffle=False)
# torch.utils.data.DataLoader 클래스: 데이터셋과 sampler를 결합하고, 주어진 데이터셋에 대해 iterable을 제공
# 사용 파라미터:
# batch_size: 한 에폭마다 들어갈 배치의 수. 기본은 1
# num_workers: 데이터 로딩에 쓸 subprocesses의 개수. 0이면 메인 프로세스에서 불러옴을 의미. 본인의 CPU, GPU 상황에 따라 주는 값을 달리하기
# drop_last : True로 설정 시, batch 사이즈를 맞추고 남은 데이터들을 drop 시킴. 디폴트는 False로, 남는 데이터들은 조금 작은 배치 크기를 형성함
# pin_memory: True로 설정 시, 데이터 로더는 Tensor를 CUDA pinned momory로 복사함.
# shuffle: False가 default이며, True로 설정 시 한 에폭이 끝날 때마다 데이터들의 순서를 섞음
# 자료형 구성:
# [0]: tensor 형의 img가 batch_size개 있음
# [1]: tensor 형의 정답 라벨이 batch_size개 있음
# eval, test는 평가 용이기 때문에 batch_size를 1로. drop_last, shuffle이 굳이 필요 없음
return train_loader, eval_loader, test_loader
def train_model(_model, device, train_loader, epoch=15):
model = _model(batch=8, n_classes=10, in_channel=1, in_width=32, in_height=32, is_train=True)
model.to(device)
# Module.to() 메서드: 파라미터와 버퍼들을 옮김(또는 cast). device 파라미터에는 torch.device 객체가 들어감
model.train()
# Module.train() 메서드: 모듈을 training 모드로 설정함.
### optimizer(최적화): 최적(가장 좋은 성능을 보이는)의 파라미터를 찾는 방식 설정
optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.9)
# torch.optim 패키지를 사용하기 위해선 현재 상태(state)를 저장 & 파라미터들을 업데이트(계산된 기울기에 따라)할 optimizer 객체를 선언해야 함
# Module.parameters() 메서드: 모듈의 파라미터들에 대한 iterator를 리턴함. 보통 optimizer에게 전달됨.
# nn.Parameter: Tensor 형을 가짐. Module에 속성으로 할당 시 자동으로 매개변수로 등록됨
# torch.optim.SGD 클래스: stochastic gradient descent를 적용함
# lr(=learning rate), momentum(default: 0)
### learning rate scheduler: 미리 학습 일정을 정해두고 그에 따라 학습률을 조정함 -> 상황에 맞게 학습률을 가변적으로 적당히 변경
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.1)
# torch.optim.lr_scheduler: 에폭의 수에 따라 학습률을 조정하도록 함. 최적화 업데이트 이후 수행되어야 함
# torch.optim.lr_scheduler.StepLR 클래스:
# 지정한 step_size 에폭 단위로 학습률에 gamma를 곱해 학습률을 감소시킴
# step_size: 학습률 decay의 주기 / gamma: 기본은 0.1, 학습률 decay의 mulitplicative 인자
### criterion: 손실함수 클래스 불러오기
criterion = FashionMNISTloss(device = device)
### 학습에 사용될 각종 수 설정
epoch = epoch
iter = 0
### 각 epoch마다 학습 진행
for e in range(epoch):
total_loss = 0
for i, batch in enumerate(train_loader):
### DataLoader에서 불러온 데이터셋을 가져온 뒤 디바이스에 올리기
img = batch[0] # shape 예: torch.Size([8, 1, 32, 32]) -> [배치, 채널, 세로, 가로]
gt = batch[1] # (예) tensor([8, 4, 0, 9, 9, 2, 9, 2])
img = img.to(device)
gt = gt.to(device)
### 모델에서의 output을 받아옴
out = model(img)
# torch.Size([8, 10]) ==> [배치, 클래스 수] 형태로 각 배치에서 클래스별 결과(내놓은 답)을 출력
### output과 정답을 비교해서 손실함수를 사용해 손실값을 구함
loss_val = criterion(out, gt)
### 손실값을 사용해 오차역전파를 사용해 파라미터를 개선시킴
loss_val.backward()
# Module 클래스에서 forward 함수만 정의하면,
# backward 함수는 'torch.autograd' 기능(신경망 학습을 지원하는 PyTorch의 자동 미분 엔진)을 통해 자동으로 정의됨
optimizer.step()
# torch.optim.SGD.setp() 메서드: single 최적화 setp을 수행 -> 가중치 업데이트
optimizer.zero_grad()
# 모든 매개변수의 gradient 버퍼를 0으로 만듦
# ==> 기존의 계산된 기울기 값을 누적시키지 않을 때, 기존에 계산된 기울기를 0으로 만듦
### 전체 손실값을 누적시킴
total_loss += loss_val.item()
# torch.tensor.item(): tensor의 값을 파이썬 표준 number로 리턴함. tensor에 하나의 elem만 있어야 함
### 일정 iter마다 loss를 출력해봄
if iter % 1000 == 0:
print("{} epoch {} iter loss: {}".format(e, iter, loss_val.item()))
iter += 1
### 한 에폭이 끝남 -> 평균 손실을 구하며 학습률 갱신, 텐서 저장
mean_loss = total_loss / i # 평균 손실값을 구함, i = train_loader size: 7499
scheduler.step() # 학습률을 갱신함
print("-> {} epoch mean loss: {}".format(e, mean_loss))
torch.save(model.state_dict(), args.output_dir + "/model_epoch"+str(e)+".pt")
# torch.save() 함수: 파일로 객체를 저장함
# obj: 저장할 객체 / f: 파일 이름(관습적으로 *.pt 파일을 사용해 tensor를 저장함)
# Module.state_dict() 메서드: 모듈의 모든 state를 담고 있는 딕셔너리를 리턴함
# 파라미터들과 버퍼들을 포함함. key는 파라미터와 버퍼의 이름
# (ex) module.state_dict().keys()의 출력 = ['bias', 'weight']
print("===== END Train =====")
def eval_model(_model, device, eval_loader):
model = _model(batch=1, n_classes=10, in_channel=1, in_width=32, in_height=32)
checkpoint = torch.load(args.checkpoint) # trained model을 load함
model.load_state_dict(checkpoint)
# Module.load_state_dict() 메서드: state_dict로부터 해당 모듈과 그 자식들로 파라미터와 버퍼들을 복사해옴
model.to(device)
model.eval()
# Module.eval() 메서드: 모듈을 evaluation mode로 설정함
### 평가에 쓰일 수 정의
accuracy = 0 # 정확도
num_eval = 0 # 평가 횟수
### load한 데이터셋마다 학습 진행
for batch in eval_loader:
img = batch[0]
gt = batch[1]
img = img.to(device)
out = model(img)
out = out.cpu()
# Module.cpu() 메서드: 모든 모델의 파라미터 & 버퍼를 CPU로 옮김
if out == gt:
accuracy += 1 # 모델이 낸 답이 정답과 같으면 정확도 +1
num_eval += 1
print("Evaluation scroe: {} / {}".format(accuracy, num_eval))
# 총 몇 개의 정답을 맞추었는가? (예) Evaluation scroe: 8682 / 10000
def test_model(_model, device, test_loader):
model = _model(batch=1, n_classes=10, in_channel=1, in_width=1, in_height=1)
checkpoint = torch.load(args.checkpoint)
model.load_state_dict(checkpoint)
model.to(device)
model.eval()
for batch in test_loader:
img = batch[0]
img = img.to(device)
out = model(img)
out = out.cpu()
### 결과를 이미지로 출력함
show_img(img.cpu().numpy(), str(out.item()))
# Tensor.numpy(): tensor를 ndarray로 변경함
# 이미지 위에 모델이 낸 답을 씀
def main():
### output_dir로 설정한 디렉터리가 존재하는 지 확인 -> 존재하지 않는다면 만들기
if not os.path.isdir(args.output_dir):
os.mkdir(args.output_dir)
### 어떤 device에 올릴지 지정
if torch.cuda.is_available():
device = torch.device("cuda")
else:
device = torch.device("cpu")
### FashionMNIST dataset을 불러옴
train_dataset, eval_dataset, test_dataset = get_data()
### DataLoader를 만듦
train_loader, eval_loader, test_loader = make_dataloader(train_dataset, eval_dataset, test_dataset)
### LeNet5 모델을 불러옮
_model = LeNet5
### 모델 학습
if args.mode == "train":
train_model(_model, device, train_loader, epoch=15)
elif args.mode == "eval":
eval_model(_model, device, eval_loader)
elif args.mode == "test":
test_model(_model, device, test_loader)
if __name__=="__main__":
args = parse_args()
main()
import torch
import torch.nn as nn
class LeNet5(nn.Module):
# torch.nn.Module 클래스:
# 모든 신경망 모듈을 위한 base class. 모델을 만들려면 이 클래스를 subclass 해야 함
# Module 클래스는 다른 Module 들을 포함할 수 있음
def __init__(self, batch, n_classes, in_channel, in_width, in_height, is_train=False):
super().__init__()
self.batch = batch
self.n_classes = n_classes
self.in_channel = in_channel
self.in_width = in_width
self.in_height = in_height
self.is_train = is_train
### convolution & pooling layer
# ※ convolution output = {(W - K + 2P) / S} + 1
self.conv0 = nn.Conv2d(self.in_channel, 6, kernel_size=5, stride=1, padding=0)
# torch.nn.Conv2d 클래스: 입력 신호에 대해 2D Convolution을 적용함
# 필수 파라미터: input_channel, output_channel
# 보통 입력 크기는 (B, C_in, H, W), 출력 크기는 (B, C_out, H_out, W_out)
# 자료형으로 TensorFloat32를 사용
self.pool0 = nn.AvgPool2d(2, stride=2)
# torch.nn.AvgPool2d 클래스: 입력 신호에 대해 2D average pooling을 적용함
# 필수 파라미터: kernel_size
self.conv1 = nn.Conv2d(6, 16, kernel_size=5, stride=1, padding=0)
self.pool1 = nn.AvgPool2d(2, stride=2)
self.conv2 = nn.Conv2d(16, 120, kernel_size=5, stride=1, padding=0)
### fully-connected layer
self.fc3 = nn.Linear(120, 84)
# torch.nn.Linear 클래스: 입력 데이터에 대해 선형 변환을 적용. y = x * A.t + b
# 필수 파라미터: in_features(각 input sample의 사이즈), out_features
self.fc4 = nn.Linear(84, self.n_classes)
def forward(self, x):
# nn.Module.forward() 메서드: 매 call마다 수행될 내용. subclass에서 재정의되어야 함. 재정의한 부분이 하단
# Model 객체를 데이터와 함께 호출하면 자동으로 실행됨. 따라서 my_model = LeNet(input)으로 선언/호출 해도 자동으로 forward 수행됨
# ※ x의 shape: [B, C, H, W]
x = self.conv0(x)
x = torch.tanh(x)
x = self.pool0(x)
x = self.conv1(x)
x = torch.tanh(x)
x = self.pool1(x)
x = self.conv2(x)
x = torch.tanh(x)
x = torch.flatten(x, start_dim=1)
# 4차원을 2차원으로 바꿈 ([b, c, h, w] -> [B, C*H*W])
x = self.fc3(x)
x = torch.tanh(x)
x = self.fc4(x)
x = x.view(self.batch, -1)
# Tensor.view() 메서드: 인자로 주어진 tensor의 shape를 변경해 새 tensor로 리턴
# 파라미터 shape에 -1 입력 시, 다른 dimension으로부터 값을 자동으로 추정함
x = nn.functional.softmax(x, dim=1)
# torch.nn.functional.softmax() 함수: [0, 1]의 범위를 갖도록 softmax 함수를 적용
if self.is_train is False:
x = torch.argmax(x, dim=1)
# torch.argmax() 함수: 입력 tensor에 대해 가장 큰 값을 리턴함
return x
import torch
import torch.nn as nn
import sys
class MNISTloss(nn.Module):
def __init__(self, device = torch.device('cpu')):
super(MNISTloss, self).__init__()
# super(자식 클래스, self)로 부모 클래스의 메서드 호출. super()와 동일. 현재 클래스가 어떤 클래스인지 명확히 표시
self.loss = nn.CrossEntropyLoss().to(device)
# torch.nn.CrossEntropyLoss 클래스: input과 target 간의 cross entropy loss를 계산하는 criterion. 분류 문제에 많이 사용됨.
def forward(self, out, gt):
# nn.Module.forward() 메서드: 매 call마다 수행될 내용. subclass에서 재정의되어야 함. 재정의한 부분이 하단
# Model 객체를 데이터와 함께 호출하면 자동으로 실행됨. 따라서 my_model = LeNet(input)으로 선언/호출 해도 자동으로 forward 수행됨
loss_val = self.loss(out, gt)
# loss(input, target) 식으로 사용
# 클래스(분류 목록)에 대한 확률로 target이 주어진다면, shape는 input과 같아야 하고 각 값은 [0, 1] 범위의 값이어야 함
# CrossEntropyLoss() 선언 시 reduction이 none으로 설정되었다면 target과 같은 shape를, 그렇지 않다면 sclar 반환
return loss_val
from PIL import Image, ImageDraw
# PIL: Python Image Libaray, 파이썬 인터프리터에서 이미지를 처리할 수 있도록 함
# Image 모듈: PIL 이미지를 나타낼 수 있도록 함
# ImageDraw 모듈: Image형 객체에 대해 2D 그래픽 제공. 새 이미지 만들거나 annotate 하는 등 가능
import numpy as np
import matplotlib.pyplot as plt
def show_img(img_data, text):
_img_data = img_data * 255
# 기존에 [0, 1] 범위였던 픽셀값들을 [0, 255] 범위로 바꿈
# (1, 1, 32, 32) 형상을 가짐
_img_data = np.array(_img_data[0, 0], dtype=np.uint8)
# 4차원 데이터를 2차원으로 바꿈. 첫 번째 배치 첫 번째 채널이 [32, 32] 이므로
img_data = Image.fromarray(_img_data)
# PIL.Image.fromarray() 함수: array 객체로부터 이미지 메모리를 생성
draw = ImageDraw.Draw(img_data)
# PIL.ImageDraw.Draw() 함수: 입력된 이미지에 draw를 할 수 있는 객체를 생성
cx, cy = _img_data.shape[0] / 2, _img_data.shape[1] / 2
if text is not None:
draw.text((cx, cy), text) # 이미지의 중심에 text 내용을 써넣음
plt.imshow(img_data)
plt.show()