forked from Azure/MachineLearningNotebooks
-
Notifications
You must be signed in to change notification settings - Fork 0
/
cntk_distr_mnist.py
117 lines (94 loc) · 4.88 KB
/
cntk_distr_mnist.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# Copyright (c) Microsoft. All rights reserved.
# Licensed under the MIT license.
# Adapted from:
# https://github.com/Microsoft/CNTK/blob/master/Examples/Image/Classification/ConvNet/Python/ConvNet_MNIST.py
# ====================================================================
"""Train a CNN model on the MNIST dataset via distributed training."""
from __future__ import print_function
import numpy as np
import os
import cntk as C
import argparse
from cntk.train.training_session import CheckpointConfig, TestConfig
def create_reader(path, is_training, input_dim, label_dim, total_number_of_samples):
"""Define the reader for both training and evaluation action."""
return C.io.MinibatchSource(C.io.CTFDeserializer(path, C.io.StreamDefs(
features=C.io.StreamDef(field='features', shape=input_dim),
labels=C.io.StreamDef(field='labels', shape=label_dim)
)), randomize=is_training, max_samples=total_number_of_samples)
def convnet_mnist(max_epochs, output_dir, data_dir, debug_output=False, epoch_size=60000, minibatch_size=64):
"""Creates and trains a feedforward classification model for MNIST images."""
image_height = 28
image_width = 28
num_channels = 1
input_dim = image_height * image_width * num_channels
num_output_classes = 10
# Input variables denoting the features and label data
input_var = C.ops.input_variable((num_channels, image_height, image_width), np.float32)
label_var = C.ops.input_variable(num_output_classes, np.float32)
# Instantiate the feedforward classification model
scaled_input = C.ops.element_times(C.ops.constant(0.00390625), input_var)
with C.layers.default_options(activation=C.ops.relu, pad=False):
conv1 = C.layers.Convolution2D((5, 5), 32, pad=True)(scaled_input)
pool1 = C.layers.MaxPooling((3, 3), (2, 2))(conv1)
conv2 = C.layers.Convolution2D((3, 3), 48)(pool1)
pool2 = C.layers.MaxPooling((3, 3), (2, 2))(conv2)
conv3 = C.layers.Convolution2D((3, 3), 64)(pool2)
f4 = C.layers.Dense(96)(conv3)
drop4 = C.layers.Dropout(0.5)(f4)
z = C.layers.Dense(num_output_classes, activation=None)(drop4)
ce = C.losses.cross_entropy_with_softmax(z, label_var)
pe = C.metrics.classification_error(z, label_var)
# Load train data
reader_train = create_reader(os.path.join(data_dir, 'Train-28x28_cntk_text.txt'), True,
input_dim, num_output_classes, max_epochs * epoch_size)
# Load test data
reader_test = create_reader(os.path.join(data_dir, 'Test-28x28_cntk_text.txt'), False,
input_dim, num_output_classes, C.io.FULL_DATA_SWEEP)
# Set learning parameters
lr_per_sample = [0.001] * 10 + [0.0005] * 10 + [0.0001]
lr_schedule = C.learning_parameter_schedule_per_sample(lr_per_sample, epoch_size=epoch_size)
mms = [0] * 5 + [0.9990239141819757]
mm_schedule = C.learners.momentum_schedule_per_sample(mms, epoch_size=epoch_size)
# Instantiate the trainer object to drive the model training
local_learner = C.learners.momentum_sgd(z.parameters, lr_schedule, mm_schedule)
progress_printer = C.logging.ProgressPrinter(
tag='Training',
rank=C.train.distributed.Communicator.rank(),
num_epochs=max_epochs,
)
learner = C.train.distributed.data_parallel_distributed_learner(local_learner)
trainer = C.Trainer(z, (ce, pe), learner, progress_printer)
# define mapping from reader streams to network inputs
input_map_train = {
input_var: reader_train.streams.features,
label_var: reader_train.streams.labels
}
input_map_test = {
input_var: reader_test.streams.features,
label_var: reader_test.streams.labels
}
C.logging.log_number_of_parameters(z)
print()
C.train.training_session(
trainer=trainer,
mb_source=reader_train,
model_inputs_to_streams=input_map_train,
mb_size=minibatch_size,
progress_frequency=epoch_size,
checkpoint_config=CheckpointConfig(frequency=epoch_size,
filename=os.path.join(output_dir, "ConvNet_MNIST")),
test_config=TestConfig(reader_test, minibatch_size=minibatch_size,
model_inputs_to_streams=input_map_test)
).train()
return
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--num_epochs', help='Total number of epochs to train', type=int, default='40')
parser.add_argument('--output_dir', help='Output directory', required=False, default='outputs')
parser.add_argument('--data_dir', help='Directory with training data')
args = parser.parse_args()
os.makedirs(args.output_dir, exist_ok=True)
convnet_mnist(args.num_epochs, args.output_dir, args.data_dir)
# Must call MPI finalize when process exit without exceptions
C.train.distributed.Communicator.finalize()