import pandas as pd
import numpy as np
import math
import random
from keras.models import Sequential
from keras.layers.core import Dense, Activation
from keras.layers.recurrent import LSTM
import matplotlib.pyplot as plt
random.seed(0)
#Random number coefficient
random_factor = 0.05
#Number of steps per cycle
steps_per_cycle = 80
#Number of cycles to generate
number_of_cycles = 50
#Input length
length_of_sequences = 100
#Dimensional size of input value / output value
in_out_neurons = 1
#Number of hidden element neurons
hidden_neurons = 300
random.seed(0)
The seed value is the number of the set. If it is 0, the random number 0 is generated, and if it is 100, the random number 100 is generated.
np_ary = np.arange(steps_per_cycle * number_of_cycles + 1)
df = pd.DataFrame(np_ary, columns=["x"])
Create a data frame whose column name (column name) is "x". Since we want to create cells for the number of steps here, we can create 4000 data by multiplying the number of steps per cycle by the number of cycles. Since the row of the data frame starts from 0, the number of cells will be 4001 by adding +1 at the end, but the last row name will be 4000. np.arange () is a function that generates integers from 0 to the number of contents of () in numpy format.
pi_t = 2 * math.pi / steps_per_cycle
df["sin(x)"] = df.x.apply(lambda t: math.sin(t * pi_t + random.uniform(-1.0, +1.0) * random_factor))
pi_t is the value of the circle divided by steps_per_cycle. By giving the pandas column as df [""], you can specify it without creating it in advance. In the column, enter the value of sin corresponding to the row of the number of steps created earlier.
-"Df.x.apply ()" means "enter each cell of column" x "in the data frame in ()". -"Lambda t:" means "defining t that you want to input from :".
In other words, when these two are combined, it means "enter each cell of column" t "in x". These values and other noise are included in θ of sinθ. random.uniform (A, B) generates random numbers from A to B including the decimal point. By multiplying by random_factor, its influence, that is, the magnitude of noise can be adjusted.
df[["sin(x)"]].head(steps_per_cycle * 2).plot()
plt.show()
df.head () is a function that gets the cell value only for the value in (). Here, "number of steps per cycle" x 2 and sin wave data for 2 cycles of sin are acquired. Graph the value with .plot (). However, plt.show () is required to show the graph on the display.
def Input_Ans_Extract(data, input_num = 100):
#Create an empty list
InputList, AnsList = [], []
#From 0(len(data)- input_num)Is an array of only integers
for i in range(len(data) - input_num):#For 3501 data
#From the i-th to the i in the doc array+Put the 100th data in the form of a matrix
InputData = data.iloc[i:i+input_num].as_matrix()#3501~3601
AnsData = data.iloc[i+input_num].as_matrix()
InputList.append(InputData)
AnsList.append(AnsData)
#Redefine the matrix in its form so that it can be handled by numpy
InputList_np = np.array(InputList)
AnsList_np = np.array(AnsList)
return InputList_np, AnsList_np
Enter each matrix data in numpy format in the prepared empty list. Here, by using as_matrix (), the format of the data will be a numpy matrix.
def Data_Split(df, test_size=0.1, input_num = 100):
train_size = round(len(df) * (1 - test_size))#360 1 piece
train_size = int(train_size)#Training data is 0-3601
Input_train, Ans_train = Input_Ans_Extract(df.iloc[0:train_size], input_num)
#Test data is 3601 ~
Input_test, Ans_test = Input_Ans_Extract(df.iloc[train_size:], input_num)
return (Input_train, Ans_train), (Input_test, Ans_test)
round () is a function that rounds the value in () after the decimal point. Get the number of df lines with len (). "Test_size = 0.1" means to set aside 10% of the test data, so "1 --test_size" means 90%. Since it is 90% of 4001 pieces, the training data is rounded to 3601 pieces. On the other hand, the test data means from 3601 to the end.
(Input_train, Ans_train), (Input_test, Ans_test) = Data_Split(df[["sin(x)"]], input_num = length_of_sequences)
model = Sequential() #Magic
model.add(LSTM(hidden_neurons, batch_input_shape=(None, length_of_sequences, in_out_neurons), return_sequences=False))
#By setting None, you can determine the batch size by any number without specifying a value.
#100 lists with one input
#in_out_neurons is the number of outputs
#One output for the number of input values
model.add(Dense(in_out_neurons))
#Activation function linear
model.add(Activation("linear"))
#compile
model.compile(loss="mean_squared_error", optimizer="rmsprop")
model.fit(Input_train, Ans_train, batch_size=60, nb_epoch=3, validation_split=0.05)
predicted = model.predict(Input_test)
batch_input_shape = (batch size = number of sets of input data, number of data to be input to the intermediate layer, number of dimensions to be input to the intermediate layer) mean_squared_error is a reverse error transmission system. rmsprop is a type of gradient method. X_train and y_train are the data created earlier. batch_size = 60 is the number of samples per set, which means 60 datasets per training. nb_epoch = 3 represents the number of times the training data is used up. Learning and validation data is also required in fit, and validation_split = 0.05 is treated as 0.05 (5%) of all data for validation.
#Saved part
json_string = model.to_json() open('model.json', 'w').write(json_string) model.save_weights('weights.hdf5')
#Reading part
from keras.models import model_from_json modelname = input ("model file =" model.json "") paramname = input ("learning file =" weights.hdf5 "") json_string = open(modelname).read() model = model_from_json(json_string) model.load_weights(paramname)
predicted = model.predict(Input_test)
#View results
dataf = pd.DataFrame(predicted[:200]) dataf.columns = ["predict"] dataf.plot() dataf["answer"] = Ans_test[:200] dataf.plot() plt.show()
#Whole code
import pandas as pd
import numpy as np
import math
import random
from keras.models import Sequential
from keras.layers.core import Dense, Activation
from keras.layers.recurrent import LSTM
% matplotlib inline
import matplotlib.pyplot as plt
random.seed(0)
random_factor = 0.05
steps_per_cycle = 80
number_of_cycles = 50
length_of_sequences = 100
in_out_neurons = 1
hidden_neurons = 300
np_ary = np.arange(steps_per_cycle * number_of_cycles + 1); df = pd.DataFrame(np_ary, columns=["x"]) pi_t = 2 * math.pi / steps_per_cycle df["sin(x)"] = df.x.apply(lambda t: math.sin(t * pi_t + random.uniform(-1.0, +1.0) * random_factor)) df[["sin(x)"]].head(steps_per_cycle * 2).plot()
def Input_Ans_Extract(data, input_num = 100): InputList, AnsList = [], [] for i in range(len(data) - input_num): InputData = data.iloc[i:i+input_num].as_matrix() AnsData = data.iloc[i+input_num].as_matrix() InputList.append(InputData) AnsList.append(AnsData) InputList_np = np.array(InputList) AnsList_np = np.array(AnsList) return InputList_np, AnsList_np
def Data_Split(df, test_size=0.1, input_num = 100): train_size = round(len(df) * (1 - test_size)) train_size = int(train_size) Input_train, Ans_train = Input_Ans_Extract(df.iloc[0:train_size], input_num) Input_test, Ans_test = Input_Ans_Extract(df.iloc[train_size:], input_num) return (Input_train, Ans_train), (Input_test, Ans_test)
(Input_train, Ans_train), (Input_test, Ans_test) = Data_Split(df[["sin(x)"]], input_num = length_of_sequences)
model = Sequential()
model.add(LSTM(hidden_neurons, batch_input_shape=(None, length_of_sequences, in_out_neurons), return_sequences=False))
model.add(Dense(in_out_neurons))
model.add(Activation("linear"))
model.compile(loss="mean_squared_error", optimizer="rmsprop")
model.fit(Input_train, Ans_train, batch_size=60, nb_epoch=3, validation_split=0.05)
predicted = model.predict(Input_test)
dataf = pd.DataFrame(predicted[:200]) dataf.columns = ["predict"] dataf.plot() dataf["answer"] = Ans_test[:200] dataf.plot() plt.show()
Recommended Posts