Report¶

CNN + Greedy (ContourWatcher)¶

Part 0: Preliminaries¶

Import¶

In [ ]:
import torch                                   # main module
from torch import nn                           # we'll use neural networks...
import torch.optim as optim
from torch.optim import lr_scheduler

import numpy as np
import torchvision
from torchvision import datasets, models, transforms
import matplotlib.pyplot as plt
import time
import os
import copy

import cv2
import random
from torch.utils.data import DataLoader
c:\Users\lgand\anaconda3\envs\DL\Lib\site-packages\tqdm\auto.py:21: TqdmWarning: IProgress not found. Please update jupyter and ipywidgets. See https://ipywidgets.readthedocs.io/en/stable/user_install.html
  from .autonotebook import tqdm as notebook_tqdm
In [ ]:
device = "cuda" if torch.cuda.is_available() else "mps" if torch.backends.mps.is_available() else "cpu"
print(f"Using {device}")
Using cuda

Function that loads images, resizes them and splits them into different squares¶

In [ ]:
def process_and_divide(image, new_width, new_height, block_width, block_height, erase_borders = 0, shuffle = True):
    if image is None:
        raise Exception("none")
    new_dimensions = (new_width, new_height)
    # Resize the image
    resized_image = cv2.resize(image, new_dimensions, interpolation=cv2.INTER_AREA)
    image = resized_image

    # Divide l'immagine in quadrati
    squares = []
    dim_y = len(range(0, image.shape[0], block_height))
    dim_x = len(range(0, image.shape[1], block_width))
    for y in range(0, image.shape[0], block_height):
        for x in range(0, image.shape[1], block_width):
            square = image[y + erase_borders:y+block_height - erase_borders, x + erase_borders:x+block_width - erase_borders]
            squares.append(square)

    indices = np.arange(dim_x * dim_y)
    # Mescola gli indici
    if shuffle:
        np.random.shuffle(indices)

    # Applica l'ordine mescolato agli array x e y
    squares = np.array(squares)[indices]
    return squares, indices

example:

In [ ]:
image_path = r"C:\Users\lgand\Pictures\Barconchill\IMG_1640.jpeg"
image = cv2.imread(image_path)
squares, indices = process_and_divide(image, 600,600, 100,100, 0)

Part 1: CNN to check borders¶

First, we have to prepare the training and testing dataloaders. Our goal is to build a CNN which is able to predict whether two blocks are next to each others, therefore we cannot use directly the original paintings as as datapoints but we have to create pair of blocks.

Training set¶

We load the dataset

In [ ]:
from datasets import load_dataset

# Specifica la directory di cache
cache_directory = r"D:\Progetto ML"

# Carica il dataset specificando la directory di cache
dataset = load_dataset("huggan/wikiart", cache_dir=cache_directory)

Creazione di coppie: solo orizzontali: affiancate tutte, non affiancate : p (frazione)

For simplicity, we use only the first 1000 images in the dataset to train the model. For each of them, we consider all the correct horizontal combinations and a fraction p of the uncorrect ones. The elements of the dataloader will be 6x100 pixels images, where the 3x100 part on the left comes from one block, and the 3x100 part on the right comes from the second block.

In [ ]:
# SOLO ORIZZONTALI

n_images = 1000
p = 0.3
counter = 0
coppie = []
label = []
# proviamo con una larghezza di 3 pixel per quadrato
x_start = 98
x_end = 104
# e tutto la y
y_start = 0
y_end = 100

for single in dataset["train"]:
    print(counter)
    pil_image = single["image"]
    numpy_image = np.array(pil_image)
    cv2_image = cv2.cvtColor(numpy_image, cv2.COLOR_RGB2BGR)

    squares, indices = process_and_divide(cv2_image, 600,600, 100,100, 0, shuffle = False)

    # orizzontale
    for y in range(6):
        for x in range(5):
            roi = np.array(cv2.hconcat([squares[6*y + x], squares[6*y + x + 1]])[ y_start:y_end, x_start:x_end, :], dtype = np.int16)
            coppie.append(roi)
            label.append(1)

    for i in range(36):
        for j in range(36):
            if random.random() < p:
                if j != i and ( j!= i + 1 or (j + 1)% 6 == 0 ):
                    roi = np.array(cv2.hconcat([squares[i], squares[j]])[ y_start:y_end, x_start:x_end, :], dtype = np.int16)
                    coppie.append(roi)
                    label.append(0)

    # # verticale
    # for y in range(5):
    #     for x in range(6):
    #         coppie.append( cv2.hconcat([squares[6*y + x], squares[6*(y + 1) + x]]))
    #         label.append(1)

    # for i in range(36):
    #     for j in range(36):
    #         if j != i and  j!= i + 6 :
    #             coppie.append( cv2.hconcat([squares[i], squares[j]]))
    #             label.append(0)


    counter += 1
    if counter >= n_images:
        break


coppie = np.array(coppie)
#coppie = coppie / 255.0  # Normalizzazione
coppie = coppie.transpose(0,3,1,2)
label = np.array(label)
In [ ]:
coppie.shape
(1197997, 3, 100, 6)
In [ ]:
label.shape
(1197997,)

We have 400'000 datapoints, as said, each of them has 3 channels and has a dimension 6x100

From this datapoints we create the final dataloader

In [ ]:
# Convert to PyTorch tensors
X_train = torch.tensor(coppie, dtype=torch.int16)
y_train = torch.tensor(label, dtype=torch.long)

# Create a DataLoader for efficient training
train_dataset = torch.utils.data.TensorDataset(X_train, y_train)
train_loader = DataLoader(train_dataset, batch_size=128, shuffle=True)
In [ ]:
del coppie

Test set¶

We do the same for the test set. Here p is smaller = 0.1 and we consider only -- images.

In [ ]:
# SOLO ORIZZONTALI
p = 0.1

n_images = 300
counter = 0
coppie = []
label = []
x_start = 98
x_end = 104
y_start = 0
y_end = 100

testing = 0

for single in dataset["train"]:
    if testing <= 1000:         # inefficient, to be changed
        testing += 1
        continue
    print(counter)
    pil_image = single["image"]
    numpy_image = np.array(pil_image)
    cv2_image = cv2.cvtColor(numpy_image, cv2.COLOR_RGB2BGR)

    squares, indices = process_and_divide(cv2_image, 600,600, 100,100, 0, shuffle = False)

    # orizzontale
    for y in range(6):
        for x in range(5):
            roi = np.array(cv2.hconcat([squares[6*y + x], squares[6*y + x + 1]])[ y_start:y_end, x_start:x_end, :], dtype = np.int16)
            coppie.append(roi)
            label.append(1)

    for i in range(36):
        for j in range(36):
            if random.random() < p:
                if j != i and ( j!= i + 1 or (j + 1)% 6 == 0 ):
                    roi = np.array(cv2.hconcat([squares[i], squares[j]])[ y_start:y_end, x_start:x_end, :], dtype = np.int16)
                    coppie.append(roi)
                    label.append(0)



    counter += 1
    if counter >= n_images:
        break


coppie = np.array(coppie)
#coppie = coppie / 255.0  # Normalizzazione
coppie = coppie.transpose(0,3,1,2)
label = np.array(label)
In [ ]:
# Convert to PyTorch tensors
X_test = torch.tensor(coppie, dtype=torch.int16)
y_test = torch.tensor(label, dtype=torch.long)

# Create a DataLoader for efficient testing
test_dataset = torch.utils.data.TensorDataset(X_test, y_test)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=True)
In [ ]:
del coppie
del label
In [ ]:
X_test.shape
torch.Size([10721, 3, 100, 6])
In [ ]:
y_test.shape
torch.Size([10721])
In [ ]:
X_train.shape
torch.Size([399844, 3, 100, 6])

Definition of the model¶

Define the model to check if a square is to the left of another square

