ansir 님의 블로그

[ SK 네트웍스 Family AI 캠프 수업 내용 복습 ] 모델 검증 2025-04-15 본문

SK 네트웍스 family AI 캠프/수업 내용 복습

[ SK 네트웍스 Family AI 캠프 수업 내용 복습 ] 모델 검증 2025-04-15

ansir 2025. 4. 15. 21:10

비 분산훈련

data: mnist
CNN
1. 데이터셋
  데이터 전처리( 이미지 정규화, 텐서 )
2. 데이터 로더
3. cnn 아키텍쳐 - 모델
4. 학스벵 필요한 설정
  손실함수, 옵티마이저, 에포크 수, learning_rate
5. 학습루프
6. 평가
7. 추론
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from torchvision.transforms import Compose, ToTensor, Normalize, Resize
from torchvision.datasets.mnist import MNIST

# CUDA 사용 가능하면 GPU 사용, 아니면 CPU 사용
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# 데이터 전처리
transforms = Compose([
    ToTensor(), # PIL 이미지를 텐서로 변환 (0~255 → 0~1)
    Normalize(mean(0.5,), std=(0.25,)) # 정규화: 평균 0.5, 표준편차 0.25로
])
# 지정된 데이터셋 반환
training_data = MNIST(
    root = './',
    train = True,
    download = True,
    transform = transforms
)
# 지정된 데이터셋 반환
test_data = MNIST(
    root = './',
    train = False,
    download = True,
    transform = transforms
)
# 데이터로더
train_dataloader = DataLoader(training_data, batch_size=64, shuffle=True)
test_dataloader = DataLoader(test_data, batch_size=64, shuffle=False)

# cnn 모델
class MnistModel(nn.Module):
  def __init__(self):
    super().__init__()
    # 1채널( 흑백 ) 입력, 3x3 커널, 패딩=1 -> 출력 크기 유지
    self.conv1 = nn.Conv2d(in_channels=1, out_channels=32, kernel_size=3, padding=1) # 32, 14, 14
    self.conv2 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, padding=1) # 64, 7, 7
    self.pooling = nn.MaxPool2d(2, 2) # 2x2 최대 풀링 -> 크기 절반링 -> 크기 절반
    self.dropout = nn.Dropout(0.25)
    self.fc1 = nn.Linear(64 * 7 * 7, 128) # 완전연결층( Flatten 이후 )
    self.fc2 = nn.Linear(128, 10) # 클래스 수 = 10 ( 0~ 9 )
    self.relu = nn.ReLU()

  def forward(self, x):
    x = self.pooling(self.relu(self.conv1(x))) # conv1 -> ReLU -> MaxPool
    x = self.pooling(self.relu(self.conv2(x))) # conv2 -> ReLU -> MaxPool
    x = self.dropout(x) # 과적합 방지를 위한 드롭아웃
    x = torch.flatten(x, start_dim=1) # ( B, 64, 7, 7 ) -> ( B, 64x7x7 )  배치 차원 제외하고 평탄화
    x = self.relu(self.fc1(x)) # FC1 + ReLU
    x = self.fc2(x) # FC2 -> 출력층sofrmax는 적용 안함. why? 손실함수인 크로스엔트로피 내부에 softmax 기능이 내장 적용되어 있음
    return x

# 모델 학습 함수
from tqdm import tqdm

def train(model, device, train_loader, optimizer, epochs):
  loss_fn = nn.CrossEntropyLoss()
  model.to(device)
  for epoch in range(epochs):
    iterator = tqdm(train_loader) # 진행바 표시
    for data, label in iterator:
      data, label = data.to(device), label.to(device)

      output = model(data)
      loss = loss_fn(output, label)

      loss.backward()        # 역전파
      optimizer.step()       # 가중치 업데이트
      optimizer.zero_grad()  # 그래디언트 초기화
      iterator.set_description(f'Epoch {epoch}')
      iterator.set_postfix(loss=loss.item())

