Report¶
CNN + Greedy (ContourWatcher)¶
Part 0: Preliminaries¶
Import¶
import torch # main module
from torch import nn # we'll use neural networks...
import torch.optim as optim
from torch.optim import lr_scheduler
import numpy as np
import torchvision
from torchvision import datasets, models, transforms
import matplotlib.pyplot as plt
import time
import os
import copy
import cv2
import random
from torch.utils.data import DataLoader
c:\Users\lgand\anaconda3\envs\DL\Lib\site-packages\tqdm\auto.py:21: TqdmWarning: IProgress not found. Please update jupyter and ipywidgets. See https://ipywidgets.readthedocs.io/en/stable/user_install.html from .autonotebook import tqdm as notebook_tqdm
device = "cuda" if torch.cuda.is_available() else "mps" if torch.backends.mps.is_available() else "cpu"
print(f"Using {device}")
Using cuda
Function that loads images, resizes them and splits them into different squares¶
def process_and_divide(image, new_width, new_height, block_width, block_height, erase_borders = 0, shuffle = True):
if image is None:
raise Exception("none")
new_dimensions = (new_width, new_height)
# Resize the image
resized_image = cv2.resize(image, new_dimensions, interpolation=cv2.INTER_AREA)
image = resized_image
# Divide l'immagine in quadrati
squares = []
dim_y = len(range(0, image.shape[0], block_height))
dim_x = len(range(0, image.shape[1], block_width))
for y in range(0, image.shape[0], block_height):
for x in range(0, image.shape[1], block_width):
square = image[y + erase_borders:y+block_height - erase_borders, x + erase_borders:x+block_width - erase_borders]
squares.append(square)
indices = np.arange(dim_x * dim_y)
# Mescola gli indici
if shuffle:
np.random.shuffle(indices)
# Applica l'ordine mescolato agli array x e y
squares = np.array(squares)[indices]
return squares, indices
example:
image_path = r"C:\Users\lgand\Pictures\Barconchill\IMG_1640.jpeg"
image = cv2.imread(image_path)
squares, indices = process_and_divide(image, 600,600, 100,100, 0)
Part 1: CNN to check borders¶
First, we have to prepare the training and testing dataloaders. Our goal is to build a CNN which is able to predict whether two blocks are next to each others, therefore we cannot use directly the original paintings as as datapoints but we have to create pair of blocks.
Training set¶
We load the dataset
from datasets import load_dataset
# Specifica la directory di cache
cache_directory = r"D:\Progetto ML"
# Carica il dataset specificando la directory di cache
dataset = load_dataset("huggan/wikiart", cache_dir=cache_directory)
Creazione di coppie: solo orizzontali: affiancate tutte, non affiancate : p (frazione)
For simplicity, we use only the first 1000 images in the dataset to train the model. For each of them, we consider all the correct horizontal combinations and a fraction p of the uncorrect ones. The elements of the dataloader will be 6x100 pixels images, where the 3x100 part on the left comes from one block, and the 3x100 part on the right comes from the second block.
# SOLO ORIZZONTALI
n_images = 1000
p = 0.3
counter = 0
coppie = []
label = []
# proviamo con una larghezza di 3 pixel per quadrato
x_start = 98
x_end = 104
# e tutto la y
y_start = 0
y_end = 100
for single in dataset["train"]:
print(counter)
pil_image = single["image"]
numpy_image = np.array(pil_image)
cv2_image = cv2.cvtColor(numpy_image, cv2.COLOR_RGB2BGR)
squares, indices = process_and_divide(cv2_image, 600,600, 100,100, 0, shuffle = False)
# orizzontale
for y in range(6):
for x in range(5):
roi = np.array(cv2.hconcat([squares[6*y + x], squares[6*y + x + 1]])[ y_start:y_end, x_start:x_end, :], dtype = np.int16)
coppie.append(roi)
label.append(1)
for i in range(36):
for j in range(36):
if random.random() < p:
if j != i and ( j!= i + 1 or (j + 1)% 6 == 0 ):
roi = np.array(cv2.hconcat([squares[i], squares[j]])[ y_start:y_end, x_start:x_end, :], dtype = np.int16)
coppie.append(roi)
label.append(0)
# # verticale
# for y in range(5):
# for x in range(6):
# coppie.append( cv2.hconcat([squares[6*y + x], squares[6*(y + 1) + x]]))
# label.append(1)
# for i in range(36):
# for j in range(36):
# if j != i and j!= i + 6 :
# coppie.append( cv2.hconcat([squares[i], squares[j]]))
# label.append(0)
counter += 1
if counter >= n_images:
break
coppie = np.array(coppie)
#coppie = coppie / 255.0 # Normalizzazione
coppie = coppie.transpose(0,3,1,2)
label = np.array(label)
coppie.shape
(1197997, 3, 100, 6)
label.shape
(1197997,)
We have 400'000 datapoints, as said, each of them has 3 channels and has a dimension 6x100
From this datapoints we create the final dataloader
# Convert to PyTorch tensors
X_train = torch.tensor(coppie, dtype=torch.int16)
y_train = torch.tensor(label, dtype=torch.long)
# Create a DataLoader for efficient training
train_dataset = torch.utils.data.TensorDataset(X_train, y_train)
train_loader = DataLoader(train_dataset, batch_size=128, shuffle=True)
del coppie
Test set¶
We do the same for the test set. Here p is smaller = 0.1 and we consider only -- images.
# SOLO ORIZZONTALI
p = 0.1
n_images = 300
counter = 0
coppie = []
label = []
x_start = 98
x_end = 104
y_start = 0
y_end = 100
testing = 0
for single in dataset["train"]:
if testing <= 1000: # inefficient, to be changed
testing += 1
continue
print(counter)
pil_image = single["image"]
numpy_image = np.array(pil_image)
cv2_image = cv2.cvtColor(numpy_image, cv2.COLOR_RGB2BGR)
squares, indices = process_and_divide(cv2_image, 600,600, 100,100, 0, shuffle = False)
# orizzontale
for y in range(6):
for x in range(5):
roi = np.array(cv2.hconcat([squares[6*y + x], squares[6*y + x + 1]])[ y_start:y_end, x_start:x_end, :], dtype = np.int16)
coppie.append(roi)
label.append(1)
for i in range(36):
for j in range(36):
if random.random() < p:
if j != i and ( j!= i + 1 or (j + 1)% 6 == 0 ):
roi = np.array(cv2.hconcat([squares[i], squares[j]])[ y_start:y_end, x_start:x_end, :], dtype = np.int16)
coppie.append(roi)
label.append(0)
counter += 1
if counter >= n_images:
break
coppie = np.array(coppie)
#coppie = coppie / 255.0 # Normalizzazione
coppie = coppie.transpose(0,3,1,2)
label = np.array(label)
# Convert to PyTorch tensors
X_test = torch.tensor(coppie, dtype=torch.int16)
y_test = torch.tensor(label, dtype=torch.long)
# Create a DataLoader for efficient testing
test_dataset = torch.utils.data.TensorDataset(X_test, y_test)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=True)
del coppie
del label
X_test.shape
torch.Size([10721, 3, 100, 6])
y_test.shape
torch.Size([10721])
X_train.shape
torch.Size([399844, 3, 100, 6])
Definition of the model¶
Define the model to check if a square is to the left of another square
class Conv_no_roi_small(nn.Module):
def __init__(self):
## Invoke the parent constructor to set up out object
super().__init__()
## We'll only need one (composite) attribute
self.conv = nn.Sequential( # input size: 6 x 100
nn.Conv2d(3, 128, kernel_size=(5,6)), # output sizes: 256x 1 x 96
nn.BatchNorm2d(128), # we need BatchNorm2d here since we're using images
nn.ReLU(),
nn.MaxPool2d(kernel_size= (4,1)), # output sizes: 256x1x24
nn.Conv2d(128, 256, kernel_size= (3,1)), # output sizes: 512x1x20
nn.BatchNorm2d(256),
nn.ReLU(),
nn.AvgPool2d(kernel_size= (22,1)), # output sizes: 512x1x1 (we are "summarizing" each channel into a single number)
nn.Flatten())
self.head = nn.Sequential( # input size: 8x100
nn.Dropout(), # some dropout here forces redundancy in the features
nn.Linear(256, 512), # a dense layer
nn.BatchNorm1d(512),
nn.ReLU(),
nn.Dropout(),
nn.Linear(512, 2),
)
def forward(self, x):
# # Definire le coordinate della regione di interesse (ROI)
# x_start = 98
# x_end = 104
# y_start = 0
# y_end = 100
# # Estrarre la ROI dall'immagine
# roi = x[:, :, y_start:y_end, x_start:x_end]
# normalization (fatta qui per risparmiare memoria prima)
x = x/255.0
ze = self.conv(x)
logits = self.head(ze)
return logits
Training and testing functions¶
def train(dataloader, model, loss_fn, optimizer):
size = len(dataloader.dataset)
num_batches = len(dataloader)
model.train() # training mode
mean_loss = 0.0
correct = 0
for batch, (X,y) in enumerate(dataloader):
X, y = X.to(device), y.to(device)
pred = model(X)
loss = loss_fn(pred, y)
optimizer.zero_grad()
loss.backward()
optimizer.step()
mean_loss += loss.item()
correct += (pred.argmax(1) == y).type(torch.float).sum().item()
# visualization
if batch % 100 == 0:
loss = loss.item()
current = (batch + 1)* len(X) # quanti ne hai fatti fino ad ora
print(f" [{current:>5d}/{size:>5d}]")
## Compute and print some data to monitor the training
mean_loss /= num_batches
correct /= size
print(f"TRAINING - Accuracy: {(100 * correct):>5.1f}%, Avg loss: {loss:>7f}")
def test(dataloader, model, loss_fn):
size = len(dataloader.dataset)
num_batches = len(dataloader)
model.eval()
test_loss = 0.0
correct = 0
with torch.no_grad():
for X,y in dataloader:
X,y = X.to(device), y.to(device)
pred = model(X)
loss = loss_fn(pred, y).item()
test_loss += loss
correct += (pred.argmax(1) == y).type(torch.float).sum().item()
## Compute and print some data to monitor the training
test_loss /= num_batches
correct /= size
print(f"TESTING - Accuracy: {(100 * correct):>5.1f}%, Avg loss: {test_loss:>8f}\n")
# questa è per fare gradient clipping e schedule ,non utilizzata alla fine,
def train_new(dataloader, model, loss_fn, optimizer, device, max_norm=1.0, scheduler=None):
size = len(dataloader.dataset)
num_batches = len(dataloader)
model.train() # Set the model to training mode
mean_loss = 0.0
correct = 0
for batch, (X, y) in enumerate(dataloader):
X, y = X.to(device), y.to(device)
optimizer.zero_grad()
pred = model(X)
loss = loss_fn(pred, y)
loss.backward()
# Gradient clipping
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm)
optimizer.step()
mean_loss += loss.item()
correct += (pred.argmax(1) == y).type(torch.float).sum().item()
# Visualization
if batch % 100 == 0:
current = (batch + 1) * len(X)
print(f" [{current:>5d}/{size:>5d}]")
# Compute and print some data to monitor the training
mean_loss /= num_batches
correct /= size
print(f"TRAINING - Accuracy: {(100 * correct):>5.1f}%, Avg loss: {mean_loss:>7f}")
# Learning rate scheduling
if scheduler is not None:
scheduler.step()
Create the model and train¶
from torch.optim.lr_scheduler import StepLR
model = Conv_no_roi_small().to(device)
loss_fn = nn.CrossEntropyLoss()
#optimizer = torch.optim.SGD(model.parameters(), lr=1e-2)
optimizer = optim.Adam(model.parameters(), lr=1e-2)
scheduler = StepLR(optimizer, step_size=8, gamma=0.3)
Try with 1 epoch
epochs = 1
for t in range(epochs):
print(f"Epoch {t+1}\n-------------------")
train(train_loader, model, loss_fn, optimizer)
test(test_loader, model, loss_fn)
print("Done!")
Epoch 1 ------------------- [ 128/399572] [12928/399572] [25728/399572] [38528/399572] [51328/399572] [64128/399572] [76928/399572] [89728/399572] [102528/399572] [115328/399572] [128128/399572] [140928/399572] [153728/399572] [166528/399572] [179328/399572] [192128/399572] [204928/399572] [217728/399572] [230528/399572] [243328/399572] [256128/399572] [268928/399572] [281728/399572] [294528/399572] [307328/399572] [320128/399572] [332928/399572] [345728/399572] [358528/399572] [371328/399572] [384128/399572] [396928/399572] TRAINING - Accuracy: 95.6%, Avg loss: 0.026428 TESTING - Accuracy: 93.4%, Avg loss: 0.167261 Done!
torch.save(model, r"C:\Users\lgand\Documents\BAI\III anno\Deep learning\Progetto\modelli\v1.pth")
Switching to nesterov
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3, nesterov= True, momentum = 0.8, dampening = 0)
epochs = 1
for t in range(epochs):
print(f"Epoch {t+1}\n-------------------")
train(train_loader, model, loss_fn, optimizer)
test(test_loader, model, loss_fn)
print("Done!")
Epoch 1 ------------------- [ 128/399572] [12928/399572] [25728/399572] [38528/399572] [51328/399572] [64128/399572] [76928/399572] [89728/399572] [102528/399572] [115328/399572] [128128/399572] [140928/399572] [153728/399572] [166528/399572] [179328/399572] [192128/399572] [204928/399572] [217728/399572] [230528/399572] [243328/399572] [256128/399572] [268928/399572] [281728/399572] [294528/399572] [307328/399572] [320128/399572] [332928/399572] [345728/399572] [358528/399572] [371328/399572] [384128/399572] [396928/399572] TRAINING - Accuracy: 97.0%, Avg loss: 0.107138 TESTING - Accuracy: 94.6%, Avg loss: 0.127386 Done!
torch.save(model, r"C:\Users\lgand\Documents\BAI\III anno\Deep learning\Progetto\modelli\v2.pth")
epochs = 1
for t in range(epochs):
print(f"Epoch {t+1}\n-------------------")
train(train_loader, model, loss_fn, optimizer)
test(test_loader, model, loss_fn)
print("Done!")
Epoch 1 ------------------- [ 128/399572] [12928/399572] [25728/399572] [38528/399572] [51328/399572] [64128/399572] [76928/399572] [89728/399572] [102528/399572] [115328/399572] [128128/399572] [140928/399572] [153728/399572] [166528/399572] [179328/399572] [192128/399572] [204928/399572] [217728/399572] [230528/399572] [243328/399572] [256128/399572] [268928/399572] [281728/399572] [294528/399572] [307328/399572] [320128/399572] [332928/399572] [345728/399572] [358528/399572] [371328/399572] [384128/399572] [396928/399572] TRAINING - Accuracy: 97.1%, Avg loss: 0.086171 TESTING - Accuracy: 96.0%, Avg loss: 0.103920 Done!
torch.save(model, r"C:\Users\lgand\Documents\BAI\III anno\Deep learning\Progetto\modelli\v3.pth")
optimizer = torch.optim.SGD(model.parameters(), lr=2e-4, nesterov= True, momentum = 0.8, dampening = 0)
epochs = 1
for t in range(epochs):
print(f"Epoch {t+1}\n-------------------")
train(train_loader, model, loss_fn, optimizer)
test(test_loader, model, loss_fn)
print("Done!")
Epoch 1 ------------------- [ 128/399572] [12928/399572] [25728/399572] [38528/399572] [51328/399572] [64128/399572] [76928/399572] [89728/399572] [102528/399572] [115328/399572] [128128/399572] [140928/399572] [153728/399572] [166528/399572] [179328/399572] [192128/399572] [204928/399572] [217728/399572] [230528/399572] [243328/399572] [256128/399572] [268928/399572] [281728/399572] [294528/399572] [307328/399572] [320128/399572] [332928/399572] [345728/399572] [358528/399572] [371328/399572] [384128/399572] [396928/399572] TRAINING - Accuracy: 97.2%, Avg loss: 0.093918 TESTING - Accuracy: 95.6%, Avg loss: 0.108165 Done!
Better before. For now we keep the CNN after the third epoch and we go on
model = torch.load(r"C:\Users\lgand\Documents\BAI\III anno\Deep learning\Progetto\modelli\v3.pth")
Part 2: Image reconstruction (the Greedy algorithm)¶
We define the greedy algorithm to reconstruct the original image
class Build_puzzle3():
def __init__(self, model, squares_a, dim_x , dim_y, th = 0.90):
self.model = model
self.squares_a = squares_a
self.dim_x = dim_x
self.dim_y = dim_y
self.n = dim_x * dim_y
self.oth = th
self.x_start = 98
self.x_end = 104
self.y_start = 0
self.y_end = 100
def initialize(self):
"""
è separata dall'__init__ nel caso si volessero fare più run con la stessa immagine
"""
self.av = {i for i in range(36)}
self.visit_av = []
self.alive = []
self.positions = {}
self.th = self.oth
self.minth = self.oth
self.mappa_max = {}
def build(self):
"""
Funzione principale, che bisogna chiamare per avviare l'algoritmo
"""
# prendi il primo quadrato
self.initialize()
current = 0
self.positions = {current : (0,0)}
self.av.remove(current)
self.alive.append(current)
self.visit_av.append(current)
self.xmax = 0
self.xmin = 0
self.ymax = 0
self.ymin = 0
while len(self.av) != 0:
current = self.find_and_add(current)
self.update_min_max(current)
return self.transform_in_indices()
def find_and_add(self, current):
"""
La funzione che, a partire dal blocco current, trova il migliore blocco da attaccarci, e se lo score è maggiore di th, lo attacca
"""
res = np.zeros((4,36))
pos_av = [True for i in range(4)]
pos_av[0] = self.check_av_pos((self.positions[current][0] + 1, self.positions[current][1]))
pos_av[1] = self.check_av_pos((self.positions[current][0] - 1, self.positions[current][1]))
pos_av[2] = self.check_av_pos((self.positions[current][0] , self.positions[current][1] - 1))
pos_av[3] = self.check_av_pos((self.positions[current][0] , self.positions[current][1] + 1))
if not pos_av[0] and not pos_av[1] and not pos_av[2] and not pos_av[3]:
self.alive.remove(current)
self.visit_av.remove(current)
return self.old_block()
for i in self.av:
if pos_av[0]:
res[0][i] = self.h_merge( self.squares_a[current], self.squares_a[i])
if pos_av[1]:
res[1][i] = self.h_merge( self.squares_a[i], self.squares_a[current])
if pos_av[2]:
res[2][i] = self.v_merge( self.squares_a[current], self.squares_a[i])
if pos_av[3]:
res[3][i] = self.v_merge( self.squares_a[i], self.squares_a[current])
val_max = np.array([np.max(res[0]), np.max(res[1]), np.max(res[2]), np.max(res[3])])
ind_max = np.array([np.argmax(res[0]), np.argmax(res[1]), np.argmax(res[2]), np.argmax(res[3])])
if np.max(val_max) < self.th:
self.alive.remove(current)
self.mappa_max[current] = (np.max(val_max), current)
return self.old_block()
self.mappa_max = {}
self.alive = self.visit_av.copy()
dir_max = np.argmax(val_max)
new_block = ind_max[dir_max]
self.th = self.oth
if dir_max == 0:
self.positions[new_block] = (self.positions[current][0] + 1, self.positions[current][1])
elif dir_max == 1:
self.positions[new_block] = (self.positions[current][0] - 1, self.positions[current][1])
elif dir_max == 2:
self.positions[new_block] = (self.positions[current][0] , self.positions[current][1] - 1)
elif dir_max == 3:
self.positions[new_block] = (self.positions[current][0] , self.positions[current][1] + 1)
self.av.remove(new_block)
self.alive.append(new_block)
self.visit_av.append(new_block)
return new_block
def h_merge(self, sx, dx):
"""
unisce due immagini orizzontalmente (estrae la roi) e la da in pasto al modello. Ritorna il risultato del modello
"""
sx = [sx]
dx = [dx]
coppia = np.concatenate((sx,dx), axis = 3)
roi = coppia[:, :, self.y_start:self.y_end, self.x_start:self.x_end]
roi = torch.tensor(roi, dtype=torch.float32).to(device)
logits = self.model(roi)
return float(nn.Softmax(dim = 1)(logits)[0][1])
def v_merge(self, sx, dx):
"""
unisce due immagini verticalmente (estrae la roi) e la da in pasto al modello. Ritorna il risultato del modello
"""
sx = [sx]
dx = [dx]
coppia = np.concatenate((sx,dx), axis = 2)
coppia = coppia.transpose(0,1,3,2)
roi = coppia[:, :, self.y_start:self.y_end, self.x_start:self.x_end]
roi = torch.tensor(roi, dtype=torch.float32).to(device)
logits = self.model(roi)
return float(nn.Softmax(dim = 1)(logits)[0][1])
def old_block(self):
"""
quando lo score ottenuto è minore della soglia th --> prendere un altro blocco se esistono ancora dei blocchi alive oppure abbassare la soglia al valore
massimo dei blocchi esistenti e con spazi liberi vicini (visit_av), e questo lo prende da mappa_max
"""
if len(self.alive) == 0:
temp = np.array([self.mappa_max[block][0] for block in self.mappa_max])
blocks = [self.mappa_max[block][1] for block in self.mappa_max]
self.th = np.max(temp)
best_block = blocks[np.argmax(temp)]
self.mappa_max = {}
if self.th < self.minth:
self.minth = self.th
self.alive = self.visit_av.copy()
return best_block
return random.choices(self.alive)[0]
def update_min_max(self, block):
"""
x_max e x_min, y_max, y_min servono per scalare le posizioni alla fine, e per evitare si sfori il quadrato 6x6
"""
if self.xmax < self.positions[block][0]:
self.xmax = self.positions[block][0]
if self.xmin > self.positions[block][0]:
self.xmin = self.positions[block][0]
if self.ymax < self.positions[block][1]:
self.ymax = self.positions[block][1]
if self.ymin > self.positions[block][1]:
self.ymin = self.positions[block][1]
def check_av_pos(self, pos):
"""
Funzione che ritorna True se la posizione pos è libera e all'interno del quadrato 6x6
"""
x = pos[0]
y = pos[1]
if np.abs(x - self.xmin) < self.dim_x and np.abs(x - self.xmax) < self.dim_x and np.abs(y - self.ymin) < self.dim_y and np.abs(self.ymax - y) < self.dim_y:
if pos not in self.positions.values():
return True
else:
return False
return False
def transform_in_indices(self):
"""
Funzione da usare alla fine per ottenere la permutazione degli indici a partire dalle posizioni
"""
result = []
self.reversed_pos = {(px - self.xmin, py - self.ymax): key for key, (px, py) in self.positions.items()}
for y in range(0, - self.dim_y, -1):
for x in range(self.dim_x):
result.append(self.reversed_pos[((x,y))] )
return result
Testing the results on the test set¶
we define a function that reconstructs the image predicted
def print_image(squares, result, dim_x, dim_y):
squares_b = squares[result]
prov = []
for y in range(dim_y):
prov.append(squares_b[dim_y * y])
for x in range(1,dim_x):
prov[y] = cv2.hconcat([prov[y], squares_b[dim_y * y + x]])
final = prov[0]
for y in range(1, dim_y):
final = cv2.vconcat([final, prov[y]])
return final
We load the first 200 images from the test set
cv2_images = []
cc = 0
for single in dataset["train"]:
if cc <= 1000:
cc += 1
continue
if cc > 1200:
break
cc += 1
pil_image = single["image"]
numpy_image = np.array(pil_image)
cv2_images.append(cv2.cvtColor(numpy_image, cv2.COLOR_RGB2BGR))
Changing here the index, we can select which image to use
index = 0
Shuffle the blocks
squares, indices = process_and_divide(cv2_images[index], 600, 600, 100, 100)
squares_a = np.array(squares)
squares_a = squares_a.transpose(0, 3, 1, 2)
Build the puzzle game and solve it
builder = Build_puzzle3(model, squares_a, dim_x = 6, dim_y = 6, th = 0.9)
res = builder.build()
print the number of wrong indices:
error = sum([indices[res[i]] != i for i in range(36)])
error
0
reconstruct in cv2 format predicted and initial image
final = print_image(squares, res, 6,6)
initial = print_image(squares, [i for i in range(36)], 6,6)
Print the results! (press 0 to close)
# Display the original and resized images
cv2.imshow('correct', cv2.resize(cv2_images[index], (600,600), interpolation=cv2.INTER_AREA))
cv2.imshow('initial', initial)
cv2.imshow('pred', final)
# Wait for a key press and close the windows
cv2.waitKey(0)
cv2.destroyAllWindows()
Accuracy in the test set¶
risultato = []
errori = 0
for index in range(len(cv2_images)):
squares, indices = process_and_divide(cv2_images[index], 600, 600, 100, 100)
squares_a = np.array(squares)
squares_a = squares_a.transpose(0, 3, 1, 2)
builder = Build_puzzle3(model, squares_a, dim_x = 6, dim_y = 6, th = 0.9)
res = builder.build()
error = sum([indices[res[i]] != i for i in range(36)])
print(index, error)
risultato.append(error)
final = print_image(squares, res, 6,6)
initial = print_image(squares, [i for i in range(36)], 6,6)
cv2.imwrite(r"C:\Users\lgand\Documents\BAI\III anno\Deep learning\Progetto\immagini\1000" + "\\" + str(index) + "p.jpeg", final)
cv2.imwrite(r"C:\Users\lgand\Documents\BAI\III anno\Deep learning\Progetto\immagini\1000" + "\\" + str(index) + "i.jpeg", initial)
cv2.imwrite(r"C:\Users\lgand\Documents\BAI\III anno\Deep learning\Progetto\immagini\1000" + "\\" + str(index) + "c.jpeg", cv2.resize(cv2_images[index], (600,600), interpolation=cv2.INTER_AREA) )
if error > 0:
errori += 1
0 0 1 0 2 0 3 0 4 0 5 0 6 0 7 35 8 0 9 0 10 0 11 6 12 0 13 0 14 0 15 0 16 0 17 0 18 0 19 0 20 0 21 0 22 0 23 36 24 0 25 0 26 0 27 0 28 0 29 0 30 0 31 0 32 0 33 0 34 0 35 3 36 0 37 36 38 0 39 0 40 0 41 4 42 0 43 0 44 0 45 0 46 0 47 0 48 0 49 0 50 0 51 0 52 0 53 0 54 0 55 0 56 0 57 0 58 0 59 0 60 0 61 0 62 0 63 0 64 0 65 0 66 26 67 0 68 0 69 0 70 0 71 0 72 0 73 0 74 0 75 0 76 35 77 0 78 36 79 36 80 0 81 0 82 0 83 0 84 6 85 0 86 0 87 0 88 0 89 0 90 0 91 0 92 0 93 0 94 0 95 36 96 0 97 0 98 0 99 0 100 0 101 0 102 0 103 0 104 36 105 0 106 0 107 0 108 0 109 0 110 0 111 0 112 2 113 0 114 0 115 0 116 0 117 0 118 0 119 0 120 0 121 0 122 0 123 0 124 3 125 0 126 0 127 0 128 0 129 0 130 0 131 0 132 0 133 0 134 9 135 0 136 0 137 0 138 0 139 0 140 0 141 0 142 0 143 0 144 0 145 0 146 0 147 0 148 13 149 0 150 0 151 0 152 0 153 0 154 0 155 0 156 0 157 0 158 4 159 0 160 0 161 0 162 0 163 0 164 0 165 0 166 0 167 0 168 0 169 0 170 0 171 0 172 0 173 0 174 0 175 0 176 34 177 0 178 0 179 0 180 0 181 0 182 0 183 0 184 0 185 0 186 35 187 8 188 0 189 0 190 0 191 0 192 0 193 0 194 0 195 0 196 0 197 0 198 0 199 0
errori
21
We ordered wrongly 21 images over 200
errori/200
0.105
Load your own paintings¶
now you can select any painting from the hard disk
cv2_image = cv2.imread(r"C:\Users\lgand\Documents\BAI\III anno\Deep learning\Progetto\immagini\pablo-picasso-ritratto-di-ambrose-vollard.webp")
cv2_image.shape
(1088, 746, 3)
squares, indices = process_and_divide(cv2_image, 600, 600, 100, 100)
squares_a = np.array(squares)
squares_a = squares_a.transpose(0, 3, 1, 2)
builder = Build_puzzle3(model, squares_a, dim_x = 6, dim_y = 6)
res = builder.build()
final = print_image(squares, res, 6,6)
initial = print_image(squares, [i for i in range(36)], 6,6)
Print the results! (press 0 to close)
# Display the original and resized images
cv2.imshow('correct', cv2.resize(cv2_image, (600,600), interpolation=cv2.INTER_AREA))
cv2.imshow('initial', initial)
cv2.imshow('pred', final)
# Wait for a key press and close the windows
cv2.waitKey(0)
cv2.destroyAllWindows()
Part 3: feature extraction with AlexNet plus MLP head¶
Inspired by previous literature, we decided to implement a model that works in the following way:
Dataset
Define grid size
- Images are split into patches of size 227x227 following a square grid with a certain side (called grid_size), so that we have grid_size $^2$ patches. In our case we focus on a grid size of 2, which means that the image will be divided into 4 patches.
- A permutation is generated in the format of a permutation matrix
- Patches are permuted following the permutation
A given element of the dataset is then composed as follows:
data: list of permuted patches
label: permutation
Model
- Implement pretrained AlexNet up to fc6 on each patch separately to extract features
- Concatenate outputs of all patches into a single tensor
- Pass output through a MLP that outputs a vector of size that matches the number of patches squared
- This is then transformed into a matrix of the same size as the permutation matrix, which is passed into a Sinkhorn layer to transform it into a doubly stochastic matrix
The output is then interpreted as a permutation matrix that can be compared to the true one.
from datasets import load_dataset
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
import numpy as np
from torchvision import datasets, models, transforms
from torchvision.transforms import ToTensor
from torch.utils.data import DataLoader
import torchvision.models as models
import torchvision.datasets as datasets
from PIL import Image
import itertools
import random
import os
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
# Loading the dataset
cache_directory = r"/Users/gabri/Desktop/wikiart"
dataset = load_dataset("huggan/wikiart", cache_dir=cache_directory)
We preprocess the images and transform them into squares of size multiple to 227, in particular such that every patch alone will have a shape of 227*227 alone, since that is the input size required by AlexNet
# Preprocessing function for the images
def preprocess(grid_size):
return transforms.Compose([
transforms.Resize(227*grid_size),
transforms.CenterCrop(227*grid_size),
transforms.ToTensor()
])
# first image before and after preprocessing
GRID_SIZE = 2
im = dataset['train'][0]['image']
preprocessed = preprocess(GRID_SIZE)(im)
fig, ax = plt.subplots(1, 2)
ax[0].imshow(im)
ax[0].set_title('Original Image')
ax[0].axis('off')
ax[1].imshow(preprocessed.permute(1, 2, 0))
ax[1].set_title('Preprocessed Image')
ax[1].axis('off')
plt.show()
# function to divide images (tensor) into square patches of size 227x227
def split_tensor(tensor, grid_size):
patches = []
channels, width, height = tensor.size()
patch_size = width // grid_size
for i, j in itertools.product(range(grid_size), range(grid_size)):
left = i * patch_size
upper = j * patch_size
right = left + patch_size
lower = upper + patch_size
patch = tensor[:, left:right, upper:lower]
patches.append(patch)
return patches
# function to combine patches into a single tensor
def combine_tensors(patches, grid_size):
patch_size = patches[0].size(2)
width = height = patch_size * grid_size
tensor = torch.zeros((1, 3, width, height))
for i, j in itertools.product(range(grid_size), range(grid_size)):
patch = patches[i * grid_size + j]
left = i * patch_size
upper = j * patch_size
tensor[:, :, left:left+patch_size, upper:upper+patch_size] = patch
return tensor
# function to turn a tensor into a PIL image
def tensor_to_image(tensor):
tensor = tensor.squeeze(0)
tensor = tensor.permute(1, 2, 0)
tensor = tensor.numpy()
tensor = np.clip(tensor, 0, 1)
return Image.fromarray((tensor * 255).astype(np.uint8))
# function to turn a PIL image into a tensor
def image_to_tensor(image):
tensor = ToTensor()(image)
tensor = tensor.unsqueeze(0)
return tensor
# Splitting and combining the image
patches = split_tensor(preprocessed, GRID_SIZE)
reconstructed = combine_tensors(patches, GRID_SIZE)
# Displaying the original image, patches, and reconstructed image
fig, ax = plt.subplots(1, 6, figsize=(15, 3))
ax[0].imshow(preprocessed.permute(1, 2, 0))
ax[0].set_title('Original Image')
ax[0].axis('off')
for i in range(1, 5):
ax[i].imshow(tensor_to_image(patches[i-1]))
ax[i].set_title(f'Patch {i}')
ax[i].axis('off')
ax[5].imshow(tensor_to_image(reconstructed))
ax[5].set_title('Reconstructed Image')
ax[5].axis('off')
plt.show()
# Function to compute a permutation of the patches
def compute_permutation(grid_size):
permutation = list(range(grid_size**2))
random.shuffle(permutation)
return permutation
# Function to permute the patches
def permute_patches(patches, permutation):
return [patches[i] for i in permutation]
def permutation_to_matrix(permutation, grid_size):
matrix = torch.zeros((grid_size**2, grid_size**2))
for i, j in enumerate(permutation):
matrix[i, j] = 1
return matrix
# Permuting the patches and reconstructing the image
permutation = compute_permutation(GRID_SIZE)
permuted_patches = permute_patches(patches, permutation)
permuted_image = combine_tensors(permuted_patches, GRID_SIZE)
# Displaying the original image and the permuted image
fig, ax = plt.subplots(1, 2)
ax[0].imshow(preprocessed.permute(1, 2, 0))
ax[0].set_title('Original Image')
ax[0].axis('off')
ax[1].imshow(tensor_to_image(permuted_image))
ax[1].set_title('Permuted Image')
ax[1].axis('off')
plt.show()
# testing with more patches
GRID_SIZE = 8
im = dataset['train'][0]['image']
preprocessed = preprocess(GRID_SIZE)(im)
patches = split_tensor(preprocessed, GRID_SIZE)
reconstructed = combine_tensors(patches, GRID_SIZE)
permutation = compute_permutation(GRID_SIZE)
permuted_patches = permute_patches(patches, permutation)
permuted_image = combine_tensors(permuted_patches, GRID_SIZE)
# Displaying the original image and the permuted image
fig, ax = plt.subplots(1, 2)
ax[0].imshow(preprocessed.permute(1, 2, 0))
ax[0].set_title('Original Image')
ax[0].axis('off')
ax[1].imshow(tensor_to_image(permuted_image))
ax[1].set_title('Permuted Image')
ax[1].axis('off')
plt.show()
# Transforming the data and splitting it into training and validation sets
X = []
y = []
GRID_SIZE = 2
TRAIN_SIZE = 100
TEST_SIZE = TRAIN_SIZE // 4
indeces = np.random.choice(len(dataset['train']), TRAIN_SIZE+TEST_SIZE, replace=False)
indeces = indeces.tolist()
for i in indeces:
im = dataset['train'][i]['image']
im = preprocess(GRID_SIZE)(im)
perm = compute_permutation(GRID_SIZE)
patches = split_tensor(im, GRID_SIZE)
permuted_patches = permute_patches(patches, perm)
# transform permuted_im into tensor
permuted_patches = torch.stack(permuted_patches)
perm_matrix = permutation_to_matrix(perm, GRID_SIZE)
X.append(permuted_patches)
y.append(perm_matrix)
# transform into tensors
X = torch.stack(X)
y = torch.stack(y)
# Split the dataset into training and validation sets (e.g., 80% train, 20% validation)
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42)
# Create Datasets and DataLoaders
train_dataset = torch.utils.data.TensorDataset(X_train, y_train)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_dataset = torch.utils.data.TensorDataset(X_val, y_val)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False) # No shuffling for validation
# Defining the model
# Load the pretrained AlexNet model and remove the last layers
alexnet_model = models.alexnet(pretrained=True)
alexnet_model.classifier = alexnet_model.classifier[:3]
# freeze the weights of the model
for param in alexnet_model.parameters():
param.requires_grad = False
# Define the Sinkhorn layer for transforming the matrix into a doubly stochastic matrix
def sinkhorn_function(X):
# add a small value to avoid nan values
X = X + 1e-6
D1 = torch.diag(1. / torch.sum(X, axis=1))
D2 = torch.diag(1. / torch.sum(torch.matmul(D1, X), axis=0))
X = torch.matmul(D1, X)
X = torch.matmul(X, D2)
return X
# Define the complete model
class Model(nn.Module):
def __init__(self, num_patches):
super().__init__()
self.alex_net = alexnet_model
self.layer_stack = nn.Sequential(nn.Linear(in_features = 4096*num_patches, out_features=4096),
nn.ReLU(inplace=True),
nn.Linear(in_features=4096, out_features=num_patches**2),
nn.ReLU(inplace=True))
self.sinkhorn_layer = torch.func.vmap(sinkhorn_function)
self.num_patches = num_patches
def forward(self, batch):
out_concat = []
# pass each patch through the AlexNet model
for x in batch:
a = self.alex_net(x)
a = torch.reshape(a, (-1,))
out_concat.append(a)
# concatenate the outputs
out_concat = torch.stack(out_concat)
# pass the concatenated output through the fully connected layers
out = self.layer_stack(out_concat)
# reshape the output into a matrix
out = out.view(-1, self.num_patches, self.num_patches)
# pass the output through the Sinkhorn layer multiple times to get a doubly stochastic matrix
for i in range(10):
out = self.sinkhorn_layer(out)
return out
/Users/gabri/opt/anaconda3/envs/envai/lib/python3.11/site-packages/torchvision/models/_utils.py:208: UserWarning: The parameter 'pretrained' is deprecated since 0.13 and may be removed in the future, please use 'weights' instead. warnings.warn( /Users/gabri/opt/anaconda3/envs/envai/lib/python3.11/site-packages/torchvision/models/_utils.py:223: UserWarning: Arguments other than a weight enum or `None` for 'weights' are deprecated since 0.13 and may be removed in the future. The current behavior is equivalent to passing `weights=AlexNet_Weights.IMAGENET1K_V1`. You can also use `weights=AlexNet_Weights.DEFAULT` to get the most up-to-date weights. warnings.warn(msg)
The main challenge we encountered when creating the model was defining a proper loss. The issue here is that we have to compare two matrices that are doubly stochastic, with values summing up to n for an nxn matrix.
We tried implementing different loss functions, here we display the most promising ones:
- Computing the Frobenious norm between the two matrices, which consists in the euclidean norm of the vector of differences between all entries in the matrices
- Computing the cross entropy on all n rows of the matrices, which can be interpreted as n different classification problems done at the same time. In this framework, we can see that each row represents the vector of probabilities of the positions of a patch inside the grid
# Initialise the model, loss, and optimiser
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = Model(num_patches=GRID_SIZE**2)
model.to(device)
# frobenious norm loss
class loss_frobenious(nn.Module):
def __init__(self):
super(loss_frobenious, self).__init__()
def forward(self, y_pred, y_true):
return torch.norm(y_pred - y_true, p='fro')
class loss_class(nn.Module):
def __init__(self):
super(loss_class, self).__init__()
def forward(self, y_pred, y_true):
l = 0
for row_pred, row_true in zip(y_pred, y_true):
l += nn.CrossEntropyLoss()(row_pred, row_true)
return l
loss = loss_class()
# define the optimizer
optimizer = optim.Adam(model.parameters(), lr=0.01) #Â Adam optimiser
# Training loop
for epoch in range(50):
print(f'Epoch {epoch + 1}')
model.train() # Switch model to training mode
train_loss = 0.0 # Initialize validation loss accumulator
for x_batch, y_batch in train_loader:
x_batch = x_batch.to(device)
y_batch = y_batch.to(device)
y_pred = model(x_batch) # Get the predictions on training batch
l = loss(y_pred, y_batch) # Calculate the loss
train_loss += l.item() # Accumulate the loss for the entire dataset
optimizer.zero_grad() # Clear previous gradients
l.backward() # Compute new gradients
optimizer.step() # Update the model weights
train_loss /= len(train_loader) # Calculate average training loss
# Validation loop
model.eval() # Switch model to evaluation mode (important for layers like dropout or batchnorm)
with torch.no_grad(): # Disable gradient calculation for efficiency
val_loss = 0.0
# Initialize validation loss accumulator
for x_val, y_val in val_loader:
x_val = x_val.to(device)
y_val = y_val.to(device)
y_pred_val = model(x_val)
l = loss(y_pred_val, y_val) # Calculate loss
val_loss += l.item() # Accumulate the loss for the entire dataset
val_loss /= len(val_loader) # Calculate average validation loss
print(f'Epoch {epoch + 1:3d}: Train Loss {train_loss:.4f} - Val Loss {val_loss:.4f}')
# save model
torch.save(model.state_dict(), '/Users/gabri/Desktop/models/model_100.pth')
Epoch 1
Epoch 1: Train Loss 35.8048 - Val Loss 34.9909 Epoch 2 Epoch 2: Train Loss 35.7997 - Val Loss 34.9079 Epoch 3 Epoch 3: Train Loss 35.7565 - Val Loss 34.8952 Epoch 4 Epoch 4: Train Loss 35.7255 - Val Loss 34.9231 Epoch 5 Epoch 5: Train Loss 35.7041 - Val Loss 34.9180 Epoch 6 Epoch 6: Train Loss 35.6659 - Val Loss 34.8592 Epoch 7 Epoch 7: Train Loss 35.6193 - Val Loss 34.7927 Epoch 8 Epoch 8: Train Loss 35.5460 - Val Loss 34.7528 Epoch 9 Epoch 9: Train Loss 35.4488 - Val Loss 34.6879 Epoch 10 Epoch 10: Train Loss 35.2695 - Val Loss 34.8859 Epoch 11 Epoch 11: Train Loss 34.7937 - Val Loss 35.5742 Epoch 12 Epoch 12: Train Loss 34.3729 - Val Loss 33.9521 Epoch 13 Epoch 13: Train Loss 34.7685 - Val Loss 33.9371 Epoch 14 Epoch 14: Train Loss 34.8844 - Val Loss 34.1512 Epoch 15 Epoch 15: Train Loss 34.3006 - Val Loss 35.0351 Epoch 16 Epoch 16: Train Loss 34.4229 - Val Loss 35.7175 Epoch 17 Epoch 17: Train Loss 33.9915 - Val Loss 34.3384 Epoch 18 Epoch 18: Train Loss 33.5976 - Val Loss 34.0753 Epoch 19 Epoch 19: Train Loss 34.8324 - Val Loss 34.2907 Epoch 20 Epoch 20: Train Loss 34.3832 - Val Loss 34.4982 Epoch 21 Epoch 21: Train Loss 34.2901 - Val Loss 34.3765 Epoch 22 Epoch 22: Train Loss 34.3071 - Val Loss 34.3259 Epoch 23 Epoch 23: Train Loss 34.4470 - Val Loss 34.0470 Epoch 24 Epoch 24: Train Loss 34.3334 - Val Loss 34.1564 Epoch 25 Epoch 25: Train Loss 34.0980 - Val Loss 34.4980 Epoch 26 Epoch 26: Train Loss 34.1584 - Val Loss 34.7253 Epoch 27 Epoch 27: Train Loss 34.5762 - Val Loss 34.6020 Epoch 28 Epoch 28: Train Loss 34.6798 - Val Loss 34.6020 Epoch 29 Epoch 29: Train Loss 34.5875 - Val Loss 34.6020 Epoch 30 Epoch 30: Train Loss 34.5876 - Val Loss 34.6020 Epoch 31 Epoch 31: Train Loss 34.5864 - Val Loss 34.6020 Epoch 32 Epoch 32: Train Loss 34.5849 - Val Loss 34.6020 Epoch 33 Epoch 33: Train Loss 34.5829 - Val Loss 34.6020 Epoch 34 Epoch 34: Train Loss 34.4841 - Val Loss 36.2076 Epoch 35 Epoch 35: Train Loss 35.0643 - Val Loss 36.9980 Epoch 36 Epoch 36: Train Loss 35.4282 - Val Loss 36.9749 Epoch 37 Epoch 37: Train Loss 35.4509 - Val Loss 36.9412 Epoch 38 Epoch 38: Train Loss 35.4368 - Val Loss 36.9194 Epoch 39 Epoch 39: Train Loss 35.4274 - Val Loss 36.9066 Epoch 40 Epoch 40: Train Loss 35.4213 - Val Loss 36.8961 Epoch 41 Epoch 41: Train Loss 35.4161 - Val Loss 36.8886 Epoch 42 Epoch 42: Train Loss 35.4112 - Val Loss 36.8850 Epoch 43 Epoch 43: Train Loss 35.4057 - Val Loss 36.8855 Epoch 44 Epoch 44: Train Loss 35.4034 - Val Loss 36.8863 Epoch 45 Epoch 45: Train Loss 35.3975 - Val Loss 36.8867 Epoch 46 Epoch 46: Train Loss 35.3922 - Val Loss 36.8881 Epoch 47 Epoch 47: Train Loss 35.3893 - Val Loss 36.8925 Epoch 48 Epoch 48: Train Loss 35.3354 - Val Loss 36.8918 Epoch 49 Epoch 49: Train Loss 35.3321 - Val Loss 36.8930 Epoch 50 Epoch 50: Train Loss 35.3771 - Val Loss 36.8949
# performance on the training set
X = X_train
y = y_train
predictions = model(X)
# Displaying the original image, permuted image, and predicted image
for i in range(20):
fig, ax = plt.subplots(1, 3)
# original image
permuted = X_train[i]
permutation = y_train[i]
# reverse the permutation
permutation = torch.argmax(permutation, axis=1)
permutation = permutation.tolist()
reverse_permutation = [permutation.index(i) for i in range(GRID_SIZE**2)]
permuted_patches = permute_patches(permuted, reverse_permutation)
original = combine_tensors(permuted_patches, GRID_SIZE)
ax[0].imshow(tensor_to_image(original))
ax[0].set_title('Original Image')
ax[0].axis('off')
# permuted image
permuted_im = combine_tensors(X[i], GRID_SIZE)
ax[1].imshow(tensor_to_image(permuted_im))
ax[1].set_title('Permuted Image')
ax[1].axis('off')
# predicted image
prediction = predictions[i]
prediction = torch.argmax(prediction, axis=1).tolist()
permuted_patches = permute_patches(split_tensor(original.squeeze(0), GRID_SIZE), prediction)
predicted_im = combine_tensors(permuted_patches, GRID_SIZE)
ax[2].imshow(tensor_to_image(predicted_im))
ax[2].set_title('Predicted Image')
ax[2].axis('off')
Note: in the predicted image not all patches are always present, since when transforming the matrix of probabilities into an actual permutation it is possible for one patch to be the most probable in more than one position. This is of course an unintended scenario, but we decided to not further complicate the code inside the model, since we expect that with a successful training this issue would resolve itself.
Example of doubly stochastic matrix where the first item is the most probable in more than one row:
[0.5, 0.1, 0.1, 0.3]
[0.5, 0.1, 0.1, 0.3]
[0.0, 0.8, 0.0, 0.2]
[0.0, 0.0, 0.8, 0.2]
We tried different combinations of losses, number of epochs, training set sizes and model structures but we were not able to train a proper model following this approach. There could be various issues leading to this. It could be that the model was not big enough to extrapolate all the relevant information for properly classifying the images. It could also be that the training steps required to reach good scores is way too large for our capabilities and thus we didn't reach promising results. Another issue could be in the definition of our loss functions or even in the size of the MLP layers. One other approach could be changing the dataset format, even though we believe that giving the permutation matrix as a target label was the best choice we could make, since it can be interpreted as a one-hot encoded version of the position of each patch.
We believe that better results could be achieved with a more extensive training and more experimentation on the model parameters and structure. However, due to time constraints, we were not able to push our research far enough to reach our intended goal.
# test the model on 10 random images with original, permuted and predicted images
X = []
y = []
indeces = np.random.choice(len(dataset['train']), 10, replace=False)
indeces = indeces.tolist()
for i in indeces:
im = dataset['train'][i]['image']
im = preprocess(GRID_SIZE)(im)
perm = compute_permutation(GRID_SIZE)
patches = split_tensor(im, GRID_SIZE)
permuted_patches = permute_patches(patches, perm)
# transform permuted_im into tensor
permuted_patches = torch.stack(permuted_patches)
perm_matrix = permutation_to_matrix(perm, GRID_SIZE)
X.append(permuted_patches)
y.append(perm_matrix)
# transform into tensors
X = torch.stack(X)
y = torch.stack(y)
predictions = model(X)
# Displaying the original image, permuted image, and predicted image
for i in range(10):
fig, ax = plt.subplots(1, 3)
# original image
original = dataset['train'][indeces[i]]['image']
original = preprocess(GRID_SIZE)(original)
original = tensor_to_image(original)
ax[0].imshow(original)
ax[0].set_title('Original Image')
ax[0].axis('off')
# permuted image
permutation = torch.argmax(y[i], axis=1).tolist()
permuted_patches = permute_patches(X[i], permutation)
permuted_im = combine_tensors(permuted_patches, GRID_SIZE)
ax[1].imshow(tensor_to_image(permuted_im))
ax[1].set_title('Permuted Image')
ax[1].axis('off')
# predicted image
prediction = predictions[i]
prediction = torch.argmax(prediction, axis=1).tolist()
permuted_patches = permute_patches(X[i], prediction)
predicted_im = combine_tensors(permuted_patches, GRID_SIZE)
ax[2].imshow(tensor_to_image(predicted_im))
ax[2].set_title('Predicted Image')
ax[2].axis('off')
As expected, with images never seen before the performance did not increase.
Part 4: feature extraction with transformer plus MLP head¶
What follows is a modification of the previous model, in which we decided to swap the AlexNet model for feature extraction with an encoder-only Transformer model. Unfortunately, even in this case the results were not satisfactory and the considerations made in the previous part apply also for what follows.
# Preprocessing function for the images
def preprocess(grid_size):
return transforms.Compose([
transforms.Resize(224*grid_size),
transforms.CenterCrop(224*grid_size),
transforms.ToTensor()
])
# Transforming the data and splitting it into training and validation sets
X = []
y = []
GRID_SIZE = 2
TRAIN_SIZE = 100
TEST_SIZE = TRAIN_SIZE // 4
indeces = np.random.choice(len(dataset['train']), TRAIN_SIZE+TEST_SIZE, replace=False)
indeces = indeces.tolist()
for i in indeces:
im = dataset['train'][i]['image']
im = preprocess(GRID_SIZE)(im)
perm = compute_permutation(GRID_SIZE)
patches = split_tensor(im, GRID_SIZE)
permuted_patches = permute_patches(patches, perm)
# transform permuted_im into tensor
permuted_patches = torch.stack(permuted_patches)
perm_matrix = permutation_to_matrix(perm, GRID_SIZE)
X.append(permuted_patches)
y.append(perm_matrix)
# transform into tensors
X = torch.stack(X)
y = torch.stack(y)
# Split the dataset into training and validation sets (e.g., 80% train, 20% validation)
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42)
# Create Datasets and DataLoaders
train_dataset = torch.utils.data.TensorDataset(X_train, y_train)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_dataset = torch.utils.data.TensorDataset(X_val, y_val)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False) # No shuffling for validation
# Defining the model
from transformers import ViTModel
# Load the pre-trained Vision Transformer model
model_name = "google/vit-base-patch16-224-in21k"
vit_model = ViTModel.from_pretrained(model_name)
# freeze the weights of the model
for param in vit_model.parameters():
param.requires_grad = False
# Define the Sinkhorn layer for transforming the matrix into a doubly stochastic matrix
def sinkhorn_function(X):
# add a small value to avoid nan values
X = X + 1e-6
D1 = torch.diag(1. / torch.sum(X, axis=1))
D2 = torch.diag(1. / torch.sum(torch.matmul(D1, X), axis=0))
X = torch.matmul(D1, X)
X = torch.matmul(X, D2)
return X
# Define the complete model
class Model(nn.Module):
def __init__(self, num_patches):
super().__init__()
self.encoder = vit_model
self.layer_stack = nn.Sequential(
nn.Linear(768*num_patches, 4096),
nn.ReLU(),
nn.Linear(4096, 16),
nn.ReLU(inplace=True)
)
self.sinkhorn_layer = torch.func.vmap(sinkhorn_function)
self.num_patches = num_patches
def forward(self, batch):
out_concat = []
# pass each patch through the transformer
for x in batch:
a = self.encoder(x)
a = a.last_hidden_state[:, 0, :]
a = torch.reshape(a, (-1,))
out_concat.append(a)
# concatenate the outputs
out_concat = torch.stack(out_concat)
# pass the concatenated output through the fully connected layers
out = self.layer_stack(out_concat)
# reshape the output into a matrix
out = out.view(-1, self.num_patches, self.num_patches)
# pass the output through the Sinkhorn layer multiple times to get a doubly stochastic matrix
for i in range(10):
out = self.sinkhorn_layer(out)
return out
# Initialise the model, loss, and optimiser
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = Model(num_patches=GRID_SIZE**2)
model.to(device)
# frobenious norm loss
class loss_frobenious(nn.Module):
def __init__(self):
super(loss_frobenious, self).__init__()
def forward(self, y_pred, y_true):
return torch.norm(y_pred - y_true, p='fro')
class loss_class(nn.Module):
def __init__(self):
super(loss_class, self).__init__()
def forward(self, y_pred, y_true):
l = 0
for row_pred, row_true in zip(y_pred, y_true):
l += nn.CrossEntropyLoss()(row_pred, row_true)
return l
loss = loss_class()
# define the optimizer
optimizer = optim.Adam(model.parameters(), lr=0.01) #Â Adam optimiser
# Training loop
for epoch in range(30):
print(f'Epoch {epoch + 1}')
model.train() # Switch model to training mode
train_loss = 0.0 # Initialize validation loss accumulator
for x_batch, y_batch in train_loader:
x_batch = x_batch.to(device)
y_batch = y_batch.to(device)
y_pred = model(x_batch) # Get the predictions on training batch
l = loss(y_pred, y_batch) # Calculate the loss
train_loss += l.item() # Accumulate the loss for the entire dataset
optimizer.zero_grad() # Clear previous gradients
l.backward() # Compute new gradients
optimizer.step() # Update the model weights
train_loss /= len(train_loader) # Calculate average training loss
# Validation loop
model.eval() # Switch model to evaluation mode (important for layers like dropout or batchnorm)
with torch.no_grad(): # Disable gradient calculation for efficiency
val_loss = 0.0
# Initialize validation loss accumulator
for x_val, y_val in val_loader:
x_val = x_val.to(device)
y_val = y_val.to(device)
y_pred_val = model(x_val)
l = loss(y_pred_val, y_val) # Calculate loss
val_loss += l.item() # Accumulate the loss for the entire dataset
val_loss /= len(val_loader) # Calculate average validation loss
print(f'Epoch {epoch + 1:3d}: Train Loss {train_loss:.4f} - Val Loss {val_loss:.4f}')
Epoch 1 Epoch 1: Train Loss 35.0503 - Val Loss 10.2086 Epoch 2 Epoch 2: Train Loss 32.3284 - Val Loss 10.4170 Epoch 3 Epoch 3: Train Loss 31.7873 - Val Loss 10.4511 Epoch 4 Epoch 4: Train Loss 31.3311 - Val Loss 10.5062 Epoch 5 Epoch 5: Train Loss 31.0125 - Val Loss 10.3400 Epoch 6 Epoch 6: Train Loss 30.5945 - Val Loss 10.2514 Epoch 7 Epoch 7: Train Loss 30.0216 - Val Loss 10.2141 Epoch 8 Epoch 8: Train Loss 29.8278 - Val Loss 10.7241 Epoch 9 Epoch 9: Train Loss 28.9653 - Val Loss 10.7352 Epoch 10 Epoch 10: Train Loss 28.7687 - Val Loss 10.5764 Epoch 11 Epoch 11: Train Loss 29.1978 - Val Loss 10.7008 Epoch 12 Epoch 12: Train Loss 29.7156 - Val Loss 10.7162 Epoch 13 Epoch 13: Train Loss 29.8121 - Val Loss 10.7034 Epoch 14 Epoch 14: Train Loss 29.7151 - Val Loss 10.6779 Epoch 15 Epoch 15: Train Loss 29.6557 - Val Loss 10.6616 Epoch 16 Epoch 16: Train Loss 29.6645 - Val Loss 10.8122 Epoch 17 Epoch 17: Train Loss 29.1903 - Val Loss 10.1473 Epoch 18 Epoch 18: Train Loss 29.4905 - Val Loss 10.0249 Epoch 19 Epoch 19: Train Loss 30.2286 - Val Loss 10.0267 Epoch 20 Epoch 20: Train Loss 30.1922 - Val Loss 10.0281 Epoch 21 Epoch 21: Train Loss 30.0436 - Val Loss 10.0295 Epoch 22 Epoch 22: Train Loss 30.1858 - Val Loss 10.0308 Epoch 23 Epoch 23: Train Loss 30.2854 - Val Loss 10.0320 Epoch 24 Epoch 24: Train Loss 30.2605 - Val Loss 10.0348 Epoch 25 Epoch 25: Train Loss 30.1548 - Val Loss 10.0435 Epoch 26 Epoch 26: Train Loss 30.3868 - Val Loss 10.2374 Epoch 27 Epoch 27: Train Loss 30.5202 - Val Loss 10.1327 Epoch 28 Epoch 28: Train Loss 30.5052 - Val Loss 10.1288 Epoch 29 Epoch 29: Train Loss 30.3886 - Val Loss 10.1247 Epoch 30 Epoch 30: Train Loss 30.3301 - Val Loss 10.1220
# performance on the training set
X = X_train
y = y_train
predictions = model(X)
# Displaying the original image, permuted image, and predicted image
for i in range(20):
fig, ax = plt.subplots(1, 3)
# original image
permuted = X_train[i]
permutation = y_train[i]
# reverse the permutation
permutation = torch.argmax(permutation, axis=1)
permutation = permutation.tolist()
reverse_permutation = [permutation.index(i) for i in range(GRID_SIZE**2)]
permuted_patches = permute_patches(permuted, reverse_permutation)
original = combine_tensors(permuted_patches, GRID_SIZE)
ax[0].imshow(tensor_to_image(original))
ax[0].set_title('Original Image')
ax[0].axis('off')
# permuted image
permuted_im = combine_tensors(X[i], GRID_SIZE)
ax[1].imshow(tensor_to_image(permuted_im))
ax[1].set_title('Permuted Image')
ax[1].axis('off')
# predicted image
prediction = predictions[i]
prediction = torch.argmax(prediction, axis=1).tolist()
permuted_patches = permute_patches(split_tensor(original.squeeze(0), GRID_SIZE), prediction)
predicted_im = combine_tensors(permuted_patches, GRID_SIZE)
ax[2].imshow(tensor_to_image(predicted_im))
ax[2].set_title('Predicted Image')
ax[2].axis('off')
# test the model on 10 random images with original, permuted and predicted images
X = []
y = []
indeces = np.random.choice(len(dataset['train']), 10, replace=False)
indeces = indeces.tolist()
for i in indeces:
im = dataset['train'][i]['image']
im = preprocess(GRID_SIZE)(im)
perm = compute_permutation(GRID_SIZE)
patches = split_tensor(im, GRID_SIZE)
permuted_patches = permute_patches(patches, perm)
# transform permuted_im into tensor
permuted_patches = torch.stack(permuted_patches)
perm_matrix = permutation_to_matrix(perm, GRID_SIZE)
X.append(permuted_patches)
y.append(perm_matrix)
# transform into tensors
X = torch.stack(X)
y = torch.stack(y)
predictions = model(X)
# Displaying the original image, permuted image, and predicted image
for i in range(10):
fig, ax = plt.subplots(1, 3)
# original image
original = dataset['train'][indeces[i]]['image']
original = preprocess(GRID_SIZE)(original)
original = tensor_to_image(original)
ax[0].imshow(original)
ax[0].set_title('Original Image')
ax[0].axis('off')
# permuted image
permutation = torch.argmax(y[i], axis=1).tolist()
permuted_patches = permute_patches(X[i], permutation)
permuted_im = combine_tensors(permuted_patches, GRID_SIZE)
ax[1].imshow(tensor_to_image(permuted_im))
ax[1].set_title('Permuted Image')
ax[1].axis('off')
# predicted image
prediction = predictions[i]
prediction = torch.argmax(prediction, axis=1).tolist()
permuted_patches = permute_patches(X[i], prediction)
predicted_im = combine_tensors(permuted_patches, GRID_SIZE)
ax[2].imshow(tensor_to_image(predicted_im))
ax[2].set_title('Predicted Image')
ax[2].axis('off')