In [ ]:
class Conv_no_roi_small(nn.Module):
    def __init__(self):
        ## Invoke the parent constructor to set up out object
        super().__init__()
        ## We'll only need one (composite) attribute        
        self.conv = nn.Sequential(                   # input size: 6 x 100
            nn.Conv2d(3, 128, kernel_size=(5,6)),                   # output sizes: 256x 1 x 96
            nn.BatchNorm2d(128),                 # we need BatchNorm2d here since we're using images
            nn.ReLU(),
            nn.MaxPool2d(kernel_size= (4,1)),                        # output sizes: 256x1x24

            nn.Conv2d(128, 256, kernel_size= (3,1)),                  # output sizes: 512x1x20
            nn.BatchNorm2d(256),  
            nn.ReLU(),

            nn.AvgPool2d(kernel_size= (22,1)),     # output sizes: 512x1x1 (we are "summarizing" each channel into a single number)

            nn.Flatten())
        
        self.head = nn.Sequential(                   # input size: 8x100
            nn.Dropout(),                           # some dropout here forces redundancy in the features

            nn.Linear(256, 512),                   # a dense layer
            nn.BatchNorm1d(512),                   
            nn.ReLU(),

            nn.Dropout(),                           

            nn.Linear(512, 2),                  

            )

    def forward(self, x):
        # # Definire le coordinate della regione di interesse (ROI)
        # x_start = 98
        # x_end = 104
        # y_start = 0
        # y_end = 100

        # # Estrarre la ROI dall'immagine
        # roi = x[:, :, y_start:y_end, x_start:x_end]

        # normalization (fatta qui per risparmiare memoria prima)
        x = x/255.0

        ze = self.conv(x)
        logits = self.head(ze)
        return logits

Training and testing functions¶

In [ ]:
def train(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.train()               # training mode
    mean_loss = 0.0
    correct = 0
    for batch, (X,y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)
        pred = model(X)
        loss = loss_fn(pred, y)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        mean_loss += loss.item()
        correct += (pred.argmax(1) == y).type(torch.float).sum().item()

        # visualization
        if batch % 100 == 0:
            loss = loss.item()
            current = (batch + 1)* len(X)       # quanti ne hai fatti fino ad ora
            print(f"  [{current:>5d}/{size:>5d}]")
        
    ## Compute and print some data to monitor the training
    mean_loss /= num_batches
    correct /= size
    print(f"TRAINING - Accuracy: {(100 * correct):>5.1f}%, Avg loss: {loss:>7f}")
In [ ]:
def test(dataloader, model, loss_fn):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.eval()
    test_loss = 0.0
    correct = 0
    with torch.no_grad():
        for X,y in dataloader:
            X,y = X.to(device), y.to(device)
            pred = model(X)
            loss = loss_fn(pred, y).item()
            test_loss += loss
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()
    ## Compute and print some data to monitor the training
    test_loss /= num_batches
    correct /= size
    print(f"TESTING  - Accuracy: {(100 * correct):>5.1f}%, Avg loss: {test_loss:>8f}\n")
In [ ]:
# questa è per fare gradient clipping e schedule ,non utilizzata alla fine,
def train_new(dataloader, model, loss_fn, optimizer, device, max_norm=1.0, scheduler=None):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.train()  # Set the model to training mode
    mean_loss = 0.0
    correct = 0
    
    for batch, (X, y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)
        optimizer.zero_grad()
        pred = model(X)
        loss = loss_fn(pred, y)
        loss.backward()
        
        # Gradient clipping
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm)
        
        optimizer.step()
        mean_loss += loss.item()
        correct += (pred.argmax(1) == y).type(torch.float).sum().item()

        # Visualization
        if batch % 100 == 0:
            current = (batch + 1) * len(X)
            print(f"  [{current:>5d}/{size:>5d}]")
        
    # Compute and print some data to monitor the training
    mean_loss /= num_batches
    correct /= size
    print(f"TRAINING - Accuracy: {(100 * correct):>5.1f}%, Avg loss: {mean_loss:>7f}")
    
    # Learning rate scheduling
    if scheduler is not None:
        scheduler.step()

Create the model and train¶

In [ ]:
from torch.optim.lr_scheduler import StepLR
model = Conv_no_roi_small().to(device)
loss_fn = nn.CrossEntropyLoss()
#optimizer = torch.optim.SGD(model.parameters(), lr=1e-2)
optimizer = optim.Adam(model.parameters(), lr=1e-2)
scheduler = StepLR(optimizer, step_size=8, gamma=0.3)

Try with 1 epoch

In [ ]:
epochs = 1
for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------")
    train(train_loader, model, loss_fn, optimizer)
    test(test_loader, model, loss_fn)
print("Done!")
Epoch 1
-------------------
  [  128/399572]
  [12928/399572]
  [25728/399572]
  [38528/399572]
  [51328/399572]
  [64128/399572]
  [76928/399572]
  [89728/399572]
  [102528/399572]
  [115328/399572]
  [128128/399572]
  [140928/399572]
  [153728/399572]
  [166528/399572]
  [179328/399572]
  [192128/399572]
  [204928/399572]
  [217728/399572]
  [230528/399572]
  [243328/399572]
  [256128/399572]
  [268928/399572]
  [281728/399572]
  [294528/399572]
  [307328/399572]
  [320128/399572]
  [332928/399572]
  [345728/399572]
  [358528/399572]
  [371328/399572]
  [384128/399572]
  [396928/399572]
TRAINING - Accuracy:  95.6%, Avg loss: 0.026428
TESTING  - Accuracy:  93.4%, Avg loss: 0.167261

Done!
In [ ]:
torch.save(model, r"C:\Users\lgand\Documents\BAI\III anno\Deep learning\Progetto\modelli\v1.pth")

Switching to nesterov

In [ ]:
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3, nesterov= True, momentum = 0.8, dampening = 0)
In [ ]:
epochs = 1
for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------")
    train(train_loader, model, loss_fn, optimizer)
    test(test_loader, model, loss_fn)
print("Done!")
Epoch 1
-------------------
  [  128/399572]
  [12928/399572]
  [25728/399572]
  [38528/399572]
  [51328/399572]
  [64128/399572]
  [76928/399572]
  [89728/399572]
  [102528/399572]
  [115328/399572]
  [128128/399572]
  [140928/399572]
  [153728/399572]
  [166528/399572]
  [179328/399572]
  [192128/399572]
  [204928/399572]
  [217728/399572]
  [230528/399572]
  [243328/399572]
  [256128/399572]
  [268928/399572]
  [281728/399572]
  [294528/399572]
  [307328/399572]
  [320128/399572]
  [332928/399572]
  [345728/399572]
  [358528/399572]
  [371328/399572]
  [384128/399572]
  [396928/399572]
TRAINING - Accuracy:  97.0%, Avg loss: 0.107138
TESTING  - Accuracy:  94.6%, Avg loss: 0.127386

Done!
In [ ]:
torch.save(model, r"C:\Users\lgand\Documents\BAI\III anno\Deep learning\Progetto\modelli\v2.pth")
In [ ]:
epochs = 1
for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------")
    train(train_loader, model, loss_fn, optimizer)
    test(test_loader, model, loss_fn)
print("Done!")
Epoch 1
-------------------
  [  128/399572]
  [12928/399572]
  [25728/399572]
  [38528/399572]
  [51328/399572]
  [64128/399572]
  [76928/399572]
  [89728/399572]
  [102528/399572]
  [115328/399572]
  [128128/399572]
  [140928/399572]
  [153728/399572]
  [166528/399572]
  [179328/399572]
  [192128/399572]
  [204928/399572]
  [217728/399572]
  [230528/399572]
  [243328/399572]
  [256128/399572]
  [268928/399572]
  [281728/399572]
  [294528/399572]
  [307328/399572]
  [320128/399572]
  [332928/399572]
  [345728/399572]
  [358528/399572]
  [371328/399572]
  [384128/399572]
  [396928/399572]
TRAINING - Accuracy:  97.1%, Avg loss: 0.086171
TESTING  - Accuracy:  96.0%, Avg loss: 0.103920

Done!
In [ ]:
torch.save(model, r"C:\Users\lgand\Documents\BAI\III anno\Deep learning\Progetto\modelli\v3.pth")
In [ ]:
optimizer = torch.optim.SGD(model.parameters(), lr=2e-4, nesterov= True, momentum = 0.8, dampening = 0)
In [ ]:
epochs = 1
for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------")
    train(train_loader, model, loss_fn, optimizer)
    test(test_loader, model, loss_fn)
