# Examen Partiel: Introduction to Normalizing Flow 

This is a partial exam for the course: Deep Learning II. 
The exam is divided in two parts:
- Implementing a normalizing flow model
- Training the model to generate samples from a target distribution

The exam consists of several tasks to be implemented sequentially, each task builds on the previous one.
The more tasks you complete, the more points you will get.


The notebook as an `HTML` file has to be sent at 11:40 at the following email address: `alexandre.verine@dauphine.psl.eu`. Make sure I received your email before leaving the room ! 

Before starting, please make sure you disable Copilot and any other AI assistance tool.
On Colab, you can do this by going to `Tools > AI Assistance` and check `Mask AI assistance`.
Every time, the use of AI assistance will be considered as a fraud and will be penalized by 5 points. 




Required modules :

In [None]:
import torch
import torchvision
from torch import nn
import torch.nn.functional as F
import torch.nn.init as init
import torch.optim as optim
from torch.autograd import Variable

import math
import numpy as np
import matplotlib.pyplot as plt

device = torch.device('cuda:0' if torch.cuda.is_available()  else 'cpu')

## 1. Normalizing Flows

A Normalizing Flow is a generative model that learns a mapping from a simple distribution to a more complex one. The model is trained to maximize the log-likelihood of the data. Given a dataset $X = \{x_1, x_2, ..., x_N\}$, a model $F_{\theta}$, and a latent distribution $Q$ with density $q$, the log-likelihood of the data is given by:
$$
\log p(X) = \sum_{i=1}^{N} \log p(x_i) = \sum_{i=1}^{N} \log q(F_{\theta}(x_i)) + \log |det(\frac{\partial F_{\theta}(x_i)}{\partial x_i})|
$$

Once the model is trained, we can sample from the target distribution $P$ by sampling from the latent distribution and applying the inverse transformation of the model:
- Sample $z \sim Q$
- Compute $x = F_{\theta}^{-1}(z)$

The distribution $Q$ is usually chosen to be a simple distribution such as a standard Gaussian distribution. The model $F_{\theta}$ defines a distribution $\widehat{P}$ and the goal is to learn a mapping that makes $\widehat{P}$ as close as possible to the true distribution $P$.

For that reason, Normalizing Flows are a mapping learned by a neural networ that satifies two properties:
- The mapping is invertible
- The determinant of the Jacobian of the mapping can be computed efficiently
To acheive this, the model is usually composed of a sequence of invertible transformations that satisfy these properties. We can write the transformation as a composition of functions:
$$
F_{\theta}(x) = f_K \circ f_{K-1} \circ ... \circ f_1(x)
$$
and thus, the log determinant of the jacobian of the transformation is given by:
$$
\log |det(\frac{\partial F_{\theta}(x)}{\partial x})| = \sum_{i=1}^{K} \log |det(\frac{\partial f_i(x)}{\partial x})|
$$
and the inverse transformation is given by:
$$
F_{\theta}^{-1}(z) = f_1^{-1} \circ f_2^{-1} \circ ... \circ f_K^{-1}(z).
$$

We can define multiple type of atomic transformations that can be used to build a normalizing flow. The most common ones are:
- Affine Coupling Layer
- Planar Flow
- Radial Flow
- Residual Flow

In the following, we will implement two different normalizing flows using Pytorch and train them to generate samples from a 2D Gaussian distribution using:
- Linear transformations
- Affine Coupling Layers

## 2. Coding coupling based Normalizing Flow

In this section, we will implement different types of blocks that can be used to build a normalizing flow. The block will be implemented as a Pytorch module with this backbone:
```python

class Block(nn.Module):
    def __init__(self, dim):
        super(Block, self).__init__()
        self.dim = dim
        
    def forward(self, x, inverse=False):
        raise NotImplementedError
```

The ```__init__``` method will take as input the dimension of the input and output of the block and will serve to initialize the parameters of the block. The ```forward``` method will take as input the input tensor and a boolean that indicates if the forward or inverse transformation should be applied. It should return the transformed tensor and the log determinant of the jacobian of the transformation. 

### 1.1 Coupling Functions

