PyTorch를 이용해 MNIST 숫자 데이터셋을 학습시키고 onnx모델로 변환한다. 1학년 때 TensorFlow로 진짜 대충 배우고 뭔지도 모른 채로 MNIST 데이터를 학습시킨 적이 있었는데, 현재 진행 중인 DeepSeg 프로젝트를 통해 보다 발전시켜본다.
1. 라이브러리 import 및 기본 설정
# mnist.py
import numpy as np
import torch
from torch import nn
from torch.utils.data import Subset
from torch.utils.data import DataLoader
import torchvision
from torchvision import transforms as tr
import torch.nn.functional as F
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
# GPU 설정
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# 재현성을 위해 난수 시드 고정
torch.manual_seed(1)
필요한 라이브러리들을 import하고 GPU와 난수 시드를 설정한다.
2. 데이터셋 로드 및 전처리
# 데이터 저장 경로 및 이미지 전처리 (PIL 이미지를 텐서로 변환)
image_path = "./project/data"
transform = tr.Compose([tr.ToTensor()])
imgae_path는 MNIST 데이터를 저장할 경로를 지정하고, transform은 이미지 데이터를 tensor로 변환한다. tr.Compose()는 여러 이미지 전처리 작업을 순차적으로 적용하는 함수이다. 여기서는 ToTensor()를 사용해 PIL 이미지를 PyTorch의 tensor로 변환한다.
class Compose(transforms: list[ToTensor])
Composes several transforms together. This transform does not support torchscript. Please, see the note below.
Args
transforms : list of Transform objects
list of transforms to compose.
Examples
>>> transforms.Compose([
>>> transforms.CenterCrop(10),
>>> transforms.PILToTensor(),
>>> transforms.ConvertImageDtype(torch.float),
>>> ])
Compose의 정의.
# MNIST 학습 데이터셋 다운로드 및 로드
mnist_dataset = torchvision.datasets.MNIST(
root=image_path, train=True, transform=transform, download=True
)
MNIST의 학습 데이터셋을 다운로드하고 로드한다. root는 데이터셋이 저장될 위치, transform은 각 샘플에 대해 정의한 전처리를 수행한다.
class MNIST(
root: str | Path,
train: bool = True,
transform: ((...) -> Any) | None = None,
target_transform: ((...) -> Any) | None = None,
download: bool = False
)
MNIST Dataset.
Args
root : str or pathlib.Path
Root directory of dataset where MNIST/raw/train-images-idx3-ubyte and MNIST/raw/t10k-images-idx3-ubyte exist.
train : bool, optional
If True, creates dataset from train-images-idx3-ubyte, otherwise from t10k-images-idx3-ubyte.
download : bool, optional
If True, downloads the dataset from the internet and puts it in root directory. If dataset is already downloaded, it is not downloaded again.
transform : callable, optional
A function/transform that takes in a PIL image and returns a transformed version. E.g, transforms.RandomCrop
target_transform : callable, optional
A function/transform that takes in the target and transforms it.
torchvision.datasets.MNIST의 정의.
"""
데이터셋 분할:
- 검증 데이터셋: 첫 10,000개 샘플
- 학습 데이터셋: 나머지 샘플
"""
mnist_valid_dataset = Subset(mnist_dataset, torch.arange(10000))
mnist_train_dataset = Subset(mnist_dataset, torch.arange(10000, len(mnist_dataset)))
전체 학습 데이터셋 중 dkvdml 10,000개는 검증용으로 사용하고, 나머지는 학습용으로 사용한다. torch.arrange(10000)은 0부터 9999까지의 인덱스를 생성한다.
class Subset(
dataset: Dataset,
indices: Sequence[int]
)
Subset of a dataset at specified indices.
Args
dataset : Dataset
The whole Dataset
indices : sequence
Indices in the whole set selected for subset
Subset 정의.
# MNIST 테스트 데이터셋 로드 (테스트 시 사용)
mnist_test_dataset = torchvision.datasets.MNIST(
root=image_path, train=False, transform=transform, download=False
)
모델 평가 시 사용할 테스트 데이터셋은 train=False, download=False로 별도로 로드한다.
3. 모델 정의 (CNN 기반 분류기)
class Model(nn.Module):
def __init__(self, in_channels=1, num_classes=10):
super().__init__()
nn.Module을 상속해 CNN 기반 분류기 모델을 정의한다. nn.Module은 다음과 같이 정의된다.
(class) Module
Base class for all neural network modules.
Your models should also subclass this class.
Modules can also contain other Modules, allowing to nest them in a tree structure. You can assign the submodules as regular attributes:
import torch.nn as nn
import torch.nn.functional as F
class Model(nn.Module):
def __init__(self) -> None:
super().__init__()
self.conv1 = nn.Conv2d(1, 20, 5)
self.conv2 = nn.Conv2d(20, 20, 5)
def forward(self, x):
x = F.relu(self.conv1(x))
return F.relu(self.conv2(x))
Submodules assigned in this way will be registered, and will have their parameters converted too when you call to, etc.
note
As per the example above, an __init__() call to the parent class must be made before assignment on the child.
Attributes
nn.Module을 상속하는 사용자 정의 모델 클래스는 생성자에서 모델의 층(레이어)을 정의하고, forward 함수를 오버라이딩하여 순전파를 정의한다.
self.conv1 = nn.Conv2d(
in_channels=in_channels, out_channels=32, kernel_size=5, padding=2
) # 첫 번째 합성곱 계층: 1채널 → 32채널, kernel 크기 5, padding=2
첫 번째 레이어는 합성곱 계층(Convolution Layer)으로, nn.Conv2d를 사용한다. MNIST 데이터셋은 흑백 이미지이므로, 입력 채널인 in_channels는 1이 된다. out_channels는 출력 채널로, 32로 확장해 특징을 추출한다. kernel_size=5로 5x5 크기의 필터를 사용하며, padding=2는 가장자리를 0으로 채워 출력 크기를 유지한다.
class Conv2d(
in_channels: int,
out_channels: int,
kernel_size: _size_2_t,
stride: _size_2_t = 1,
padding: _size_2_t | str = 0,
dilation: _size_2_t = 1,
groups: int = 1,
bias: bool = True,
padding_mode: str = "zeros",
device: Any | None = None,
dtype: Any | None = None
)
Initialize internal Module state, shared by both nn.Module and ScriptModule.
Conv2d 정의.
self.pool1 = nn.MaxPool2d(
kernel_size=2
) # 첫 번째 풀링 계층: 커널 크기 2 (ReLU는 forward에서 적용)
두 번째 레이어는 풀링 계층(Pooling Layer)이다. nn.MaxPool2d는 2D 최대 풀링 계층으로, 지정된 영역에서 최댓값을 선택한다. 여기서 kernel_size=2이므로 2x2 크기로 샘플링하여 특징 맵의 크기를 반으로 줄인다.
class MaxPool2d(
kernel_size: _size_any_t,
stride: _size_any_t | None = None,
padding: _size_any_t = 0,
dilation: _size_any_t = 1,
return_indices: bool = False,
ceil_mode: bool = False
)
Initialize internal Module state, shared by both nn.Module and ScriptModule.
MaxPool2d 정의.
self.conv2 = nn.Conv2d(
in_channels=32, out_channels=64, kernel_size=5, padding=2
) # 두 번째 합성곱 계층: 32채널 → 64채널, kernel 크기 5, padding=2
self.pool2 = nn.MaxPool2d(kernel_size=2) # 두 번째 풀링 계층
두 번째 합성곱 계층에서도 마찬가지로 출력 채널을 입력의 두배로 확장하고 kernel_size와 padding을 유지해 특징을 추출, 크기를 유지한다. 두 번째 풀링 계층은 첫번째와 마찬가지로 샘플링해 크기를 절반으로 축소시킨다.
두 합성곱/풀링 계층을 지나면서 입력 데이터는 다음과 같이 변화해 특징 맵이 된다.
- 입력: 28x28
- conv1 -> 28x28
- pool1 -> 14x14
- conv2 -> 14x14
- pool2 -> 7x7
- 채널 수는 con2에서 64이므로, 특징 맵의 feature 개수는 64x7x7 = 3136개
# flatten 계층
self.flatten = nn.Flatten()
다음 레이어는 flatten 계층으로, 다차원 텐서를 1차원 벡터로 변환한다.
self.fc1 = nn.Linear(64 * 7 * 7, 1024) # 전결합 계층: 3136 → 1024
self.dropout = nn.Dropout(p=0.5) # dropout 계층
self.fc2 = nn.Linear(1024, num_classes) # 최종 출력 계층: 1024 → num_classes
Convolution Layer를 통과한 데이터는 Fully Connected Layer로 입력된다.
정의된 모델의 전체 구조는 다음과 같다.
# 모델 정의 (CNN 기반 분류기)
class Model(nn.Module):
def __init__(self, in_channels=1, num_classes=10):
super().__init__()
self.conv1 = nn.Conv2d(
in_channels=in_channels, out_channels=32, kernel_size=5, padding=2
) # 첫 번째 합성곱 계층: 1채널 → 32채널, kernel 크기 5, padding=2
self.pool1 = nn.MaxPool2d(
kernel_size=2
) # 첫 번째 풀링 계층: 커널 크기 2 (ReLU는 forward에서 적용)
self.conv2 = nn.Conv2d(
in_channels=32, out_channels=64, kernel_size=5, padding=2
) # 두 번째 합성곱 계층: 32채널 → 64채널, kernel 크기 5, padding=2
self.pool2 = nn.MaxPool2d(kernel_size=2) # 두 번째 풀링 계층
"""
convolution 연산 후의 feature map 크기 계산:
입력: 28×28 → conv1 (padding=2, stride=1) → 28×28 → pool1 (kernel=2) → 14×14
→ conv2 (padding=2) → 14×14 → pool2 → 7×7
채널 수는 conv2 이후 64개이므로 flatten 후 feature 개수는 64*7*7 = 3136
"""
# flatten 계층
self.flatten = nn.Flatten()
self.fc1 = nn.Linear(64 * 7 * 7, 1024) # 전결합 계층: 3136 → 1024
self.dropout = nn.Dropout(p=0.5) # dropout 계층
self.fc2 = nn.Linear(1024, num_classes) # 최종 출력 계층: 1024 → num_classes
다음으로 forward 메서드를 정의한다.
def forward(self, x):
x = self.pool1(F.relu(self.conv1(x))) # 첫 번째 conv → ReLU → 풀링
x = self.pool2(F.relu(self.conv2(x))) # 두 번째 conv → ReLU → 풀링
x = self.flatten(x) # flatten: (N, 64, 7, 7) → (N, 3136)
x = F.relu(self.fc1(x)) # fc1 → ReLU
x = self.dropout(x) # dropout 적용
x = self.fc2(x) # fc2: 최종 출력
return x
forward는 모델의 입력데이터 x에 대해 순전파 과정을 정의한다. 각 층의 활성화 함수로 ReLU 함수를 사용한다.
# 모델 정의 (CNN 기반 분류기)
class Model(nn.Module):
def __init__(self, in_channels=1, num_classes=10):
super().__init__()
self.conv1 = nn.Conv2d(
in_channels=in_channels, out_channels=32, kernel_size=5, padding=2
) # 첫 번째 합성곱 계층: 1채널 → 32채널, kernel 크기 5, padding=2
self.pool1 = nn.MaxPool2d(
kernel_size=2
) # 첫 번째 풀링 계층: 커널 크기 2 (ReLU는 forward에서 적용)
self.conv2 = nn.Conv2d(
in_channels=32, out_channels=64, kernel_size=5, padding=2
) # 두 번째 합성곱 계층: 32채널 → 64채널, kernel 크기 5, padding=2
self.pool2 = nn.MaxPool2d(kernel_size=2) # 두 번째 풀링 계층
"""
convolution 연산 후의 feature map 크기 계산:
입력: 28×28 → conv1 (padding=2, stride=1) → 28×28 → pool1 (kernel=2) → 14×14
→ conv2 (padding=2) → 14×14 → pool2 → 7×7
채널 수는 conv2 이후 64개이므로 flatten 후 feature 개수는 64*7*7 = 3136
"""
# flatten 계층
self.flatten = nn.Flatten()
self.fc1 = nn.Linear(64 * 7 * 7, 1024) # 전결합 계층: 3136 → 1024
self.dropout = nn.Dropout(p=0.5) # dropout 계층
self.fc2 = nn.Linear(1024, num_classes) # 최종 출력 계층: 1024 → num_classes
def forward(self, x):
x = self.pool1(F.relu(self.conv1(x))) # 첫 번째 conv → ReLU → 풀링
x = self.pool2(F.relu(self.conv2(x))) # 두 번째 conv → ReLU → 풀링
x = self.flatten(x) # flatten: (N, 64, 7, 7) → (N, 3136)
x = F.relu(self.fc1(x)) # fc1 → ReLU
x = self.dropout(x) # dropout 적용
x = self.fc2(x) # fc2: 최종 출력
return x
Model 전체 정의.
4. 모델 학습 함수 정의
# 학습 함수 정의
def train(model, epochs, train_dl, valid_dl, optimizer, loss_func):
"""
모델 학습 및 검증 함수
- model: 학습할 모델 (device에 올려둔 상태)
- epochs: 전체 학습 에폭 수
- train_dl: 학습 데이터 로더
- valid_dl: 검증 데이터 로더
- optimizer: 옵티마이저
- loss_func: 손실 함수
"""
# epoch별 손실과 정확도를 저장할 리스트 초기화
train_loss = [0] * epochs
train_accuracy = [0] * epochs
valid_loss = [0] * epochs
valid_accuracy = [0] * epochs
train_dl과 valid_dl로부터 데이터를 로드하여, 학습과 검증을 수행한다.
각 epoch마다 손실과 정확도를 누적하여 저장할 리스트를 초기화한다. [0] * epochs는 리스트의 요소를 epochs 수만큼 생성해 0으로 초기화한다.
for epoch in range(epochs):
model.train() # 모델을 학습 모드로 전환 (dropout 등 활성화)
model.train()은 모델을 학습 모드로 전환한다.
for x_batch, y_batch in train_dl:
# 학습 데이터를 device(GPU/CPU)로 전송
x_batch = x_batch.to(device)
y_batch = y_batch.to(device)
DataLoader는 미니배치 단위로 (입력, 레이블) 튜플을 반환한다. 각 데이터 tensor는 지정된 device로 이동한다.
optimizer.zero_grad() # gradient 초기화
pred = model(x_batch) # 모델 예측
loss = loss_func(pred, y_batch) # 손실 계산
loss.backward() # 역전파로 gradient 계산
optimizer.step() # parameter 업데이트
optimizer.zero_grad()는 이전 배치에서 계산된 gradient를 0으로 초기화한다. model(x_batch)로 forward 함수를 호출해 예측값을 얻은 후, 손실값을 계산한다.
loss.backward()는 역전파를 수행해 gradient를 계산하고, optimizer.step()이 parameter를 업데이트한다.
# 배치별 손실과 정확도 누적
train_loss[epoch] += loss.item() * y_batch.size(0)
is_correct = (torch.argmax(pred, dim=1) == y_batch).float()
train_accuracy[epoch] += is_correct.sum().cpu()
loss.item()은 tensor형태의 손실값을 scalar 값으로 변환한다. y_batch.size(0)은 현재 배치의 샘플 개수를 반환하여 배치별 손실 합을 구하도록 한다.
is_correct는 예측한 클래스와 실제 레이블을 비교해 맞으면 1, 틀리면 0인 tensor를 생성한다. is_correct.sum().cpu()는 맞은 샘플의 수를 합산하고, GPU tensor를 CPU로 이동시켜 계산한다.
# 전체 학습 데이터셋에 대해 평균 손실과 정확도 계산
train_loss[epoch] /= len(train_dl.dataset)
train_accuracy[epoch] /= len(train_dl.dataset)
epoch당 누적된 손실과 정확도를 전체 데이터셋 크기로 나누어 평균을 구한다.
# 검증 단계 (평가 모드: dropout, batchnorm 등 비활성화)
model.eval()
with torch.no_grad():
for x_batch, y_batch in valid_dl:
# 검증 데이터를 device로 전송
x_batch = x_batch.to(device)
y_batch = y_batch.to(device)
pred = model(x_batch)
loss = loss_func(pred, y_batch)
valid_loss[epoch] += loss.item() * y_batch.size(0)
is_correct = (torch.argmax(pred, dim=1) == y_batch).float()
valid_accuracy[epoch] += is_correct.sum().cpu()
valid_loss[epoch] /= len(valid_dl.dataset)
valid_accuracy[epoch] /= len(valid_dl.dataset)
검증 단계에서는 model.eval()로 모델을 평가모드로 전환하고, with torch.no_grad()로 gradient 자동 연산을 비활성화한다.
print(
f"Epoch {epoch+1}: Train Loss: {train_loss[epoch]:.4f}, Train Acc: {train_accuracy[epoch]:.4f}, "
+ f"Valid Loss: {valid_loss[epoch]:.4f}, Valid Acc: {valid_accuracy[epoch]:.4f}"
)
return train_loss, valid_loss, train_accuracy, valid_accuracy
epoch마다 학습 결과를 출력하고 각 epoch별 손실과 정확도를 리스트로 반환한다.
전체 함수.
# 학습 함수 정의
def train(model, epochs, train_dl, valid_dl, optimizer, loss_func):
"""
모델 학습 및 검증 함수
- model: 학습할 모델 (device에 올려둔 상태)
- epochs: 전체 학습 에폭 수
- train_dl: 학습 데이터 로더
- valid_dl: 검증 데이터 로더
- optimizer: 옵티마이저
- loss_func: 손실 함수
"""
# epoch별 손실과 정확도를 저장할 리스트 초기화
train_loss = [0] * epochs
train_accuracy = [0] * epochs
valid_loss = [0] * epochs
valid_accuracy = [0] * epochs
for epoch in range(epochs):
model.train() # 모델을 학습 모드로 전환 (dropout 등 활성화)
for x_batch, y_batch in train_dl:
# 학습 데이터를 device(GPU/CPU)로 전송
x_batch = x_batch.to(device)
y_batch = y_batch.to(device)
optimizer.zero_grad() # gradient 초기화
pred = model(x_batch) # 모델 예측
loss = loss_func(pred, y_batch) # 손실 계산
loss.backward() # 역전파로 gradient 계산
optimizer.step() # parameter 업데이트
# 배치별 손실과 정확도 누적
train_loss[epoch] += loss.item() * y_batch.size(0)
is_correct = (torch.argmax(pred, dim=1) == y_batch).float()
train_accuracy[epoch] += is_correct.sum().cpu()
# 전체 학습 데이터셋에 대해 평균 손실과 정확도 계산
train_loss[epoch] /= len(train_dl.dataset)
train_accuracy[epoch] /= len(train_dl.dataset)
# 검증 단계 (평가 모드: dropout, batchnorm 등 비활성화)
model.eval()
with torch.no_grad():
for x_batch, y_batch in valid_dl:
# 검증 데이터를 device로 전송
x_batch = x_batch.to(device)
y_batch = y_batch.to(device)
pred = model(x_batch)
loss = loss_func(pred, y_batch)
valid_loss[epoch] += loss.item() * y_batch.size(0)
is_correct = (torch.argmax(pred, dim=1) == y_batch).float()
valid_accuracy[epoch] += is_correct.sum().cpu()
valid_loss[epoch] /= len(valid_dl.dataset)
valid_accuracy[epoch] /= len(valid_dl.dataset)
print(
f"Epoch {epoch+1}: Train Loss: {train_loss[epoch]:.4f}, Train Acc: {train_accuracy[epoch]:.4f}, "
+ f"Valid Loss: {valid_loss[epoch]:.4f}, Valid Acc: {valid_accuracy[epoch]:.4f}"
)
return train_loss, valid_loss, train_accuracy, valid_accuracy
5. main 실행 부분
if __name__ == "__main__":
batch_size = 64
# DataLoader 생성 시, GPU 사용 시 메모리 전송 최적화를 위해 pin_memory=True 설정
train_dl = DataLoader(
dataset=mnist_train_dataset,
batch_size=batch_size,
shuffle=True,
pin_memory=True if device.type == "cuda" else False,
)
valid_dl = DataLoader(
dataset=mnist_valid_dataset,
batch_size=batch_size,
shuffle=True,
pin_memory=True if device.type == "cuda" else False,
)
batch_size를 64로 설정하고, DataLoader를 생성한다. DataLoader는 데이터를 미니배치 단위로 로드할 수 있게 해준다.
한 배치에 64개의 데이터를 로드하며, epoch마다 데이터를 shuffle해 모델의 일반화 성능을 높인다. 그리고 메모리 전송 속도를 높이기 위해 pin_memory=True를 설정해 데이터를 고정 메모리 영역에 저장한다.
class DataLoader(
dataset: Dataset,
batch_size: int | None = 1,
shuffle: bool | None = None,
sampler: Sampler | Iterable | None = None,
batch_sampler: Sampler[List] | Iterable[List] | None = None,
num_workers: int = 0,
collate_fn: _collate_fn_t | None = None,
pin_memory: bool = False,
drop_last: bool = False,
timeout: float = 0,
worker_init_fn: _worker_init_fn_t | None = None,
multiprocessing_context: Any | None = None,
generator: Any | None = None,
*,
prefetch_factor: int | None = None,
persistent_workers: bool = False,
pin_memory_device: str = ""
)
Data loader combines a dataset and a sampler, and provides an iterable over the given dataset.
DataLoader 정의 중략.
# 모델, 손실함수, 옵티마이저 정의
model = Model().to(device=device) # 모델을 device(GPU/CPU)로 이동
loss_func = nn.CrossEntropyLoss() # 분류 문제에 적합한 CrossEntropy 손실 함수
optimizer = torch.optim.Adam(model.parameters(), lr=0.001) # 옵티마이저 생성
모델을 생성하고, 손실 함수와 옵티마이저를 정의한다. Adam 알고리즘으로 모델 parameter를 업데이트하며, model.parameters()는 모델의 모든 학습 가능한 parameter를 옵티마이저에 전달한다. 학습률은 0.001이며, 필요에 따라 최적화한다.
# 모델 학습
hist = train(
model,
epochs=20,
train_dl=train_dl,
valid_dl=valid_dl,
optimizer=optimizer,
loss_func=loss_func,
)
train 함수를 호출해 20 epoch동안 학습 및 검증을 수행하고, epoch별 손실 및 정확도를 hist에 저장한다.
# 평가
test_data = (mnist_test_dataset.data.unsqueeze(1) / 255.0).to(device)
prediction = model(test_data)
is_correct = (
torch.argmax(prediction, dim=1) == mnist_test_dataset.targets.to(device)
).float()
print(f"Test acc: {is_correct.mean():.4f}")
원본 데이터 mnist_test.data에 unsqueeze(1)를 통해 채널 차원을 1로 추가한다. /255.0은 픽셀값을 0~1 범위로 정규화한다. model에 테스트 데이터를 입력해 예측값을 얻고 테스트 정확도를 계산한다.
6. ONNX 모델 export
"""
ONNX Export: 모델을 onnx 형식으로 반환하도록 설정
"""
dummy_input = torch.randn(1, 1, 28, 28).to(device) # 더미 입력: 배치 크기 1, 1채널, 28×28 이미지
torch.onnx.export(
model, # 원본 모델을 그대로 export (출력 shape: [N, 10] logit)
dummy_input,
"./project/onnx_models/mnist_model.onnx", # ONNX 모델 저장 경로
input_names=["input"], # 입력 tensor 이름
output_names=["output"], # 출력 tensor 이름 (logit 벡터, shape: [N, 10])
dynamic_axes={"input": {0: "batch_size"}, "output": {0: "batch_size"}},
opset_version=11,
)
print("ONNX model saved as 'mnist_model.onnx'.")
torch.randn은 정규분포를 따르는 난수로 채워진 tensor를 반환한다. torch.onnx.export는 모델을 onnx 형식으로 내보내기 위한 함수이다.
(function) def export(
model: Module | ExportedProgram | ScriptModule | ScriptFunction,
args: tuple[Any, ...] = (),
f: str | PathLike | None = None,
*,
kwargs: dict[str, Any] | None = None,
export_params: bool = True,
verbose: bool | None = None,
input_names: Sequence[str] | None = None,
output_names: Sequence[str] | None = None,
opset_version: int | None = None,
dynamic_axes: Mapping[str, Mapping[int, str]] | Mapping[str, Sequence[int]] | None = None,
keep_initializers_as_inputs: bool = False,
dynamo: bool = False,
external_data: bool = True,
dynamic_shapes: dict[str, Any] | tuple[Any, ...] | list[Any] | None = None,
report: bool = False,
verify: bool = False,
profile: bool = False,
dump_exported_program: bool = False,
artifacts_dir: str | PathLike = ".",
fallback: bool = False,
training: TrainingMode = _C_onnx.TrainingMode.EVAL,
operator_export_type: OperatorExportTypes = _C_onnx.OperatorExportTypes.ONNX,
do_constant_folding: bool = True,
custom_opsets: Mapping[str, int] | None = None,
export_modules_as_functions: bool | Collection[type[Module]] = False,
autograd_inlining: bool = True,
**_: Any
) -> (Any | None)
Exports a model into ONNX format.
export 정의 중략.
6. 시각화
# 시각화
x_arr = np.arange(len(hist[0])) + 1
fig = plt.figure(figsize=(12, 10))
# 전체 figure를 위/아래 두 영역으로 나눔 (상단: 학습 곡선, 하단: 이미지 예측 결과)
gs = gridspec.GridSpec(2, 1, height_ratios=[1, 1], hspace=0.3)
# 상단: 학습 및 검증 곡선 (좌우 두 개의 subplot)
gs_top = gridspec.GridSpecFromSubplotSpec(1, 2, subplot_spec=gs[0], wspace=0.3)
# 왼쪽: 손실 그래프
ax_loss = fig.add_subplot(gs_top[0])
ax_loss.plot(x_arr, hist[0], "-o", label="Train loss")
ax_loss.plot(x_arr, hist[1], "--<", label="Validation loss")
ax_loss.set_xlabel("Epoch", fontsize=15)
ax_loss.set_ylabel("Loss", fontsize=15)
ax_loss.legend(fontsize=15)
ax_loss.set_title("Loss", fontsize=17)
# 오른쪽: 정확도 그래프
ax_acc = fig.add_subplot(gs_top[1])
ax_acc.plot(x_arr, hist[2], "-o", label="Train acc.")
ax_acc.plot(x_arr, hist[3], "--<", label="Validation acc.")
ax_acc.set_xlabel("Epoch", fontsize=15)
ax_acc.set_ylabel("Accuracy", fontsize=15)
ax_acc.legend(fontsize=15)
ax_acc.set_title("Accuracy", fontsize=17)
# 하단: 테스트 이미지 예측 결과 (2행 6열, 총 12개)
gs_bottom = gridspec.GridSpecFromSubplotSpec(
2, 6, subplot_spec=gs[1], wspace=0.1, hspace=0.1
)
for i in range(12):
ax = fig.add_subplot(gs_bottom[i])
ax.set_xticks([])
ax.set_yticks([])
# mnist_test_dataset[i][0]는 (1, 28, 28) 텐서이므로 [0, :, :]를 통해 (28, 28) 형태를 얻음
img = mnist_test_dataset[i][0][0, :, :]
# 채널 차원을 복원하여 (1, 1, 28, 28) 형태로 만든 후 GPU로 이동
img_tensor = img.unsqueeze(0).unsqueeze(1).to(device)
pred = model(img_tensor)
y_pred = torch.argmax(pred)
ax.imshow(img, cmap="gray_r")
ax.text(
0.9,
0.1,
y_pred.item(),
fontsize=15,
color="blue",
horizontalalignment="center",
verticalalignment="center",
transform=ax.transAxes,
)
plt.tight_layout()
plt.show()
7. 결과
99% 이상의 높은 정확도의 모델이 생성되었다.
전체 코드
# mnist.py
import numpy as np
import torch
from torch import nn
from torch.utils.data import Subset
from torch.utils.data import DataLoader
import torchvision
from torchvision import transforms as tr
import torch.nn.functional as F
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
# GPU 설정
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# 재현성을 위해 난수 시드 고정
torch.manual_seed(1)
# 데이터 저장 경로 및 이미지 전처리 (PIL 이미지를 텐서로 변환)
image_path = "./project/data"
transform = tr.Compose([tr.ToTensor()])
# MNIST 학습 데이터셋 다운로드 및 로드
mnist_dataset = torchvision.datasets.MNIST(
root=image_path, train=True, transform=transform, download=True
)
"""
데이터셋 분할:
- 검증 데이터셋: 첫 10,000개 샘플
- 학습 데이터셋: 나머지 샘플
"""
mnist_valid_dataset = Subset(mnist_dataset, torch.arange(10000))
mnist_train_dataset = Subset(mnist_dataset, torch.arange(10000, len(mnist_dataset)))
# MNIST 테스트 데이터셋 로드 (테스트 시 사용)
mnist_test_dataset = torchvision.datasets.MNIST(
root=image_path, train=False, transform=transform, download=False
)
# 모델 정의 (CNN 기반 분류기)
class Model(nn.Module):
def __init__(self, in_channels=1, num_classes=10):
super().__init__()
self.conv1 = nn.Conv2d(
in_channels=in_channels, out_channels=32, kernel_size=5, padding=2
) # 첫 번째 합성곱 계층: 1채널 → 32채널, kernel 크기 5, padding=2
self.pool1 = nn.MaxPool2d(
kernel_size=2
) # 첫 번째 풀링 계층: 커널 크기 2 (ReLU는 forward에서 적용)
self.conv2 = nn.Conv2d(
in_channels=32, out_channels=64, kernel_size=5, padding=2
) # 두 번째 합성곱 계층: 32채널 → 64채널, kernel 크기 5, padding=2
self.pool2 = nn.MaxPool2d(kernel_size=2) # 두 번째 풀링 계층
"""
convolution 연산 후의 feature map 크기 계산:
입력: 28×28 → conv1 (padding=2, stride=1) → 28×28 → pool1 (kernel=2) → 14×14
→ conv2 (padding=2) → 14×14 → pool2 → 7×7
채널 수는 conv2 이후 64개이므로 flatten 후 feature 개수는 64*7*7 = 3136
"""
# flatten 계층
self.flatten = nn.Flatten()
self.fc1 = nn.Linear(64 * 7 * 7, 1024) # 전결합 계층: 3136 → 1024
self.dropout = nn.Dropout(p=0.5) # dropout 계층
self.fc2 = nn.Linear(1024, num_classes) # 최종 출력 계층: 1024 → num_classes
def forward(self, x):
x = self.pool1(F.relu(self.conv1(x))) # 첫 번째 conv → ReLU → 풀링
x = self.pool2(F.relu(self.conv2(x))) # 두 번째 conv → ReLU → 풀링
x = self.flatten(x) # flatten: (N, 64, 7, 7) → (N, 3136)
x = F.relu(self.fc1(x)) # fc1 → ReLU
x = self.dropout(x) # dropout 적용
x = self.fc2(x) # fc2: 최종 출력
return x
# 학습 함수 정의
def train(model, epochs, train_dl, valid_dl, optimizer, loss_func):
"""
모델 학습 및 검증 함수
- model: 학습할 모델 (device에 올려둔 상태)
- epochs: 전체 학습 에폭 수
- train_dl: 학습 데이터 로더
- valid_dl: 검증 데이터 로더
- optimizer: 옵티마이저
- loss_func: 손실 함수
"""
# epoch별 손실과 정확도를 저장할 리스트 초기화
train_loss = [0] * epochs
train_accuracy = [0] * epochs
valid_loss = [0] * epochs
valid_accuracy = [0] * epochs
for epoch in range(epochs):
model.train() # 모델을 학습 모드로 전환 (dropout 등 활성화)
for x_batch, y_batch in train_dl:
# 학습 데이터를 device(GPU/CPU)로 전송
x_batch = x_batch.to(device)
y_batch = y_batch.to(device)
optimizer.zero_grad() # gradient 초기화
pred = model(x_batch) # 모델 예측
loss = loss_func(pred, y_batch) # 손실 계산
loss.backward() # 역전파로 gradient 계산
optimizer.step() # parameter 업데이트
# 배치별 손실과 정확도 누적
train_loss[epoch] += loss.item() * y_batch.size(0)
is_correct = (torch.argmax(pred, dim=1) == y_batch).float()
train_accuracy[epoch] += is_correct.sum().cpu()
# 전체 학습 데이터셋에 대해 평균 손실과 정확도 계산
train_loss[epoch] /= len(train_dl.dataset)
train_accuracy[epoch] /= len(train_dl.dataset)
# 검증 단계 (평가 모드: dropout, batchnorm 등 비활성화)
model.eval()
with torch.no_grad():
for x_batch, y_batch in valid_dl:
# 검증 데이터를 device로 전송
x_batch = x_batch.to(device)
y_batch = y_batch.to(device)
pred = model(x_batch)
loss = loss_func(pred, y_batch)
valid_loss[epoch] += loss.item() * y_batch.size(0)
is_correct = (torch.argmax(pred, dim=1) == y_batch).float()
valid_accuracy[epoch] += is_correct.sum().cpu()
valid_loss[epoch] /= len(valid_dl.dataset)
valid_accuracy[epoch] /= len(valid_dl.dataset)
print(
f"Epoch {epoch+1}: Train Loss: {train_loss[epoch]:.4f}, Train Acc: {train_accuracy[epoch]:.4f}, "
+ f"Valid Loss: {valid_loss[epoch]:.4f}, Valid Acc: {valid_accuracy[epoch]:.4f}"
)
return train_loss, valid_loss, train_accuracy, valid_accuracy
if __name__ == "__main__":
batch_size = 64
# DataLoader 생성 시, GPU 사용 시 메모리 전송 최적화를 위해 pin_memory=True 설정
train_dl = DataLoader(
dataset=mnist_train_dataset,
batch_size=batch_size,
shuffle=True,
pin_memory=True if device.type == "cuda" else False,
)
valid_dl = DataLoader(
dataset=mnist_valid_dataset,
batch_size=batch_size,
shuffle=True,
pin_memory=True if device.type == "cuda" else False,
)
# 모델, 손실함수, 옵티마이저 정의
model = Model().to(device=device) # 모델을 device(GPU/CPU)로 이동
loss_func = nn.CrossEntropyLoss() # 분류 문제에 적합한 CrossEntropy 손실 함수
optimizer = torch.optim.Adam(model.parameters(), lr=0.001) # 옵티마이저 생성
# 모델 학습
hist = train(
model,
epochs=20,
train_dl=train_dl,
valid_dl=valid_dl,
optimizer=optimizer,
loss_func=loss_func,
)
# 평가
test_data = (mnist_test_dataset.data.unsqueeze(1) / 255.0).to(device)
prediction = model(test_data)
is_correct = (
torch.argmax(prediction, dim=1) == mnist_test_dataset.targets.to(device)
).float()
print(f"Test acc: {is_correct.mean():.4f}")
"""
ONNX Export: 모델을 onnx 형식으로 반환하도록 설정
"""
dummy_input = torch.randn(1, 1, 28, 28).to(device) # 더미 입력: 배치 크기 1, 1채널, 28×28 이미지
torch.onnx.export(
model, # 원본 모델을 그대로 export (출력 shape: [N, 10] logit)
dummy_input,
"./project/onnx_models/mnist_model.onnx", # ONNX 모델 저장 경로
input_names=["input"], # 입력 tensor 이름
output_names=["output"], # 출력 tensor 이름 (logit 벡터, shape: [N, 10])
dynamic_axes={"input": {0: "batch_size"}, "output": {0: "batch_size"}},
opset_version=11,
)
print("ONNX model saved as 'mnist_model.onnx'.")
# 시각화
x_arr = np.arange(len(hist[0])) + 1
fig = plt.figure(figsize=(12, 10))
# 전체 figure를 위/아래 두 영역으로 나눔 (상단: 학습 곡선, 하단: 이미지 예측 결과)
gs = gridspec.GridSpec(2, 1, height_ratios=[1, 1], hspace=0.3)
# 상단: 학습 및 검증 곡선 (좌우 두 개의 subplot)
gs_top = gridspec.GridSpecFromSubplotSpec(1, 2, subplot_spec=gs[0], wspace=0.3)
# 왼쪽: 손실 그래프
ax_loss = fig.add_subplot(gs_top[0])
ax_loss.plot(x_arr, hist[0], "-o", label="Train loss")
ax_loss.plot(x_arr, hist[1], "--<", label="Validation loss")
ax_loss.set_xlabel("Epoch", fontsize=15)
ax_loss.set_ylabel("Loss", fontsize=15)
ax_loss.legend(fontsize=15)
ax_loss.set_title("Loss", fontsize=17)
# 오른쪽: 정확도 그래프
ax_acc = fig.add_subplot(gs_top[1])
ax_acc.plot(x_arr, hist[2], "-o", label="Train acc.")
ax_acc.plot(x_arr, hist[3], "--<", label="Validation acc.")
ax_acc.set_xlabel("Epoch", fontsize=15)
ax_acc.set_ylabel("Accuracy", fontsize=15)
ax_acc.legend(fontsize=15)
ax_acc.set_title("Accuracy", fontsize=17)
# 하단: 테스트 이미지 예측 결과 (2행 6열, 총 12개)
gs_bottom = gridspec.GridSpecFromSubplotSpec(
2, 6, subplot_spec=gs[1], wspace=0.1, hspace=0.1
)
for i in range(12):
ax = fig.add_subplot(gs_bottom[i])
ax.set_xticks([])
ax.set_yticks([])
# mnist_test_dataset[i][0]는 (1, 28, 28) 텐서이므로 [0, :, :]를 통해 (28, 28) 형태를 얻음
img = mnist_test_dataset[i][0][0, :, :]
# 채널 차원을 복원하여 (1, 1, 28, 28) 형태로 만든 후 GPU로 이동
img_tensor = img.unsqueeze(0).unsqueeze(1).to(device)
pred = model(img_tensor)
y_pred = torch.argmax(pred)
ax.imshow(img, cmap="gray_r")
ax.text(
0.9,
0.1,
y_pred.item(),
fontsize=15,
color="blue",
horizontalalignment="center",
verticalalignment="center",
transform=ax.transAxes,
)
plt.tight_layout()
plt.show()
'PyTorch' 카테고리의 다른 글
[PyTorch] Model Structure & Parameters (0) | 2025.01.28 |
---|---|
[PyTorch] Cross-Validation(교차 검증) (0) | 2025.01.27 |
[PyTorch] MLP(Multi-Layer Perceptron) Regression(다층 퍼셉트론을 이용한 회귀) (0) | 2025.01.16 |
[PyTorch] Linear Regression - torch.nn (0) | 2025.01.13 |
[PyTorch] 데이터 로드 및 전처리 기본 (1) | 2025.01.13 |