print("Done!")
Epoch 1
-------------------
  [  128/399572]
  [12928/399572]
  [25728/399572]
  [38528/399572]
  [51328/399572]
  [64128/399572]
  [76928/399572]
  [89728/399572]
  [102528/399572]
  [115328/399572]
  [128128/399572]
  [140928/399572]
  [153728/399572]
  [166528/399572]
  [179328/399572]
  [192128/399572]
  [204928/399572]
  [217728/399572]
  [230528/399572]
  [243328/399572]
  [256128/399572]
  [268928/399572]
  [281728/399572]
  [294528/399572]
  [307328/399572]
  [320128/399572]
  [332928/399572]
  [345728/399572]
  [358528/399572]
  [371328/399572]
  [384128/399572]
  [396928/399572]
TRAINING - Accuracy:  97.2%, Avg loss: 0.093918
TESTING  - Accuracy:  95.6%, Avg loss: 0.108165

Done!

Better before. For now we keep the CNN after the third epoch and we go on

In [ ]:
model = torch.load(r"C:\Users\lgand\Documents\BAI\III anno\Deep learning\Progetto\modelli\v3.pth")

Part 2: Image reconstruction (the Greedy algorithm)¶

We define the greedy algorithm to reconstruct the original image

In [ ]:
class Build_puzzle3():
    def __init__(self, model, squares_a, dim_x , dim_y, th = 0.90):
        self.model = model
        self.squares_a = squares_a
        self.dim_x = dim_x
        self.dim_y = dim_y
        self.n = dim_x * dim_y
        self.oth = th

        self.x_start = 98
        self.x_end = 104
        self.y_start = 0
        self.y_end = 100
        
    def initialize(self):
        """ 
        è separata dall'__init__ nel caso si volessero fare più run con la stessa immagine
        """
        self.av = {i for i in range(36)}
        self.visit_av = []
        self.alive = []
        self.positions = {}
        self.th = self.oth
        self.minth = self.oth
        self.mappa_max = {}


    def build(self):
        """ 
        Funzione principale, che bisogna chiamare per avviare l'algoritmo
        """
        # prendi il primo quadrato
        self.initialize()
        current = 0
        self.positions = {current : (0,0)}
        self.av.remove(current)
        self.alive.append(current)
        self.visit_av.append(current)
        self.xmax = 0
        self.xmin = 0
        self.ymax = 0
        self.ymin = 0

        while len(self.av) != 0:
            current = self.find_and_add(current)
            self.update_min_max(current)
        
        return self.transform_in_indices()
    

    def find_and_add(self, current):
            """ 
            La funzione che, a partire dal blocco current, trova il migliore blocco da attaccarci, e se lo score è maggiore di th, lo attacca
            """
            res = np.zeros((4,36))
            pos_av = [True for i in range(4)]
            pos_av[0] = self.check_av_pos((self.positions[current][0] + 1, self.positions[current][1])) 
            pos_av[1] = self.check_av_pos((self.positions[current][0] - 1, self.positions[current][1]))
            pos_av[2] = self.check_av_pos((self.positions[current][0] , self.positions[current][1] - 1))
            pos_av[3] = self.check_av_pos((self.positions[current][0] , self.positions[current][1] + 1))
            if not pos_av[0] and not pos_av[1] and not pos_av[2] and not pos_av[3]:
                self.alive.remove(current)
                self.visit_av.remove(current)
                return self.old_block()

            for i in self.av:
                if pos_av[0]:
                    res[0][i] = self.h_merge( self.squares_a[current], self.squares_a[i])
                if pos_av[1]:
                    res[1][i] = self.h_merge( self.squares_a[i], self.squares_a[current])
                if pos_av[2]:
                    res[2][i] = self.v_merge( self.squares_a[current], self.squares_a[i])
                if pos_av[3]:
                    res[3][i] = self.v_merge( self.squares_a[i], self.squares_a[current])
            val_max = np.array([np.max(res[0]), np.max(res[1]), np.max(res[2]), np.max(res[3])])
            ind_max = np.array([np.argmax(res[0]), np.argmax(res[1]), np.argmax(res[2]), np.argmax(res[3])])
            if np.max(val_max) < self.th:
                self.alive.remove(current)
                self.mappa_max[current] = (np.max(val_max), current)
                return self.old_block()
            self.mappa_max = {}
            self.alive = self.visit_av.copy()
            dir_max = np.argmax(val_max)
            new_block = ind_max[dir_max]
            self.th = self.oth

            if dir_max == 0:
                self.positions[new_block] = (self.positions[current][0] + 1, self.positions[current][1])
            elif dir_max == 1:
                self.positions[new_block] = (self.positions[current][0] - 1, self.positions[current][1])
            elif dir_max == 2:
                self.positions[new_block] = (self.positions[current][0] , self.positions[current][1] - 1)
            elif dir_max == 3:
                self.positions[new_block] = (self.positions[current][0] , self.positions[current][1] + 1)

            self.av.remove(new_block)
            self.alive.append(new_block)
            self.visit_av.append(new_block)
            return new_block


    def h_merge(self, sx, dx):
        """
        unisce due immagini orizzontalmente (estrae la roi) e la da in pasto al modello. Ritorna il risultato del modello
        """
        sx = [sx]
        dx = [dx]
        coppia = np.concatenate((sx,dx), axis = 3)
        roi = coppia[:, :, self.y_start:self.y_end, self.x_start:self.x_end]
        roi = torch.tensor(roi, dtype=torch.float32).to(device)
        logits = self.model(roi)
        return float(nn.Softmax(dim = 1)(logits)[0][1])
    

    def v_merge(self, sx, dx):
        """
        unisce due immagini verticalmente (estrae la roi) e la da in pasto al modello. Ritorna il risultato del modello
        """
        sx = [sx]
        dx = [dx]
        coppia = np.concatenate((sx,dx), axis = 2)
        coppia = coppia.transpose(0,1,3,2)
        roi = coppia[:, :, self.y_start:self.y_end, self.x_start:self.x_end]
        roi = torch.tensor(roi, dtype=torch.float32).to(device)
        logits = self.model(roi)
        return float(nn.Softmax(dim = 1)(logits)[0][1])


    def old_block(self):
        """
        quando lo score ottenuto è minore della soglia th --> prendere un altro blocco se esistono ancora dei blocchi alive oppure abbassare la soglia al valore
        massimo dei blocchi esistenti e con spazi liberi vicini (visit_av), e questo lo prende da mappa_max
        """
        if len(self.alive) == 0:
            temp = np.array([self.mappa_max[block][0] for block in self.mappa_max])
            blocks = [self.mappa_max[block][1] for block in self.mappa_max]
            self.th = np.max(temp)
            best_block = blocks[np.argmax(temp)]
            self.mappa_max = {}
            if self.th < self.minth:
                self.minth = self.th
            self.alive = self.visit_av.copy()
            return best_block
        return random.choices(self.alive)[0]

    
    def update_min_max(self, block):
        """ 
        x_max e x_min, y_max, y_min servono per scalare le posizioni alla fine, e per evitare si sfori il quadrato 6x6
        """
        if self.xmax < self.positions[block][0]:
            self.xmax = self.positions[block][0]
        if self.xmin > self.positions[block][0]:
            self.xmin = self.positions[block][0]
        if self.ymax < self.positions[block][1]:
            self.ymax = self.positions[block][1]
        if self.ymin > self.positions[block][1]:
            self.ymin = self.positions[block][1]


    def check_av_pos(self, pos):
        """ 
        Funzione che ritorna True se la posizione pos è libera e all'interno del quadrato 6x6
        """
        x = pos[0]
        y = pos[1]
        if np.abs(x - self.xmin) < self.dim_x and np.abs(x - self.xmax) < self.dim_x and np.abs(y - self.ymin) < self.dim_y and np.abs(self.ymax - y) < self.dim_y: 
            if pos not in self.positions.values():
                return True
            else:
                return False
        return False
    

    def transform_in_indices(self):
        """ 
        Funzione da usare alla fine per ottenere la permutazione degli indici a partire dalle posizioni 
        """
        result = []
        self.reversed_pos = {(px - self.xmin, py - self.ymax): key for key, (px, py) in self.positions.items()}
        for y in range(0, - self.dim_y, -1):
            for x in range(self.dim_x):
                result.append(self.reversed_pos[((x,y))] ) 
        return result

Testing the results on the test set¶