The Coupling Layer is a type of block that is used to build a normalizing flow. The idea is to split the input tensor $\boldsymbol{x}$ into two parts $\boldsymbol{x}_1$ and $\boldsymbol{x}_2$ and apply a transformation to $\boldsymbol{x}_1$ that depends on $\boldsymbol{x}_2$. The transformation is defined as:
$$
\begin{align}
\boldsymbol{y}_1 &= \boldsymbol{x}_1 \\
\boldsymbol{y}_2 &= \boldsymbol{x}_2 \odot \exp(s(\boldsymbol{x}_1)) + t(\boldsymbol{x}_1)
\end{align}
$$
where $s$ and $t$ are neural networks that take $\boldsymbol{x}_1$ as input and output the scale and translation parameters of the transformation. $\odot$ denotes the element-wise product.
The log determinant of the jacobian of the transformation is given by:
$$
\log \left| \det \left(\frac{\partial f(\boldsymbol{x})}{\partial \boldsymbol{x}} \right) \right| = \sum_{i=1}^{D} s_i(\boldsymbol{x}_1)
$$
where $D$ is the dimension of $\boldsymbol{x}_2$. Finally, the inverse transformation is given by:
$$
\begin{align}
\boldsymbol{x}_1 &= \boldsymbol{y}_1 \\
\boldsymbol{x}_2 &= (\boldsymbol{y}_2 - t(\boldsymbol{y}_1)) \odot \exp(-s(\boldsymbol{y}_1))
\end{align}
$$

To build a Normalizing Flow, we can stack multiple coupling layers to build a deep neural network. The coupling layer is usually applied to half of the dimensions of the input tensor and the other half is left unchanged. But first, we need to implement a neural network that will be used to compute the scale and translation parameters of the transformation.


<font color='blue'>**TODO:**</font>
- Complete the forward method of the NN class. It should take as input the input tensor, apply 4 linear layers with ReLU activations and return the output tensor and no output activation.

In [None]:
class NN(nn.Module):
  """Small neural network used to compute scale or translate factors.
  Args:
      input_dimension (int): Number of dimensions in the input.
      hidden_dimension (int): Number of dimensions in the hidden layers.
      output_dimension (int): Number of dimensions in the output.
  """
  def __init__(self, input_dimension, hidden_dimension, output_dimension):
    super(NN, self).__init__()

    self.in_layer = nn.Linear(in_features=input_dimension, out_features=hidden_dimension)
    self.mid_layer1 = nn.Linear(in_features=hidden_dimension, out_features=hidden_dimension)
    self.mid_layer2 = nn.Linear(in_features=hidden_dimension, out_features=hidden_dimension)
    self.out_layer = nn.Linear(in_features=hidden_dimension, out_features=output_dimension)
    self.activation = nn.ReLU()

  def forward(self, x):
    ### YOUR CODE HERE
    return x


<font color='blue'>**TODO:**</font>
- Complete the AffineCouplingBlock class. Implement the forward and inverse transformations using the equations above. 
- Run the next cell to check if your code works.