# 모델 인스턴스 생성 및 학습 시작
model = MnistModel()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
train(model, device, train_dataloader, optimizer, 10)

# 모델 저장 및 불러오기
torch.save(model.state_dict(), 'mnist_model.pth') # 모델 파라미터 저장

loaded_model = MnistModel()
loaded_model.load_state_dict(torch.load('mnist_model.pth',map_location=device))


# 테스트 함수 정의
def test(model, device, test_loader):
  loss_fn = nn.CrossEntropyLoss()
  model = model.to(device)
  model.eval() # 평가 모드: 드롭아웃/배치정규화 비활성화
  total_loss = 0.0
  total_correct = 0

  with torch.no_grad(): # 그래디언트 계산 비활성화( 속도 증가 )
    for data,label in test_loader:
      data, label = data.to(device), label.to(device)

      output = model(data)
      total_loss +=  loss_fn(output, label)
      predict = output.argmax(1) # 가장 높은 값을 가진 클래스 선택
      total_correct += predict.eq(label).sum().item() # 정답 개수 누적

  total_loss /= len(test_loader.dataset)
  print(f'total_loss : {total_loss} accuracy: {total_correct / len(test_dataloader.dataset)}')

# 테스트 실행
test(loaded_model,device,test_dataloader)

보충 설명

  1. 정규화 값
    • MNIST는 01 범위 이미지인데, 이를 평균 0.5, 표준편차 0.25로 정규화하면 대략 -22 범위로 바뀜.
    • 학습 안정성과 수렴 속도를 개선할 수 있음
  2. Normalize(mean=(0.5,), std=(0.25,))
  3. Softamx 미사용 이유
    • nn.CrossEntropyLoss는 내부적으로 nn.LogSoftmax + nn.NLLLoss를 결합한 형태이기 때문에, 출력층에서 별도로 softmax를 적용할 필요 없음. 오히려 softmax까지 넣으면 이중 계산으로 정확도가 저하될 수 있음.
  4. MaxPooling의 역할
    • 크기를 줄이고, 위치 변화에 불변성을 갖게 하며, 연산량을 줄여줌.
      • conv1 -> 28x28 -> 14x14
      • conv2 -> 14x14 -> 7x7

분산 처리

단일 GPU에 대해서
DistributedDataParallel ( DDP )
  pytorch의 분산 학습을 위한 모듈
  여러 GPU에서 모델을 병렬로 학습할 수 있게
  각 GPUT는 모델의 복사본을 가지고 데이터의 일부를 처리
  데이터는 자동으로 분배
  그레디언트는 자동으로 동기화
작동방식
  초기화
  데이터 분배
  학습 기존 과정과 동일하다
    순전파 - 역전파 - 업데이트

모델 검증

모델의 일반화 성능을 평가하기 위한 방법
1. train/validation/test split
  train: 모델 학습
  validation: 학습 중에 모델 검증. 하이퍼파라미터 튜닝. 과적합 감지
  test: 학습이 모두 끝난 다음 최종 평가
2. k-fold 교차 검증( Cross Validation )
  딥러닝에서는 사용을 자제( 시간이 오래 걸림 )
  k 폴드로 나눠서 k-1개로 학습, 나머지 1개 검증반복, 평균
  모델의 일관성과 과적합 여부를 점검
3. Ealy Stopping
  검증 데이터의 손실이 일정 epoch 이상 개선되지 않으면 중단
  과적합 방지에 효과
4. 평가 지표
  분류: Accuracy, Precison, Recall, F1-Score, ROC-AUC
  회귀: MSE, MAE, RMSE, R^2

혼동 행렬( Confusion Matrix )

  실제 Positive (1) 실제 Negative (0)
예측 Positive (1) TP (True Positive) FP (False Positive)
예측 Negative (0) FN (False Negative) TN (True Negative)

1. Accuracy (정확도)

Accuracy = TP + TN TP + TN + FP + FN

2. 정밀도 (Precision)

positive 예측 중 정답 비율
– FP를 줄여야할 때: 스팸 필터 등