we define a function that reconstructs the image predicted

In [ ]:
def print_image(squares, result, dim_x, dim_y):
    squares_b = squares[result]
    prov = []
    for y in range(dim_y):
        prov.append(squares_b[dim_y * y])
        for x in range(1,dim_x):
            prov[y] = cv2.hconcat([prov[y], squares_b[dim_y * y + x]])
    final = prov[0]
    for y in range(1, dim_y):
        final = cv2.vconcat([final, prov[y]])
    return final

We load the first 200 images from the test set

In [ ]:
cv2_images = []
cc = 0
for single in dataset["train"]:
    if cc <= 1000:
        cc += 1
        continue
    if cc > 1200:
        break
    cc += 1
    pil_image = single["image"]
    numpy_image = np.array(pil_image)
    cv2_images.append(cv2.cvtColor(numpy_image, cv2.COLOR_RGB2BGR))

Changing here the index, we can select which image to use

In [ ]:
index = 0

Shuffle the blocks

In [ ]:
squares, indices = process_and_divide(cv2_images[index], 600, 600, 100, 100)
squares_a = np.array(squares)
squares_a = squares_a.transpose(0, 3, 1, 2)

Build the puzzle game and solve it

In [ ]:
builder = Build_puzzle3(model, squares_a, dim_x = 6, dim_y = 6, th = 0.9)
res = builder.build()

print the number of wrong indices:

In [ ]:
error = sum([indices[res[i]] != i for i in range(36)])
error
0

reconstruct in cv2 format predicted and initial image

In [ ]:
final = print_image(squares, res, 6,6)
initial = print_image(squares, [i for i in range(36)], 6,6)

Print the results! (press 0 to close)

In [ ]:
# Display the original and resized images
cv2.imshow('correct', cv2.resize(cv2_images[index], (600,600), interpolation=cv2.INTER_AREA))
cv2.imshow('initial', initial)
cv2.imshow('pred', final)
# Wait for a key press and close the windows
cv2.waitKey(0)
cv2.destroyAllWindows()

Accuracy in the test set¶

In [ ]:
risultato = []
errori = 0
for index in range(len(cv2_images)):
    squares, indices = process_and_divide(cv2_images[index], 600, 600, 100, 100)
    squares_a = np.array(squares)
    squares_a = squares_a.transpose(0, 3, 1, 2)
    builder = Build_puzzle3(model, squares_a, dim_x = 6, dim_y = 6, th = 0.9)
    res = builder.build()
    error = sum([indices[res[i]] != i for i in range(36)])
    print(index, error)
    risultato.append(error)
    final = print_image(squares, res, 6,6)
    initial = print_image(squares, [i for i in range(36)], 6,6)
    cv2.imwrite(r"C:\Users\lgand\Documents\BAI\III anno\Deep learning\Progetto\immagini\1000" + "\\" + str(index) + "p.jpeg", final)  
    cv2.imwrite(r"C:\Users\lgand\Documents\BAI\III anno\Deep learning\Progetto\immagini\1000" + "\\" + str(index) + "i.jpeg", initial)  
    cv2.imwrite(r"C:\Users\lgand\Documents\BAI\III anno\Deep learning\Progetto\immagini\1000" + "\\" + str(index) + "c.jpeg", cv2.resize(cv2_images[index], (600,600), interpolation=cv2.INTER_AREA)        )   
    if error > 0:
        errori += 1
0 0
1 0
2 0
3 0
4 0
5 0
6 0
7 35
8 0
9 0
10 0
11 6
12 0
13 0
14 0
15 0
16 0
17 0
18 0
19 0
20 0
21 0
22 0
23 36
24 0
25 0
26 0
27 0
28 0
29 0
30 0
31 0
32 0
33 0
34 0
35 3
36 0
37 36
38 0
39 0
40 0
41 4
42 0
43 0
44 0
45 0
46 0
47 0
48 0
49 0
50 0
51 0
52 0
53 0
54 0
55 0
56 0
57 0
58 0
59 0
60 0
61 0
62 0
63 0
64 0
65 0
66 26
67 0
68 0
69 0
70 0
71 0
72 0
73 0
74 0
75 0
76 35
77 0
78 36
79 36
80 0
81 0
82 0
83 0
84 6
85 0
86 0
87 0
88 0
89 0
90 0
91 0
92 0
93 0
94 0
95 36
96 0
97 0
98 0
99 0
100 0
101 0
102 0
103 0
104 36
105 0
106 0
107 0
108 0
109 0
110 0
111 0
112 2
113 0
114 0
115 0
116 0
117 0
118 0
119 0
120 0
121 0
122 0
123 0
124 3
125 0
126 0
127 0
128 0
129 0
130 0
131 0
132 0
133 0
134 9
135 0
136 0
137 0
138 0
139 0
140 0
141 0
142 0
143 0
144 0
145 0
146 0
147 0
148 13
149 0
150 0
151 0
152 0
153 0
154 0
155 0
156 0
157 0
158 4
159 0
160 0
161 0
162 0
163 0
164 0
165 0
166 0
167 0
168 0
169 0
170 0
171 0
172 0
173 0
174 0
175 0
176 34
177 0
178 0
179 0
180 0
181 0
182 0
183 0
184 0
185 0
186 35
187 8
188 0
189 0
190 0
191 0
192 0
193 0
194 0
195 0
196 0
197 0
198 0
199 0
In [ ]:
errori
21

We ordered wrongly 21 images over 200

In [ ]:
errori/200
0.105

Load your own paintings¶

now you can select any painting from the hard disk

In [ ]:
cv2_image = cv2.imread(r"C:\Users\lgand\Documents\BAI\III anno\Deep learning\Progetto\immagini\pablo-picasso-ritratto-di-ambrose-vollard.webp")
In [ ]:
cv2_image.shape
(1088, 746, 3)
In [ ]:
squares, indices = process_and_divide(cv2_image, 600, 600, 100, 100)
In [ ]:
squares_a = np.array(squares)
squares_a = squares_a.transpose(0, 3, 1, 2)
In [ ]:
builder = Build_puzzle3(model, squares_a, dim_x = 6, dim_y = 6)
res = builder.build()
In [ ]:
final = print_image(squares, res, 6,6)
initial = print_image(squares, [i for i in range(36)], 6,6)

Print the results! (press 0 to close)

In [ ]:
# Display the original and resized images
cv2.imshow('correct', cv2.resize(cv2_image, (600,600), interpolation=cv2.INTER_AREA))
cv2.imshow('initial', initial)
cv2.imshow('pred', final)
# Wait for a key press and close the windows
cv2.waitKey(0)
cv2.destroyAllWindows()

Part 3: feature extraction with AlexNet plus MLP head¶

Inspired by previous literature, we decided to implement a model that works in the following way:

Dataset

Define grid size

  1. Images are split into patches of size 227x227 following a square grid with a certain side (called grid_size), so that we have grid_size $^2$ patches. In our case we focus on a grid size of 2, which means that the image will be divided into 4 patches.
  2. A permutation is generated in the format of a permutation matrix
  3. Patches are permuted following the permutation

A given element of the dataset is then composed as follows:

data: list of permuted patches

label: permutation

Model

  1. Implement pretrained AlexNet up to fc6 on each patch separately to extract features
  2. Concatenate outputs of all patches into a single tensor
  3. Pass output through a MLP that outputs a vector of size that matches the number of patches squared
  4. This is then transformed into a matrix of the same size as the permutation matrix, which is passed into a Sinkhorn layer to transform it into a doubly stochastic matrix

The output is then interpreted as a permutation matrix that can be compared to the true one.

In [ ]:
from datasets import load_dataset
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
import numpy as np
from torchvision import datasets, models, transforms
from torchvision.transforms import ToTensor
from torch.utils.data import DataLoader
import torchvision.models as models
import torchvision.datasets as datasets
from PIL import Image
import itertools
import random
import os
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
In [ ]:
# Loading the dataset
cache_directory = r"/Users/gabri/Desktop/wikiart"
dataset = load_dataset("huggan/wikiart", cache_dir=cache_directory)

We preprocess the images and transform them into squares of size multiple to 227, in particular such that every patch alone will have a shape of 227*227 alone, since that is the input size required by AlexNet

In [ ]:
# Preprocessing function for the images