In [None]:
class AffineCouplingBlock(nn.Module):
    """Affine Coupling Layer. 
    Split x into xA, xB and return (s(xB)*xA+t(xB), xB). 
    Args:
      input_dimension (int): Number of dimensions in the input.
      hidden_dimension (int): Number of dimensions in the hidden layers.
      alternate (bool): reverse xA and xB.
      activation (bool): use activation in t.
    """
    def __init__(self, input_dimension, 
                 hidden_dimension, alternate):
        super(AffineCouplingBlock, self).__init__()
        assert input_dimension%2 == 0
        self.st = NN(input_dimension//2, hidden_dimension,
                            input_dimension)
        self.scale = nn.Parameter(torch.ones(input_dimension//2))
        self.alternate = alternate
        
    def forward(self, x, ldj, reverse=False):

        if self.alternate:
            xB, xA = x.chunk(2, dim=1)
        else:
            xA, xB = x.chunk(2, dim=1)

        st = self.st(xB)
        s, t = st[:, 0::2], st[:, 1::2]
        s = self.scale * torch.tanh(s)

        if reverse:
            ldj = ldj - torch.sum(s, dim=1).view(-1, 1)
            s = torch.exp(-s)
            yA = ### YOUR CODE HERE
        else:
            ldj = ldj + torch.sum(s, dim=1).view(-1, 1)
            s = torch.exp(s)
            yA = ### YOUR CODE HERE

        if self.alternate:
            y = torch.cat((xB, yA), dim=1)
        else:
            y = torch.cat((yA, xB), dim=1)
        
        return y, ldj



In [None]:
f = AffineCouplingBlock(2, 64, True)                                       # Define the invertible block
if torch.cuda.is_available():                                              # Move the block to the GPU
    f = f.cuda()
x = torch.randn((10,2)).to(device)                                         # Random input
ldj = torch.zeros((x.size(0), 1)).to(device)                               # Log-determinant Jacobian

y, ldjo = f(x, ldj.clone())                                                # Forward pass
xi, ldji  = f(y, ldj.clone(), True)                                        # Inverse pass
torch.testing.assert_close(xi, x, atol=0.01, rtol=0.01)                    # Check if the inverse is correct
torch.testing.assert_close(ldji+ ldjo, ldj, atol=0.01, rtol=0.01)          # Check if the log-determinant Jacobian is correct

del f, x, xi, y, ldjo, ldj, ldji

As previously explained, a Normalizing Flow is a composition of multiple transformations. To build a normalizing flow, we can stack multiple coupling layers to build a deep neural network. The coupling layer is usually applied to half of the dimensions of the input tensor and the other half is left unchanged, alternatively. We can build a normalizing flow by stacking multiple coupling layers:

In [None]:
class NormalizingFlow(nn.Module):
    """Normalizing Flow class.
    Takes x as input and return F(x) and sldj. If F(y, reverse=True) is 
    called then it returns the inverse of F. 
    Args:
        input_dimension (int): Number of dimensions in the input.
        hidden_dimension (int): Number of dimensions in the hidden layers.
    """
    def __init__(self, dimension=1, hidden_dimension=256, num_steps=10):
        super(NormalizingFlow, self).__init__()
        self.dimension = dimension
        self.flows = [AffineCouplingBlock(input_dimension=dimension,
                                            hidden_dimension=hidden_dimension,
                                            alternate = (depth%2==0))
                                            for depth in range(num_steps)]
        self.flows = nn.ModuleList(self.flows)

    def forward(self, x, reverse=False):
        sldj = torch.zeros((x.size(0), 1)).to(device)
        if reverse:
            flows = reversed(self.flows)
        else:
            flows = self.flows
        for block in flows:
            x, sldj = block(x, sldj, reverse)
        return x, sldj 

<font color='blue'>**TODO:**</font>
- Run the next cell to check if your code works.

In [None]:
f = NormalizingFlow(dimension=10, num_steps=10)
if torch.cuda.is_available():
    f = f.cuda()
x = torch.randn((5,10)).to(device)
ldj = torch.zeros((x.size(0), 1)).to(device)
y, ldjo = f(x)
xi, ldji  = f(y, True)
torch.testing.assert_close(xi, x, atol=0.01, rtol=0.01)
torch.testing.assert_close(ldji+ldjo, ldj, atol=0.1, rtol=0.1)
del f, x, xi, y, ldjo, ldj, ldji

## 3. Training Functions

In this section, we will implement the training loop for the normalizing flow. The training loop will be implemented as a function that takes as input the model ```model```, the optimizer ```optimizer```, the number of epochs ```n_epochs```, the initial learning rate ```lr```, the list of epochs to decay the learning rate ```epochs_to_update_lr```, the frequence of epoch to print the loss ```freq_verbose```, the frequence to plot the samples ```freq_plot``` and the function used to plot the samples ```plot_function```.

We will consider that the loaders for the training and validation datasets ```loader_train``` and ```loader_test``` are given. 
First we will define the loss function. As previously explained, the log-likelihood of the data is given by:
$$
\log p(X) = \sum_{i=1}^{N} \log q(F_{\theta}(x_i)) + \log |det(\frac{\partial F_{\theta}(x_i)}{\partial x_i})|
$$
where $q$ is the density of the latent distribution. The loss function is the negative log-likelihood of the data.
Therefore, we will define a function ```loss_function``` that takes as input a latent vector ```z``` and the log determinant of the jacobian of the transformation ```logdetjac``` and returns the  average negative log-likelihood of the data:
$$
\text{loss} = -\frac{1}{N} \sum_{i=1}^{N}\left[ \log q(z_i) + \text{logdetjac}_i \right].
$$
As the latent distribution is a standard normal distribution, the density $q$ is given by:
$$
q(z) = \frac{1}{\sqrt{2\pi}^D } \exp\left(-\frac{1}{2}\Vert z \Vert_2^2\right),
$$
where $D$ is the dimension of the latent vector. Therefore, the log density is given by:
$$
\log q(z) = -\frac{1}{2} D \log(2\pi) - \frac{1}{2} \Vert z \Vert_2^2.
$$
It can be split into two terms:
- The constant term $-\frac{1}{2} D \log(2\pi)$
- The term $-\frac{1}{2} \Vert z \Vert_2^2$

<font color='blue'>**TODO:**</font>
- Complete the loss_function function. It should return the average negative log-likelihood of the data.
- Run the next cell to check if your code works.

In [None]:
def loss_function(z, logdetjac):
    """
    Computes the Negative Log-Likelihood (NLL) Loss for Normalizing Flows.

    Parameters:
    - z: The transformed data in latent space, with shape (batch_size, dimensions).
         This represents the result after applying the flow transformation to the input data.
    - logdetjac: The log determinant of the Jacobian of the transformation, 
                 with shape (batch_size, 1).

    Returns:
    - The average negative log-likelihood loss for the batch.
    """

    # Step 1: Calculate the likelihood contribution from z
    likelihood_term = ### YOUR CODE HERE

    # Step 2: Calculate the constant term
    constant_term = ### YOUR CODE HERE

    # Step 3: Add the log determinant of the Jacobian (logdetjac)

    jacobian_term = logdetjac.view(-1)  # Reshape to ensure it's a 1D vector

    # Step 4: Combine all terms to get the negative log-likelihood (NLL) for each sample
    loss = ### YOUR CODE HERE

    # Step 5: Take the mean over all samples in the batch to get the average NLL
    # Multiplying by -1 here because we minimize the loss during training
    loss = ### YOUR CODE HERE

    return loss

In [None]:
x = torch.Tensor([[-0.5689,  1.2247,  1.4252, -2.1294, -1.3356,  0.6384,  0.8114,  0.9037,
         -0.7982, -0.0422],
        [ 1.0852, -0.7060, -0.7232, -0.4275, -0.2482, -0.1510, -1.5881, -1.1483,
         -1.2538,  0.3152],
        [ 1.5991,  0.8852,  0.9818, -0.2948,  0.0666, -0.8725,  1.3663, -0.9406,
         -0.4303,  0.4287],
        [-1.7749,  0.2848,  0.1286, -1.1444,  1.3227, -0.8872,  0.2311, -0.8701,
         -1.0925, -1.2644],
        [ 2.9917, -1.7241,  0.3513, -0.1070, -2.3848,  0.3795,  1.1048,  1.1429,
         -0.7369,  0.5604]]).to(device)
logdetjac = torch.Tensor([[ 0.1145],
        [ 1.0377],
        [ 2.8035],
        [-0.0696],
        [ 0.0674]]).to(device)

assert torch.isclose(loss_function(x, logdetjac), torch.tensor(14.4907).to(device), atol=1e-5)

We give the function to evaluate the model on the test set. The function ```evaluate``` takes as input the model ```model``` returns the average negative log-likelihood of the data.

In [None]:
def eval_model(model):
    model.eval()
    losses = []
    with torch.no_grad():
        for batch_idx, x in enumerate(loader_test):
            x = x.to(device)
            z, logdetjac = model(x)
            loss = loss_function(z, logdetjac)
            losses.append(loss.item())
    model.train()
    return np.mean(losses)     

<font color='blue'>**TODO:**</font>
- Complent the function train to perform the backpropagation and optimization of the model.

In [None]:
def train(model, optimizer,  lr, n_epochs = 20, freq_verbose=2, freq_plot=2, plot_function=None):
    train_loss = [] 
    test_loss = []
    for epoch in range(0, n_epochs+1):
        losses = []
        for batch_idx, x in enumerate(loader_train):

            ### YOUR CODE HERE
            loss = ### YOUR CODE HERE
            ### YOUR CODE HERE

            losses.append(loss.item())
        train_loss.append(np.mean(losses))
        test_loss.append(eval_model(model))
      
        if epoch %freq_verbose==0:
            print('[%d/%d]: \tloss: %.3f \tloss eval: %.5f' % ((epoch), n_epochs, train_loss[-1], test_loss[-1]))
        if epoch %freq_plot==0 and epoch>0:
            if plot_function is not None:
                plot_function(model, epoch)
            plt.plot(train_loss, label='train')
            plt.plot(test_loss, label='test')
            plt.legend()
            plt.xlabel('Epoch')
            plt.ylabel('NLL')
            plt.show() 

<font color='blue'>**TODO:**</font>
- Run the next cell to check if your code works.
Run this cellule to check if your code works :

In [None]:
batch_size = 256
z = torch.randn((batch_size, 25)).to(device)
logdetjac = torch.rand((batch_size, 1)).to(device)
loss1 = loss_function(z, logdetjac)
loss2 = loss_function(z*2+1, 2*logdetjac)
assert loss1 < loss2, "Loss1 should be lower than Loss2"
del batch_size, z, logdetjac, loss1, loss2

## 4. Low Dimensional Dataset

We train the Normalizing Flow to generate samples from a 2D distribution (A Gaussian Mixure Model). We will use the following dataset:

In [None]:
def Gaussians(batch_size):
    scale = 4.
    centers = [(-1, 0.5),(-0.35, -0.5), (0.35, -0.5), (1, 0.5)]
    centers = [(scale * x, scale * y) for x, y in centers]

    dataset = []
    for i in range(batch_size):
        point = np.random.randn(2)*1.2
        idx = np.random.randint(4)
        center = centers[idx]
        point[0] += center[0]
        point[1] += center[1]
        dataset.append(point)
    dataset = np.array(dataset, dtype="float32")
    dataset /= 1.414
    return torch.Tensor(dataset)

plt.figure(figsize=(5,5))
x  = Gaussians(5000).numpy()
plt.hist2d(x[:,0], x[:,1],
               range=[[-5, 5], [-5, 5]], bins=100, cmap='inferno')
plt.xticks([])
plt.yticks([])
plt.xlim(-5, 5)
plt.ylim(-5, 5)
plt.title("Dataset $X_r\sim P$")
plt.show()
del x

We define a function to plot the dataset $X_r$ sampled from the target distribution, the samples $X_g=F_{\theta}^{-1}(Z_g)\sim \widehat{P}$ generated by the model by applying the reverse transformation to samples $Z_g\sim Q$. We also plot the samples $Z_r=F_{\theta}(X_r)$ and $Z_g\sim Q$.

In [None]:
def plot_2D(model,  epoch, sample=Gaussians, device=device):
    plt.clf()
    plt.figure(1, figsize=(7,10))
    plt.suptitle("Epoch "+str(epoch), y=0.95)
    plt.subplot(2,2,2)
    plt.title("$X_g = F^{-1}(Z_g)$")
    z = Variable(torch.randn((1000, 2)).to(device))
    x_gen, ldj = model(z,True)
    x_gen = x_gen.detach().cpu().numpy()
    plt.hist2d(x_gen[:,0], x_gen[:,1],
               range=[[-5, 5], [-5, 5]], bins=100, cmap='inferno')
    plt.xticks(ticks=[-5, -2.5, 0, 2.5, 5], labels=["","","","",""])
    plt.yticks(ticks=[-5, -2.5, 0, 2.5, 5], labels=["","","","",""])
    plt.xlim(-5, 5)
    plt.ylim(-5, 5)
    plt.subplot(2,2,1)
    plt.title("$X_r$")
    x = sample(1000)
    plt.hist2d(x[:,0].numpy(), x[:,1].numpy(),
               range=[[-5, 5], [-5, 5]], bins=100, cmap='inferno')
    plt.xticks(ticks=[-5, -2.5, 0, 2.5, 5], labels=["","","","",""])
    plt.yticks(ticks=[-5, -2.5, 0, 2.5, 5], labels=["","","","",""])
    plt.xlim(-5, 5)
    plt.ylim(-5, 5)
    plt.subplot(2,2,3)
    plt.title('$Z_r=F(X_r)$')
    x = Variable(x.to(device))
    z, ldj = model(x)
    z = z.detach().cpu().numpy()
    plt.hist2d(z[:,0],z[:,1] , 
               range=[[-5, 5], [-5, 5]], bins=100, cmap='inferno')
    plt.xlim(-5, 5)
    plt.ylim(-5, 5)
    plt.xticks(ticks=[-5, -2.5, 0, 2.5, 5], labels=["","","","",""])
    plt.yticks(ticks=[-5, -2.5, 0, 2.5, 5], labels=["","","","",""])
    plt.subplot(2,2,4)
    plt.title('$Z_g$')
    z = torch.randn((1000, 2))
    plt.hist2d(z[:,0].numpy(),z[:,1].numpy(), 
               range=[[-5, 5], [-5, 5]], bins=100, cmap='inferno')
    plt.xlim(-5, 5)
    plt.ylim(-5, 5)
    plt.xticks(ticks=[-5, -2.5, 0, 2.5, 5], labels=["","","","",""])
    plt.yticks(ticks=[-5, -2.5, 0, 2.5, 5], labels=["","","","",""])
    plt.tight_layout()
    plt.show()
    plt.close()

model = NormalizingFlow(dimension=2, hidden_dimension=10, num_steps=20).to(device)
plot_2D(model, 0)



To interpret the results:
- The samples $X_r$ should be close to the samples $X_g$.
- The samples $Z_r$ should be close to the samples $Z_g$.


We will now define the dataset and the loaders for the training and validation datasets.
<font color='blue'>**TODO:**</font>
- Define the loaders for the training and validation datasets with a batch size of 1024. Shuffle the training dataset and do not shuffle the validation dataset.

In [None]:
dataset_train = Gaussians(102400)
dataset_test = Gaussians(10240)
loader_train = torch.utils.data.DataLoader(dataset_train, batch_size=1024, shuffle=True)
loader_test = torch.utils.data.DataLoader(dataset_test, batch_size=1024, shuffle=False)

print(f'Number of training samples: {len(dataset_train)}')
print(f'Number of test samples: {len(dataset_test)}')
print(f'Number of training batches: {len(loader_train)}')
print(f'Number of test batches: {len(loader_test)}')


<font color='blue'>**TODO:**</font>
- Train the model with the constraints given below.

Train the model with the following parameters:
- The dimension of the latent space is 2
- The number of coupling layers is 2
- The hidden dimension of the neural networks is 10

Using the following optimizer:
- Adam optimizer with a learning rate of between 0.01 and 0.0001.
- The number of epochs is 10
- Print the loss every epoch
- Plot the samples/losses at the end of the training

Don't forget to copy the model on the desired device (CPU or GPU).



In [None]:
model = ### YOUR CODE HERE
lr = ### YOUR CODE HERE
optimizer = ### YOUR CODE HERE
train(model, optimizer, lr, n_epochs=10, freq_verbose=1, freq_plot=10)
assert eval_model(model) < 3.9, "The model should have a test loss lower than 3.9"

<font color='blue'>**TODO:**</font>
- Train a deeper model with 10 coupling layers and hidden dimension of 10. Use the same optimizer and hyperparameters as before.

In [None]:
model = ### YOUR CODE HERE
lr = ### YOUR CODE HERE
optimizer = ### YOUR CODE HERE
train(model, optimizer, lr, n_epochs=10, freq_verbose=1, freq_plot=10)

assert eval_model(model) < 3.75, "The model should have a test loss lower than 3.75"

<font color='blue'>**TODO:**</font>
- Finally, train a model with 10 coupling layers and hidden dimension of 100. (You might need to reduce the learning rate).

In [None]:
model = ### YOUR CODE HERE
lr = ### YOUR CODE HERE
optimizer = ### YOUR CODE HERE
train(model, optimizer, lr, n_epochs=10, freq_verbose=1, freq_plot=10)
assert eval_model(model) < 3.7, "The model should have a test loss lower than 3.7"

## 5. High Dimensional Dataset

We will now train the Normalizing Flow to generate samples from a higher dimensional distribution. We will use the small version of the MNIST dataset. To do so, we used a rescaled version of the dataset with a size of $size \times size$ pixels of 0 and 1.

In [None]:
import torchvision.transforms as transforms

size = 12
# Transform to resize, convert to tensor, and flatten to a vector of size (size)
transform = transforms.Compose([
    transforms.Resize((size, size)),         # Resize to sizexsize pixels
    transforms.ToTensor(),             # Convert to a tensor with shape (1, size, size)
    transforms.Lambda(lambda x: x.view(size*size)),  # Flatten to (size*size,) and normalize
    transforms.Lambda(lambda x: (x+5e-2)/(1+1e-1)) # Normalize to [0.05, 0.95] to avoid overflow
])

# Custom dataset to filter only labels 0 and 1
class FilteredMNIST(torchvision.datasets.MNIST):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        # Filter to keep only images with labels 0 and 1
        indices = [i for i, label in enumerate(self.targets) if label in [0, 1]]
        self.data = self.data[indices]
        self.targets = self.targets[indices]

    def __getitem__(self, index):
        # Return only the image, not the label
        image, _ = super().__getitem__(index)
        return image

# Create datasets and loaders
dataset_train = FilteredMNIST(root='./data', train=True, download=True, transform=transform)
dataset_test = FilteredMNIST(root='./data', train=False, download=True, transform=transform)

loader_train = torch.utils.data.DataLoader(dataset_train, batch_size=128, shuffle=True)
loader_test = torch.utils.data.DataLoader(dataset_test, batch_size=128, shuffle=True)

first_batch = next(iter(loader_test))

plt.figure(1, figsize=(10,10))
plt.suptitle("Test Set", y=0.6)
for i in range(10):
    plt.subplot(1, 10, i + 1)
    plt.imshow(first_batch[i].view(size, size).numpy(), cmap='gray')
    plt.axis('off')
plt.show()

The image are slightly scaled to be between 0.05 and 0.95 approximately. Moreover, they are directly flattened to be of size $size^2$:

In [None]:
print(f'Size of the images: {size}x{size}')
print(f'First batch shape: {first_batch.shape}')
print(f'Min/Max pixel value: {first_batch.min():.2f}, {first_batch.max():.2f}')

We will use the function to plot the generated samples:

In [None]:
def plot_mnist(model, epoch):
    plt.clf()
    plt.figure(1, figsize=(10,10))
    plt.suptitle("Epoch "+str(epoch), y=0.6)
    z = torch.randn((10, model.dimension)).to(device)
    x_gen, ldj = model(z,True)
    x_gen = x_gen.detach().cpu().numpy().reshape(-1, np.sqrt(model.dimension).astype(int), np.sqrt(model.dimension).astype(int))
    for i in range(10):
        plt.subplot(1,10,i+1)
        plt.imshow(x_gen[i], cmap='gray')
        plt.axis('off')
    plt.show()

model = NormalizingFlow(num_steps=10,  dimension=size*size).to(device)
plot_mnist(model, 0)


The image generated should be similat to the images from the dataset. However, the image are now completly random since the model is not trained yet.

<font color='blue'>**TODO:**</font>
- Train a model with 10 coupling layers and hidden dimension of 256 using Adam optimizer with a learning rate of 0.003. The number of epochs is 20. Print the loss every epoch and plot the samples/losses at the end of the training.

In [None]:
model = ### YOUR CODE HERE
lr = ### YOUR CODE HERE
optimizer = ### YOUR CODE HERE
train(model, optimizer, lr, n_epochs=20, freq_verbose=1, freq_plot=20, plot_function=plot_mnist)

We can see that the model is generating samples in a larger domain than expected:

In [None]:
z = torch.randn((10, size*size)).to(device)
x_gen, ldj = model(z,True)
print(f'Min/Max pixel value: {x_gen.min()}, {x_gen.max()}')

We can therefore help the model by making sure that the last operation made by the reversed transformation is a sigmoid function that will make sure that the output is between 0 and 1. To do that, we need to implement a block that will apply the sigmoid function to the output of the model. However, the block must be implemented as a ReverseSigmoid to be applied as the first block of the model. 

The sigmoid function is defined as follows:
$$
\text{sigmoid}(x) = \frac{1}{1+\exp(-x)}.
$$
The derivative of the sigmoid function is given by:
$$
\partial_x \text{sigmoid}(x) = \text{sigmoid}(x)\times(1-\text{sigmoid}(x)).
$$
Since the function is element-wise, the log determinant of the jacobian of the transformation is given by:
$$
\log \left| \det \left(\frac{\partial (\text{sigmoid}\boldsymbol{x})}{\partial \boldsymbol{x}} \right) \right| = \sum_{i=1}^{D} \log \left( \partial_{x_i} \text{sigmoid}(x_i) \right).
$$

The inverse transformation is given by:
$$
\text{sigmoid}^{-1}(x) = \log\left(\frac{x}{1-x}\right).
$$
and its derivative is given by:
$$
\partial_x \text{sigmoid}^{-1}(x) = \frac{1}{x(1-x)}.
$$

<font color='blue'>**TODO:**</font>
- Complete the InvertibleReverseSigmoid class. Implement the forward and inverse transformations using the equations above.
- Run the next cell to check if your code works.


In [None]:
class InvertibleReverseSigmoid(nn.Module):
    def __init__(self, dim):
        super(InvertibleReverseSigmoid, self).__init__()
        self.dim = dim

    def forward(self, x, ldj, inverse=False):
        if inverse:
            # Apply the sigmoid function
            y = ### YOUR CODE HERE
            # Compute the log-determinant of the Jacobian for sigmoid
            jacobian_diag = ### YOUR CODE HERE
            ldj += ### YOUR CODE HERE
            return y, ldj
        else:
            # Apply the logit (inverse sigmoid) function
            y = ### YOUR CODE HERE
            # Compute the log-determinant of the Jacobian for the logit
            jacobian_diag = ### YOUR CODE HERE
            ldj += ### YOUR CODE HERE
            return y, ldj

In [None]:
f = InvertibleReverseSigmoid(10)
x = torch.rand((10, 10))
ldj = torch.zeros((10, 1))
y, ldjo = f(x, ldj.clone())
xi, ldji = f(y, ldj.clone(), True)
torch.testing.assert_close(xi, x, atol=0.01, rtol=0.01)
torch.testing.assert_close(ldji+ldjo, ldj, atol=0.01, rtol=0.01)
del f, x, xi, y, ldjo, ldj, ldji

<font color='blue'>**TODO:**</font>
- Complete the code for the NormalizingFlowWithSigmoid Class by adding the ReverseSigmoid block at the beginning of the model.
- Run the next cell to check if your code works.

In [None]:
class NormalizingFlowWithSigmoid(nn.Module):
    def __init__(self, dimension=1, hidden_dimension=256, num_steps=10):
        super(NormalizingFlowWithSigmoid, self).__init__()
        self.dimension = dimension
        self.activation = ### YOUR CODE HERE
        self.flows_blocks = [AffineCouplingBlock(input_dimension=dimension,
                                                    hidden_dimension=hidden_dimension,
                                                    alternate=(depth % 2 == 0))
                                 for depth in range(num_steps)]
        self.flows = ### YOUR CODE HERE

    def forward(self, x, reverse=False):
        sldj = torch.zeros((x.size(0), 1)).to(device)
        if reverse:
            flows = reversed(self.flows)
        else:
            flows = self.flows
        for block in flows:
            x, sldj = block(x, sldj, reverse)
        return x, sldj

In [None]:
f = NormalizingFlowWithSigmoid(dimension=10, num_steps=10)
if torch.cuda.is_available():
    f = f.cuda()
x = torch.rand((5, 10)).to(device)
ldj = torch.zeros((x.size(0), 1)).to(device)
y, ldjo = f(x)
xi, ldji = f(y, True)
torch.testing.assert_close(xi, x, atol=0.01, rtol=0.01)
torch.testing.assert_close(ldji+ldjo, ldj, atol=0.01, rtol=0.01)
torch.testing.assert_close(xi, xi.clamp(0, 1), atol=0.01, rtol=0.01)
del f, x, xi, y, ldjo, ldj, ldji

<font color='blue'>**TODO:**</font>
- Train the model with the same hyperparameters as before. (You might consider a smaller learning rate).
- Observe the results and compare them with the previous model.

In [None]:
model = ### YOUR CODE HERE
lr = ### YOUR CODE HERE
optimizer = ### YOUR CODE HERE
train(model, optimizer, lr, n_epochs=20, freq_verbose=1, freq_plot=20, plot_function=plot_mnist)

In [None]:
z = torch.randn((10, size*size)).to(device)
x_gen, ldj = model(z,True)
print(f'Min/Max pixel value: {x_gen.min()}, {x_gen.max()}')
plt.figure(1, figsize=(10,10))
plt.suptitle("Generated Images", y=0.6)
for i in range(10):
    plt.subplot(1, 10, i + 1)
    plt.imshow(x_gen[i].view(size, size).detach().cpu().numpy(), cmap='gray')
    plt.axis('off')
plt.show()