Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision
  • main
1 result

Target

Select target project
  • edelland/deep_learning
  • cdemode/deep_learning
2 results
Select Git revision
Loading items
Show changes
%% Cell type:markdown id: tags:
### **_Deep Learning - Bsc Data Science for Responsible Business - Centrale Lyon_**
2024-2025
Emmanuel Dellandréa
%% Cell type:markdown id: tags:
# Practical Session 8 - GANs and cGAN
<p align="center">
<img height=300px src="https://cdn-images-1.medium.com/max/1080/0*tJRy5Chmk4XymxwN.png"/></p>
<p align="center"></p>
%% Cell type:markdown id: tags:
The objective of this tutorial is to discover GANs, understand how they are implemented and then explore one specific architecture of GANs that allows us to perform image to image translation (which corresponds to the picture that you can see above this text ! ).
The notebook contains code cells with the **"# TO DO"** comments. Your goal is to complete these cells and run the proposed experiments. Also, there are several questions that you have to anwer.
As the computation is heavy, particularly during training, we encourage you to use a GPU. If your laptob is not equiped, you may use one of these remote jupyter servers, where you can select the execution on GPU :
1) [jupyter.mi90.ec-lyon.fr](https://jupyter.mi90.ec-lyon.fr/)
This server is accessible within the campus network. If outside, you need to use a VPN. Before executing the notebook, select the kernel "Python PyTorch" to run it on GPU and have access to PyTorch module.
2) [Google Colaboratory](https://colab.research.google.com/)
Before executing the notebook, select the execution on GPU : "Exécution" Menu -> "Modifier le type d'exécution" and select "T4 GPU".
%% Cell type:markdown id: tags:
# Part1: DC-GAN
%% Cell type:markdown id: tags:
In this part, we aim to learn and understand the basic concepts of **Generative Adversarial Networks** through a DCGAN and generate new celebrities from the learned network after showing it real celebrities. For this purpose, please study the tutorial here: https://pytorch.org/tutorials/beginner/dcgan_faces_tutorial.html
%% Cell type:markdown id: tags:
Now we want to generate handwritten digits using the MNIST dataset. It is available within torvision package (https://pytorch.org/vision/stable/generated/torchvision.datasets.MNIST.html#torchvision.datasets.MNIST)
%% Cell type:code id: tags:
``` python
# TO DO: your code here to adapt the code from the tutorial to experiment on MNIST dataset
```
%% Cell type:markdown id: tags:
Please re-train the DCGAN and display some automatically generated handwritten digits.
%% Cell type:markdown id: tags:
# Part2: Conditional GAN (cGAN)
%% Cell type:markdown id: tags:
Let's take the example of the set described in the next picture.
![Map to satellite picture](https://raw.githubusercontent.com/Neyri/Projet-cGAN/master/BE/img/map_streetview.png)
We have a picture of a map (from Google Maps) and we want to create an image of what the satellite view may look like.
As we are not only trying to generate a random picture but a mapping between a picture to another one, we can't use the standard GAN architecture. We will then use a cGAN introduced in this [paper](https://arxiv.org/pdf/1611.07004.pdf).
A cGAN is a supervised GAN aiming at mapping a label picture to a real one or a real picture to a label one. As you can see in the diagram below, the discriminator will take as input a pair of images and try to predict if the pair was generated or not. The generator will not only generate an image from noise but will also use an image (label or real) to generate another one (real or label).
![Diagram of how a cGan works](https://raw.githubusercontent.com/Neyri/Projet-cGAN/master/BE/img/cgan_map.png)
%% Cell type:markdown id: tags:
### Generator
In the cGAN architecture, the generator chosen is a U-Net.
![U-Net](https://raw.githubusercontent.com/Neyri/Projet-cGAN/master/BE/img/unet.png)
A U-Net takes as input an image, and outputs another image.
It can be divided into 2 subparts : an encoder and a decoder.
* The encoder takes the input image and reduces its dimension to encode the main features into a vector.
* The decoder takes this vector and map the features stored into an image.
A U-Net architecture is different from a classic encoder-decoder in that every layer of the decoder takes as input the previous decoded output as well as the output vector from the encoder layers of the same level. It allows the decoder to map low frequencies information encoded during the descent as well as high frequencies from the original picture.
![U-Net](https://www.researchgate.net/profile/Baris_Turkbey/publication/315514772/figure/fig2/AS:485824962797569@1492841105670/U-net-architecture-Each-box-corresponds-to-a-multi-channel-features-maps-The-number-of.png)
%% Cell type:markdown id: tags:
The architecture we will implement is the following (the number in the square is the number of filters used).
![UNet Architecture](https://raw.githubusercontent.com/Neyri/Projet-cGAN/master/BE/img/unet_architecture.png)
The encoder will take as input a colored picture (3 channels: RGB), it will pass through a series of convolution layers to encode the features of the picture. It will then be decoded by the decoder using transposed convolutional layers. These layers will take as input the previous decoded vector AND the encoded features of the same level.
%% Cell type:markdown id: tags:
Now, let's create or cGAN to generate facades from a template image. For this purpose, we will use the "Facade" dataset that consists of 506 Building Facades & corresponding Segmentations with split into train and test subsets.
%% Cell type:markdown id: tags:
Let's first create a few classes describing the layers we will use in the U-Net.
%% Cell type:code id: tags:
``` python
# Importing all the libraries needed
import matplotlib.pyplot as plt
import glob
import random
import os
import numpy as np
import math
import itertools
import time
import datetime
from pathlib import Path
from PIL import Image
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms
from torchvision.utils import save_image, make_grid
from torchvision import datasets
from torch.autograd import Variable
import torch.nn as nn
import torch.nn.functional as F
import torch
```
%% Cell type:code id: tags:
``` python
# code adapted from https://github.com/milesial/Pytorch-UNet/blob/master/unet/unet_parts.py
# Input layer
class inconv(nn.Module):
def __init__(self, in_ch, out_ch):
super(inconv, self).__init__()
self.conv = nn.Sequential(
nn.Conv2d(in_ch, out_ch, kernel_size=4, padding=1, stride=2),
nn.LeakyReLU(negative_slope=0.2, inplace=True)
)
def forward(self, x):
x = self.conv(x)
return x
# Encoder layer
class down(nn.Module):
def __init__(self, in_ch, out_ch):
super(down, self).__init__()
self.conv = nn.Sequential(
nn.Conv2d(in_ch, out_ch, kernel_size=4, padding=1, stride=2),
nn.BatchNorm2d(out_ch),
nn.LeakyReLU(negative_slope=0.2, inplace=True)
)
def forward(self, x):
x = self.conv(x)
return x
# Decoder layer
class up(nn.Module):
def __init__(self, in_ch, out_ch, dropout=False):
super(up, self).__init__()
if dropout :
self.conv = nn.Sequential(
nn.ConvTranspose2d(in_ch, out_ch, kernel_size=4, padding=1, stride=2),
nn.BatchNorm2d(out_ch),
nn.Dropout(0.5, inplace=True),
nn.ReLU(inplace=True)
)
else:
self.conv = nn.Sequential(
nn.ConvTranspose2d(in_ch, out_ch, kernel_size=4, padding=1, stride=2),
nn.BatchNorm2d(out_ch),
nn.ReLU(inplace=True)
)
def forward(self, x1, x2):
x1 = self.conv(x1)
x = torch.cat([x1, x2], dim=1)
return x
# Output layer
class outconv(nn.Module):
def __init__(self, in_ch, out_ch):
super(outconv, self).__init__()
self.conv = nn.Sequential(
nn.ConvTranspose2d(in_ch, out_ch, kernel_size=4, padding=1, stride=2),
nn.Tanh()
)
def forward(self, x):
x = self.conv(x)
return x
```
%% Cell type:markdown id: tags:
Now let's create the U-Net using the helper classes defined previously.
%% Cell type:code id: tags:
``` python
class U_Net(nn.Module):
'''
Ck denotes a Convolution-BatchNorm-ReLU layer with k filters.
CDk denotes a Convolution-BatchNorm-Dropout-ReLU layer with a dropout rate of 50%
Encoder:
C64 - C128 - C256 - C512 - C512 - C512 - C512 - C512
Decoder:
CD512 - CD1024 - CD1024 - C1024 - C1024 - C512 - C256 - C128
'''
def __init__(self, n_channels, n_classes):
super(U_Net, self).__init__()
# Encoder
self.inc = inconv(n_channels, 64) # 64 filters
# TO DO :
# Create the 7 encoder layers called "down1" to "down7" following this sequence
# C64 - C128 - C256 - C512 - C512 - C512 - C512 - C512
# The first one has already been implemented
# Decoder
# TO DO :
# Create the 7 decoder layers called up1 to up7 following this sequence :
# CD512 - CD1024 - CD1024 - C1024 - C1024 - C512 - C256 - C128
# The last layer has already been defined
self.outc = outconv(128, n_classes) # 128 filters
def forward(self, x):
x1 = self.inc(x)
x2 = self.down1(x1)
x3 = self.down2(x2)
x4 = self.down3(x3)
x5 = self.down4(x4)
x6 = self.down5(x5)
x7 = self.down6(x6)
x8 = self.down7(x7)
# At this stage x8 is our encoded vector, we will now decode it
x = self.up7(x8, x7)
x = self.up6(x, x6)
x = self.up5(x, x5)
x = self.up4(x, x4)
x = self.up3(x, x3)
x = self.up2(x, x2)
x = self.up1(x, x1)
x = self.outc(x)
return x
```
%% Cell type:code id: tags:
``` python
# We take images that have 3 channels (RGB) as input and output an image that also have 3 channels (RGB)
generator=U_Net(3,3)
# Check that the architecture is as expected
generator
```
%% Cell type:markdown id: tags:
You should now have a working U-Net.
%% Cell type:markdown id: tags:
<font color='red'>**Question 1**</font>
Knowing the input and output images will be 256x256, what will be the dimension of the encoded vector x8 ?
Your answer :
%% Cell type:markdown id: tags:
<font color='red'>**Question 2**</font>
As you can see, U-net has an encoder-decoder architecture with skip connections. Explain why it works better than a traditional encoder-decoder.
Your anwer :
%% Cell type:markdown id: tags:
### Discriminator
In the cGAN architecture, the chosen discriminator is a Patch GAN. It is a convolutional discriminator which enables to produce a map of the input pictures where each pixel represents a patch of size NxN of the input.
![patch GAN](https://raw.githubusercontent.com/Neyri/Projet-cGAN/master/BE/img/patchGAN.png)
The size N is given by the depth of the net. According to this table :
| Number of layers | N |
| ---- | ---- |
| 1 | 16 |
| 2 | 34 |
| 3 | 70 |
| 4 | 142 |
| 5 | 286 |
| 6 | 574 |
The number of layers actually means the number of layers with `kernel=(4,4)`, `padding=(1,1)` and `stride=(2,2)`. These layers are followed by 2 layers with `kernel=(4,4)`, `padding=(1,1)` and `stride=(1,1)`.
In our case we are going to create a 70x70 PatchGAN.
%% Cell type:markdown id: tags:
Let's first create a few helping classes.
%% Cell type:code id: tags:
``` python
class conv_block(nn.Module):
def __init__(self, in_ch, out_ch, use_batchnorm=True, stride=2):
super(conv_block, self).__init__()
if use_batchnorm:
self.conv = nn.Sequential(
nn.Conv2d(in_ch, out_ch, kernel_size=4, padding=1, stride=stride),
nn.BatchNorm2d(out_ch),
nn.LeakyReLU(negative_slope=0.2, inplace=True)
)
else:
self.conv = nn.Sequential(
nn.Conv2d(in_ch, out_ch, kernel_size=4, padding=1, stride=stride),
nn.LeakyReLU(negative_slope=0.2, inplace=True)
)
def forward(self, x):
x = self.conv(x)
return x
class out_block(nn.Module):
def __init__(self, in_ch, out_ch):
super(out_block, self).__init__()
self.conv = nn.Sequential(
nn.Conv2d(in_ch, 1, kernel_size=4, padding=1, stride=1),
nn.Sigmoid()
)
def forward(self, x):
x = self.conv(x)
return x
```
%% Cell type:markdown id: tags:
Now let's create the Patch GAN discriminator.
As we want a 70x70 Patch GAN, the architecture will be as follows :
```
1. C64 - K4, P1, S2
2. C128 - K4, P1, S2
3. C256 - K4, P1, S2
4. C512 - K4, P1, S1
5. C1 - K4, P1, S1 (output)
```
Where Ck denotes a convolution block with k filters, Kk a kernel of size k, Pk is the padding size and Sk the stride applied.
*Note :* For the first layer, we do not use batchnorm.
%% Cell type:markdown id: tags:
<font color='red'>**Question 3**</font>
Knowing input images will be 256x256 with 3 channels each, how many parameters are there to learn ?
Your answer :
%% Cell type:code id: tags:
``` python
class PatchGAN(nn.Module):
def __init__(self, n_channels, n_classes):
super(PatchGAN, self).__init__()
# TODO :
# create the 4 first layers named conv1 to conv4
self.conv1 =
self.conv2 =
self.conv3 =
self.conv4 =
# output layer
self.out = out_block(512, n_classes)
def forward(self, x1, x2):
x = torch.cat([x2, x1], dim=1)
x = self.conv1(x)
x = self.conv2(x)
x = self.conv3(x)
x = self.conv4(x)
x = self.out(x)
return x
```
%% Cell type:code id: tags:
``` python
# We have 6 input channels as we concatenate 2 images (with 3 channels each)
discriminator = PatchGAN(6,1)
discriminator
```
%% Cell type:markdown id: tags:
You should now have a working discriminator.
%% Cell type:markdown id: tags:
### Loss functions
As we have seen in the choice of the various architectures for this GAN, the issue is to map both low and high frequencies.
To tackle this problem, this GAN rely on the architecture to map the high frequencies (U-Net + PatchGAN) and the loss function to learn low frequencies features. The global loss function will indeed be made of 2 parts :
* the first part to map hight frequencies, will try to optimize the mean squared error of the GAN.
* the second part to map low frequencies, will minimize the $\mathcal{L}_1$ norm of the generated picture.
So the loss can be defined as $$ G^* = arg\ \underset{G}{min}\ \underset{D}{max}\ \mathcal{L}_{cGAN}(G,D) + \lambda \mathcal{L}_1(G)$$
%% Cell type:code id: tags:
``` python
# Loss functions
criterion_GAN = torch.nn.MSELoss()
criterion_pixelwise = torch.nn.L1Loss()
# Loss weight of L1 pixel-wise loss between translated image and real image
lambda_pixel = 100
```
%% Cell type:markdown id: tags:
### Training and evaluating models
%% Cell type:code id: tags:
``` python
# parameters
epoch = 0 # epoch to start training from
n_epoch = 200 # number of epochs of training
batch_size =10 # size of the batches
lr = 0.0002 # adam: learning rate
b1 =0.5 # adam: decay of first order momentum of gradient
b2 = 0.999 # adam: decay of first order momentum of gradient
decay_epoch = 100 # epoch from which to start lr decay
img_height = 256 # size of image height
img_width = 256 # size of image width
channels = 3 # number of image channels
sample_interval = 500 # interval between sampling of images from generators
checkpoint_interval = -1 # interval between model checkpoints
cuda = True if torch.cuda.is_available() else False # do you have cuda ?
```
%% Cell type:markdown id: tags:
Download the dataset.
%% Cell type:code id: tags:
``` python
!wget https://partage.liris.cnrs.fr/index.php/s/4L9JpxxHBGc4rRR/download/CMP_facade_DB_base.zip
!wget https://partage.liris.cnrs.fr/index.php/s/SyrJb7mASjyDQR9/download/CMP_facade_DB_extended.zip
```
%% Cell type:code id: tags:
``` python
import os
import zipfile
# Extract in the correct folder
with zipfile.ZipFile("CMP_facade_DB_base.zip", 'r') as zip_ref:
zip_ref.extractall("./facades")
os.rename("./facades/base", "./facades/train")
# Extract in the correct folder
with zipfile.ZipFile("CMP_facade_DB_extended.zip", 'r') as zip_ref:
zip_ref.extractall("./facades")
os.rename("./facades/extended", "./facades/val")
```
%% Cell type:markdown id: tags:
Configure the dataloader
%% Cell type:code id: tags:
``` python
class ImageDataset(Dataset):
def __init__(self, root, transforms_=None, mode='train'):
self.transform = transforms.Compose(transforms_)
self.files_img = sorted(glob.glob(os.path.join(root, mode) + '/*.jpg'))
if mode == 'val':
self.files_img.extend(
sorted(glob.glob(os.path.join(root, 'val') + '/*.jpg')))
self.files_mask = sorted(glob.glob(os.path.join(root, mode) + '/*.png'))
if mode == 'val':
self.files_mask.extend(
sorted(glob.glob(os.path.join(root, 'val') + '/*.png')))
assert len(self.files_img) == len(self.files_mask)
def __getitem__(self, index):
img = Image.open(self.files_img[index % len(self.files_img)])
mask = Image.open(self.files_mask[index % len(self.files_img)])
mask = mask.convert('RGB')
img = self.transform(img)
mask = self.transform(mask)
return img, mask
def __len__(self):
return len(self.files_img)
# Configure dataloaders
transforms_ = [transforms.Resize((img_height, img_width), Image.BICUBIC),
transforms.ToTensor()] # transforms.Normalize((0.5,0.5,0.5), (0.5,0.5,0.5))
dataloader = DataLoader(ImageDataset("facades", transforms_=transforms_),
batch_size=16, shuffle=True)
val_dataloader = DataLoader(ImageDataset("facades", transforms_=transforms_, mode='val'),
batch_size=8, shuffle=False)
# Tensor type
Tensor = torch.cuda.FloatTensor if cuda else torch.FloatTensor
```
%% Cell type:markdown id: tags:
Check the loading works and a few helper functions
%% Cell type:code id: tags:
``` python
def plot2x2Array(image, mask):
f, axarr = plt.subplots(1, 2)
axarr[0].imshow(image)
axarr[1].imshow(mask)
axarr[0].set_title('Image')
axarr[1].set_title('Mask')
def reverse_transform(image):
image = image.numpy().transpose((1, 2, 0))
image = np.clip(image, 0, 1)
image = (image * 255).astype(np.uint8)
return image
def plot2x3Array(image, mask,predict):
f, axarr = plt.subplots(1,3,figsize=(15,15))
axarr[0].imshow(image)
axarr[1].imshow(mask)
axarr[2].imshow(predict)
axarr[0].set_title('input')
axarr[1].set_title('real')
axarr[2].set_title('fake')
```
%% Cell type:code id: tags:
``` python
image, mask = next(iter(dataloader))
image = reverse_transform(image[0])
mask = reverse_transform(mask[0])
plot2x2Array(image, mask)
```
%% Cell type:markdown id: tags:
Initialize our GAN
%% Cell type:code id: tags:
``` python
# Calculate output of image discriminator (PatchGAN)
patch = (1, img_height//2**3-2, img_width//2**3-2)
if cuda:
generator = generator.cuda()
discriminator = discriminator.cuda()
criterion_GAN.cuda()
criterion_pixelwise.cuda()
# Optimizers
optimizer_G = torch.optim.Adam(generator.parameters(), lr=lr, betas=(b1, b2))
optimizer_D = torch.optim.Adam(discriminator.parameters(), lr=lr, betas=(b1, b2))
```
%% Cell type:markdown id: tags:
Start training
%% Cell type:code id: tags:
``` python
def save_model(epoch):
# save your work
torch.save({
'epoch': epoch,
'model_state_dict': generator.state_dict(),
'optimizer_state_dict': optimizer_G.state_dict(),
'loss': loss_G,
}, 'generator_'+str(epoch)+'.pth')
torch.save({
'epoch': epoch,
'model_state_dict': discriminator.state_dict(),
'optimizer_state_dict': optimizer_D.state_dict(),
'loss': loss_D,
}, 'discriminator_'+str(epoch)+'.pth')
def weights_init_normal(m):
classname = m.__class__.__name__
if classname.find('Conv') != -1:
torch.nn.init.normal_(m.weight.data, 0.0, 0.02)
elif classname.find('BatchNorm2d') != -1:
torch.nn.init.normal_(m.weight.data, 1.0, 0.02)
torch.nn.init.constant_(m.bias.data, 0.0)
```
%% Cell type:markdown id: tags:
<font color='red'>Complete the loss function </font> in the following training code and train your network:
%% Cell type:code id: tags:
``` python
# ----------
# Training
# ----------
losses = []
num_epochs = 200
# Initialize weights
generator.apply(weights_init_normal)
discriminator.apply(weights_init_normal)
epoch_D = 0
epoch_G = 0
# train the network
discriminator.train()
generator.train()
print_every = 400
for epoch in range(epoch_G, num_epochs):
for i, batch in enumerate(dataloader):
# Model inputs
real_A = Variable(batch[0].type(Tensor))
real_B = Variable(batch[1].type(Tensor))
# Adversarial ground truths
valid = Variable(Tensor(np.ones((real_B.size(0), *patch))), requires_grad=False)
fake = Variable(Tensor(np.zeros((real_B.size(0), *patch))), requires_grad=False)
# ------------------
# Train Generators
# ------------------
optimizer_G.zero_grad()
# GAN loss
# TO DO: Put here your GAN loss
# Pixel-wise loss
# TO DO: Put here your pixel loss
# Total loss
# TO DO: Put here your total loss
loss_G.backward()
optimizer_G.step()
# ---------------------
# Train Discriminator
# ---------------------
optimizer_D.zero_grad()
# Real loss
pred_real = discriminator(real_A, real_B)
loss_real = criterion_GAN(pred_real, valid)
# Fake loss
pred_fake = discriminator(fake_A.detach(), real_B)
loss_fake = criterion_GAN(pred_fake, fake)
# Total loss
loss_D = 0.5 * (loss_real + loss_fake)
loss_D.backward()
optimizer_D.step()
# Print some loss stats
if i % print_every == 0:
# print discriminator and generator loss
print('Epoch [{:5d}/{:5d}] | d_loss: {:6.4f} | g_loss: {:6.4f}'.format(
epoch+1, num_epochs, loss_D.item(), loss_G.item()))
## AFTER EACH EPOCH##
# append discriminator loss and generator loss
losses.append((loss_D.item(), loss_G.item()))
if epoch % 100 == 0:
print('Saving model...')
save_model(epoch)
```
%% Cell type:markdown id: tags:
Observation of the loss along the training
%% Cell type:code id: tags:
``` python
fig, ax = plt.subplots()
losses = np.array(losses)
plt.plot(losses.T[0], label='Discriminator')
plt.plot(losses.T[1], label='Generator')
plt.title("Training Losses")
plt.legend()
```
%% Cell type:markdown id: tags:
If the training takes too much time, you can use a pretrained model in the meantime, to evaluate its performance.
It is available at : https://partage.liris.cnrs.fr/index.php/s/xwEFmxn9ANeq4zY
%% Cell type:markdown id: tags:
### Evaluate your cGAN
%% Cell type:code id: tags:
``` python
def load_model(epoch=200):
if 'generator_'+str(epoch)+'.pth' in os.listdir() and 'discriminator_'+str(epoch)+'.pth' in os.listdir():
if cuda:
checkpoint_generator = torch.load('generator_'+str(epoch)+'.pth')
else:
checkpoint_generator = torch.load('generator_'+str(epoch)+'.pth', map_location='cpu')
generator.load_state_dict(checkpoint_generator['model_state_dict'])
optimizer_G.load_state_dict(checkpoint_generator['optimizer_state_dict'])
epoch_G = checkpoint_generator['epoch']
loss_G = checkpoint_generator['loss']
if cuda:
checkpoint_discriminator = torch.load('discriminator_'+str(epoch)+'.pth')
else:
checkpoint_discriminator = torch.load('discriminator_'+str(epoch)+'.pth', map_location='cpu')
discriminator.load_state_dict(checkpoint_discriminator['model_state_dict'])
optimizer_D.load_state_dict(checkpoint_discriminator['optimizer_state_dict'])
epoch_D = checkpoint_discriminator['epoch']
loss_D = checkpoint_discriminator['loss']
else:
print('There isn\' a training available with this number of epochs')
```
%% Cell type:code id: tags:
``` python
load_model(epoch=200)
# switching mode
generator.eval()
```
%% Cell type:code id: tags:
``` python
# show a sample evaluation image on the training base
image, mask = next(iter(dataloader))
output = generator(mask.type(Tensor))
output = output.view(16, 3, 256, 256)
output = output.cpu().detach()
for i in range(8):
image_plot = reverse_transform(image[i])
output_plot = reverse_transform(output[i])
mask_plot = reverse_transform(mask[i])
plot2x3Array(mask_plot,image_plot,output_plot)
```
%% Cell type:code id: tags:
``` python
# show a sample evaluation image on the validation dataset
image, mask = next(iter(val_dataloader))
output = generator(mask.type(Tensor))
output = output.view(8, 3, 256, 256)
output = output.cpu().detach()
for i in range(8):
image_plot = reverse_transform(image[i])
output_plot = reverse_transform(output[i])
mask_plot = reverse_transform(mask[i])
plot2x3Array(mask_plot,image_plot,output_plot)
```
%% Cell type:markdown id: tags:
<font color='red'>**Question 4**</font>
Compare results for 100 and 200 epochs
%% Cell type:code id: tags:
``` python
# TO DO : Your code here to load and evaluate with a few samples
# a model after 100 epochs
```
%% Cell type:markdown id: tags:
### **_Deep Learning - Bsc Data Science for Responsible Business - Centrale Lyon_**
2024-2025
Emmanuel Dellandréa
%% Cell type:markdown id: tags:
# Practical Session 9 - Diffusion Models
Subject written by Bruno Machado
<p align="center">
<img height=300px src="https://substackcdn.com/image/fetch/f_auto,q_auto:good,fl_progressive:steep/https%3A%2F%2Fsubstack-post-media.s3.amazonaws.com%2Fpublic%2Fimages%2F2f5d7da7-52db-4104-9742-a0b4555d8dd6_1300x387.png"/></p>
<p align="center"></p>
%% Cell type:markdown id: tags:
The objective of this tutorial is to discover Diffusion Models that are probabilistic generative models which learn to generate data by iteratively refining random noise through a reverse diffusion process. Given a sample of data, noise is progressively added in small steps until it becomes pure noise. Then, a neural network is trained to reverse this process and generate realistic data from noise.
Diffusion models have gained popularity due to their ability to generate high-quality, diverse, and detailed content, surpassing GANs in the quality of the generated images.
In this assignment we will focus on DDPMs, which were introduced in this [paper](https://arxiv.org/abs/2006.11239) and laid the foundation for generative diffusion models.
The notebook contains code cells with the **"# TO DO"** comments. Your goal is to complete these cells and run the proposed experiments.
As the computation is heavy, particularly during training, we encourage you to use a GPU. If your laptob is not equiped, you may use one of these remote jupyter servers, where you can select the execution on GPU :
1) [jupyter.mi90.ec-lyon.fr](https://jupyter.mi90.ec-lyon.fr/)
This server is accessible within the campus network. If outside, you need to use a VPN. Before executing the notebook, select the kernel "Python PyTorch" to run it on GPU and have access to PyTorch module.
2) [Google Colaboratory](https://colab.research.google.com/)
Before executing the notebook, select the execution on GPU : "Exécution" Menu -> "Modifier le type d'exécution" and select "T4 GPU".
%% Cell type:markdown id: tags:
# Part1: Diffusion
%% Cell type:code id: tags:
``` python
# First import useful libraries
import torch
import numpy as np
import matplotlib.pyplot as plt
import torchvision.transforms as transforms
```
%% Cell type:code id: tags:
``` python
# device = torch.device("mps")
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
```
%% Cell type:markdown id: tags:
As in the previous session, we will use the MNIST dataset.
%% Cell type:code id: tags:
``` python
# TO DO: your code here to load the MNIST dataset. The size of the images should be set to 64x64.
mnist_dataset =
mnist_dataloader =
```
%% Cell type:markdown id: tags:
Auxiliary functions for plotting images
%% Cell type:code id: tags:
``` python
def reverse_transform(image):
image = image.numpy().transpose((1, 2, 0))
image = np.clip(image, 0, 1)
image = (image * 255).astype(np.uint8)
return image
def plot1xNArray(images, labels):
f, axarr = plt.subplots(1, len(images))
for image, ax, label in zip(images, axarr, labels):
ax.imshow(image, cmap='gray')
ax.axis('off')
ax.set_title(label)
```
%% Cell type:markdown id: tags:
In order to train the model with the diffusion process, we will use a noise scheduler, which will be in charge of the forward diffusion process. The scheduler takes an image, a sample of random noise and a timestep, and return a noisy image for the corresponding timestep. Noise is progressively added to the image at each timestep, therefore a noisy image at timestep 0 will have barely any noise while a noisy image at the maximum timestep will be basically just noise.
Let's create a noise scheduler with 1000 max timesteps and visualize some noise images.
We will use the diffusers library from Hugging Face, which provides several tools for training and using diffusion models.
%% Cell type:code id: tags:
``` python
from diffusers import DDPMScheduler
# TO DO: Create the scheduler
noise_scheduler =
image, _ = mnist_dataset[0]
# TO DO: Create a noise tensor sampled from a normal distribution with the same shape as the image
noise =
images, labels = [reverse_transform(image)], ["Original"]
for i in [100, 250, 400, 900]:
timestep = torch.LongTensor([i])
noisy_image = noise_scheduler.add_noise(image, noise, timestep)
images.append(reverse_transform(noisy_image))
labels.append(f"t={i}")
plot1xNArray(images, labels)
```
%% Cell type:markdown id: tags:
For the reverse diffusion process we will use a neural network. Given a noisy image and the corresponding timestep, the goal of the neural network is to predict the noise, which allows for the denoising.
For the model, we will have a similar architecture as we used for the cGAN generator, a 2D UNet with a few modifications. The main difference will be that we have to indicate to the model which timestep is currently being denoised. For that purpose, a timestep embedding is added, therefore the model has 2 inputs, the noisy image and the corresponding timestep.
In this exercise, we will use an UNet implementation from the diffusers library, which already has the timestep embedding included.
%% Cell type:code id: tags:
``` python
from diffusers import UNet2DModel
# TO DO: Complete the parameters
diffusion_backbone = UNet2DModel(
block_out_channels=(64, 128, 256, 512),
down_block_types=("DownBlock2D", "DownBlock2D", "DownBlock2D", "DownBlock2D"),
up_block_types=("UpBlock2D", "UpBlock2D", "UpBlock2D", "UpBlock2D"),
sample_size=,
in_channels=,
out_channels=,
).to(device)
# Optimizer
optimizer = torch.optim.AdamW(diffusion_backbone.parameters(), lr=1e-4)
print(diffusion_backbone)
```
%% Cell type:markdown id: tags:
### Now, let's train the model.
%% Cell type:code id: tags:
``` python
# ----------------
# Training Loop
# ----------------
torch.backends.cudnn.deterministic = True
losses = []
num_epochs = 5
print_every = 100
diffusion_backbone.train()
for epoch in range(num_epochs):
for i, batch in enumerate(mnist_dataloader):
# Zero the gradients
optimizer.zero_grad()
# Send input to device
images = batch[0].to(device)
# Generate noisy images, different timestep for each image in the batch
timesteps = torch.randint(noise_scheduler.config.num_train_timesteps, (images.size(0),), device=device)
# TO DO: Complete the code
noise =
noisy_images =
# Forward pass
residual = diffusion_backbone(noisy_images, timesteps).sample
# TO DO: Compute the loss
loss =
loss.backward()
optimizer.step()
# Print stats
if i % print_every == 0:
print(f'Epoch [{epoch+1}/{num_epochs}][{i}/{len(mnist_dataloader)}] | loss: {loss.item():6.4f}')
losses.append(loss.item())
torch.save(diffusion_backbone.state_dict(), f"diffusion_{epoch+1}.pth")
```
%% Cell type:markdown id: tags:
If the training takes too long, you can download an already trained model from [this link](https://partage.liris.cnrs.fr/index.php/s/AP2t6b3w8SM4Bp5) and use it for inference.
%% Cell type:code id: tags:
``` python
# TO DO: Add the path to the model checkpoint for loading the model
diffusion_backbone.load_state_dict(torch.load())
diffusion_backbone.eval()
```
%% Cell type:markdown id: tags:
### Time to generate some images.
During training, for each data sample, we take a random timestep and the corresponding noisy image to give it as input to our model. With sufficient training, the model should learn how to predict the noise in a noisy image for all possible timesteps.
During inference, to generate an image, we will start from pure noise and step by step predict the noise to go from one noisy image to the next, progressively denoising the image until we reach the timestep 0, in which we should have an image without any noise.
%% Cell type:code id: tags:
``` python
from tqdm import tqdm
# Start the image as random noise
image = torch.randn((10, 1, 64, 64)).to(device)
# Create a list of images and labels for visualization
images, labels = [(image / 2 + 0.5).clamp(0, 1).cpu().permute(0, 2, 3, 1).numpy()], ["Original"]
# Use the scheduler to iterate over timesteps
noise_scheduler.set_timesteps(1000)
for timestep in tqdm(noise_scheduler.timesteps):
with torch.no_grad():
residual = diffusion_backbone(image, timestep).sample
image = noise_scheduler.step(residual, timestep, image).prev_sample
if timestep.item() % 200 == 0:
images.append((image / 2 + 0.5).clamp(0, 1).cpu().permute(0, 2, 3, 1).numpy())
labels.append(f"t={timestep.item()}")
for i in range(images[0].shape[0]):
plot1xNArray([img[i] for img in images], labels)
```
%% Cell type:markdown id: tags:
The diffusers library also provides *Pipeline* classes, which are wrappers around the model that abstracts the inference loop implemented above.
We can create a pipeline, giving it the trained model and the noise scheduler, and use it to generate images. In this case, we will only have access to the final image, generated on the last timestep, but not the intermediary images from the denoising process.
%% Cell type:code id: tags:
``` python
from diffusers import DDPMPipeline
pipeline = DDPMPipeline(diffusion_backbone, noise_scheduler)
generated_images = pipeline(10, output_type="np")
f, axarr = plt.subplots(1, len(generated_images["images"]))
for image, ax in zip(generated_images["images"], axarr):
ax.imshow(image, cmap='gray')
ax.axis('off')
```
%% Cell type:markdown id: tags:
# Part 2: What about those beautiful images ?
%% Cell type:markdown id: tags:
<p align="center">
<img height=300px src="https://huggingface.co/stabilityai/stable-diffusion-3.5-large/media/main/sd3.5_large_demo.png"/></p>
<p align="center"></p>
In this exercise we achieved decent results for very simple datasets. But we are quite far from those beautiful AI generated images we can find online. That is for 2 main reasons:
- Model size: due to the computation and time constrains, we can't really train very large models
- Dataset size: due to the same constrains, we can't use very complex and large datasets, which requires larger models and longer training times.
Fortunately, even though we can't train those large models with the available hardware and time, we can at least use them for inference !
The goal of this part is to learn how to retrieve and use a pre-trained diffusion models and also to get creative to come up with some nice prompts to generate outstanding images.
We are going to use Stable Diffusion 3.5, which is a state of the art open-source text conditioned model. It takes a prompt in natural language and use it to guide the diffusion process. This type of models are trained with image-text pairs, but can generalize beyond the pairs seen during training, being able to mix several different concepts into a single image.
In order to save memory, we will use quantization, which consists into converting the model weights types from float16 into float4. That simply means that each model weight will be stored using only 4 bits instead of 16 bits. That allows us to run the model in GPUs with less VRAM and have faster inference, with a small drop in the quality of the results.
%% Cell type:markdown id: tags:
For this part of the assignment restart the notebook kernel to be sure your GPU memory is empty. The memory usage can be verified using the command `nvidia-smi` in a terminal. If your GPU has 2GB of VRAM or less, the model will probably not fit into memory even with quantization. In that case, use Google Colab for this part or use the smaller model indicated below. If you are not happy with the results and have plenty of VRAM available, feel free to increase the quantization to 8 bits or even load the model without quantization.
%% Cell type:code id: tags:
``` python
from diffusers import BitsAndBytesConfig, SD3Transformer2DModel
from diffusers import StableDiffusion3Pipeline
model_id = "stabilityai/stable-diffusion-3.5-medium"
nf4_config = BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_quant_type="nf4",
bnb_4bit_compute_dtype=torch.float16
)
model_nf4 = SD3Transformer2DModel.from_pretrained(
model_id,
subfolder="transformer",
quantization_config=nf4_config,
torch_dtype=torch.float16
)
pipeline = StableDiffusion3Pipeline.from_pretrained(
model_id,
transformer=model_nf4,
torch_dtype=torch.float16
)
pipeline.enable_model_cpu_offload()
```
%% Cell type:code id: tags:
``` python
# TO DO: test different prompts and visualize the generated images
# Once you are happy with the results, you can save 3 different images as png file with the correspondent prompts in a text file
prompt =
image = pipeline(
prompt=prompt,
num_inference_steps=40,
guidance_scale=4.5,
max_sequence_length=512
).images[0]
image.save("generated_image.png")
```
%% Cell type:markdown id: tags:
If even with the quantization you still run out of GPU memory and you can't use Google Colab, you can use the following code instead, which uses a much smaller model (the results won't be as near as impressive, but it should be able to run even on a CPU, if you have a little patience)
%% Cell type:code id: tags:
``` python
from diffusers import DiffusionPipeline
pipe = DiffusionPipeline.from_pretrained("OFA-Sys/small-stable-diffusion-v0").to(device)
```
%% Cell type:code id: tags:
``` python
# TO DO: test different prompts and visualize the generated images
prompt =
image = pipe(prompt).images[0]
image.save("not_as_good_generated_image.png")
```