def preprocess(grid_size):
    return transforms.Compose([
    transforms.Resize(227*grid_size),
    transforms.CenterCrop(227*grid_size),
    transforms.ToTensor()
])
In [ ]:
# first image before and after preprocessing

GRID_SIZE = 2
im = dataset['train'][0]['image']
preprocessed = preprocess(GRID_SIZE)(im)

fig, ax = plt.subplots(1, 2)
ax[0].imshow(im)
ax[0].set_title('Original Image')
ax[0].axis('off')
ax[1].imshow(preprocessed.permute(1, 2, 0))
ax[1].set_title('Preprocessed Image')
ax[1].axis('off')
plt.show()
No description has been provided for this image
In [ ]:
# function to divide images (tensor) into square patches of size 227x227

def split_tensor(tensor, grid_size):
    patches = []
    channels, width, height = tensor.size()
    patch_size = width // grid_size
    for i, j in itertools.product(range(grid_size), range(grid_size)):
        left = i * patch_size
        upper = j * patch_size
        right = left + patch_size
        lower = upper + patch_size
        patch = tensor[:, left:right, upper:lower]
        patches.append(patch)
    return patches

# function to combine patches into a single tensor

def combine_tensors(patches, grid_size):
    patch_size = patches[0].size(2)
    width = height = patch_size * grid_size
    tensor = torch.zeros((1, 3, width, height))
    for i, j in itertools.product(range(grid_size), range(grid_size)):
        patch = patches[i * grid_size + j]
        left = i * patch_size
        upper = j * patch_size
        tensor[:, :, left:left+patch_size, upper:upper+patch_size] = patch
    return tensor

# function to turn a tensor into a PIL image

def tensor_to_image(tensor):
    tensor = tensor.squeeze(0)
    tensor = tensor.permute(1, 2, 0)
    tensor = tensor.numpy()
    tensor = np.clip(tensor, 0, 1)
    return Image.fromarray((tensor * 255).astype(np.uint8))

# function to turn a PIL image into a tensor

def image_to_tensor(image):
    tensor = ToTensor()(image)
    tensor = tensor.unsqueeze(0)
    return tensor
In [ ]:
# Splitting and combining the image

patches = split_tensor(preprocessed, GRID_SIZE)
reconstructed = combine_tensors(patches, GRID_SIZE)

# Displaying the original image, patches, and reconstructed image

fig, ax = plt.subplots(1, 6, figsize=(15, 3))
ax[0].imshow(preprocessed.permute(1, 2, 0))
ax[0].set_title('Original Image')
ax[0].axis('off')
for i in range(1, 5):
    ax[i].imshow(tensor_to_image(patches[i-1]))
    ax[i].set_title(f'Patch {i}')
    ax[i].axis('off')
ax[5].imshow(tensor_to_image(reconstructed))
ax[5].set_title('Reconstructed Image')
ax[5].axis('off')
plt.show()
No description has been provided for this image
In [ ]:
# Function to compute a permutation of the patches

def compute_permutation(grid_size):
    permutation = list(range(grid_size**2))
    random.shuffle(permutation)
    return permutation

# Function to permute the patches

def permute_patches(patches, permutation):
    return [patches[i] for i in permutation]

def permutation_to_matrix(permutation, grid_size):
    matrix = torch.zeros((grid_size**2, grid_size**2))
    for i, j in enumerate(permutation):
        matrix[i, j] = 1
    return matrix
In [ ]:
# Permuting the patches and reconstructing the image

permutation = compute_permutation(GRID_SIZE)
permuted_patches = permute_patches(patches, permutation)
permuted_image = combine_tensors(permuted_patches, GRID_SIZE)

# Displaying the original image and the permuted image

fig, ax = plt.subplots(1, 2)
ax[0].imshow(preprocessed.permute(1, 2, 0))
ax[0].set_title('Original Image')
ax[0].axis('off')
ax[1].imshow(tensor_to_image(permuted_image))
ax[1].set_title('Permuted Image')
ax[1].axis('off')
plt.show()
No description has been provided for this image
In [ ]:
# testing with more patches

GRID_SIZE = 8
im = dataset['train'][0]['image']
preprocessed = preprocess(GRID_SIZE)(im)
patches = split_tensor(preprocessed, GRID_SIZE)
reconstructed = combine_tensors(patches, GRID_SIZE)
permutation = compute_permutation(GRID_SIZE)
permuted_patches = permute_patches(patches, permutation)
permuted_image = combine_tensors(permuted_patches, GRID_SIZE)

# Displaying the original image and the permuted image

fig, ax = plt.subplots(1, 2)
ax[0].imshow(preprocessed.permute(1, 2, 0))
ax[0].set_title('Original Image')
ax[0].axis('off')
ax[1].imshow(tensor_to_image(permuted_image))
ax[1].set_title('Permuted Image')
ax[1].axis('off')
plt.show()
No description has been provided for this image
In [ ]:
# Transforming the data and splitting it into training and validation sets

X = []
y = []

GRID_SIZE = 2
TRAIN_SIZE = 100
TEST_SIZE = TRAIN_SIZE // 4

indeces = np.random.choice(len(dataset['train']), TRAIN_SIZE+TEST_SIZE, replace=False)
indeces = indeces.tolist()

for i in indeces:
    im = dataset['train'][i]['image']
    im = preprocess(GRID_SIZE)(im)
    perm = compute_permutation(GRID_SIZE)
    patches = split_tensor(im, GRID_SIZE)
    permuted_patches = permute_patches(patches, perm)
    # transform permuted_im into tensor
    permuted_patches = torch.stack(permuted_patches)
    perm_matrix = permutation_to_matrix(perm, GRID_SIZE)
    X.append(permuted_patches)
    y.append(perm_matrix)

# transform into tensors

X = torch.stack(X)
y = torch.stack(y)

# Split the dataset into training and validation sets (e.g., 80% train, 20% validation)
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42)

# Create Datasets and DataLoaders
train_dataset = torch.utils.data.TensorDataset(X_train, y_train)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

val_dataset = torch.utils.data.TensorDataset(X_val, y_val)
val_loader  = DataLoader(val_dataset, batch_size=32, shuffle=False)  # No shuffling for validation
In [ ]:
# Defining the model

# Load the pretrained AlexNet model and remove the last layers
alexnet_model = models.alexnet(pretrained=True)
alexnet_model.classifier = alexnet_model.classifier[:3]

# freeze the weights of the model
for param in alexnet_model.parameters():
    param.requires_grad = False

# Define the Sinkhorn layer for transforming the matrix into a doubly stochastic matrix
def sinkhorn_function(X):
    # add a small value to avoid nan values
    X = X + 1e-6
    D1 = torch.diag(1. / torch.sum(X, axis=1))
    D2 = torch.diag(1. / torch.sum(torch.matmul(D1, X), axis=0))
    X = torch.matmul(D1, X)
    X = torch.matmul(X, D2)
    return X

# Define the complete model
class Model(nn.Module):
    def __init__(self, num_patches):
        super().__init__()
        self.alex_net = alexnet_model
        self.layer_stack = nn.Sequential(nn.Linear(in_features = 4096*num_patches, out_features=4096),
                                         nn.ReLU(inplace=True),
                                         nn.Linear(in_features=4096, out_features=num_patches**2),
                                         nn.ReLU(inplace=True))
        self.sinkhorn_layer = torch.func.vmap(sinkhorn_function)
        self.num_patches = num_patches

    def forward(self, batch):
        out_concat = []

        # pass each patch through the AlexNet model
        for x in batch:
            a = self.alex_net(x)
            a = torch.reshape(a, (-1,))
            out_concat.append(a)

        
        # concatenate the outputs
        out_concat = torch.stack(out_concat)
        
        # pass the concatenated output through the fully connected layers
        out = self.layer_stack(out_concat)
        
        # reshape the output into a matrix
        out = out.view(-1, self.num_patches, self.num_patches)

        # pass the output through the Sinkhorn layer multiple times to get a doubly stochastic matrix
        for i in range(10):
            out = self.sinkhorn_layer(out)
    
        return out