Precision = TP TP + FP

3. 재현율 (Recall, 민감도)

실제 Positive 중에서 맞춘 비율
– FN을 반드시 줄여야 할 때 (예: 암 진단 등)

Recall = TP TP + FN

4. F1 Score: Precision + Recall의 조화평균

F1 = 2 × (Precision × Recall) Precision + Recall = 2TP 2TP + FP + FN

ROC 커브

  • TPR (Recall) : ROC에서 y축. 모델의 민감도를 나타냄
  • FPR : ROC에서 x축. 거짓 Positive의 비율

TPR = TP TP + FN

FPR = FP FP + TN


AUC (Area Under the Curve)

  • ROC 곡선 아래 면적
  • AUC = 1.0 → 완벽한 분류
  • AUC = 0.5 → 랜덤 추측 수준
  • Positive vs Negative의 구분 능력, 전체 이진 분류의 성능 평가

다중 모델은 사이킷런의 classification_report를 이용하면 각 라벨 별 confusion metrix를 제공

from sklearn.metrics import classification_report, accuracy_score, confusion_matrix
from sklearn.model_selection import train_test_split
from sklearn.datasets import load_iris
import pandas as pd
import numpy as np

# 데이터 로딩 및 전처리
data = load_iris() # sklearn 내장 iris 데이터셋 로딩
X = pd.DataFrame(data.data, columns=data.feature_names) # 입력 특성( 4개 )
y = pd.DataFrame(data.target, columns=['species']) # 타겟 라벨( 0, 1, 2 )

# 학습/테스트 데이터 분할
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
X_train

# 데이터 표준화( 평균 0, 분산 1 ) -> 학습 안정화 목적
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

# 넘파이 -> Pytorch 텐서로 변환
X_train_tensor = torch.tensor(X_train_scaled, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train.values.ravel(), dtype=torch.long)
X_test_tensor = torch.tensor(X_test_scaled, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test.values.ravel(), dtype=torch.long)

# 텐서 데이터를 PyTorch Dataset / DataLoader로 래핑
train_dataset = torch.utils.data.TensorDataset(X_train_tensor, y_train_tensor)
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=16, shuffle=True)

# 모델 정의( 단순 MLP )
class IrisModel(nn.Module):
  def __init__(self, input_size, hidden_size, num_classes):
    super(IrisModel, self).__init__()
    self.fc1 = nn.Linear(input_size, hidden_size)  # 입력층 -> 은닉층
    self.relu = nn.ReLU()                          # 활성화 함수
    self.fc2 = nn.Linear(hidden_size, num_classes) # 은닉층 -> 출력층

  def forward(self, x):
    out = self.fc1(x)
    out = self.relu(out)
    out = self.fc2(out) # CrossEntropyLoss에서는 softmax 생략 가능
    return out

# 모델 초기화 및 학습 설정
input_size = X_train_scaled.shape[1]  # 특성 수: 4
hidden_size = 10                      # 은닉 노드 수
num_classes = len(np.unique(y_train)) # 클래스 수: 3
model = IrisModel(input_size, hidden_size, num_classes)

criterion = nn.CrossEntropyLoss() # 다중 클래스 분류 손실 함수
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

# 모델 학습
num_epochs = 100
for epoch in range(num_epochs):
  for i, (inputs, labels) in enumerate(train_loader):
    inputs = inputs.to(device)
    labels = labels.to(device)

    # 순전파
    outputs = model(inputs)
    loss = criterion(outputs, labels)

    # 역전파 및 파라미터 업데이트
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    if (i+1) % 10 == 0:
      print(f'Epoch [{epoch+1}/{num_epochs}], Step [{i+1}/{len(train_loader)}], Loss: {loss.item():.4f}')

