Previously, I tried to do what I did with Keras, so I will try the classic Autoencoder first. I will start from the explanation of pytorch-lightning. Step 1: Add these imports Step 2: Define a LightningModule (nn.Module subclass) Step 3: Train! And without changing a single line of code, you could run on GPUs/TPUs For advanced users, you can still own complex training loops Let's run first.
【reference】 ①Step 1: Add these imports@PyTorchLightning/pytorch-lightning When connected, the code looks like the one below, and I was able to execute it.
import os
import torch
from torch import nn
import torch.nn.functional as F
from torchvision.datasets import MNIST
from torch.utils.data import DataLoader, random_split
from torchvision import transforms
import pytorch_lightning as pl
class LitAutoEncoder(pl.LightningModule):
def __init__(self):
super().__init__()
self.encoder = nn.Sequential(nn.Linear(28 * 28, 128), nn.ReLU(), nn.Linear(128, 3))
self.decoder = nn.Sequential(nn.Linear(3, 128), nn.ReLU(), nn.Linear(128, 28 * 28))
def forward(self, x):
# in lightning, forward defines the prediction/inference actions
embedding = self.encoder(x)
return embedding
def training_step(self, batch, batch_idx):
# training_step defined the train loop. It is independent of forward
x, y = batch
x = x.view(x.size(0), -1)
z = self.encoder(x)
x_hat = self.decoder(z)
loss = F.mse_loss(x_hat, x)
self.log('train_loss', loss)
return loss
def configure_optimizers(self):
optimizer = torch.optim.Adam(self.parameters(), lr=1e-3)
return optimizer
dataset = MNIST(os.getcwd(), download=True, transform=transforms.ToTensor())
train, val = random_split(dataset, [55000, 5000])
autoencoder = LitAutoEncoder()
#trainer = pl.Trainer()
trainer = pl.Trainer(max_epochs=1, gpus=1)
trainer.fit(autoencoder, DataLoader(train), DataLoader(val))
# torchscript
autoencoder = LitAutoEncoder()
torch.jit.save(autoencoder.to_torchscript(), "model.pt")
print('training_finished')
>python simple_autoencoder.py
GPU available: True, used: True
TPU available: None, using: 0 TPU cores
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
C:\Users\user\Anaconda3\lib\site-packages\pytorch_lightning\utilities\distributed.py:49: UserWarning: you passed in a val_dataloader but have no validation_step. Skipping validation loop
warnings.warn(*args, **kwargs)
2020-12-29 22:52:28.353096: W tensorflow/stream_executor/platform/default/dso_loader.cc:60] Could not load dynamic library 'cudart64_110.dll'; dlerror: cudart64_110.dll not found
2020-12-29 22:52:28.353187: I tensorflow/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.
| Name | Type | Params
---------------------------------------
0 | encoder | Sequential | 100 K
1 | decoder | Sequential | 101 K
---------------------------------------
202 K Trainable params
0 Non-trainable params
202 K Total params
Epoch 0: 100%|███████████████████████████████████████████████████████████████████████████| 55000/55000 [02:57<00:00, 310.28it/s, loss=0.0406, v_num=98]
training_finished
There is no validation_step or cudart dlerror is displayed, but you can see that what you wrote in the code can be executed and epoch0 can be executed. And you can see that the loss is also decreasing. This code goes through without commentary and starts here.
・ Complete MNIST_autoencoder ・ Applies to Cifar10
・ Start from the main function -Added validation_step and test_step to class LitAutoEncoder ・ Data reading is put in class LitAutoEncoder (・ Divide data division into train, val, test) ・ Initial image confirmation ・ Display the result -Use callback for display control during learning ・ Set network to class
This introduces because it is a standard and below.
if __name__ == '__main__':
start_time = time.time()
main()
print('elapsed time: {:.3f} [sec]'.format(time.time() - start_time))
If you add it, validation_step will be automatically executed for each epoch after learning. Then, test_step is automatically executed after the end of epoch. Also, even if training_step is interrupted by ctrl -C in the middle, execution below test_step continues.
class LitAutoEncoder(pl.LightningModule):
...
def validation_step(self, batch, batch_idx):
x, y = batch
x = x.view(x.size(0), -1)
z = self.encoder(x)
x_hat = self.decoder(z)
loss = F.mse_loss(x_hat, x)
self.log('test_loss', loss) #Change to test only here
return loss
def test_step(self, batch, batch_idx):
# Here we just reuse the validation_step for testing
return self.validation_step(batch, batch_idx)
This is the same code as First Lit.
class LitAutoEncoder(pl.LightningModule):
...
def prepare_data(self):
# download
MNIST(self.data_dir, train=True, download=True)
MNIST(self.data_dir, train=False, download=True)
def setup(self, stage=None): #train, val,test data split
# Assign train/val datasets for use in dataloaders
mnist_full =MNIST(self.data_dir, train=True, transform=self.transform)
n_train = int(len(mnist_full)*0.8)
n_val = len(mnist_full)-n_train
self.mnist_train, self.mnist_val = torch.utils.data.random_split(mnist_full, [n_train, n_val])
self.mnist_test = MNIST(self.data_dir, train=False, transform=self.transform)
def train_dataloader(self):
self.trainloader = DataLoader(self.mnist_train, shuffle=True, drop_last = True, batch_size=32, num_workers=0)
return self.trainloader
def val_dataloader(self):
return DataLoader(self.mnist_val, shuffle=False, batch_size=32, num_workers=0)
def test_dataloader(self):
self.testloader = DataLoader(self.mnist_test, shuffle=False, batch_size=32, num_workers=0)
return self.testloader
Add the following code
# functions to show an image
def imshow(img):
img = img / 2 + 0.5 # unnormalize
npimg = img.numpy()
plt.imshow(np.transpose(npimg, (1, 2, 0)))
plt.pause(1)
plt.close()
def __init__(self, data_dir='./'):
def train_dataloader(self):
Add self.classes = and call the output imshow () at train_datalorder.
class LitAutoEncoder(pl.LightningModule):
def __init__(self, data_dir='./'):
super().__init__()
self.data_dir = data_dir
# Hardcode some dataset specific attributes
self.num_classes = 10
self.classes = ('0', '1', '2', '3', '4', '5', '6', '7', '8', '9')
...
def train_dataloader(self):
self.trainloader = DataLoader(self.mnist_train, shuffle=True, drop_last = True, batch_size=32, num_workers=0)
# get some random training images
dataiter = iter(self.trainloader)
images, labels = dataiter.next()
# show images
imshow(torchvision.utils.make_grid(images))
# print labels
print(' '.join('%5s' % self.classes[labels[j]] for j in range(4)))
return self.trainloader
In addition, the position of imshow () moved as follows after learning. For this, we have added self.trainloader and self.
trainer.fit(autoencoder)
dataiter = iter(autoencoder.trainloader)
images, labels = dataiter.next()
# show images
imshow(torchvision.utils.make_grid(images))
# print labels
print(' '.join('%5s' % autoencoder.classes[labels[j]] for j in range(4)))
First, save the learned model as shown in Reference ② and read it again.
【reference】 ②Manual saving@SAVING AND LOADING WEIGHTS
trainer.save_checkpoint("example.ckpt")
PATH = 'example.ckpt'
pretrained_model = autoencoder.load_from_checkpoint(PATH)
pretrained_model.freeze()
pretrained_model.eval()
Let's try Autoencoding with this pretrained_model. Before that, the original test data that has not been learned once is output.
latent_dim,ver = 3, 1
dataiter = iter(autoencoder.testloader)
images, labels = dataiter.next()
# show images
imshow(torchvision.utils.make_grid(images),'original_images_{}_{}'.format(latent_dim,ver),text_ =' '.join('%5s' % autoencoder.classes[labels[j]] for j in range(8)))
encode_img = pretrained_model.encoder(images[0:32].to('cpu').reshape(32,28*28))
decode_img = pretrained_model.decoder(encode_img)
imshow(torchvision.utils.make_grid(decode_img.cpu().reshape(32,1,28,28)), 'original_autoencode_preds_mnist_{}_{}'.format(latent_dim,ver),text_ =' '.join('%5s' % autoencoder.classes[labels[j]] for j in range(8)))
The following is the result of learning with epoch = 1 time. It's wrong.
If you increase epoch, the display at the time of learning will be overwritten from the second time onward, and I do not like it, so I changed the line with callback.
from pytorch_lightning.callbacks import Callback
class MyPrintingCallback(Callback):
def on_epoch_end(self, trainer, pl_module):
print('')
...
def main():
autoencoder = LitAutoEncoder()
#trainer = pl.Trainer()
trainer = pl.Trainer(max_epochs=10, gpus=1, callbacks=[MyPrintingCallback()])
trainer.fit(autoencoder)
This is the same as last time. The difference is that there are two networks, encoder () and decliner (), which means that you can use as many networks as you need.
from net_encoder_decoder_mnist import Encoder, Decoder
class LitAutoEncoder(pl.LightningModule):
def __init__(self, data_dir='./'):
...
self.encoder = Encoder() #nn.Sequential(nn.Linear(28 * 28, 128), nn.ReLU(), nn.Linear(128, 32))
self.decoder = Decoder() #nn.Sequential(nn.Linear(32, 128), nn.ReLU(), nn.Linear(128, 28 * 28))
The network is defined separately below.
net_encoder_decoder_mnist.py
import torch.nn as nn
import torch.nn.functional as F
class Encoder(nn.Module):
def __init__(self):
super(Encoder, self).__init__()
self.encoder = nn.Sequential(
nn.Linear(28 * 28, 128),
nn.ReLU(),
nn.Linear(128, 32)
)
def forward(self, x):
x = self.encoder(x)
return x
class Decoder(nn.Module):
def __init__(self):
super(Decoder, self).__init__()
self.decoder = nn.Sequential(
nn.Linear(32, 128),
nn.ReLU(),
nn.Linear(128, 28 * 28)
)
def forward(self, x):
x = self.decoder(x)
return x
In this way, Autoencoder has taken the first step. The result of epoch = 10 is as follows. When the channel of the latent space of the middle layer = 3 When the latent space channel = 32 And the correct answer (input) is as follows. The entire code will be posted as a bonus.
・ Change the data to cifar10 ・ Compatible with Cifar10 (input 3ch) · Extend network to cnn
This is as shown in the bonus Autoencoder for cifar10 in pytorch-lightning; flatten-model. I think it can be done by corresponding to the dimensions of data and input.
The result of epoch = 1 latent space dimension 32 is as follows original Encoded image
>python simple_autoencoder3.py
GPU available: True, used: True
TPU available: None, using: 0 TPU cores
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
Files already downloaded and verified
Files already downloaded and verified
2020-12-30 16:44:59.037624: W tensorflow/stream_executor/platform/default/dso_loader.cc:60] Could not load dynamic library 'cudart64_110.dll'; dlerror: cudart64_110.dll not found
2020-12-30 16:44:59.037743: I tensorflow/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.
| Name | Type | Params
------------------------------------
0 | encoder | Encoder | 397 K
1 | decoder | Decoder | 400 K
------------------------------------
797 K Trainable params
0 Non-trainable params
797 K Total params
Epoch 0: 100%|████████████████████████████████████████████████████████████████████████████| 1563/1563 [00:12<00:00, 126.63it/s, loss=0.0542, v_num=145]
Epoch 0: 100%|████████████████████████████████████████████████████████████████████████████| 1563/1563 [00:12<00:00, 125.70it/s, loss=0.0542, v_num=145]
training_finished
bird ship car frog
Files already downloaded and verified
Files already downloaded and verified
Testing: 100%|██████████████████████████████████████████████████████████████████████████████████████████████████████| 313/313 [00:01<00:00, 185.81it/s]
--------------------------------------------------------------------------------
DATALOADER:0 TEST RESULTS
{'test_loss': tensor(0.0541, device='cuda:0')}
--------------------------------------------------------------------------------
[{'test_loss': 0.054066576063632965}]
cat ship ship plane
Clipping input data to the valid range for imshow with RGB data ([0..1] for floats or [0..255] for integers).
elapsed time: 34.649 [sec]
The network is as follows
net_encoder_decoder_1D_cifar10.py
import torch.nn as nn
import torch.nn.functional as F
class Encoder(nn.Module):
def __init__(self):
super(Encoder, self).__init__()
self.encoder = nn.Sequential(
nn.Linear(3*32 * 32, 128),
nn.ReLU(),
nn.Linear(128, 32)
)
def forward(self, x):
x = self.encoder(x)
return x
class Decoder(nn.Module):
def __init__(self):
super(Decoder, self).__init__()
self.decoder = nn.Sequential(
nn.Linear(32, 128),
nn.ReLU(),
nn.Linear(128, 3*32 * 32)
)
def forward(self, x):
x = self.decoder(x)
return x
First of all, you are free to change the network, but the following is one solution.
import torch.nn as nn
import torch.nn.functional as F
class Encoder(nn.Module):
def __init__(self):
super(Encoder, self).__init__()
self.encoder = nn.Sequential(
nn.Conv2d(in_channels = 3, out_channels = 64,
kernel_size = 3, padding = 1),
nn.ReLU(),
nn.MaxPool2d(2, 2),
nn.BatchNorm2d(64),
nn.Conv2d(64, 256, 3, 1, 1),
nn.ReLU(),
nn.MaxPool2d(2, 2),
nn.BatchNorm2d(256)
)
def forward(self, x):
x = self.encoder(x)
return x
class Decoder(nn.Module):
def __init__(self):
super(Decoder, self).__init__()
self.decoder = nn.Sequential(
nn.ConvTranspose2d(in_channels = 256, out_channels = 16,
kernel_size = 2, stride = 2, padding = 0),
nn.ConvTranspose2d(in_channels = 16, out_channels = 3,
kernel_size = 2, stride = 2)
)
def forward(self, x):
x = self.decoder(x)
return x
Below this, let's output the model configuration.
summary(autoencoder.encoder,(3,32,32))
summary(autoencoder.decoder,(256,8,8))
summary(autoencoder,(3,32,32))
print(autoencoder)
If you try, you will get the following results.
>python simple_autoencoder4.py
cuda:0
----------------------------------------------------------------
Layer (type) Output Shape Param #
================================================================
Conv2d-1 [-1, 64, 32, 32] 1,792
ReLU-2 [-1, 64, 32, 32] 0
MaxPool2d-3 [-1, 64, 16, 16] 0
BatchNorm2d-4 [-1, 64, 16, 16] 128
Conv2d-5 [-1, 256, 16, 16] 147,712
ReLU-6 [-1, 256, 16, 16] 0
MaxPool2d-7 [-1, 256, 8, 8] 0
BatchNorm2d-8 [-1, 256, 8, 8] 512
================================================================
Total params: 150,144
Trainable params: 150,144
Non-trainable params: 0
----------------------------------------------------------------
Input size (MB): 0.01
Forward/backward pass size (MB): 2.50
Params size (MB): 0.57
Estimated Total Size (MB): 3.08
----------------------------------------------------------------
----------------------------------------------------------------
Layer (type) Output Shape Param #
================================================================
ConvTranspose2d-1 [-1, 16, 16, 16] 16,400
ConvTranspose2d-2 [-1, 3, 32, 32] 195
================================================================
Total params: 16,595
Trainable params: 16,595
Non-trainable params: 0
----------------------------------------------------------------
Input size (MB): 0.06
Forward/backward pass size (MB): 0.05
Params size (MB): 0.06
Estimated Total Size (MB): 0.18
----------------------------------------------------------------
----------------------------------------------------------------
Layer (type) Output Shape Param #
================================================================
Conv2d-1 [-1, 64, 32, 32] 1,792
ReLU-2 [-1, 64, 32, 32] 0
MaxPool2d-3 [-1, 64, 16, 16] 0
BatchNorm2d-4 [-1, 64, 16, 16] 128
Conv2d-5 [-1, 256, 16, 16] 147,712
ReLU-6 [-1, 256, 16, 16] 0
MaxPool2d-7 [-1, 256, 8, 8] 0
BatchNorm2d-8 [-1, 256, 8, 8] 512
Encoder-9 [-1, 256, 8, 8] 0
ConvTranspose2d-10 [-1, 16, 16, 16] 16,400
ConvTranspose2d-11 [-1, 3, 32, 32] 195
Decoder-12 [-1, 3, 32, 32] 0
================================================================
Total params: 166,739
Trainable params: 166,739
Non-trainable params: 0
----------------------------------------------------------------
Input size (MB): 0.01
Forward/backward pass size (MB): 2.70
Params size (MB): 0.64
Estimated Total Size (MB): 3.35
----------------------------------------------------------------
LitAutoEncoder(
(encoder): Encoder(
(encoder): Sequential(
(0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
(1): ReLU()
(2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
(3): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(4): Conv2d(64, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
(5): ReLU()
(6): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
(7): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
)
)
(decoder): Decoder(
(decoder): Sequential(
(0): ConvTranspose2d(256, 16, kernel_size=(2, 2), stride=(2, 2))
(1): ConvTranspose2d(16, 3, kernel_size=(2, 2), stride=(2, 2))
)
)
)
And the result of Autoencoder in this case is as follows. A relatively beautiful image is obtained with one epoch. And in 10epoch, it became as follows. The reproducibility is much better now. By the way, the total time from learning to output is just over 3 minutes.
| Name | Type | Params
------------------------------------
0 | encoder | Encoder | 150 K
1 | decoder | Decoder | 16.6 K
------------------------------------
166 K Trainable params
0 Non-trainable params
166 K Total params
Epoch 0: 100%|█████████████████████████████████████████████████████████████████████████████| 1563/1563 [00:16<00:00, 94.14it/s, loss=0.0133, v_num=150]
Epoch 1: 100%|█████████████████████████████████████████████████████████████████████████████| 1563/1563 [00:16<00:00, 94.88it/s, loss=0.0102, v_num=150]
Epoch 2: 100%|████████████████████████████████████████████████████████████████████████████| 1563/1563 [00:16<00:00, 95.17it/s, loss=0.00861, v_num=150]
Epoch 3: 100%|█████████████████████████████████████████████████████████████████████████████| 1563/1563 [00:16<00:00, 95.22it/s, loss=0.0105, v_num=150]
Epoch 4: 100%|████████████████████████████████████████████████████████████████████████████| 1563/1563 [00:16<00:00, 95.17it/s, loss=0.00785, v_num=150]
Epoch 5: 100%|████████████████████████████████████████████████████████████████████████████| 1563/1563 [00:16<00:00, 94.58it/s, loss=0.00782, v_num=150]
Epoch 6: 100%|█████████████████████████████████████████████████████████████████████████████| 1563/1563 [00:16<00:00, 94.79it/s, loss=0.0073, v_num=150]
Epoch 7: 100%|████████████████████████████████████████████████████████████████████████████| 1563/1563 [00:16<00:00, 94.88it/s, loss=0.00723, v_num=150]
Epoch 8: 100%|████████████████████████████████████████████████████████████████████████████| 1563/1563 [00:16<00:00, 94.70it/s, loss=0.00721, v_num=150]
Epoch 9: 100%|████████████████████████████████████████████████████████████████████████████| 1563/1563 [00:16<00:00, 94.77it/s, loss=0.00689, v_num=150]
Epoch 9: 100%|████████████████████████████████████████████████████████████████████████████| 1563/1563 [00:16<00:00, 94.25it/s, loss=0.00689, v_num=150]
training_finished
bird dog frog horse
Files already downloaded and verified
Files already downloaded and verified
Testing: 100%|██████████████████████████████████████████████████████████████████████████████████████████████████████| 313/313 [00:01<00:00, 174.33it/s]
--------------------------------------------------------------------------------
DATALOADER:0 TEST RESULTS
{'test_loss': tensor(0.0043, device='cuda:0')}
--------------------------------------------------------------------------------
[{'test_loss': 0.004303526598960161}]
cat ship ship plane
Clipping input data to the valid range for imshow with RGB data ([0..1] for floats or [0..255] for integers).
elapsed time: 189.072 [sec]
In fact, if you look at the code below, you can see that it is def forward (self, x):
different.
Changed to output network summary.
You can learn this as before, and you can calculate preds as we have seen.
def forward(self, x):
# in lightning, forward defines the prediction/inference actions
embedding = self.encoder(x)
x = self.decoder(embedding)
return x
Also, the latent space can be made one-dimensional, but considering what kind of case it is necessary to do, I think that Autoencodr can be left as it is.
・ I tried to create Autoender of Cifr10 with Lit ・ The accuracy was as good as in Keras. -The basic code is very similar to Last categorized code, and I feel that it is highly reusable.
・ Based on this, I will try various things such as denoising, coloring, cGAN, etc.
Autoencoder for mnist in pytorch-lightning
import os
import time
import numpy as np
import torch
from torch import nn
import torch.nn.functional as F
import torchvision
from torchvision.datasets import MNIST
from torch.utils.data import DataLoader, random_split
from torchvision import transforms
import pytorch_lightning as pl
import matplotlib.pyplot as plt
from net_encoder_decoder_mnist import Encoder, Decoder
# functions to show an image
def imshow(img,file='', text_=''):
img = img / 2 + 0.5 # unnormalize
npimg = img.detach().numpy() #img.numpy()
plt.imshow(np.transpose(npimg, (1, 2, 0)))
plt.text(x = 3, y = 2, s = text_, c = "red")
#plt.imshow(npimg)
plt.pause(3)
if file != '':
plt.savefig(file+'.png')
plt.close()
from pytorch_lightning.callbacks import Callback
class MyPrintingCallback(Callback):
def on_epoch_end(self, trainer, pl_module):
print('')
class LitAutoEncoder(pl.LightningModule):
def __init__(self, data_dir='./'):
super().__init__()
self.data_dir = data_dir
# Hardcode some dataset specific attributes
self.num_classes = 10
self.classes = ('0', '1', '2', '3', '4', '5', '6', '7', '8', '9')
self.dims = (1, 28, 28)
channels, width, height = self.dims
self.transform=transforms.Compose([transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))])
self.encoder = Encoder() #nn.Sequential(nn.Linear(28 * 28, 128), nn.ReLU(), nn.Linear(128, 32))
self.decoder = Decoder() #nn.Sequential(nn.Linear(32, 128), nn.ReLU(), nn.Linear(128, 28 * 28))
def forward(self, x):
# in lightning, forward defines the prediction/inference actions
embedding = self.encoder(x)
return embedding
def training_step(self, batch, batch_idx):
# training_step defined the train loop. It is independent of forward
x, y = batch
x = x.view(x.size(0), -1)
z = self.encoder(x)
x_hat = self.decoder(z)
loss = F.mse_loss(x_hat, x)
self.log('train_loss', loss)
return loss
def validation_step(self, batch, batch_idx):
x, y = batch
x = x.view(x.size(0), -1)
z = self.encoder(x)
x_hat = self.decoder(z)
loss = F.mse_loss(x_hat, x)
self.log('test_loss', loss)
return loss
def test_step(self, batch, batch_idx):
# Here we just reuse the validation_step for testing
return self.validation_step(batch, batch_idx)
def configure_optimizers(self):
optimizer = torch.optim.Adam(self.parameters(), lr=1e-3)
return optimizer
def prepare_data(self):
# download
MNIST(self.data_dir, train=True, download=True)
MNIST(self.data_dir, train=False, download=True)
def setup(self, stage=None): #train, val,test data split
# Assign train/val datasets for use in dataloaders
mnist_full =MNIST(self.data_dir, train=True, transform=self.transform)
n_train = int(len(mnist_full)*0.8)
n_val = len(mnist_full)-n_train
self.mnist_train, self.mnist_val = torch.utils.data.random_split(mnist_full, [n_train, n_val])
self.mnist_test = MNIST(self.data_dir, train=False, transform=self.transform)
def train_dataloader(self):
self.trainloader = DataLoader(self.mnist_train, shuffle=True, drop_last = True, batch_size=32, num_workers=0)
# get some random training images
return self.trainloader
def val_dataloader(self):
return DataLoader(self.mnist_val, shuffle=False, batch_size=32, num_workers=0)
def test_dataloader(self):
self.testloader = DataLoader(self.mnist_test, shuffle=False, batch_size=32, num_workers=0)
return self.testloader
def main():
autoencoder = LitAutoEncoder()
#trainer = pl.Trainer()
trainer = pl.Trainer(max_epochs=10, gpus=1, callbacks=[MyPrintingCallback()])
trainer.fit(autoencoder) #, DataLoader(train), DataLoader(val))
print('training_finished')
dataiter = iter(autoencoder.trainloader)
images, labels = dataiter.next()
# show images
imshow(torchvision.utils.make_grid(images),'mnist_initial',text_='original')
# print labels
print(' '.join('%5s' % autoencoder.classes[labels[j]] for j in range(4)))
results = trainer.test(autoencoder)
print(results)
dataiter = iter(autoencoder.testloader)
images, labels = dataiter.next()
# show images
imshow(torchvision.utils.make_grid(images), 'mnist_results',text_ =' '.join('%5s' % autoencoder.classes[labels[j]] for j in range(4)))
# print labels
print(' '.join('%5s' % autoencoder.classes[labels[j]] for j in range(4)))
# torchscript
torch.jit.save(autoencoder.to_torchscript(), "model.pt")
trainer.save_checkpoint("example.ckpt")
PATH = 'example.ckpt'
pretrained_model = autoencoder.load_from_checkpoint(PATH)
pretrained_model.freeze()
pretrained_model.eval()
latent_dim,ver = 32, 10
dataiter = iter(autoencoder.testloader)
images, labels = dataiter.next()
# show images
imshow(torchvision.utils.make_grid(images),'original_images_{}_{}'.format(latent_dim,ver),text_ =' '.join('%5s' % autoencoder.classes[labels[j]] for j in range(8)))
encode_img = pretrained_model.encoder(images[0:32].to('cpu').reshape(32,28*28))
decode_img = pretrained_model.decoder(encode_img)
imshow(torchvision.utils.make_grid(decode_img.cpu().reshape(32,1,28,28)), 'original_autoencode_preds_mnist_{}_{}'.format(latent_dim,ver),text_ =' '.join('%5s' % autoencoder.classes[labels[j]] for j in range(8)))
if __name__ == '__main__':
start_time = time.time()
main()
print('elapsed time: {:.3f} [sec]'.format(time.time() - start_time))
Execution result
>python simple_autoencoder2.py
GPU available: True, used: True
TPU available: None, using: 0 TPU cores
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
2020-12-30 15:54:08.278635: W tensorflow/stream_executor/platform/default/dso_loader.cc:60] Could not load dynamic library 'cudart64_110.dll'; dlerror: cudart64_110.dll not found
2020-12-30 15:54:08.278724: I tensorflow/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.
| Name | Type | Params
------------------------------------
0 | encoder | Encoder | 104 K
1 | decoder | Decoder | 105 K
------------------------------------
209 K Trainable params
0 Non-trainable params
209 K Total params
Epoch 0: 100%|█████████████████████████████████████████████████████████████████████████████| 1875/1875 [00:13<00:00, 142.55it/s, loss=0.185, v_num=142]
Epoch 1: 100%|█████████████████████████████████████████████████████████████████████████████| 1875/1875 [00:13<00:00, 143.58it/s, loss=0.178, v_num=142]
Epoch 2: 100%|█████████████████████████████████████████████████████████████████████████████| 1875/1875 [00:13<00:00, 143.63it/s, loss=0.161, v_num=142]
Epoch 3: 100%|█████████████████████████████████████████████████████████████████████████████| 1875/1875 [00:13<00:00, 143.62it/s, loss=0.156, v_num=142]
Epoch 4: 100%|█████████████████████████████████████████████████████████████████████████████| 1875/1875 [00:13<00:00, 142.94it/s, loss=0.151, v_num=142]
Epoch 5: 100%|█████████████████████████████████████████████████████████████████████████████| 1875/1875 [00:13<00:00, 143.26it/s, loss=0.142, v_num=142]
Epoch 6: 100%|█████████████████████████████████████████████████████████████████████████████| 1875/1875 [00:13<00:00, 143.26it/s, loss=0.136, v_num=142]
Epoch 7: 100%|██████████████████████████████████████████████████████████████████████████████| 1875/1875 [00:13<00:00, 143.29it/s, loss=0.14, v_num=142]
Epoch 8: 100%|█████████████████████████████████████████████████████████████████████████████| 1875/1875 [00:13<00:00, 143.57it/s, loss=0.138, v_num=142]
Epoch 9: 100%|█████████████████████████████████████████████████████████████████████████████| 1875/1875 [00:13<00:00, 143.63it/s, loss=0.132, v_num=142]
Epoch 9: 100%|█████████████████████████████████████████████████████████████████████████████| 1875/1875 [00:13<00:00, 142.42it/s, loss=0.132, v_num=142]
training_finished
Clipping input data to the valid range for imshow with RGB data ([0..1] for floats or [0..255] for integers).
6 1 1 5
Testing: 100%|██████████████████████████████████████████████████████████████████████████████████████████████████████| 313/313 [00:01<00:00, 208.94it/s]
--------------------------------------------------------------------------------
DATALOADER:0 TEST RESULTS
{'test_loss': tensor(0.1300, device='cuda:0')}
--------------------------------------------------------------------------------
[{'test_loss': 0.13000936806201935}]
Clipping input data to the valid range for imshow with RGB data ([0..1] for floats or [0..255] for integers).
7 2 1 0
Clipping input data to the valid range for imshow with RGB data ([0..1] for floats or [0..255] for integers).
Clipping input data to the valid range for imshow with RGB data ([0..1] for floats or [0..255] for integers).
elapsed time: 149.029 [sec]
Autoencoder for cifar10 in pytorch-lightning; flatten-model
simple_autoencoder3.py
import os
import time
import numpy as np
import torch
from torch import nn
import torch.nn.functional as F
import torchvision
from torchvision.datasets import CIFAR10 #MNIST
from torch.utils.data import DataLoader, random_split
from torchvision import transforms
import pytorch_lightning as pl
import matplotlib.pyplot as plt
from net_encoder_decoder_1D_cifar10 import Encoder, Decoder
#from net_encoder_decoder_mnist import Encoder, Decoder
# functions to show an image
def imshow(img,file='', text_=''):
img = img / 2 + 0.5 # unnormalize
npimg = img.detach().numpy() #img.numpy()
plt.imshow(np.transpose(npimg, (1, 2, 0)))
plt.text(x = 3, y = 2, s = text_, c = "red")
#plt.imshow(npimg)
plt.pause(3)
if file != '':
plt.savefig(file+'.png')
plt.close()
from pytorch_lightning.callbacks import Callback
class MyPrintingCallback(Callback):
def on_epoch_end(self, trainer, pl_module):
print('')
class LitAutoEncoder(pl.LightningModule):
def __init__(self, data_dir='./'):
super().__init__()
self.data_dir = data_dir
# Hardcode some dataset specific attributes
self.num_classes = 10
self.classes = ('plane', 'car', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck')
#self.classes = ('0', '1', '2', '3', '4', '5', '6', '7', '8', '9')
self.dims = (3, 32, 32) #(1, 28, 28)
channels, width, height = self.dims
self.transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])
#self.transform=transforms.Compose([transforms.ToTensor(),
# transforms.Normalize((0.1307,), (0.3081,))])
self.encoder = Encoder()
self.decoder = Decoder()
def forward(self, x):
# in lightning, forward defines the prediction/inference actions
embedding = self.encoder(x)
return embedding
def training_step(self, batch, batch_idx):
# training_step defined the train loop. It is independent of forward
x, y = batch
x = x.view(x.size(0), -1)
z = self.encoder(x)
x_hat = self.decoder(z)
loss = F.mse_loss(x_hat, x)
self.log('train_loss', loss)
return loss
def validation_step(self, batch, batch_idx):
x, y = batch
x = x.view(x.size(0), -1)
z = self.encoder(x)
x_hat = self.decoder(z)
loss = F.mse_loss(x_hat, x)
self.log('test_loss', loss)
return loss
def test_step(self, batch, batch_idx):
# Here we just reuse the validation_step for testing
return self.validation_step(batch, batch_idx)
def configure_optimizers(self):
optimizer = torch.optim.Adam(self.parameters(), lr=1e-3)
return optimizer
def prepare_data(self):
# download
CIFAR10(self.data_dir, train=True, download=True)
CIFAR10(self.data_dir, train=False, download=True)
def setup(self, stage=None): #train, val,test data split
# Assign train/val datasets for use in dataloaders
cifar10_full =CIFAR10(self.data_dir, train=True, transform=self.transform)
n_train = int(len(cifar10_full)*0.8)
n_val = len(cifar10_full)-n_train
self.cifar10_train, self.cifar10_val = torch.utils.data.random_split(cifar10_full, [n_train, n_val])
self.cifar10_test = CIFAR10(self.data_dir, train=False, transform=self.transform)
def train_dataloader(self):
self.trainloader = DataLoader(self.cifar10_train, shuffle=True, drop_last = True, batch_size=32, num_workers=0)
# get some random training images
return self.trainloader
def val_dataloader(self):
return DataLoader(self.cifar10_val, shuffle=False, batch_size=32, num_workers=0)
def test_dataloader(self):
self.testloader = DataLoader(self.cifar10_test, shuffle=False, batch_size=32, num_workers=0)
return self.testloader
def main():
autoencoder = LitAutoEncoder()
#trainer = pl.Trainer()
trainer = pl.Trainer(max_epochs=1, gpus=1, callbacks=[MyPrintingCallback()])
trainer.fit(autoencoder) #, DataLoader(train), DataLoader(val))
print('training_finished')
dataiter = iter(autoencoder.trainloader)
images, labels = dataiter.next()
# show images
imshow(torchvision.utils.make_grid(images),'cifar10_initial',text_='original')
# print labels
print(' '.join('%5s' % autoencoder.classes[labels[j]] for j in range(4)))
results = trainer.test(autoencoder)
print(results)
dataiter = iter(autoencoder.testloader)
images, labels = dataiter.next()
# show images
imshow(torchvision.utils.make_grid(images), 'cifar10_results',text_ =' '.join('%5s' % autoencoder.classes[labels[j]] for j in range(4)))
# print labels
print(' '.join('%5s' % autoencoder.classes[labels[j]] for j in range(4)))
# torchscript
torch.jit.save(autoencoder.to_torchscript(), "model_cifar10.pt")
trainer.save_checkpoint("example_cifar10.ckpt")
PATH = 'example_cifar10.ckpt'
pretrained_model = autoencoder.load_from_checkpoint(PATH)
pretrained_model.freeze()
pretrained_model.eval()
latent_dim,ver = 32, 1
dataiter = iter(autoencoder.testloader)
images, labels = dataiter.next()
# show images
imshow(torchvision.utils.make_grid(images),'original_images_cifar10_{}_{}'.format(latent_dim,ver),text_ =' '.join('%5s' % autoencoder.classes[labels[j]] for j in range(8)))
encode_img = pretrained_model.encoder(images[0:32].to('cpu').reshape(32,3*32*32))
decode_img = pretrained_model.decoder(encode_img)
imshow(torchvision.utils.make_grid(decode_img.cpu().reshape(32,3,32,32)), 'original_autoencode_preds_cifar10_{}_{}'.format(latent_dim,ver),text_ =' '.join('%5s' % autoencoder.classes[labels[j]] for j in range(8)))
if __name__ == '__main__':
start_time = time.time()
main()
print('elapsed time: {:.3f} [sec]'.format(time.time() - start_time))
The above code corresponds to the following models.
net_encoder_decoder_1D_cifar10.py
import torch.nn as nn
import torch.nn.functional as F
class Encoder(nn.Module):
def __init__(self):
super(Encoder, self).__init__()
self.encoder = nn.Sequential(
nn.Linear(3*32 * 32, 128),
nn.ReLU(),
nn.Linear(128, 32)
)
def forward(self, x):
x = self.encoder(x)
return x
class Decoder(nn.Module):
def __init__(self):
super(Decoder, self).__init__()
self.decoder = nn.Sequential(
nn.Linear(32, 128),
nn.ReLU(),
nn.Linear(128, 3*32 * 32)
)
def forward(self, x):
x = self.decoder(x)
return x
Autoencoder for cifar10 in pytorch-lightning; 2D-model
import os
import time
import numpy as np
import torch
from torch import nn
import torch.nn.functional as F
import torchvision
from torchvision.datasets import CIFAR10 #MNIST
from torch.utils.data import DataLoader, random_split
from torchvision import transforms
import pytorch_lightning as pl
import matplotlib.pyplot as plt
from torchsummary import summary
from net_encoder_decoder2D import Encoder, Decoder
#from net_encoder_decoder_1D_cifar10 import Encoder, Decoder
#from net_encoder_decoder_mnist import Encoder, Decoder
# functions to show an image
def imshow(img,file='', text_=''):
img = img / 2 + 0.5 # unnormalize
npimg = img.detach().numpy() #img.numpy()
plt.imshow(np.transpose(npimg, (1, 2, 0)))
plt.text(x = 3, y = 2, s = text_, c = "red")
#plt.imshow(npimg)
plt.pause(3)
if file != '':
plt.savefig(file+'.png')
plt.close()
from pytorch_lightning.callbacks import Callback
class MyPrintingCallback(Callback):
def on_epoch_end(self, trainer, pl_module):
print('')
class LitAutoEncoder(pl.LightningModule):
def __init__(self, data_dir='./'):
super().__init__()
self.data_dir = data_dir
# Hardcode some dataset specific attributes
self.num_classes = 10
self.classes = ('plane', 'car', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck')
#self.classes = ('0', '1', '2', '3', '4', '5', '6', '7', '8', '9')
self.dims = (3, 32, 32) #(1, 28, 28)
channels, width, height = self.dims
self.transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])
#self.transform=transforms.Compose([transforms.ToTensor(),
# transforms.Normalize((0.1307,), (0.3081,))])
self.encoder = Encoder()
self.decoder = Decoder()
def forward(self, x):
# in lightning, forward defines the prediction/inference actions
embedding = self.encoder(x)
x = self.decoder(embedding)
return x
def training_step(self, batch, batch_idx):
# training_step defined the train loop. It is independent of forward
x, y = batch
#x = x.view(x.size(0), -1)
z = self.encoder(x)
x_hat = self.decoder(z)
loss = F.mse_loss(x_hat, x)
self.log('train_loss', loss)
return loss
def validation_step(self, batch, batch_idx):
x, y = batch
#x = x.view(x.size(0), -1)
z = self.encoder(x)
x_hat = self.decoder(z)
loss = F.mse_loss(x_hat, x)
self.log('test_loss', loss)
return loss
def test_step(self, batch, batch_idx):
# Here we just reuse the validation_step for testing
return self.validation_step(batch, batch_idx)
def configure_optimizers(self):
optimizer = torch.optim.Adam(self.parameters(), lr=1e-3)
return optimizer
def prepare_data(self):
# download
CIFAR10(self.data_dir, train=True, download=True)
CIFAR10(self.data_dir, train=False, download=True)
def setup(self, stage=None): #train, val,test data split
# Assign train/val datasets for use in dataloaders
cifar10_full =CIFAR10(self.data_dir, train=True, transform=self.transform)
n_train = int(len(cifar10_full)*0.8)
n_val = len(cifar10_full)-n_train
self.cifar10_train, self.cifar10_val = torch.utils.data.random_split(cifar10_full, [n_train, n_val])
self.cifar10_test = CIFAR10(self.data_dir, train=False, transform=self.transform)
def train_dataloader(self):
self.trainloader = DataLoader(self.cifar10_train, shuffle=True, drop_last = True, batch_size=32, num_workers=0)
# get some random training images
return self.trainloader
def val_dataloader(self):
return DataLoader(self.cifar10_val, shuffle=False, batch_size=32, num_workers=0)
def test_dataloader(self):
self.testloader = DataLoader(self.cifar10_test, shuffle=False, batch_size=32, num_workers=0)
return self.testloader
def main():
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") #for gpu
# Assuming that we are on a CUDA machine, this should print a CUDA device:
print(device)
pl.seed_everything(0)
# model
autoencoder = LitAutoEncoder()
autoencoder = autoencoder.to(device) #for gpu
summary(autoencoder.encoder,(3,32,32))
summary(autoencoder.decoder,(256,8,8))
summary(autoencoder,(3,32,32))
print(autoencoder)
#trainer = pl.Trainer()
trainer = pl.Trainer(max_epochs=10, gpus=1, callbacks=[MyPrintingCallback()])
trainer.fit(autoencoder) #, DataLoader(train), DataLoader(val))
print('training_finished')
dataiter = iter(autoencoder.trainloader)
images, labels = dataiter.next()
# show images
imshow(torchvision.utils.make_grid(images),'cifar10_initial',text_='original')
# print labels
print(' '.join('%5s' % autoencoder.classes[labels[j]] for j in range(4)))
results = trainer.test(autoencoder)
print(results)
dataiter = iter(autoencoder.testloader)
images, labels = dataiter.next()
# show images
imshow(torchvision.utils.make_grid(images), 'cifar10_results',text_ =' '.join('%5s' % autoencoder.classes[labels[j]] for j in range(4)))
# print labels
print(' '.join('%5s' % autoencoder.classes[labels[j]] for j in range(4)))
# torchscript
# torch.jit.save(autoencoder.to_torchscript(), "model_cifar10.pt")
trainer.save_checkpoint("example_cifar10.ckpt")
PATH = 'example_cifar10.ckpt'
pretrained_model = autoencoder.load_from_checkpoint(PATH)
pretrained_model.freeze()
pretrained_model.eval()
latent_dim,ver = 16384, 10
dataiter = iter(autoencoder.testloader)
images, labels = dataiter.next()
# show images
imshow(torchvision.utils.make_grid(images),'original_images_cifar10_{}_{}'.format(latent_dim,ver),text_ =' '.join('%5s' % autoencoder.classes[labels[j]] for j in range(8)))
encode_img = pretrained_model.encoder(images[0:32].to('cpu').reshape(32,3,32,32))
decode_img = pretrained_model.decoder(encode_img)
imshow(torchvision.utils.make_grid(decode_img.cpu().reshape(32,3,32,32)), 'original_autoencode_preds_cifar10_{}_{}'.format(latent_dim,ver),text_ =' '.join('%5s' % autoencoder.classes[labels[j]] for j in range(8)))
if __name__ == '__main__':
start_time = time.time()
main()
print('elapsed time: {:.3f} [sec]'.format(time.time() - start_time))
Recommended Posts