/Users/gabri/opt/anaconda3/envs/envai/lib/python3.11/site-packages/torchvision/models/_utils.py:208: UserWarning: The parameter 'pretrained' is deprecated since 0.13 and may be removed in the future, please use 'weights' instead.
  warnings.warn(
/Users/gabri/opt/anaconda3/envs/envai/lib/python3.11/site-packages/torchvision/models/_utils.py:223: UserWarning: Arguments other than a weight enum or `None` for 'weights' are deprecated since 0.13 and may be removed in the future. The current behavior is equivalent to passing `weights=AlexNet_Weights.IMAGENET1K_V1`. You can also use `weights=AlexNet_Weights.DEFAULT` to get the most up-to-date weights.
  warnings.warn(msg)

The main challenge we encountered when creating the model was defining a proper loss. The issue here is that we have to compare two matrices that are doubly stochastic, with values summing up to n for an nxn matrix.

We tried implementing different loss functions, here we display the most promising ones:

  • Computing the Frobenious norm between the two matrices, which consists in the euclidean norm of the vector of differences between all entries in the matrices
  • Computing the cross entropy on all n rows of the matrices, which can be interpreted as n different classification problems done at the same time. In this framework, we can see that each row represents the vector of probabilities of the positions of a patch inside the grid
In [ ]:
# Initialise the model, loss, and optimiser

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = Model(num_patches=GRID_SIZE**2)
model.to(device)

# frobenious norm loss

class loss_frobenious(nn.Module):
    def __init__(self):
        super(loss_frobenious, self).__init__()
    
    def forward(self, y_pred, y_true):
        return torch.norm(y_pred - y_true, p='fro')


class loss_class(nn.Module):
    def __init__(self):
        super(loss_class, self).__init__()
    
    def forward(self, y_pred, y_true):
        l = 0
        for row_pred, row_true in zip(y_pred, y_true):
            l += nn.CrossEntropyLoss()(row_pred, row_true)
        return l
    
loss = loss_class()

# define the optimizer
optimizer = optim.Adam(model.parameters(), lr=0.01) # Adam optimiser
In [ ]:
# Training loop
for epoch in range(50):
    print(f'Epoch {epoch + 1}')
    model.train()  # Switch model to training mode
    train_loss = 0.0  # Initialize validation loss accumulator
    
    for x_batch, y_batch in train_loader:
        x_batch = x_batch.to(device)
        y_batch = y_batch.to(device)
        y_pred = model(x_batch) # Get the predictions on training batch
        
        l = loss(y_pred, y_batch)  # Calculate the loss
        
        train_loss += l.item()  # Accumulate the loss for the entire dataset
        optimizer.zero_grad()  # Clear previous gradients
        l.backward()        # Compute new gradients
        optimizer.step()       # Update the model weights

    train_loss /= len(train_loader)  # Calculate average training loss
    
    # Validation loop
    model.eval()  # Switch model to evaluation mode (important for layers like dropout or batchnorm)
    with torch.no_grad():  # Disable gradient calculation for efficiency
        
        val_loss = 0.0 
            # Initialize validation loss accumulator
        for x_val, y_val in val_loader:
            x_val = x_val.to(device)
            y_val = y_val.to(device)
            y_pred_val = model(x_val)
            l = loss(y_pred_val, y_val)  # Calculate loss
            val_loss += l.item()  # Accumulate the loss for the entire dataset

        val_loss /= len(val_loader)  # Calculate average validation loss
        print(f'Epoch {epoch + 1:3d}: Train Loss {train_loss:.4f} - Val Loss {val_loss:.4f}')

# save model

torch.save(model.state_dict(), '/Users/gabri/Desktop/models/model_100.pth')
Epoch 1
Epoch   1: Train Loss 35.8048 - Val Loss 34.9909
Epoch 2
Epoch   2: Train Loss 35.7997 - Val Loss 34.9079
Epoch 3
Epoch   3: Train Loss 35.7565 - Val Loss 34.8952
Epoch 4
Epoch   4: Train Loss 35.7255 - Val Loss 34.9231
Epoch 5
Epoch   5: Train Loss 35.7041 - Val Loss 34.9180
Epoch 6
Epoch   6: Train Loss 35.6659 - Val Loss 34.8592
Epoch 7
Epoch   7: Train Loss 35.6193 - Val Loss 34.7927
Epoch 8
Epoch   8: Train Loss 35.5460 - Val Loss 34.7528
Epoch 9
Epoch   9: Train Loss 35.4488 - Val Loss 34.6879
Epoch 10
Epoch  10: Train Loss 35.2695 - Val Loss 34.8859
Epoch 11
Epoch  11: Train Loss 34.7937 - Val Loss 35.5742
Epoch 12
Epoch  12: Train Loss 34.3729 - Val Loss 33.9521
Epoch 13
Epoch  13: Train Loss 34.7685 - Val Loss 33.9371
Epoch 14
Epoch  14: Train Loss 34.8844 - Val Loss 34.1512
Epoch 15
Epoch  15: Train Loss 34.3006 - Val Loss 35.0351
Epoch 16
Epoch  16: Train Loss 34.4229 - Val Loss 35.7175
Epoch 17
Epoch  17: Train Loss 33.9915 - Val Loss 34.3384
Epoch 18
Epoch  18: Train Loss 33.5976 - Val Loss 34.0753
Epoch 19
Epoch  19: Train Loss 34.8324 - Val Loss 34.2907
Epoch 20
Epoch  20: Train Loss 34.3832 - Val Loss 34.4982
Epoch 21
Epoch  21: Train Loss 34.2901 - Val Loss 34.3765
Epoch 22
Epoch  22: Train Loss 34.3071 - Val Loss 34.3259
Epoch 23
Epoch  23: Train Loss 34.4470 - Val Loss 34.0470
Epoch 24
Epoch  24: Train Loss 34.3334 - Val Loss 34.1564
Epoch 25
Epoch  25: Train Loss 34.0980 - Val Loss 34.4980
Epoch 26
Epoch  26: Train Loss 34.1584 - Val Loss 34.7253
Epoch 27
Epoch  27: Train Loss 34.5762 - Val Loss 34.6020
Epoch 28
Epoch  28: Train Loss 34.6798 - Val Loss 34.6020
Epoch 29
Epoch  29: Train Loss 34.5875 - Val Loss 34.6020
Epoch 30
Epoch  30: Train Loss 34.5876 - Val Loss 34.6020
Epoch 31
Epoch  31: Train Loss 34.5864 - Val Loss 34.6020
Epoch 32
Epoch  32: Train Loss 34.5849 - Val Loss 34.6020
Epoch 33
Epoch  33: Train Loss 34.5829 - Val Loss 34.6020
Epoch 34
Epoch  34: Train Loss 34.4841 - Val Loss 36.2076
Epoch 35
Epoch  35: Train Loss 35.0643 - Val Loss 36.9980
Epoch 36
Epoch  36: Train Loss 35.4282 - Val Loss 36.9749
Epoch 37
Epoch  37: Train Loss 35.4509 - Val Loss 36.9412
Epoch 38
Epoch  38: Train Loss 35.4368 - Val Loss 36.9194
Epoch 39
Epoch  39: Train Loss 35.4274 - Val Loss 36.9066
Epoch 40
Epoch  40: Train Loss 35.4213 - Val Loss 36.8961
Epoch 41
Epoch  41: Train Loss 35.4161 - Val Loss 36.8886
Epoch 42
Epoch  42: Train Loss 35.4112 - Val Loss 36.8850
Epoch 43
Epoch  43: Train Loss 35.4057 - Val Loss 36.8855
Epoch 44
Epoch  44: Train Loss 35.4034 - Val Loss 36.8863
Epoch 45
Epoch  45: Train Loss 35.3975 - Val Loss 36.8867
Epoch 46
Epoch  46: Train Loss 35.3922 - Val Loss 36.8881
Epoch 47
Epoch  47: Train Loss 35.3893 - Val Loss 36.8925
Epoch 48
Epoch  48: Train Loss 35.3354 - Val Loss 36.8918
Epoch 49
Epoch  49: Train Loss 35.3321 - Val Loss 36.8930
Epoch 50
Epoch  50: Train Loss 35.3771 - Val Loss 36.8949
In [ ]:
# performance on the training set

X = X_train
y = y_train

predictions = model(X)

# Displaying the original image, permuted image, and predicted image

for i in range(20):
    fig, ax = plt.subplots(1, 3)
    # original image
    permuted = X_train[i]
    permutation = y_train[i]
    # reverse the permutation
    permutation = torch.argmax(permutation, axis=1)
    permutation = permutation.tolist()
    reverse_permutation = [permutation.index(i) for i in range(GRID_SIZE**2)]
    permuted_patches = permute_patches(permuted, reverse_permutation)

    original = combine_tensors(permuted_patches, GRID_SIZE)
    ax[0].imshow(tensor_to_image(original))
    ax[0].set_title('Original Image')
    ax[0].axis('off')
    # permuted image
   
    permuted_im = combine_tensors(X[i], GRID_SIZE)
    ax[1].imshow(tensor_to_image(permuted_im))
    ax[1].set_title('Permuted Image')
    ax[1].axis('off')

    # predicted image
    prediction = predictions[i]
    prediction = torch.argmax(prediction, axis=1).tolist()

    permuted_patches = permute_patches(split_tensor(original.squeeze(0), GRID_SIZE), prediction)
    predicted_im = combine_tensors(permuted_patches, GRID_SIZE)
    ax[2].imshow(tensor_to_image(predicted_im))
    ax[2].set_title('Predicted Image')
    ax[2].axis('off')
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image

Note: in the predicted image not all patches are always present, since when transforming the matrix of probabilities into an actual permutation it is possible for one patch to be the most probable in more than one position. This is of course an unintended scenario, but we decided to not further complicate the code inside the model, since we expect that with a successful training this issue would resolve itself.

Example of doubly stochastic matrix where the first item is the most probable in more than one row:

[0.5, 0.1, 0.1, 0.3]

[0.5, 0.1, 0.1, 0.3]

[0.0, 0.8, 0.0, 0.2]

[0.0, 0.0, 0.8, 0.2]

We tried different combinations of losses, number of epochs, training set sizes and model structures but we were not able to train a proper model following this approach. There could be various issues leading to this. It could be that the model was not big enough to extrapolate all the relevant information for properly classifying the images. It could also be that the training steps required to reach good scores is way too large for our capabilities and thus we didn't reach promising results. Another issue could be in the definition of our loss functions or even in the size of the MLP layers. One other approach could be changing the dataset format, even though we believe that giving the permutation matrix as a target label was the best choice we could make, since it can be interpreted as a one-hot encoded version of the position of each patch.

We believe that better results could be achieved with a more extensive training and more experimentation on the model parameters and structure. However, due to time constraints, we were not able to push our research far enough to reach our intended goal.

In [ ]:
# test the model on 10 random images with original, permuted and predicted images

X = []
y = []

indeces = np.random.choice(len(dataset['train']), 10, replace=False)
indeces = indeces.tolist()

for i in indeces:
    im = dataset['train'][i]['image']
    im = preprocess(GRID_SIZE)(im)
    perm = compute_permutation(GRID_SIZE)
    patches = split_tensor(im, GRID_SIZE)
    permuted_patches = permute_patches(patches, perm)
    # transform permuted_im into tensor
    permuted_patches = torch.stack(permuted_patches)
    perm_matrix = permutation_to_matrix(perm, GRID_SIZE)
    X.append(permuted_patches)
    y.append(perm_matrix)

# transform into tensors

X = torch.stack(X)
y = torch.stack(y)

predictions = model(X)

# Displaying the original image, permuted image, and predicted image

for i in range(10):
    fig, ax = plt.subplots(1, 3)
    # original image
    original = dataset['train'][indeces[i]]['image']
    original = preprocess(GRID_SIZE)(original)
    original = tensor_to_image(original)    
    ax[0].imshow(original)
    ax[0].set_title('Original Image')
    ax[0].axis('off')
    # permuted image
    permutation = torch.argmax(y[i], axis=1).tolist()
    permuted_patches = permute_patches(X[i], permutation)
    permuted_im = combine_tensors(permuted_patches, GRID_SIZE)
    ax[1].imshow(tensor_to_image(permuted_im))
    ax[1].set_title('Permuted Image')
    ax[1].axis('off')

    # predicted image
    prediction = predictions[i]
    prediction = torch.argmax(prediction, axis=1).tolist()
    permuted_patches = permute_patches(X[i], prediction)
    predicted_im = combine_tensors(permuted_patches, GRID_SIZE)
    ax[2].imshow(tensor_to_image(predicted_im))
    ax[2].set_title('Predicted Image')
    ax[2].axis('off')
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image

As expected, with images never seen before the performance did not increase.

Part 4: feature extraction with transformer plus MLP head¶

What follows is a modification of the previous model, in which we decided to swap the AlexNet model for feature extraction with an encoder-only Transformer model. Unfortunately, even in this case the results were not satisfactory and the considerations made in the previous part apply also for what follows.

In [ ]:
# Preprocessing function for the images

def preprocess(grid_size):
    return transforms.Compose([
    transforms.Resize(224*grid_size),
    transforms.CenterCrop(224*grid_size),
    transforms.ToTensor()
])
In [ ]:
# Transforming the data and splitting it into training and validation sets

X = []
y = []

GRID_SIZE = 2
TRAIN_SIZE = 100
TEST_SIZE = TRAIN_SIZE // 4

indeces = np.random.choice(len(dataset['train']), TRAIN_SIZE+TEST_SIZE, replace=False)
indeces = indeces.tolist()

for i in indeces:
    im = dataset['train'][i]['image']
    im = preprocess(GRID_SIZE)(im)
    perm = compute_permutation(GRID_SIZE)
    patches = split_tensor(im, GRID_SIZE)
    permuted_patches = permute_patches(patches, perm)
    # transform permuted_im into tensor
    permuted_patches = torch.stack(permuted_patches)
    perm_matrix = permutation_to_matrix(perm, GRID_SIZE)
    X.append(permuted_patches)
    y.append(perm_matrix)

# transform into tensors

X = torch.stack(X)
y = torch.stack(y)

# Split the dataset into training and validation sets (e.g., 80% train, 20% validation)
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42)