# 모델 평가
with torch.no_grad():
  X_test_tensor = X_test_tensor.to(device)
  y_test_tensor = y_test_tensor.to(device)  
  outputs = model(X_test_tensor)

  # 클래스 예측
  _, predicted = torch.max(outputs.data, 1)

  # 정확도 계산
  correct = (predicted == y_test_tensor).sum().item()
  accuracy = 100 * correct / len(y_test_tensor)
  print(f"Accuracy of the model on the test set: {accuracy:.2f}%")

# 정밀한 성능 평가( Precision, Recall, F1 등 )
print(classification_report(y_test, predicted.cpu().numpy()))

# 혼동 행렬 시각화
import seaborn as sns
sns.heatmap(confusion_matrix(y_test, predicted.cpu().numpy()), annot=True, fmt='d')
plt.xlabel("Predicted")
plt.ylabel("Actual")
plt.title("Confusion Matrix")
plt.show()

혼동 행렬 시각화 heatmap

추가 설명

  • CrossEntropyLoss(): 내부적으로 Softmax + NLLLose가 결합된 형태. 마지막 층은 softmax 없이 raw logit 출력해도 됨.
  • TensorDataset: X, y 텐서를 하나의 데이터셋으로 묶어줌.
  • torch.max(outputs, 1): 가장 높은 확률을 가진 클래스 인덱스 추출
  • classification_report: Precision, Recall, F1 score 등 상세한 성능 출력
  • confusion_matrix + seaborn.heatmap: 분류 결과를 직관적으로 시각화 가능

ROC/AUC 계산 및 시각화

from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import roc_curve, auc
from sklearn.preprocessing import label_binarize

# RandomForestClassifier 모델 정의 및 학습
rfc = RandomForestClassifier()
rfc.fit(X_train,y_train) # 훈련 데이터로 모델 학습

# 테스트 데이터에 대한 클래스별 확률 예측( predict_proba는 각 클래스의 확률을 반환 )
y_score = rfc.predict_proba(X_test) # 각 클래스의 확률 예측


print(y_score.shape) # y_score는 각 샘플에 대해 각 클래스의 확률을 반환( n_samples, n_classes )

# 3. 실제 값(y_test)을 원-핫 인코딩 (각 클래스를 벡터로 변환)
# 예: 클래스 0 (Setosa)에 대해 [1, 0, 0], 클래스 1 (Versicolor)에 대해 [0, 1, 0], 클래스 2 (Virginica)에 대해 [0, 0, 1]로 변환
y_test_bin = label_binarize(y_test, classes=[0, 1, 2])

# ROC curve와 AUC  계산
fpr,tpr,roc_auc = {},{},{} # false positive rate, true positive rate, auc 값을 저장할 딕셔너리

for i in range(3): # 3개의 클래스 (setosa, versicolor, virginica) # fpr,tpr 계산
  fpr[i],tpr[i], _  = roc_curve(y_test_bin[:,i], y_score[:,i]) # AUC 계산
  roc_auc[i] = auc(fpr[i], tpr[i])

# 각 클래스에 대한 ROC curve 시각화
for name in ['setosa','versicolor','virginica']:
  plt.plot(fpr[0],tpr[0],label=f'ROC curve {name} (area = {roc_auc[0]:0.2f})')

plt.legend()
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver Operating Characteristic (ROC) Curve')
plt.show()

# ROC curve에 대한 정확한 값 계산
roc_curve(y_test_bin[:,0], y_test_bin[:,0])

ROC curve

추가 설명

  • predict_proba: rfc.predict_proba(X_test)는 각 클래스에 대해 예측된 확률을 반환함. 예를 들어, 샘플 1이 클래스 0일 확률이 0.8이고, 클래스 1일 확률이 0.2라면 predict_proba[0.8, 0.2]와 같은 벡터를 반환.
  • label_binarize: 주어진 클래스를 원-핫 인코딩 방식으로 변환. 예를 들어, 클래스 0, 1, 2가 있다고 할 때:
    클래스 0은 [1, 0, 0] (Setosa)
    클래스 1은 [0, 1, 0] (Versicolor)
    클래스 2는 [0, 0, 1] (Virginica)
반응형