Last time continuation
The difference from the previous Perceptron is The point of using a linear activation function instead of a unit step function to update weights
By advancing the learning so that it becomes the minimum value of the cost function, the learning can be converged.
ADALINE can find the weight that minimizes the cost function by using the gradient descent method by using a differentiable activation function and using the sum of squares of error as the cost function to make it a convex function.
A diagram of the principle of gradient descent
To update the weights using gradient descent, go one step backwards along the gradient $ \ nabla J (\ mathbf {w}) $ of the cost function $ J (\ mathbf {w}) $.
So
ADALINE's learning rules are very similar to Perceptron. Implemented below
import numpy as np
class AdalineGD(object):
"""ADAptive LInear NEuron classifier
Parameters
-----------
eta : float
Learning rate(0.Greater than 0 1.Value less than or equal to 0)
n_iter : int
Number of trainings in training data
random_state : int
Random seed for weight initialization
attribute
-----------
w_ :One-dimensional array
Weight after conforming
cost_ :list<- error_Is cost_Change to
Cost function of sum of squares of error at each epoch
"""
def __init__(self, eta=0.01, n_iter=50, random_state=1):
self.eta = eta
self.n_iter = n_iter
self.random_state = random_state
def fit(self, X, y):
"""Fits to training data
Parameters
------------
X : {Array-like data structure}, shape = [n_samples, n_features]
Training data
n_samples is the number of samples, n_features is the number of features
y :Array-like data structure, shape = [n_samples]
Objective variable
Return value
------------
self : object
"""
rgen = np.random.RandomState(self.random_state)
self.w_ = rgen.normal(loc=0.0, scale=0.01, size=1 + X.shape[1])
self.cost_ = []
for _ in range(self.n_iter): #Repeat training data for the number of trainings
net_input = self.net_input(X) #You don't have to loop for each sample as you would with a Perceptron implementation (I'm not sure why Perceptron looped for each sample).
#Because the activation method is just an identity function
#This code has no effect. Implemented as a mere concept of activation function.
#In the case of logistic regression implementation, it seems that it is only necessary to change to a sigmoid function.
output = self.activation(net_input)
#Error y^(i) - φ(z^(i))Calculation
errors = y - output
#Weight w_1, ..., w_m update
self.w_[1:] += self.eta * X.T.dot(errors)
#Weight w_Update 0
self.w_[0] += self.eta * errors.sum()
#Calculation of cost function
cost = (errors ** 2).sum() / 2.0
#Cost storage
self.cost_.append(cost)
return self
def net_input(self, X):
"""Calculate total input"""
return np.dot(X, self.w_[1:]) + self.w_[0] #You don't have to loop with xi
def activation(self, X):
"""Calculate the output of the linear activation function"""
return X
def predict(self, X):
"""Returns the class label after one step"""
return np.where(self.activation(self.net_input(X)) >= 0.0, 1, -1)
Try with well-classified data from Perceptron.
import numpy as np
from sklearn.datasets import load_iris
import pandas as pd
iris = load_iris()
df = pd.DataFrame(iris.data, columns=iris.feature_names)
df['target'] = iris.target
df2 = df.query("target != 1").copy() #Exclude label 1
df2["target"] -= 1 #Label 1-Align to 1
X = df2[['petal width (cm)', 'sepal width (cm)']].values
Y = df2['target'].values
import matplotlib.pyplot as plt
fig, ax = plt.subplots(nrows=1, ncols=2, figsize=(10, 4))
ada1 = AdalineGD(n_iter=10, eta=0.01).fit(X, Y)
ax[0].plot(range(1, len(ada1.cost_)+1), np.log10(ada1.cost_), marker='o')
ax[0].set_xlabel('Epochs')
ax[0].set_ylabel('log(Sum-squared-error)')
ax[0].set_title('Adaline - Learning rate 0.01')
ada2 = AdalineGD(n_iter=10, eta=0.0001).fit(X, Y)
ax[1].plot(range(1, len(ada2.cost_)+1), ada2.cost_, marker='o')
ax[1].set_xlabel('Epochs')
ax[1].set_ylabel('Sum-squared-error')
ax[1].set_title('Adaline - Learning rate 0.0001')
← is the log of the sum of squared errors for the number of epochs when the learning rate is 0.01, and → is the sum of squared errors for the number of epochs when 0.0001
It turns out that if you do not select η well, it will not converge well.
If the learning rate is high, we aim for the minimum value as shown in the figure, but we jump over the minimum value and climb to the opposite bank.
Scaling features for optimal performance of machine learning algorithms.
This time standardized $ \ mathbf {x}'\ _j = \ frac {\ mathbf {x} \ _ j-\ mu_j} {\ sigma_j} $
This gives the data the characteristics of a standard normal distribution and allows the learning to converge quickly.
#Standardization
X_std = np.copy(X)
X_std[:, 0] = (X[:, 0] - X[:, 0].mean()) / X[:, 0].std()
X_std[:, 1] = (X[:, 1] - X[:, 1].mean()) / X[:, 1].std()
from sklearn import preprocessing
ss = preprocessing.StandardScaler()
X_std2 = ss.fit_transform(X)
print(X_std[:3])
print(X_std2[:3]) #the same
[[-1.02461719 0.71907625]
[-1.02461719 -0.4833924 ]
[-1.02461719 -0.00240494]]
[[-1.02461719 0.71907625]
[-1.02461719 -0.4833924 ]
[-1.02461719 -0.00240494]]
#Boundary plot function implemented earlier
from matplotlib.colors import ListedColormap
def plot_decision_regions(X, y, classifier, resolution=0.02):
#Marker and color map preparation
markers = ('s', 'x', 'o', '^', 'v')
colors = ('red', 'blue', 'lightgreen', 'gray', 'cyan')
cmap = ListedColormap(colors[:len(np.unique(y))])
#Plot of decision area
x1_min, x1_max = X[:, 0].min() - 1, X[:, 0].max() + 1
x2_min, x2_max = X[:, 1].min() - 1, X[:, 1].max() + 1
#Grid point generation
xx1, xx2 = np.meshgrid(np.arange(x1_min, x1_max, resolution),
np.arange(x2_min, x2_max, resolution))
#Predict by converting each feature into a one-dimensional array
Z = classifier.predict(np.array([xx1.ravel(), xx2.ravel()]).T)
#Convert prediction results to original gridpoint data size
Z = Z.reshape(xx1.shape)
#Plot of grid point contours
plt.contourf(xx1, xx2, Z, alpha=0.3, cmap=cmap)
#Axis range setting
plt.xlim(xx1.min(), xx1.max())
plt.ylim(xx2.min(), xx2.max())
#Plot samples by class
for idx, cl in enumerate(np.unique(y)):
plt.scatter(x=X[y == cl, 0],
y=X[y == cl, 1],
alpha=0.8,
c=colors[idx],
marker=markers[idx],
label=cl,
edgecolor='black')
ada = AdalineGD(n_iter=15, eta=0.01)
ada.fit(X_std, Y)
plot_decision_regions(X_std, Y, classifier=ada)
plt.title('Adaline - Gradient Descent')
plt.xlabel('petal width [standardized]')
plt.ylabel('sepal width [standardized]')
plt.legend(loc='upper left')
plt.tight_layout()
plt.show()
plt.plot(range(1, len(ada.cost_) + 1), ada.cost_, marker='o')
plt.xlabel('Epochs')
plt.ylabel('Sum-squared-error')
plt.tight_layout()
plt.show()
It can be seen that it has converged and the boundaries of the classification are tightly closed.
It can be confirmed that the standardization is effective because it did not converge well at the same learning rate.
ada1 = AdalineGD(n_iter=15, eta=0.01)
ada1.fit(X, Y)
plot_decision_regions(X, Y, classifier=ada1)
plt.title('Adaline - Gradient Descent')
plt.xlabel('petal width [standardized]')
plt.ylabel('sepal width [standardized]')
plt.legend(loc='upper left')
plt.tight_layout()
plt.show()
plt.plot(range(1, len(ada1.cost_) + 1), ada1.cost_, marker='o')
plt.xlabel('Epochs')
plt.ylabel('Sum-squared-error')
plt.tight_layout()
plt.show()
↑ Even if I tried using unstandardized data, it didn't work.
After that, I will try with the data that could not be classified last time.
import numpy as np
import matplotlib.pyplot as plt
df3 = df.query("target != 0").copy() #Exclude label 0
y = df3.iloc[:, 4].values
y = np.where(y == 1, -1, 1) #label 1-Set 1 to 1 for others (label 2)
# plt.scatter(df3.iloc[:50, 1], df3.iloc[:50, 0], color='orange', marker='o', label='versicolor')
# plt.scatter(df3.iloc[50:, 1], df3.iloc[50:, 0], color='green', marker='o', label='virginica')
# plt.xlabel('sepal width [cm]')
# plt.ylabel('sepal length [cm]')
# plt.legend(loc='upper left')
# plt.show()
X2 = df3[['sepal width (cm)', 'sepal length (cm)']].values
from sklearn import preprocessing
sc = preprocessing.StandardScaler()
X2_std = sc.fit_transform(X2)
ada2 = AdalineGD(n_iter=15, eta=0.01)
ada2.fit(X2_std, y)
plot_decision_regions(X2_std, y, classifier=ada2)
plt.title('Adaline - Gradient Descent')
plt.xlabel('sepal width [standardized]')
plt.ylabel('sepal length [standardized]')
plt.legend(loc='upper left')
plt.tight_layout()
plt.show()
plt.plot(range(1, len(ada2.cost_) + 1), ada2.cost_, marker='o')
plt.xlabel('Epochs')
plt.ylabel('Sum-squared-error')
plt.tight_layout()
plt.show()
Of course, it is not linear and completely separable, but since the boundary is drawn at a reasonable position, it may have converged to the minimum cost.
If the number of data is large, the calculation cost of the method up to this point (batch gradient descent method) becomes high.
→ Use stochastic gradient descent
Instead, update the weights step by step for each sample
It is important to sort the samples in a random order. The advantages of this method are that it converges at high speed, that shallow minimum values can be easily escaped (there is a lot of noise on the error surface), and that it can be used for online learning.
Online learning… When new training data arrives, train on the spot and update the model. You can adapt to changes quickly.
Implement ADALINE using stochastic gradient descent below
import numpy as np
from numpy.random import seed
class AdalineSGD(object):
"""ADAptive LInear NEuron classifier
Parameters
-----------
eta : float
Learning rate(0.Greater than 0 1.Value less than or equal to 0)
n_iter : int
Number of trainings in training data
shuffle : bool (Default True)
If True, shuffle training data per epoch to avoid circulation
random_state : int
Random seed for weight initialization
attribute
-----------
w_ :One-dimensional array
Weight after conforming
cost_ :list
Error sum of squares cost function to average all training samples at each epoch
"""
def __init__(self, eta=0.01, n_iter=10, shuffle=True, random_state=None):
self.eta = eta
self.n_iter = n_iter
self.w_initialized = False #Weight initialization flag
self.shuffle = shuffle
self.random_state = random_state
def fit(self, X, y):
"""Fits to training data
Parameters
------------
X : {Array-like data structure}, shape = [n_samples, n_features]
Training data
n_samples is the number of samples, n_features is the number of features
y :Array-like data structure, shape = [n_samples]
Objective variable
Return value
------------
self : object
"""
#Generation of weight vector
self._initialize_weights(X.shape[1])
self.cost_ = []
#Repeat for the number of trainings
for i in range(self.n_iter):
#Shuffle training data if specified
if self.shuffle:
X, y = self._shuffle(X, y)
#Generate a list to store the cost of each sample
cost = []
#Calculation for each sample
for xi, target in zip(X, y):
#Weight update and cost calculation using feature xi and objective variable y
cost.append(self._update_weights(xi, target))
#Calculation of average cost of sample
avg_cost = sum(cost)/len(y)
#Average cost storage
self.cost_.append(avg_cost)
return self
def partial_fit(self, X, y):
"""Fit training data without reinitializing weights"""
#Execute initialization if it has not been initialized
if not self.w_initialized:
self._initialize_weights(X.shape[1])
#When the number of elements of the objective variable y is 2 or more
#Update weights with features xi and target of each sample
if y.ravel().shape[0] > 1:
for xi, target in zip(X, y):
self._update_weights(xi, target)
#When the number of elements of the objective variable y is 1,
#Update weights with feature X and objective variable y for the entire sample
else:
self._update_weights(X, y)
return self
def _shuffle(self, X, y):
"""Shuffle training data"""
r = self.rgen.permutation(len(y))
return X[r], y[r] #Shuffle can be achieved by passing an array to the index
def _initialize_weights(self, m):
"""Initialize weights to small random numbers"""
self.rgen = np.random.RandomState(self.random_state)
self.w_ = self.rgen.normal(loc=0.0, scale=0.01, size=1 + m)
self.w_initialized = True
def _update_weights(self, xi, target):
"""Update weights using ADALINE learning rules"""
#Calculation of the output of the activation function
output = self.activation(self.net_input(xi))
#Error calculation
error = target - output
#Weight update
self.w_[1:] += self.eta * xi.dot(error)
self.w_[0] += self.eta * error
#Cost calculation
cost = 0.5 * error**2
return cost
def net_input(self, X):
"""Calculate total input"""
return np.dot(X, self.w_[1:]) + self.w_[0]
def activation(self, X):
"""Calculate the output of the linear activation function"""
return X
def predict(self, X):
"""Returns the class label after one step"""
return np.where(self.activation(self.net_input(X)) >= 0.0, 1, -1)
ada = AdalineSGD(n_iter=15, eta=0.01, random_state=1)
ada.fit(X_std, y)
plot_decision_regions(X_std, y, classifier=ada)
plt.title('Adaline - Stochastic Gradient Descent')
plt.xlabel('petal width [standardized]')
plt.ylabel('sepal width [standardized]')
plt.legend(loc='upper left')
plt.tight_layout()
plt.show()
plt.plot(range(1, len(ada.cost_) + 1), ada.cost_, marker='o')
plt.xlabel('Epochs')
plt.ylabel('Average Cost')
plt.show()
The average cost is decreasing soon. The boundary is the same as the batch gradient descent method.
For online learning, you can update by calling partial_fit.
After learning Perceptron and ADALINE, I have deepened my understanding considerably, so I think it will be easier to understand other machine learning algorithms in the future, so I am glad to study.
Until now, I used other algorithms as black boxes, so it was good to be able to know the contents.
I have a notebook on Gist.
(Conceptual diagrams, etc. are quoted from Chapter 2 of "Theory and Practice by Python Machine Learning Programming Expert Data Scientists" as before.)
Recommended Posts