The previous article is here. Create a CNN experiment code by adding or modifying the experiment code created in here. The experiment uses the scikit-learn MNIST dataset due to execution time issues. Differences from regular MNIST datasets
--Image size is $ (8, 8) $ --The total number of images in the dataset is 1797.
That is. Thanks to that, the learning time is only tens of seconds (in my environment). For the time being, the experimental code for the full Keras dataset is also included. This seems to take several hours in my environment, so I gave up ...
Trainer
class
--Changed the implementation of the forward
function of the Trainer
class. Approximately, if you try to stream data of 10MB or more at a time, it will be split and streamed.
--Added a link to google colaboratory to README.md on github-[Change _TypeManager
class](Change #_typemanager class)
-[Add Trainer
class](Add #trainer class)
-[Change LayerManager
class](Change #layermanager class)
-[CNN experiment code body](#cnn experiment code body)
-[For Keras MNIST dataset](For #keras mnist dataset)
-[For scikit-learn MNIST dataset](for # scikit-learn MNIST dataset)
-[CNN learning body](#cnn learning body)
-[Display of misjudged data](#Display of misjudged data)
-Conclusion
_TypeManager
classFirst, add it to the _TypeManager
class so that the LayerManager
class can handle ConvLayer
and PoolingLayer
.
_type_manager.py
class _TypeManager():
"""
Manager class for layer types
"""
N_TYPE = 4 #Number of layer types
BASE = -1
MIDDLE = 0 #Middle layer numbering
OUTPUT = 1 #Output layer numbering
CONV = 2 #Numbering of convolution layers
POOL = 3 #Numbering of the pooling layer
REGULATED_DIC = {"Middle": MiddleLayer,
"Output": OutputLayer,
"Conv": ConvLayer,
"Pool": PoolingLayer,
"BaseLayer": None}
@property
def reg_keys(self):
return list(self.REGULATED_DIC.keys())
def name_rule(self, name):
name = name.lower()
if "middle" in name or name == "mid" or name == "m":
name = self.reg_keys[self.MIDDLE]
elif "output" in name or name == "out" or name == "o":
name = self.reg_keys[self.OUTPUT]
elif "conv" in name or name == "c":
name = self.reg_keys[self.CONV]
elif "pool" in name or name == "p":
name = self.reg_keys[self.POOL]
else:
raise UndefinedLayerError(name)
return name
Added CONV
and POOL
as constants, and made it possible to get the layer object from the layer name with REGULATED_DIC
.
In addition, since there were many situations where the keys
list of REGURATED_DIC
was needed, we added a convolution layer and a pooling layer to the propertyization and naming rules.
Trainer
classThe functions for learning and prediction are separated from the LayerManager
class as the Trainer
class.
trainer.py
import time
import numpy as np
softmax = type(get_act("softmax"))
sigmoid = type(get_act("sigmoid"))
class Trainer():
def __init__(self, x, y):
self.x_train, self.x_test = x
self.y_train, self.y_test = y
self.make_anim = False
def forward(self, x, lim_memory=10):
def propagate(x):
x_in = x
n_batch = x.shape[0]
switch = True
for ll in self.layer_list:
if switch and not self.is_CNN(ll.name):
x_in = x_in.reshape(n_batch, -1)
switch = False
x_in = ll.forward(x_in)
#Because the forward propagation method is also used for error calculation and prediction of unknown data
#Memory capacity can be large
if np.prod(x.shape)*8/2**20 >= 10:
#Double precision floating point number(8byte)At 10MB(=10*2**20)More than
#When using memory, divide it into 5MB or less and execute
n_batch = int(5*2**20/(8*np.prod(x.shape[1:])))
y = np.zeros((x.shape[0], lm[-1].n))
n_loop = int(np.ceil(x.shape[0]/n_batch))
for i in range(n_loop):
propagate(x[i*n_batch : (i+1)*n_batch])
y[i*n_batch : (i+1)*n_batch] = lm[-1].y.copy()
lm[-1].y = y
else:
#Otherwise run normally
propagate(x)
def backward(self, t):
y_in = t
n_batch = t.shape[0]
switch = True
for ll in self.layer_list[::-1]:
if switch and self.is_CNN(ll.name):
y_in = y_in.reshape(n_batch, *ll.O_shape)
switch = False
y_in = ll.backward(y_in)
def update(self, **kwds):
for ll in self.layer_list:
ll.update(**kwds)
def training(self, epoch, n_batch=16, threshold=1e-8,
show_error=True, show_train_error=False, **kwds):
if show_error:
self.error_list = []
if show_train_error:
self.train_error_list = []
if self.make_anim:
self.images = []
self.n_batch = n_batch
n_train = self.x_train.shape[0]//n_batch
n_test = self.x_test.shape[0]
#Start learning
start_time = time.time()
lap_time = -1
error = 0
error_prev = 0
rand_index = np.arange(self.x_train.shape[0])
for t in range(1, epoch+1):
#Scene creation
if self.make_anim:
self.make_scene(t, epoch)
#Training error calculation
if show_train_error:
self.forward(self.x_train)
error = lm[-1].get_error(self.y_train)
self.train_error_list.append(error)
#Error calculation
self.forward(self.x_test)
error = lm[-1].get_error(self.y_test)
if show_error:
self.error_list.append(error)
#Convergence test
if np.isnan(error):
print("fail training...")
break
if abs(error - error_prev) < threshold:
print("end learning...")
break
else:
error_prev = error
t_percent = int(50*t/epoch)
np.random.shuffle(rand_index)
for i in range(n_train):
i_percent = int(50*(i+1)/n_train)
if i_percent <= t_percent:
time_stamp = ("progress:[" + "X"*i_percent
+ "\\"*(t_percent-i_percent)
+ " "*(50-t_percent) + "]")
else:
time_stamp = ("progress:[" + "X"*t_percent
+ "/"*(i_percent-t_percent)
+ " "*(50-i_percent) + "]")
elapsed_time = time.time() - start_time
print("\r" + time_stamp
+ "{}s/{}s".format(
int(elapsed_time),
int(lap_time*epoch) if lap_time > 0 else "?"),
end="")
rand = rand_index[i*n_batch : (i+1)*n_batch]
self.forward(self.x_train[rand])
self.backward(self.y_train[rand])
self.update(**kwds)
if lap_time < 0:
lap_time = time.time() - start_time
print()
if show_error:
#Error transition display
self.show_errors(show_train_error, **kwds)
def pred_func(self, y, threshold=0.5):
if isinstance(self[-1].act, softmax):
return np.argmax(y, axis=1)
elif isinstance(self[-1].act, sigmoid):
return np.where(y > threshold, 1, 0)
else:
raise NotImplemented
def predict(self, x=None, y=None, threshold=0.5):
if x is None:
x = self.x_test
if y is None:
y = self.y_test
self.forward(x)
self.y_pred = self.pred_func(self[-1].y, threshold=threshold)
y = self.pred_func(y, threshold=threshold)
print("correct:", y[:min(16, int(y.shape[0]*0.1))])
print("predict:", self.y_pred[:min(16, int(y.shape[0]*0.1))])
print("accuracy rate:", np.sum(self.y_pred == y, dtype=int)/y.shape[0]*100, "%",
"({}/{})".format(np.sum(self.y_pred == y, dtype=int), y.shape[0]))
return self.y_pred
def show_errors(self, show_train_error=False, title="error transition",
xlabel="epoch", ylabel="error", fname="error_transition.png ",
log_scale=True, **kwds):
fig, ax = plt.subplots(1)
fig.suptitle(title)
if log_scale:
ax.set_yscale("log")
ax.set_xlabel(xlabel)
ax.set_ylabel(ylabel)
ax.grid()
if show_train_error:
ax.plot(self.train_error_list, label="train accuracy")
ax.plot(self.error_list, label="test accuracy")
ax.legend(loc="best")
#fig.show()
if len(fname) != 0:
fig.savefig(fname)
def ready_anim(self, n_image, x, y, title="animation",
xlabel="x", ylabel="y", ex_color="r", color="b",
x_left=0, x_right=0, y_down = 1, y_up = 1):
self.n_image = n_image
self.x = x
self.color = color
self.make_anim = True
self.anim_fig, self.anim_ax = plt.subplots(1)
self.anim_fig.suptitle(title)
self.anim_ax.set_xlabel(xlabel)
self.anim_ax.set_ylabel(ylabel)
self.anim_ax.set_xlim(np.min(x) - x_left, np.max(x) + x_right)
self.anim_ax.set_ylim(np.min(y) - y_down, np.max(y) + y_up)
self.anim_ax.grid()
self.anim_ax.plot(x, y, color=ex_color)
return self.anim_fig, self.anim_ax
def make_scene(self, t, epoch):
#Scene creation
if t % (epoch/self.n_image) == 1:
x_in = self.x.reshape(-1, 1)
for ll in self.layer_list:
x_in = ll.forward(x_in)
im, = self.anim_ax.plot(self.x, ll.y, color=self.color)
self.images.append([im])
The reason why the forward
, backward
, and ʻupdatefunctions are separated as functions is that if you want to do something original, you can just throw the method you want the
forwardfunction to do in forward propagation. To do. I think there is a little more room for ingenuity ... Also, since the
forward` function is also used in error calculation and prediction calculation, a huge amount of data may flow. Therefore, assuming that double-precision floating-point number (8 bytes) data has flowed, if it exceeds the estimated 10MB, it has been changed so that it is divided into about 5MB each.
The training
function describes the learning flow. I thought that the error transition of the training data would be like that, so I added it.
In addition, the judgment of NaN
is also included in the convergence test, and the training is finished immediately if the learning fails.
Also, until now, the progress was displayed using the tqdm
module, but I prepared it myself.
"" Shows the progress of the epoch, and "/" shows the digestion status of the batch.
The predict
function literally makes predictions for test data. Optional arguments are used, and if not specified, the test data held by the layer manager will be used.
After running the test data, the data format is changed by pred_func
and the correct answer rate is calculated. It seems that we need to change a little here as well ... This will only give the correct answer rate for the classification question ...
LayerManager
classWith the addition of the ConvLayer
and Pooling
classes, minor changes were required.
layer_manager.py
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.animation as animation
import tqdm
class LayerManager(_TypeManager, Trainer):
"""
Manager class for managing layers
"""
def __init__(self, x, y):
super().__init__(x, y)
self.__layer_list = [] #List of layers
self.__name_list = [] #Name list for each layer
self.__ntype = np.zeros(self.N_TYPE, dtype=int) #Number of layers by type
def __repr__(self):
layerRepr= "layer_list: " + repr(self.__layer_list)
nameRepr = "name_list: " + repr(self.__name_list)
ntypeRepr = "ntype: " + repr(self.__ntype)
return (layerRepr + "\n"
+ nameRepr + "\n"
+ ntypeRepr)
def __str__(self):
layerStr = "layer_list: " + str(self.__layer_list)
nameStr = "name_list: " + str(self.__name_list)
ntypeStr = "ntype: " + str(self.__ntype)
return (layerStr + "\n"
+ nameStr + "\n"
+ ntypeStr)
def __len__(self):
"""
Python built-in functions`len`Describes the operation when called from.
Returns the sum of the number of layers by type.
"""
return int(np.sum(self.__ntype))
def __getitem__(self, key):
"""
For example
lm = LayerManager()
+----------------+
| (Add element to lm) |
+----------------+
x = lm[3].~~
Because it is called when an element of a list or array is accessed, like
Describe the operation at that time.
slice and str,Only allow access via int.
"""
if isinstance(key, slice):
#If the key is a slice, refer to the list of layers with slice.
#Unusual value(Index out of range etc.)When is entered
#Python gives me an error.
return self.__layer_list[key]
elif isinstance(key, str):
#If key is a string, get the index from the name list of each layer and
#Returns the elements of the list of applicable layers.
if key in self.__name_list:
index = self.__name_list.index(key)
return self.__layer_list[index]
else:
#If the key does not exist, KeyError is issued.
raise KeyError("{}: No such item".format(key))
elif isinstance(key, int):
#If key is an integer, returns the corresponding element in the list of layers.
#Unusual value(Index out of range etc.)When is entered
#Python gives me an error.
return self.__layer_list[key]
else:
raise KeyError(key, ": Undefined such key type.")
def __setitem__(self, key, value):
"""
For example
lm = LayerManager()
+----------------+
| (Add element to lm) |
+----------------+
lm[1] = x
Because it is called when an element of a list or array is accessed, like
Describe the operation at that time.
Only overwriting elements is allowed, and adding new elements is prohibited.
"""
value_type = ""
if isinstance(value, list):
#Specified on the right side'value'But'list'If
#All elements'BaseLayer'Error if class or not inheriting it.
if not np.all(
np.where(isinstance(value, BaseLayer), True, False)):
self.AssignError()
value_type = "list"
elif isinstance(value, BaseLayer):
#Specified on the right side'value'But'BaseLayer'Is it a class?
#Error if it is not inherited.
self.AssignError(type(value))
if value_type == "":
value_type = self.reg_keys[self.BASE]
if isinstance(key, slice):
#If key is a slice, overwrite the element in the list of layers.
#However'value_type'But'list'Otherwise an error.
#Unusual value(Index out of range etc.)When is entered
#Python gives me an error.
if value_type != "list":
self.AssignError(value_type)
self.__layer_list[key] = value
elif isinstance(key, str):
#If key is a string, get the index from the name list of each layer and
#Overwrite the element in the list of applicable layers.
#However'value_type'But'BaseLayer'Otherwise an error.
if value_type != self.reg_keys[self.BASE]:
raise AssignError(value_type)
if key in self.__name_list:
index = self.__name_list.index(key)
self.__layer_list[index] = value
else:
#If the key does not exist, KeyError is issued.
raise KeyError("{}: No such item".format(key))
elif isinstance(key, int):
#If key is an integer, overwrite the corresponding element in the layer list.
#However'value_type'But'BaseLayer'Otherwise an error.
#Also, an abnormal value(Index out of range etc.)When is entered
#Python gives me an error.
if value_type != self.reg_keys[self.BASE]:
raise AssignError(value_type)
self.__layer_list[key] = value
else:
raise KeyError(key, ": Undefined such key type.")
def __delitem__(self, key):
"""
For example
lm = LayerManager()
+----------------+
| (Add element to lm) |
+----------------+
del lm[2]
It is called when an element of a list or array is accessed by a del statement, so
Describe the operation at that time.
If the specified element exists, it will be deleted and renamed.
"""
if isinstance(key, slice):
#If the key is a slice, delete the specified element as it is
#Unusual value(Index out of range etc.)When is entered
#Python gives me an error.
del self.__layer_list[slice]
del self.__name_list[slice]
elif isinstance(key, str):
#If key is a string, get the index from the name list of each layer and
#Delete the relevant element.
if key in self.__name_list:
del self.__layer_list[index]
del self.__name_list[index]
else:
#If the key does not exist, KeyError is issued.
raise KeyError("{}: No such item".format(key))
elif isinstance(key, int):
#If key is an integer, delete the corresponding element in the layer list.
#Unusual value(Index out of range etc.)When is entered
#Python gives me an error.
del self.__layer_list[key]
else:
raise KeyError(key, ": Undefined such key type.")
#Rename
self._rename()
def _rename(self):
"""
When the name list naming violates the rules due to list operations
Rename the naming list and each layer to meet the rules again.
The naming rule is[Layer type][What number]will do.
If the layer type is Middle Layer, Middle
Output for Output Layer
It is abbreviated as.
The number is counted by type.
Also, here again__Counts ntypes.
"""
#Initialize the number of layers by type
self.__ntype = np.zeros(self.N_TYPE)
#Recount and rename each layer
for i in range(len(self)):
for j, reg_name in enumerate(self.REGULATED_DIC):
if reg_name in self.__name_list[i]:
self.__ntype[j] += 1
self.__name_list[i] = (self.reg_keys[j]
+ str(self.__ntype[j]))
self.__layer_list[i].name = (self.reg_keys[j]
+ str(self.__ntype[j]))
break
else:
raise UndefinedLayerType(self.__name_list[i])
def append(self, *, name="Middle", **kwds):
"""
Implementation of the familiar append method, which is a method for adding elements to a list.
"""
if "prev" in kwds:
# 'prev'Is included in the keyword
#This means that the number of elements in the previous layer is specified.
#Basically it is supposed to be the time to insert the first layer, so
#Other than that, it is basically determined automatically and is not specified.
if len(self) != 0:
if kwds["prev"] != self.__layer_list[-1].n:
#Error if it does not match the number of units at the end.
raise UnmatchUnitError(self.__layer_list[-1].n,
kwds["prev"])
elif not self.is_CNN(name):
if len(self) == 0:
#The first DNN layer must always specify the number of input units.
raise UnmatchUnitError("Input units", "Unspecified")
else:
#The number of units in the last layer'kwds'Add to
kwds["prev"] = self.__layer_list[-1].n
#Read the layer type and change the name according to the naming rule
name = self.name_rule(name)
#Add a layer.
for i, reg_name in enumerate(self.REGULATED_DIC):
if name in reg_name:
#Increment the layer by type
self.__ntype[i] += 1
#Add to name
name += str(self.__ntype[i])
#Add to name list
self.__name_list.append(name)
#Finally, create a layer and add it to the list.
self.__layer_list.append(self.REGULATED_DIC[reg_name](name=name,**kwds))
def extend(self, lm):
"""
Another layer manager already in the extend method'lm'Elements of
Add all.
"""
if not isinstance(lm, LayerManager):
# 'lm'Error if the instance of is not LayerManager.
raise TypeError(type(lm), ": Unexpected type.")
if len(self) != 0:
if self.__layer_list[-1].n != lm[0].prev:
#With the number of units in your last layer
# 'lm'Error if the number of inputs in the first layer of is not the same.
raise UnmatchUnitError(self.__layer_list[-1].n,
lm[0].prev)
#Each'extend'Add by method
self.__layer_list.extend(lm.layer_list)
self.__name_list.extend(lm.name_list)
#Rename
self._rename()
def insert(self, prev_name, name="Middle", **kwds):
"""
In the insert method, specify the name of the previous layer and combine it with that layer.
Add an element.
"""
# 'prev_name'Error if does not exist.
if not prev_name in self.__name_list:
raise KeyError(prev_name, ": No such key.")
# 'prev'Is included in the keyword
# 'prev_name'Error if it does not match the number of units in the layer specified in.
if "prev" in kwds:
if kwds["prev"] \
!= self.__layer_list[self.index(prev_name)].n:
raise UnmatchUnitError(
kwds["prev"],
self.__layer_list[self.index(prev_name)].n)
# 'n'Is included in the keyword
if "n" in kwds:
# 'prev_name'If is not the last
if prev_name != self.__name_list[-1]:
#Error if it does not match the number of units in the next layer.
if kwds["n"] != self.__layer_list[
self.index(prev_name)+1].prev:
raise UnmatchUnitError(
kwds["n"],
self.__layer_list[self.index(prev_name)].prev)
#If there are no elements yet'append'Give an error to use the method.
if len(self) == 0:
raise RuntimeError(
"You have to use 'append' method instead.")
#Get index of insertion location
index = self.index(prev_name) + 1
#Read the layer type and change the name according to the naming rule
name = self.name_rule(name)
#Insert element
for i, reg_name in enumerate(self.REGULATED_DIC):
if reg_name in name:
self.__layer_list.insert(index,
self.REGULATED_DIC[reg_name](name=name,**kwds))
self.__name_list.insert(index,
self.REGULATED_DIC[reg_name](name=name,**kwds))
#Rename
self._rename()
def extend_insert(self, prev_name, lm):
"""
This is the original function.
It behaves like a combination of extend and insert methods.
Simply put, it's like inserting another layer manager.
"""
if not isinstance(lm, LayerManager):
# 'lm'Error if the instance of is not LayerManager.
raise TypeError(type(lm), ": Unexpected type.")
# 'prev_name'Error if does not exist.
if not prev_name in self.__name_list:
raise KeyError(prev_name, ": No such key.")
#The number of units of the layers before and after the specified location and the first and last layers of lm
#If they do not match, an error occurs.
if len(self) != 0:
if self.__layer_list[self.index(prev_name)].n \
!= lm.layer_list[0].prev:
#With the number of units in your designated location'lm'The first number of units in
#If they do not match, an error occurs.
raise UnmatchUnitError(
self.__layer_list[self.index(prev_name)].n,
lm.layer_list[0].prev)
if prev_name != self.__name_list[-1]:
# 'prev_name'Is not my last layer
if lm.layer_list[-1].n \
!= self.__layer_list[self.index(prev_name)+1].prev:
# 'lm'The number of units at the end of and the layer next to your designated location
# 'prev'Error if it does not match the number of units.
raise UnmatchUnitError(
lm.layer_list[-1].n,
self.__layer_list[self.index(prev_name)+1].prev)
else:
#If you don't have any elements'extend'I get an error to use the method.
raise RuntimeError(
"You have to use 'extend' method instead.")
#Get index of insertion location
index = self.index(prev_name) + 1
#Elements after the insertion location'buf'After evacuating to, remove it once and
#Add an element using the extend method
layer_buf = self.__layer_list[index:]
name_buf = self.__name_list[index:]
del self.__layer_list[index:]
del self.__name_list[index:]
self.extend(lm)
#Add the element that was evacuated
self.__layer_list.extend(layer_buf)
self.__name_list.extend(name_buf)
#Rename
self._rename()
def remove(self, key):
"""
The remove method removes the element with the specified name.
It is also allowed to be specified by index.
"""
#Already implemented'del'The sentence is OK.
del self[key]
def index(self, target):
return self.__name_list.index(target)
def name(self, indices):
return self.__name_list[indices]
@property
def layer_list(self):
return self.__layer_list
@property
def name_list(self):
return self.__name_list
@property
def ntype(self):
return self.__ntype
def is_CNN(self, name=None):
if name is None:
if self.__ntype[self.CONV] > 0 \
or self.__ntype[self.POOL] > 0:
return True
else:
return False
else:
name = self.name_rule(name)
if self.reg_keys[self.CONV] in name \
or self.reg_keys[self.POOL] in name:
return True
else:
return False
Of the small changes, I will omit the parts that do not matter.
The part that is omitted is the change due to the enhancement of the _TypeManager
class. The main change is using the reg_keys
property.
The big change is that it is too wasteful to increase the conditional branch every time the layer type is increased, so I made it possible to do it in a loop. As an example, let's take a look at the relevant part of the ʻappend` method.
layer_manager.py
#Add a layer.
for i, reg_name in enumerate(self.REGULATED_DIC):
if name in reg_name:
#Increment the layer by type
self.__ntype[i] += 1
#Add to name
name += str(self.__ntype[i])
#Add to name list
self.__name_list.append(name)
#Finally, create a layer and add it to the list.
self.__layer_list.append(self.REGULATED_DIC[reg_name](name=name,**kwds))
The REGULATED_DIC
is looped with the ʻenumerate function, and when the layer name is included in the
reg_name, the layer number ʻi
is used for processing.
** Therefore, the layer constants of the _TypeManager
class and the registration index of REGULATED_DIC
must be aligned. ** **
The other parts are similar.
Finally, we have prepared the ʻis_CNNfunction. This returns if the network the
LayerManger class has is a CNN unless specified in the argument
name. If a layer name is specified for
name, it returns whether the layer name deserves a CNN (that is, whether it is a convolution layer or a pooling layer). It is used for forward and back propagation of the
Trainer` class.
Now, let's move on to the CNN experiment. The entire code can be found here [https://github.com/kuroitu/DNN_test). Feel free to clone / copy and experiment.
Let's start with the Keras dataset. Keras' MNIST dataset has 60,000 training data and 10,000 test data, and the image size is $ (28, 28) $, so even if it is a small machine learning dataset, it will be learned on a notebook PC, etc. Is a fairly large data set.
keras_data.py
import numpy as np
from keras.datasets import mnist
#from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import tqdm
#Data set acquisition
n_class=10
(x_train, y_train), (x_test, y_test) = mnist.load_data()
C, B, I_h, I_w = 1, *x_train.shape
B_test = x_test.shape[0]
#Standardization
sc = StandardScaler()
x_train = sc.fit_transform(x_train.reshape(B, -1)).reshape(B, C, I_h, I_w)
x_test = sc.fit_transform(x_test.reshape(B_test, -1)).reshape(B_test, C, I_h, I_w)
# one-Convert to hot label
def to_one_hot(data, n_class):
vec = np.zeros((len(data), n_class))
for i in range(len(data)):
vec[i, data[i]] = 1.
return vec
t_train = to_one_hot(y_train, n_class)
t_test = to_one_hot(y_test, n_class)
This time, we will not create validation
data. If you want to create it, use scikit-learn's train_test_split
function etc. to divide the training data.
Later, it is standardized using the StandardScaler
class of scikit-learn. It doesn't do any difficult processing, so you can write the code yourself. Also, since it is image recognition, normalization is OK.
Please note that scikit-learn's StandardScaler
class etc. only supports data whose input is $ (B, N) $.
Finally, the correct label is the numerical data of a one-dimensional array of $ (60000,) $ and $ (10000,) $, so change it to what is called a one-hot expression.
The one-hot expression corresponds to the correct answer data with a numerical label of $ 3 $, for example, in 10-class classification, such as $ [0, 0, 0, 1, 0, 0, 0, 0, 0, 0] $. It is a data representation that takes $ 1 $ only for the part.
This will result in the correct labels being $ (60000, 10) $ and $ (10000, 10) $.
Data processing is complete.
Next, I will introduce the case of scikit-learn's MNIST dataset. As I mentioned at the beginning, this is a fairly small dataset, so you can feel free to try machine learning.
scikit_learn_data.py
import numpy as np
from sklearn import datasets
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import tqdm
#Data set acquisition
n_class=10
C, I_h, I_w = 1, 8, 8
digits = datasets.load_digits()
x = digits.data
t = digits.target
n_data = len(x)
#Standardization
sc = StandardScaler()
x = sc.fit_transform(x).reshape(n_data, I_h, I_w)
x_train, x_test, y_train, y_test = train_test_split(x, t, test_size=0.2, shuffle=True)
# one-Convert to hot label
def to_one_hot(data, n_class):
vec = np.zeros((len(data), n_class))
for i in range(len(data)):
vec[i, data[i]] = 1.
return vec
t_train = to_one_hot(y_train, n_class)
t_test = to_one_hot(y_test, n_class)
What we are doing is almost the same as in Keras. The difference is that the dataset is passed in the form $ (1797, 64) $.
Therefore, after standardizing the data, it is reshaped
and divided by the train_test_split
function.
Once the dataset is ready, it's time to learn.
cnn_test.py
#Create convolution layer and output layer
M, F_h, F_w = 10, 3, 3
lm = LayerManager((x_train, x_test), (t_train, t_test))
lm.append(name="c", I_shape=(C, I_h, I_w), F_shape=(M, F_h, F_w), pad=1,
wb_width=0.1, opt="AdaDelta", opt_dic={"eta": 1e-2})
lm.append(name="p", I_shape=lm[-1].O_shape, pool=2)
lm.append(name="m", n=100, wb_width=0.1,
opt="AdaDelta", opt_dic={"eta": 1e-2})
lm.append(name="o", n=n_class, act="softmax", err_func="Cross", wb_width=0.1,
opt="AdaDelta", opt_dic={"eta": 1e-2})
#To learn
epoch = 50
threshold = 1e-8
n_batch = 8
lm.training(epoch, threshold=threshold, n_batch=n_batch, show_train_error=True)
#Predict
print("training dataset")
lm.predict(x=lm.x_train, y=lm.y_train)
print("test dataset")
lm.predict()
This time we are building a very simple CNN. The number of learning epochs is 50, and the mini-batch size is 8. The rest is left to the layer manager lol The structure of CNN is as shown in the above figure. The execution result with scikit-learn will be as shown in the figure below.
By the way, let's visualize what kind of data you made a mistake.
cnn_test.py
#Display incorrect data
col=4
dpi=125
y = lm.pred_func(lm.y_test)
fail_index = np.where(y_pred != y)[0]
print("incorrect index:", fail_index)
if fail_index.size:
row = int(np.ceil(fail_index.size/col))
if row * dpi >= 2 ** 16:
row = int(np.ceil((2 ** 16 // dpi - 1)/col))
fig, ax = plt.subplots(row, col, figsize=(col, row + 1), dpi=dpi, facecolor="w")
if row != 1:
for i, f in enumerate(fail_index):
ax[i // col, i % col].imshow(lm.x_test[f], interpolation='nearest', cmap='gray')
ax[i // col, i % col].tick_params(labelbottom=False, labelleft=False, labelright=False, labeltop=False)
ax[i // col, i % col].set_title(str(y[f]) + " => " + str(y_pred[f]))
if i >= row * col:
break
else:
for i, f in enumerate(fail_index):
ax[i % col].imshow(lm.x_test[f], interpolation='nearest', cmap='gray')
ax[i % col].tick_params(labelbottom=False, labelleft=False, labelright=False, labeltop=False)
ax[i % col].set_title(str(y[f]) + ' => ' + str(y_pred[f]))
if i >= row * col:
break
fig.tight_layout()
When this is executed, it will look like the figure below. By the way, please note that it is different from the previous experimental result. It feels barely visible to humans ... This can be misjudged (as it is).
During the experiment, if the batch size was made larger than 1, learning did not proceed well and I had a hard time.
After all, it was because the activation function did not correspond to the batch. Ordinary activation functions can be batch-enabled thanks to numpy
, but some exceptional functions, such as the softmax
function, need to be batch-enabled. did.
If anyone is suffering in the same way, please be careful.
-Introduction to Deep Learning ~ Basics ~ -Introduction to Deep Learning ~ Coding Preparation ~ -Introduction to Deep Learning ~ Forward Propagation ~ -Introduction to Deep Learning ~ Backpropagation ~ -Introduction to Deep Learning ~ Learning Rules ~ -Introduction to Deep Learning ~ Localization and Loss Functions ~ -Introduction to Deep Learning ~ Function Approximation ~ -Introduction to Deep Learning ~ Convolution and Pooling ~ -Introduction to Deep Learning ~ CNN Experiment ~ -List of activation functions (2020) -Gradient descent method list (2020) -See and understand! Comparison of optimization methods (2020) -Thorough understanding of im2col -Col2im thorough understanding -Complete understanding of numpy.pad function
Recommended Posts