This repository has been archived by the owner on Nov 6, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdcgan_capsulegan.py
595 lines (485 loc) · 42.2 KB
/
dcgan_capsulegan.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
# -*- coding: utf-8 -*-
"""DCGAN_CapsuleGAN
Automatically generated by Colaboratory.
Original file is located at
https://colab.research.google.com/drive/1HQ9f0REbOdKA1DXLzA671hnBSLUj7xGQ
# **UNSUPERVISED LEARNING | GANZ00 (The Art of Programming) _ Capsule & Deep Convolutional GAN**
***Powered by:***
![uniXerr logo](https://drive.google.com/uc?id=1TXJwfJsTJzU2M7LrIQgx2Tx4cfUzcQuX)
**Deep Convolutioanl GAN several improvements:**
* Utilizing the convolution layer instead pooling function in the Discriminator
model for reducing dimensionality. This way, the network itself will learn how to reduce dimensionality. On the other hand, in the Generator Model, we use deconvolution to upsample dimensions of feature maps.
* Adding in the batch normalization. This is used to increase the stability of a neural network. In an essence, batch normalization normalizes the output of a previous layer by subtracting the batch mean and dividing by the batch standard deviation.
* Remove fully connected layers from Convolutional Neural Network.
* Use Relu and Leaky Relu activation functions.
![DCGAN job](https://drive.google.com/uc?id=1Ind08ydejfh6IYYl6Gw_jYfLEGN4Eiph)
![DCGAN process](https://drive.google.com/uc?id=1wZufkk6jq22l15a8VUFEQ1MT5mfERMS_)
**Discriminator Process**
> Strided convolution instead of max-pooling down samples the image.
![D_process](https://drive.google.com/uc?id=1mQSjU2KVzOEQwx5qdp7VTglx5AvUhd_3)
**Generator Process**
> Upsampling is used instead of fractionally-strided transposed convolution.
![G_process](https://drive.google.com/uc?id=1AWKUP8dGW8xdXVX8JENavoBR_JSR01WA)
**Adversarial Network**
> The Adversarial model is simply generator with its output connected to the input of the discriminator. Also shown is the training process wherein the Generator labels its fake image output with 1.0 trying to fool the Discriminator.
![G_process](https://drive.google.com/uc?id=1jMhMV5kiaCqNa9x1E-YDdTTB4CWB8XOD)
**Loss Function**
> Discriminator in GAN uses a cross entropy loss, since discriminators job is to classify; cross entropy loss is the best one out there.
![gan loss](https://drive.google.com/uc?id=1TZlEihIaUqK4v8_MFYb8Ilf9o0Rjw2PR)
> This formula represents the cross entropy loss between `p`: the true distribution and `q`: the estimated distribution.
`(p)` and `(q)` are the of `m` dimensions where `m` is the number of classes.
![cross entropy](https://drive.google.com/uc?id=1BJSC-RUODhllXGDR6TnzuRkYBwKjg0xF)
> In GAN, discriminator is a binary classifier. It needs to classify either the data is real or fake. Which means `m = 2`. The true distribution is one hot vector consisting of only 2 terms.
For `n` number of samples, we can sum over the losses.
This above shown equation is of binary cross entropy loss, where `y` can take two values 0 and 1.
GAN’s have a latent vector `z`, image `G(z)` is magically generated out of it. We apply the discriminator function `D` with real image `x` and the generated image `G(z)`.
The intention of the loss function is to push the predictions of the real image towards 1 and the fake images to 0. We do so by log probability term.
![minmax formula](https://drive.google.com/uc?id=1Ky3cfOdWT1tRNk3SLT7Luscko1e3J0NT)
**Note:** `~` sign means: is distributed as and `Ex` here means expectations: since we don’t know how samples are fed into the discriminator, we are representing them as expectations rather than the sum.
If we observe the joint loss function we are maximizing the discriminator term, which means log of `D(x)` should inch closer to zero, and log `D(G(z))` should be closer to 1. Here generator is trying to make `D(G(z))` inch closer to 1 while discriminator is trying to do the opposite.
# **Mount Google Drive**
"""
from google.colab import drive
drive.mount('/gdrive')
"""# **Requirements**"""
from __future__ import print_function, division
from PIL import Image
import pprint
import time
import random
import pickle
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import os
import sys
import imageio
import tensorflow as tf
import plotly.graph_objects as go
import cv2
import asyncio
import math
import seaborn as sns
import tensorflow.keras.backend as K
from tensorflow.keras.layers import MaxPooling2D
from tensorflow.keras.layers import Input
from tensorflow.keras.layers import Dense
from tensorflow.keras.layers import Reshape
from tensorflow.keras.layers import Flatten
from tensorflow.keras.layers import Dropout
from tensorflow.keras.layers import BatchNormalization
from tensorflow.keras.layers import Activation
from tensorflow.keras.layers import ZeroPadding2D
from tensorflow.keras.layers import LeakyReLU
from tensorflow.keras.layers import UpSampling2D
from tensorflow.keras.layers import Conv2DTranspose
from tensorflow.keras.layers import Conv2D
from tensorflow.keras.layers import Lambda
from tensorflow.keras.layers import Concatenate
from tensorflow.keras.layers import Multiply
from tensorflow.keras.models import Sequential, Model, load_model
from tensorflow.keras.optimizers import Adam, RMSprop
from tensorflow.keras.callbacks import TensorBoard
from tensorflow.keras.datasets import mnist, fashion_mnist, cifar10
# Confirm that we're using Python 3
assert sys.version_info.major is 3, 'Oops, not running Python 3. Use Runtime > Change runtime type'
print("[...] Installing dependencies for Colab environment")
!pip install -Uq grpcio==1.26.0
"""# **Enable TPU for Training**"""
assert 'COLAB_TPU_ADDR' in os.environ, 'Did you forget to switch to TPU?'
tpu_address = 'grpc://' + os.environ['COLAB_TPU_ADDR'] # colab is using grpc for its VPSes
print(f"Found TPU at {tpu_address}")
resolver = tf.distribute.cluster_resolver.TPUClusterResolver(tpu=tpu_address)
tf.config.experimental_connect_to_cluster(resolver)
tf.tpu.experimental.initialize_tpu_system(resolver)
strategy = tf.distribute.experimental.TPUStrategy(resolver)
"""# **The Image Processing Kit**"""
class IPKit:
def __init__(self):
self.drivePath = '/gdrive/My Drive/'
def saveGANIMG(self, generated, epoch, dataset_name):
if not os.path.isdir('generated'): os.mkdir('generated')
fig, axs = plt.subplots(5, 5) # (5 , 5) images for 25 noises
batch_count = 0
for row in range(5):
for col in range(5):
# plot image on each of 25 axis of figure object in range [0, 255]
if dataset_name == 'paint_art' or dataset_name == 'cifar10':
axs[row, col].imshow((generated[batch_count, :, :, :] * 127.5 + 127.5).astype(np.uint8))
else:
axs[row, col].imshow(generated[batch_count, :, :, 0] * 127.5 + 127.5, cmap='gray')
axs[row, col].axis('off') # hide the related axis
batch_count += 1 # get ready for next data row
fig.savefig(f"generated/{epoch}.png")
plt.close()
def MakeGif(self):
filenames = [ fname for fname in np.sort(os.listdir('generated')) if ".png" in fname]
with imageio.get_writer('generated/gan.gif', mode="I") as writer: # open a writer object for writing images on it to export a gif
for filename in filenames: # for every file in filenames list read them
image = imageio.imread('generated/'+filename)
writer.append_data(image) # append opened image into writer object for making gif
# call below function whenever you have new images in gdrive art folder
# turn all images into a numpy array of pixels
def buildPaint(self):
training_data = []
filenames = os.listdir(self.drivePath+'Art-Dataset/')
for fname in filenames:
image_path = os.path.join(self.drivePath+'Art-Dataset/'+fname)
image = Image.open(image_path).resize((128,128), Image.ANTIALIAS)
if np.asarray(image).shape != (128, 128, 3):
os.remove(image_path)
else:
training_data.append(np.asarray(image))
# plt.imshow(image)
# print(image)
np.save(self.drivePath+'Art-Dataset/paint_art.npy', training_data)
def loadPaint(self):
return tf.data.Dataset.from_tensor_slices(np.load(os.path.join(self.drivePath+'Art-Dataset/paint_art.npy'))) # create the data pipeline from hard disk
"""# **Tooling Classes - Setting up Hyper-Parameters**"""
class HPARAM:
loss = 'binary_crossentropy'
optimizer = lambda name : Adam(lr=0.0002, beta_1=0.5) if name == 'Adam' else RMSprop(learning_rate=0.0008, rho=1.0, decay=6e-8)
batch_size = 64
buffer_size = 10000
epochs = 30000
capsules = 8
cross_entropy = tf.keras.losses.BinaryCrossentropy(from_logits=True)
dropout = 0.4
momentum = 0.9
depth = 256
metrics = ['accuracy']
"""# **GAN Class**
**BUG**: fix the black pixel generating for paint_art dataset!
"""
class GAN:
'''
we convert training images to float32 type
then normalize and scale the pixel data by half of the 255:
the activation function of the output layer of the generator is tanh,
which returns a value between -1 and 1. To scale that to 0 and 255
(which are the values you expect for an image), we have to multiply it
by 127.5 (so that -1 becomes -127.5, and 1 becomes 127.5), and then
add 127.5 (so that -127.5 becomes 0, and 127.5 becomes 255). We then
have to do the inverse of this when feeding an image into the
discriminator (which will expect a value between -1 and 1).
Leaky ReLUs are one attempt to fix the “dying ReLU” problem.
Instead of the function being zero when x < 0, a leaky ReLU
will instead have a small negative slope (of 0.01, or so).
That is, the function computes f(x)=1(x<0)(αx)+1(x>=0)(x) where α is a small constant.
NOTE : the None in models' summary is the batch dimension.
NOTE : "same" results in padding the input such that the output has the same length as the original input.
NOTE : all kernels or filters in each Conv2D layers has the depth of the image channels.
NOTE : BatchNormalization layer normalize the activations of the previous layer at each batch,
i.e. applies a transformation that maintains the mean activation close to 0 and the activation standard deviation close to 1.
'''
def __init__(self, dataset_name='paint_art', channels=3, generator_input_features=100, discNetwork='dcgan'):
self.channels = channels # the default is 3, because paint dataset has colorful images
self.dataset_name = dataset_name
self.ipkit = IPKit()
self.dcganFlag, self.capsuleFlag = False, False
if self.dataset_name == 'mnist' or self.dataset_name == 'fashion_mnist':
self.h = self.w = 28
self.mDep = int(self.h/4)
if discNetwork == 'dcgan':
self.dcganFlag = True
if discNetwork == 'cgan':
self.capsuleFlag = True
if self.dataset_name == 'cifar10':
self.h = self.w = 32
self.mDep = int(self.h/8) # for generator : image length and width start from 4 up to 32 by each Conv2D layer strides and for discriminator start from 32 down to flatten 2*2*256 neurons
(self.x_train, _), (_, _) = cifar10.load_data()
self.x_train = np.reshape(self.x_train, (-1, self.h, self.w, self.channels)) # shape : (50000, 32, 32, 3)
elif self.dataset_name == 'mnist':
(self.x_train, _), (_, _) = mnist.load_data()
self.x_train = np.expand_dims(self.x_train, axis=3) # shape : (60000, 28, 28, 1) ; you can also use self.x_train.reshape(self.x_train.shape[0], self.h, self.w, 1)
elif self.dataset_name == 'fashion_mnist':
(self.x_train, _), (_, _) = fashion_mnist.load_data()
self.x_train = np.expand_dims(self.x_train, axis=3) # shape : (60000, 28, 28, 1) ; you can also use self.x_train.reshape(self.x_train.shape[0], self.h, self.w, 1)
elif self.dataset_name == 'paint_art':
self.h = self.w = 128
self.mDep = int(self.h/32) # the begining depth of our generator network
self.x_train = self.ipkit.loadPaint() # return a data storage
self.x_train = self.x_train.cache() # allows to cache elements of the dataset for future reusing. Cached data will be store in memory (by default)
self.x_train = self.x_train.shuffle(HPARAM.buffer_size, reshuffle_each_iteration=True) # every time when data was needed, it takes from the buffer. After that buffer is filled up with newest elements to the given buffer size.
self.x_train = self.x_train.map(lambda x : (tf.cast(x, tf.float32) - 127.5)/127.5, num_parallel_calls=tf.data.experimental.AUTOTUNE) # num_parallel_calls should be equal the number of processes that can be used for transformation. tf.data.experimental.AUTOTUNE defines appropriate number of processes that are free for working - scale to [-1, 1]
self.x_train = self.x_train.batch(HPARAM.batch_size, drop_remainder=True) # drop the last batch cause it doesn't fit the batch size - 8320 images devided into 8320/HPARAM.batch_size batches each of size HPARAM.batch_size
self.x_train = self.x_train.prefetch(tf.data.experimental.AUTOTUNE) # defines appropriate number of batches to feed into the next iteration - prevent CPU stands idle
self.x_train = list(self.x_train.as_numpy_iterator()) # len(self.x_train) iterations wich contains HPARAM.batch_size sammples in each iter to complete one epoch
self.discriminator_input = (128, 128, 3)
if self.dataset_name is not 'paint_art':
self.x_train = (self.x_train.astype(np.float32) - 127.5)/127.5 # normalize the images to [-1, 1] - because the output of our generator is squashed by a tanh activation function which give a number in range [-1, 1]
self.discriminator_input = self.x_train[0].shape # Example : (32, 32, 1) for paint_art mnist
self.generator_input_features = generator_input_features
self.__create_networks()
def __MakeGeneratorModel(self):
'''
creating generator layers activated by tanh.
basically this model generates noisy images for first rounds and real images at the end of total epochs.
The generator model is typically implemented using a deep convolutional neural network
and results-specialized layers that learn to fill in features in an image
rather than extract features from an input image, cause we want to produce a real image
from noisy one by learning the features map (deconvolutional process).
GAN architecture are required to upsample input data in order since it synthesizes more realistic images.
fractional stride (deconvolutional layers) can be used in the generator for upsampling.
The upsampling layer is a simple layer with no weights that will double the dimensions of
input and can be used in a generative model when followed by a traditional convolutional layer.
NOTE : in order to understand the architecture of the generator model see its summary and the shape of training images!
NOTE : the output shape of Conv2DTranspose with padding same is : output = input * stride
NOTE : the default stride is 1 because we choosed to use UpSampling2D layer.
NOTE : you can comment UpSampling2D layers, set strides=2 for each Conv2DTranspos layers to get the same result with
UpSampling2D layer cause transposed convolutions are more flexible than classical upsampling methods.
NOTE : default strides option of Conv2DTranspose layer doesn't affect the output shape because the argument is set to 1 by default and we have UpSampling2D layer.
you can uncomment the Upsampling2D layers and comment Conv2DTranspose layers or set the strides of each Conv2DTranspose layers to 1 to get the benefits of upsampling method.
'''
print("\n\n (+(+(+(+(+(+ GENERATOR SUMMARY - FEATURES/NEURONS/INPUTS STRUCTURE +)+)+)+)+)+) \n\n")
generator_input_features = Input(shape=(self.generator_input_features,), name='generator_input_features') # create a Input layer with size for example 100 (first layer neurons)
self.generator = Sequential() # create sequential model object - generator/decoder
self.generator.add(Dense(HPARAM.depth * self.mDep * self.mDep, input_dim=self.generator_input_features)) # size of next layer (hidden) is (None, HPARAM.depth * self.mDep * self.mDep) with the input : (None, 100) - weights matrix size : (100, HPARAM.depth * self.mDep * self.mDep)
self.generator.add(BatchNormalization(momentum=HPARAM.momentum))
self.generator.add(LeakyReLU())
self.generator.add(Reshape((self.mDep, self.mDep, HPARAM.depth))) # reshape to (None, self.mDep, self.mDep, 256) - None is the batch size dim
self.generator.add(Dropout(HPARAM.dropout)) # apply a dropout with a 40% chance of setting input features to zero for perviouse layer to prevent over-fitting
# self.generator.add(UpSampling2D()) # opposite of pooling layer - doubles the dimensions of the last layer output ; output size : (None, 2*self.mDep, 2*self.mDep, 256)
for n_layer in range(int(math.log2(self.h/self.mDep))):
self.generator.add(Conv2DTranspose(filters=int(HPARAM.depth), kernel_size=5, strides=2, padding='same')) # output size : double the pervious layer output in every iteration by strides with filter 256
self.generator.add(BatchNormalization(momentum=HPARAM.momentum))
self.generator.add(LeakyReLU())
# self.generator.add(UpSampling2D()) # double the last output size, not the filter! before use it remove strides in Conv2DTranspose layer
self.generator.add(Conv2DTranspose(filters=int(HPARAM.depth/2), kernel_size=5, padding='same')) # output size : (None, self.h, self.w, 128) with padding "same" after convolutional ops >>>> input_width & input_height = self.h * strides with 128 filters
self.generator.add(BatchNormalization(momentum=HPARAM.momentum))
self.generator.add(LeakyReLU())
# self.generator.add(UpSampling2D()) # double the last output size, not the filter!
self.generator.add(Conv2DTranspose(filters=int(HPARAM.depth/4), kernel_size=5, padding='same')) # output size : (None, self.h, self.w, 64) with padding "same" after convolutional ops >>>> input_width & input_height = self.h * strides with 64 filters
self.generator.add(BatchNormalization(momentum=HPARAM.momentum))
self.generator.add(LeakyReLU())
# self.generator.add(UpSampling2D()) # double the last output size, not the filter!
self.generator.add(Conv2DTranspose(filters=int(HPARAM.depth/8), kernel_size=5, padding='same')) # output size : (None, self.h, self.w, 32) with padding "same" after convolutional ops >>>> input_width & input_height = self.h * strides with 32 filters
self.generator.add(BatchNormalization(momentum=HPARAM.momentum))
self.generator.add(LeakyReLU())
# self.generator.add(UpSampling2D()) # double the last output size, not the filter!
self.generator.add(Conv2DTranspose(filters=int(HPARAM.depth/16), kernel_size=5, padding='same')) # output size : (None, self.h, self.w, 16) with padding "same" after convolutional ops >>>> input_width & input_height = self.h * strides with 16 filters
self.generator.add(BatchNormalization(momentum=HPARAM.momentum))
self.generator.add(LeakyReLU())
self.generator.add(Conv2DTranspose(filters=self.channels, kernel_size=5, padding="same")) # image channels as the number of filters of the last layer - output size : (None, self.h, self.w, self.channels) with padding "same" after convolutional ops >>>> input_width & input_height = self.h * strides with self.channels filters
self.generator.add(Activation("tanh")) # -1 < output < 1
self.generator.summary()
print("\n\n (+(+(+(+(+(+ GENERATOR MODEL SUMMARY AFTER TURNING IT INTO A TENSOR +)+)+)+)+)+) \n\n")
generator_output_tensor = self.generator(generator_input_features) # turn our generator sequential model object into a tensor with input layer for example 100 neurons - output size : (None, self.h, self.w, self.channels)
self.generator_model = Model(generator_input_features, generator_output_tensor) # create the generator model with for example 100 inputs and (None, self.h, self.w, self.channels) output
self.generator_model.compile(loss=HPARAM.loss, optimizer=HPARAM.optimizer('Adam'), metrics=HPARAM.metrics)
self.generator_model.summary()
print(f"\n\n\t\t [======Generator Tensor======] \n\n\t\t {generator_output_tensor}\n\n")
def __MakeDiscriminatorModel(self):
'''
The discriminator model takes an example from the domain as input (real or generated)
and predicts a binary class label of real or fake (generated). It has kinda supervised manner!
we use downsampling in the discriminator model to reduce dimensionality.
In GANs, the recommendation is to not use pooling layers,
and instead use the stride in convolutional layers to
perform downsampling in the discriminator model.
for the output layer we'll use sigmoid activation function to
squashes the output to a range between 0 and 1 for discriminating images.
NOTE : in order to understand the architecture of the discriminator model see its summary and the shape of training images!
NOTE : Conv2D(8, kernel_size=(3, 3), activation='relu', padding='same', strides=2)
if padding == 'same':
output_length = input_length
elif padding == 'valid':
output_length = input_length - filter_size
return (output_length + stride - 1) // stride
so (input=400 + 2-1)//2 = (200,200) will be (H,W) respectively and including filter (200,200,filter)
NOTE : you can remove the strides argument from each Conv2D layer and use MaxPooling2D with pool_size=2 layer to half the size of the width and height of the input features.
just remember to use a MaxPooling2D layer as the first layer of the discriminator using functional model api to half the size of the input features : maxpooling((None, self.h, self.w, self.channels)) -> Conv2D(32, 5, "same") -> (None, self.h/2, self.w/2, 32)
'''
discriminator_input = Input(shape=self.discriminator_input, name='discriminator_input_features') # create the input layer with size for example (self.h, self.w, self.channels)
if self.dcganFlag:
print("\n\n (+(+(+(+(+(+ DEEP CONVOLUTIONAL DISCRIMINATOR SUMMARY - FEATURES/NEURONS/INPUTS STRUCTURE +)+)+)+)+)+) \n\n")
self.discriminator = Sequential() # discriminator/encoder
self.discriminator.add(Conv2D(filters=int(HPARAM.depth/16), kernel_size=5, strides=2, input_shape=self.discriminator_input, padding="same")) # output size : (None, self.h/2, self.w/2, 16) of first hidden layer - input size : (None, self.h, self.w, self.channels) >>> input_width & input_height = self.h / strides with 16 filters
self.discriminator.add(LeakyReLU(0.2)) # fix the “dying ReLU” problem by alpha = 0.2
self.discriminator.add(Dropout(HPARAM.dropout)) # apply a dropout with a 40% chance of setting inputs features to zero for perviouse layer and to each element or cell within the feature maps
self.discriminator.add(Conv2D(filters=int(HPARAM.depth/8), kernel_size=5, strides=2, padding="same")) # output size : half the pervious layer output by strides with 32 filters
if self.h == 28:
self.discriminator.add(ZeroPadding2D(padding=((0,1),(0,1)))) # add rows and columns of zeros at the top, bottom, left and right side of an image tensor - output size : (None, (self.h/4)+1, (self.w/4)+1, 32)
if self.h == 128:
for n_layer in range(2):
self.discriminator.add(Conv2D(filters=int(HPARAM.depth/8), kernel_size=5, strides=2, padding="same")) # output size : half the pervious layer output in every iteration by strides with 32 filters
self.discriminator.add(BatchNormalization(momentum=HPARAM.momentum))
self.discriminator.add(LeakyReLU(0.2))
self.discriminator.add(Dropout(HPARAM.dropout)) # apply a dropout with a 40% chance of setting input features to zero for perviouse layer and to each element or cell within the feature maps to prevent over-fitting
self.discriminator.add(Conv2D(filters=int(HPARAM.depth/4), kernel_size=5, strides=2, padding="same")) # output size : (None, 4, 4, 64)
self.discriminator.add(BatchNormalization(momentum=HPARAM.momentum))
self.discriminator.add(LeakyReLU(0.2))
self.discriminator.add(Dropout(HPARAM.dropout))
self.discriminator.add(Conv2D(filters=int(HPARAM.depth/4), kernel_size=5, strides=2, padding="same")) # output size : (None, 2, 2, 64)
self.discriminator.add(BatchNormalization(momentum=HPARAM.momentum))
self.discriminator.add(LeakyReLU(0.2))
self.discriminator.add(Dropout(HPARAM.dropout))
self.discriminator.add(Conv2D(filters=int(HPARAM.depth/2), kernel_size=5, strides=1, padding="same")) # output size : (None, 2, 2, 128)
self.discriminator.add(BatchNormalization(momentum=HPARAM.momentum))
self.discriminator.add(LeakyReLU(0.2))
self.discriminator.add(Dropout(HPARAM.dropout))
self.discriminator.add(Conv2D(filters=HPARAM.depth, kernel_size=5, strides=1, padding="same")) # output size : (None, 2, 2, 256) >>>
self.discriminator.add(BatchNormalization(momentum=HPARAM.momentum))
self.discriminator.add(LeakyReLU(0.2))
self.discriminator.add(Dropout(HPARAM.dropout)) # apply a dropout with a 40% chance of setting inputs features to zero for perviouse layer and to each element or cell within the feature maps
self.discriminator.add(Flatten()) # turn the last layer by flatten it into a fully dense connected for prediction in the next - output size : (None, 1024)
self.discriminator.add(Dense(1, activation='sigmoid')) # one neuron (single scalar) at the output ; means the image is real or fake, 1 for real (if the sigmoid neuron's output is larger than or equal to 0.5) and 0 for fake (if the output is smaller than 0.5) - output size : (None, 1) | weights matrix size : (1024, 1)
self.discriminator.summary()
discriminator_tensor = self.discriminator(discriminator_input) # turn our discriminator sequential model object into a tensor with an input layer for example (None, self.h, self.w, self.channels) inputs or neurons - output size : (None , 1)
print("\n\n (+(+(+(+(+(+ DISCRIMINATOR MODEL SUMMARY AFTER TURNING IT INTO A TENSOR +)+)+)+)+)+) \n\n")
if self.capsuleFlag:
# =========================================================================================
# idea borrowed from : https://github.com/gusgad/capsule-GAN/blob/master/capsule_gan.ipynb
# =========================================================================================
print("\n\n (+(+(+(+(+(+ DISCRIMINATOR SUMMARY WITH CAPSULE LAYER ARCHITECTURE - FEATURES/NEURONS/INPUTS STRUCTURE +)+)+)+)+)+) \n\n")
if self.dataset_name == 'paint_art':
# change the dimension of input (width and height of image) to 32 for primary caps layers with 256 channels
self.discriminator = Conv2D(filters=HPARAM.depth, kernel_size=5, strides=2, padding="same", name="conv0_64")(discriminator_input) # output size : (None, 64, 64, 256)
self.discriminator = Conv2D(filters=HPARAM.depth, kernel_size=5, strides=2, padding="same", name="conv0_32")(self.discriminator) # output size : (None, 32, 32, 256)
self.discriminator = Conv2D(filters=HPARAM.depth, kernel_size=9 , strides=1, padding='valid', name='conv1')(self.discriminator) # output size : (None, 24, 24, 256)
if self.dataset_name =='mnist' or self.dataset_name == 'fashion_mnist':
self.discriminator = ZeroPadding2D(padding=((4,0),(0,4)), name='zeropadded_to_32')(discriminator_input) # output size : (None, 32, 32, 256)
self.discriminator = Conv2D(filters=HPARAM.depth, kernel_size=9 , strides=1, padding='valid', name='conv1')(self.discriminator) # output size : (None, 24, 24, 256)
if self.dataset_name == 'cifar10':
self.discriminator = Conv2D(filters=HPARAM.depth, kernel_size=9 , strides=1, padding='valid', name='conv1')(discriminator_input) # output size : (None, 24, 24, 256)
self.discriminator = LeakyReLU(0.2)(self.discriminator)
self.discriminator = BatchNormalization(momentum=HPARAM.momentum)(self.discriminator)
self.discriminator = Conv2D(filters=HPARAM.capsules * int(HPARAM.depth/HPARAM.capsules), kernel_size=9, strides=2, padding='valid', name='primarycap_conv2')(self.discriminator)
self.discriminator = Reshape((HPARAM.capsules * HPARAM.depth, 8), name='primarycap_reshape')(self.discriminator) # reshape to 8D vector (capsule) for each 32 capsule layers
self.discriminator = Lambda(self.__squash, name='primarycap_squash')(self.discriminator) # the output of each capsule must be [0, 1]
self.discriminator = BatchNormalization(momentum=HPARAM.momentum)(self.discriminator)
self.discriminator = Flatten()(self.discriminator) # output size : (None, 16384) - digit caps
uhat = Dense(160, kernel_initializer='he_normal', bias_initializer='zeros', name='uhat_digitcaps')(self.discriminator) # output size : (None, 160) - weights of previous and this layer is : (16384, 160)
c = Activation('softmax', name='softmax_digitcaps1')(uhat)
c = Dense(160)(c)
self.discriminator = Multiply()([uhat, c])
s_j = LeakyReLU(0.2)(self.discriminator)
for i in range(2): # dynamic routing - agreement
c = Activation('softmax', name=f'softmax_digitcaps{i+2}')(s_j)
c = Dense(160)(c)
self.discriminator = Multiply()([uhat, c])
s_j = LeakyReLU(0.2)(self.discriminator)
self.discriminator = Dense(1, activation='sigmoid')(s_j) # our last tensor object which is not callable
discriminator_tensor = self.discriminator # the discriminator in this architecture is a tensor
self.discriminator_model = Model(discriminator_input, discriminator_tensor) # create the discriminator model with for example (None, self.h, self.w, self.channels) inputs and (None, 1) output - one input and one sequential object
self.discriminator_model.compile(loss=HPARAM.loss, optimizer=HPARAM.optimizer('Adam'), metrics=HPARAM.metrics) # binary crossentropy between an output (predicted y) tensor and a target (real y) tensor since the output of the discriminator is sigmoid
self.discriminator_model.summary()
print(f"\n\n\t\t [======Discriminator Tensor======] \n\n\t\t {discriminator_tensor}\n\n")
def __create_networks(self):
'''
We now create the GAN where we combine the Generator and discriminator.
When we train the generator we will freeze the discriminator model.
We will input the noised image of shape for example 100 units to the generator.
The output generated from the generator will be fed to the discriminator.
'''
with strategy.scope(): # blazing the speed!
self.__MakeGeneratorModel()
self.__MakeDiscriminatorModel()
print("\n\n (+(+(+(+(+(+ GAN SUMMARY +)+)+)+)+)+) \n\n")
self.discriminator_model.trainable = False # freeze the model because at first, we will train only generator model.
real_input = Input(shape=(self.generator_input_features,)) # the real input features of our gan model
generator_output_tensor = self.generator_model(real_input) # pass input of shape for example 100 neurons to generator model input - output size : (None, self.h, self.w, self.channels)
discriminator_output_tensor = self.discriminator_model(generator_output_tensor) # this is the output tensor of our discriminator model which is the result of passing the output of generator model to it for discriminating - output size : (None , 1)
self.gan = Model(inputs=real_input, outputs=discriminator_output_tensor) # input size : (None, 100) - output size : (None, self.h, self.w, self.channels) and (None, 1) for two model objects
self.gan.compile(loss=HPARAM.loss, optimizer=HPARAM.optimizer('Adam')) # use Adam optimizer to prevent nan loss from happening!
self.gan.summary() # the structure is : one input layer and 2 model objects | data -> generator -> discriminator .... gan(x) = discriminator(generator(x))
def __squash(self, vectors, axis=-1):
'''
It drives the length of a large vector to near 1 and small vector to 0.
is used to normalize the magnitude of vectors, rather than the scalar elements themselves.
the epsilon is a small floating point number used to generally avoid mistakes like divide by zero.
vj=∥sj∥2/1+∥sj∥2*sj/∥sj∥
'''
s_squared_sum = K.sum(K.square(vectors), axis, keepdims=True)
scale = s_squared_sum / (1 + s_squared_sum) / K.sqrt(s_squared_sum + K.epsilon())
return scale * vectors # a tensor with same shape as input vectors because of keepdims flag
def predictNoise(self, b_size):
# noise = tf.random.normal([b_size, self.generator_input_features])
# return self.generator_model.predict(noise, steps=b_size)
noise = np.random.normal(0, 1, (b_size, self.generator_input_features)) # output shape : (25,100) - to match the first layer matrix we suppose a (25, 100) matrix ; cause our first layer has 100 features or neurons | random vector from the latent space
generated_noise = self.generator_model.predict(noise) # input shape : (b_size, 100) to the generator model with 12544 neurons for first hidden layer
# print(f"\n\n[======NONE SCALED GENERATED NOISE======]\n\n{generated_noise}") # the are between [-1, 1]
generated_noise = 0.5 * generated_noise + 0.5 # scale the image which is between -1 and 1 to 0 and 1 - because the output of discriminator is [0, 1] and we have to scale our input data for the network
# print(f"\n\n[======SCALED GENERATED NOISE======]\n\n{generated_noise}")
return generated_noise
def __plotLoss(self, analaysis):
anal = pd.DataFrame(analaysis)
# print(f"\n\n[=========ANALAYSIS DATAFRAME=========]\n\n\t{analaysis}\n\n")
plt.figure(figsize=(20,5))
for col in anal.columns:
plt.plot(anal[col], label=col)
plt.legend()
plt.ylabel("loss")
plt.xlabel("epoch")
plt.show()
def saveModels(self):
self.discriminator_model.save('/gdrive/My Drive/GAN-models/dc-capsule__GAN/disc.h5')
self.generator_model.save('/gdrive/My Drive/GAN-models/dc-capsule__GAN/gen.h5')
self.gan.save('/gdrive/My Drive/GAN-models/dc-capsule__GAN/gan.h5')
def __saveImages(self, epoch):
generated_noise = self.predictNoise(b_size=25) # predict for 25 noisy images or 25 batch size - output size : (25, self.h, self.w, self.channels)
self.ipkit.saveGANIMG(generated_noise, epoch, self.dataset_name)
def fit(self):
'''
G(Z) : generated_noise & D(G(Z)) : discriminating generated_noise.
since we are only training generators here, we do not want to adjust the weights of discriminator.
this is what really an “Adversarial” in Adversarial Network means, if we do not set this,
the generator will get its weight adjusted so it gets better at fooling discriminator
and it also adjusts the weights of the discriminator to make it better at being fooled.
we don’t want this. So, we have to train them separately and fight against each other.
NOTE : for weights matrix of for example discriminator model you might want look at the self.discriminator_model.trainable_weights
'''
real, fake, analaysis = np.ones((HPARAM.batch_size, 1)), np.zeros((HPARAM.batch_size, 1)), []
for epoch in range(HPARAM.epochs):
# train the discriminator
if self.dataset_name == 'paint_art':
batch_indices = np.random.randint(0, len(self.x_train))
else:
batch_indices = np.random.randint(0, self.x_train.shape[0], HPARAM.batch_size) # select a random batch index in every epoch - from 0 to 60000 select HPARAM.batch_size numbers (all in a vector) randomly
batch = self.x_train[batch_indices] # get a random set of real images - shape for all dataset except paint_art : (256, self.h, self.w, self.channels)
batch = 0.5 * batch + 0.5 # rescale to [0, 1] - because all training images have range [-1, 1] and to feed the batch into the discriminator network we have to scale our data to [0, 1]
generated_noise = self.predictNoise(HPARAM.batch_size) # for 256 data we produce noisy images using our generator model with shape (256, self.h, self.w, self.channels)
self.discriminator_model.trainable = True # pre train discriminator on fake and real data before starting the gan to let the discriminator model weights update
real_metric_loss = self.discriminator_model.train_on_batch(batch, real) # runs a single gradient update on a single batch of data and returns scalar training loss for real images - how much they are real! train to get the 1s.
fake_metric_loss = self.discriminator_model.train_on_batch(generated_noise, fake) # runs a single gradient update on a single batch of data and returns scalar training loss for fake images - how much they are fake! train to get the 0s.
discriminator_loss = 0.5 * np.add(real_metric_loss, fake_metric_loss) # gan binaryCrossEntropy : realLoss + fakeLoss as we saw in above pictures - in practice, we divide the objective by 2 while optimizing discriminator, which slows down the rate at which discriminator learns relatively to generator.
# train the generator
self.discriminator_model.trainable = False # during the training of gan, the weights of discriminator should be fixed and we can enforce that by setting the trainable flag
noise = np.random.normal(0, 1, (HPARAM.batch_size, self.generator_input_features)) # we'll feed this generated noise into our gan model to produce real images from noisy by training our gen model
generator_metric_loss = self.gan.train_on_batch(noise, real) # training the gan by alternating the training of the discriminator and training the chained gan model with discriminator’s weights freezed ; closing predicted noise from generator to real labels- runs a single gradient update on a single batch of data and returns scalar training loss - take a (batch_size, 100) matrix as input and (batch_size, 1) filled with 1 matrix as real output value ; our gan model has (batch_size, 100) -> (batch_size, 28, 82, 1) -> (batch_size, 1) architecture
print(f"[*************EPOCH - {epoch + 1}*************]")
print(f"DISCRIMINATOR LOSS ⏎\n\t{discriminator_loss[0]}\n")
print(f"DISCRIMINATOR ACC ⏎\n\t{discriminator_loss[1]*100}\n")
print(f"GENERATOR LOSS ⏎\n\t{generator_metric_loss}\n")
print("_________________________________________________________________________________________________________\n")
analaysis.append({"D": discriminator_loss[0], "G": generator_metric_loss})
if epoch % 10 == 0:
self.__saveImages(epoch)
self.__plotLoss(analaysis)
self.ipkit.MakeGif()
"""# **Testing our GAN on Different Datasets using Two Different Discriminator Architectures**"""
# discNetwork : discriminator network => dcgan or cgan | deep convolutional or capsule respectively
# dataset_name : paint_art, fashion_mnist, mnist, cifar10
# channels : 1 for mnist and fashion_mnist, 3 for paint_art and cifar10
# generator_input_features : features or number of first layer columns (neurons) for generator network
gan = GAN(dataset_name='paint_art', channels=3, generator_input_features=100, discNetwork='cgan')
gan.fit() # start training
gan.saveModels() # save trained models
"""**Deep Convolutional GAN Generated Noise - Trained on TPU After 30K Epochs on paint_art Dataset**
![dcgan gif paint_art]()
**Capsule GAN Generated Noise - Trained on TPU After 30K Epochs On paint_art Dataset**
![cgan gif paint_art]()
**GAN Model Prediction API - Tensorflow Serving**
"""
# TODO : https://www.tensorflow.org/tfx/tutorials/serving/rest_simple
gen = load_model('/gdrive/My Drive/GAN-models/gen.h5')
noise = np.random.normal(0, 1, (25, gan.generator_input_features))
predicted_noise = gen.predict(noise) # it should give us a real image!
if gan.dataset_name == 'cifar10' or gan.dataset_name == 'paint_art':
plt.imshow((predicted_noise[0, :, :, :] * 127.5 + 127.5).astype(np.uint8)) # plot the 0th predicted noise - because of generator output we have to scale the prediction to [0, 255], so we multiply by 127.5 and add 127.5
else:
plt.imshow(predicted_noise[0, :, :, 0] * 127.5 + 127.5, cmap='gray')
predicted_noise = 0.5 * predicted_noise + 0.5 # because the discriminator output is in range [0, 1] we have to scale the generated noise
print("\n\n |=> 0th GENERATED NOISE FROM GENERATOR <=|")
disc = load_model('/gdrive/My Drive/GAN-models/disc.h5')
print("\n\n |=> DISCRIMINATING 25 GENRATED NOISE BATCHES <=|\n\n{}".format(disc.predict(predicted_noise)))