# Create Datasets and DataLoaders
train_dataset = torch.utils.data.TensorDataset(X_train, y_train)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

val_dataset = torch.utils.data.TensorDataset(X_val, y_val)
val_loader  = DataLoader(val_dataset, batch_size=32, shuffle=False)  # No shuffling for validation
In [ ]:
# Defining the model

from transformers import ViTModel

# Load the pre-trained Vision Transformer model
model_name = "google/vit-base-patch16-224-in21k"
vit_model = ViTModel.from_pretrained(model_name)

# freeze the weights of the model
for param in vit_model.parameters():
    param.requires_grad = False

# Define the Sinkhorn layer for transforming the matrix into a doubly stochastic matrix
def sinkhorn_function(X):
    # add a small value to avoid nan values
    X = X + 1e-6
    D1 = torch.diag(1. / torch.sum(X, axis=1))
    D2 = torch.diag(1. / torch.sum(torch.matmul(D1, X), axis=0))
    X = torch.matmul(D1, X)
    X = torch.matmul(X, D2)
    return X

# Define the complete model
class Model(nn.Module):
    def __init__(self, num_patches):
        super().__init__()
        self.encoder = vit_model
        self.layer_stack = nn.Sequential(
            nn.Linear(768*num_patches, 4096),
            nn.ReLU(),
            nn.Linear(4096, 16),
            nn.ReLU(inplace=True)
        )
        self.sinkhorn_layer = torch.func.vmap(sinkhorn_function)
        self.num_patches = num_patches

    def forward(self, batch):
        out_concat = []

        # pass each patch through the transformer
        for x in batch:
            a = self.encoder(x)
            a = a.last_hidden_state[:, 0, :]
            a = torch.reshape(a, (-1,))
            out_concat.append(a)

        
        # concatenate the outputs
        out_concat = torch.stack(out_concat)
        
        # pass the concatenated output through the fully connected layers
        out = self.layer_stack(out_concat)
        
        # reshape the output into a matrix
        out = out.view(-1, self.num_patches, self.num_patches)

        # pass the output through the Sinkhorn layer multiple times to get a doubly stochastic matrix
        for i in range(10):
            out = self.sinkhorn_layer(out)
    
        return out
In [ ]:
# Initialise the model, loss, and optimiser

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = Model(num_patches=GRID_SIZE**2)
model.to(device)

# frobenious norm loss

class loss_frobenious(nn.Module):
    def __init__(self):
        super(loss_frobenious, self).__init__()
    
    def forward(self, y_pred, y_true):
        return torch.norm(y_pred - y_true, p='fro')


class loss_class(nn.Module):
    def __init__(self):
        super(loss_class, self).__init__()
    
    def forward(self, y_pred, y_true):
        l = 0
        for row_pred, row_true in zip(y_pred, y_true):
            l += nn.CrossEntropyLoss()(row_pred, row_true)
        return l
    
loss = loss_class()

# define the optimizer
optimizer = optim.Adam(model.parameters(), lr=0.01) # Adam optimiser
In [ ]:
# Training loop
for epoch in range(30):
    print(f'Epoch {epoch + 1}')
    model.train()  # Switch model to training mode
    train_loss = 0.0  # Initialize validation loss accumulator
    
    for x_batch, y_batch in train_loader:
        x_batch = x_batch.to(device)
        y_batch = y_batch.to(device)
        y_pred = model(x_batch) # Get the predictions on training batch
        
        l = loss(y_pred, y_batch)  # Calculate the loss
        
        train_loss += l.item()  # Accumulate the loss for the entire dataset
        optimizer.zero_grad()  # Clear previous gradients
        l.backward()        # Compute new gradients
        optimizer.step()       # Update the model weights

    train_loss /= len(train_loader)  # Calculate average training loss
    
    # Validation loop
    model.eval()  # Switch model to evaluation mode (important for layers like dropout or batchnorm)
    with torch.no_grad():  # Disable gradient calculation for efficiency
        
        val_loss = 0.0 
            # Initialize validation loss accumulator
        for x_val, y_val in val_loader:
            x_val = x_val.to(device)
            y_val = y_val.to(device)
            y_pred_val = model(x_val)
            l = loss(y_pred_val, y_val)  # Calculate loss
            val_loss += l.item()  # Accumulate the loss for the entire dataset

        val_loss /= len(val_loader)  # Calculate average validation loss
        print(f'Epoch {epoch + 1:3d}: Train Loss {train_loss:.4f} - Val Loss {val_loss:.4f}')
Epoch 1
Epoch   1: Train Loss 35.0503 - Val Loss 10.2086
Epoch 2
Epoch   2: Train Loss 32.3284 - Val Loss 10.4170
Epoch 3
Epoch   3: Train Loss 31.7873 - Val Loss 10.4511
Epoch 4
Epoch   4: Train Loss 31.3311 - Val Loss 10.5062
Epoch 5
Epoch   5: Train Loss 31.0125 - Val Loss 10.3400
Epoch 6
Epoch   6: Train Loss 30.5945 - Val Loss 10.2514
Epoch 7
Epoch   7: Train Loss 30.0216 - Val Loss 10.2141
Epoch 8
Epoch   8: Train Loss 29.8278 - Val Loss 10.7241
Epoch 9
Epoch   9: Train Loss 28.9653 - Val Loss 10.7352
Epoch 10
Epoch  10: Train Loss 28.7687 - Val Loss 10.5764
Epoch 11
Epoch  11: Train Loss 29.1978 - Val Loss 10.7008
Epoch 12
Epoch  12: Train Loss 29.7156 - Val Loss 10.7162
Epoch 13
Epoch  13: Train Loss 29.8121 - Val Loss 10.7034
Epoch 14
Epoch  14: Train Loss 29.7151 - Val Loss 10.6779
Epoch 15
Epoch  15: Train Loss 29.6557 - Val Loss 10.6616
Epoch 16
Epoch  16: Train Loss 29.6645 - Val Loss 10.8122
Epoch 17
Epoch  17: Train Loss 29.1903 - Val Loss 10.1473
Epoch 18
Epoch  18: Train Loss 29.4905 - Val Loss 10.0249
Epoch 19
Epoch  19: Train Loss 30.2286 - Val Loss 10.0267
Epoch 20
Epoch  20: Train Loss 30.1922 - Val Loss 10.0281
Epoch 21
Epoch  21: Train Loss 30.0436 - Val Loss 10.0295
Epoch 22
Epoch  22: Train Loss 30.1858 - Val Loss 10.0308
Epoch 23
Epoch  23: Train Loss 30.2854 - Val Loss 10.0320
Epoch 24
Epoch  24: Train Loss 30.2605 - Val Loss 10.0348
Epoch 25
Epoch  25: Train Loss 30.1548 - Val Loss 10.0435
Epoch 26
Epoch  26: Train Loss 30.3868 - Val Loss 10.2374
Epoch 27
Epoch  27: Train Loss 30.5202 - Val Loss 10.1327
Epoch 28
Epoch  28: Train Loss 30.5052 - Val Loss 10.1288
Epoch 29
Epoch  29: Train Loss 30.3886 - Val Loss 10.1247
Epoch 30
Epoch  30: Train Loss 30.3301 - Val Loss 10.1220
In [ ]:
# performance on the training set

X = X_train
y = y_train

predictions = model(X)

# Displaying the original image, permuted image, and predicted image

for i in range(20):
    fig, ax = plt.subplots(1, 3)
    # original image
    permuted = X_train[i]
    permutation = y_train[i]
    # reverse the permutation
    permutation = torch.argmax(permutation, axis=1)
    permutation = permutation.tolist()
    reverse_permutation = [permutation.index(i) for i in range(GRID_SIZE**2)]
    permuted_patches = permute_patches(permuted, reverse_permutation)

    original = combine_tensors(permuted_patches, GRID_SIZE)
    ax[0].imshow(tensor_to_image(original))
    ax[0].set_title('Original Image')
    ax[0].axis('off')
    # permuted image
   
    permuted_im = combine_tensors(X[i], GRID_SIZE)
    ax[1].imshow(tensor_to_image(permuted_im))
    ax[1].set_title('Permuted Image')
    ax[1].axis('off')

    # predicted image
    prediction = predictions[i]
    prediction = torch.argmax(prediction, axis=1).tolist()

    permuted_patches = permute_patches(split_tensor(original.squeeze(0), GRID_SIZE), prediction)
    predicted_im = combine_tensors(permuted_patches, GRID_SIZE)
    ax[2].imshow(tensor_to_image(predicted_im))
    ax[2].set_title('Predicted Image')
    ax[2].axis('off')
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
In [ ]:
# test the model on 10 random images with original, permuted and predicted images

X = []
y = []

indeces = np.random.choice(len(dataset['train']), 10, replace=False)
indeces = indeces.tolist()

for i in indeces:
    im = dataset['train'][i]['image']
    im = preprocess(GRID_SIZE)(im)
    perm = compute_permutation(GRID_SIZE)
    patches = split_tensor(im, GRID_SIZE)
    permuted_patches = permute_patches(patches, perm)
    # transform permuted_im into tensor
    permuted_patches = torch.stack(permuted_patches)
    perm_matrix = permutation_to_matrix(perm, GRID_SIZE)
    X.append(permuted_patches)
    y.append(perm_matrix)

# transform into tensors

X = torch.stack(X)
y = torch.stack(y)

predictions = model(X)

# Displaying the original image, permuted image, and predicted image

for i in range(10):
    fig, ax = plt.subplots(1, 3)
    # original image
    original = dataset['train'][indeces[i]]['image']
    original = preprocess(GRID_SIZE)(original)
    original = tensor_to_image(original)    
    ax[0].imshow(original)
    ax[0].set_title('Original Image')
    ax[0].axis('off')
    # permuted image
    permutation = torch.argmax(y[i], axis=1).tolist()
    permuted_patches = permute_patches(X[i], permutation)
    permuted_im = combine_tensors(permuted_patches, GRID_SIZE)
    ax[1].imshow(tensor_to_image(permuted_im))
    ax[1].set_title('Permuted Image')
    ax[1].axis('off')

    # predicted image
    prediction = predictions[i]
    prediction = torch.argmax(prediction, axis=1).tolist()
    permuted_patches = permute_patches(X[i], prediction)
    predicted_im = combine_tensors(permuted_patches, GRID_SIZE)
    ax[2].imshow(tensor_to_image(predicted_im))
    ax[2].set_title('Predicted Image')
    ax[2].axis('off')
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image