** RNN (Recurrent Neural Network) is a form of neural network that takes time series data as input and determines the output by using the "state" of the previous time in addition to the input of the current time. Is LSTM (Long Short-Term Memory) famous? Time-series data is data that has meaning for the entire column, such as video and text. An ordinary neural network takes a certain format of data such as images and characters as input, but when dealing with videos and texts in which they are lined up, not only individual images (frames) and characters, but also their arrangement. Also has a big meaning. The structure that handles such data well is the RNN.
However, I think it's hard to be honest, unlike ordinary fully connected layers. Me too.
So, first I want to understand what an RNN does, and then I want to be able to build my own RNN that uses the "state" of the previous time.
I would like to solve the following problems (because I was actually worried ...).
--I don't really understand the difference between RNN
, SimpleRNN
and SimpleRNNCell
.
――I want to understand the contents of layers such as LSTM
.
――I want to write a non-standard RNN layer by myself to retest the experiment of the paper
[^ basiclstm]: The basics of LSTM that can't be heard now-HELLO CYBERNETICS
It would be nice if you could build your own RNN using Keras by looking at the network structure (graph) and mathematical formulas as described on the reference page [^ basiclstm].
On the contrary, the following things are not dealt with on this page. Maybe I'll write an article on another occasion.
--How to think about the network structure according to the problem you want to solve
--How to handle variable length (various lengths for each sample) input
-[Try basic RNN (LSTM) with Keras --Qiita](https://qiita.com/everylittle/items/ba821e93d275a421ca2b#%E5%8F%AF%E5%A4%89%E9%95% B7% E3% 81% AE% E7% B3% BB% E5% 88% 97% E3% 82% 92% E5% 85% A5% E5% 8A% 9B% E3% 81% 99% E3% 82% 8B% E5% A0% B4% E5% 90% 88-1)
--How to use return_state
or stateful
--How to use Bidirectional RNN
The basis of an RNN is ** "the output depends on the input of the current time and the" state "of the previous time" **. The difference is that in a normal fully connected layer or convolution layer, the output depends only on the input, but in the RNN, the information of the previous input can also be used. You can decide for yourself what "state" you want to bring to the next time.
As shown in the figure on the left side below, RNN can be expressed as a network (cell) with a recursive structure, but the specific operation is in the form of expanding the loop as shown in the figure on the right side. You can understand (source [^ 1]).
--Input: $ x_1, x_2, ..., x_t, ... $
When given
--Output: $ o_1, o_2, ..., o_t, ... $ --Status: $ s_1, s_2, ..., s_t, ... $
To
\begin{align}
s_t &= f(Ux_t + Ws_{t-1} + b) \\
o_t &= h(Vs_t)
\end{align}
It is determined as follows. Where $ U, V, W $ are matrices and $ b $ are column vectors, which are layer weights (parameters to train). $ f and h $ are activation functions. I / O and states $ x_t, o_t, s_t $ are also column vectors.
First, let's touch the RNN. As a simple RNN, consider a network that sequentially outputs the partial sum of the input sequences (the sum of all the values from the beginning to that point). At this time, the partial sum is defined as "state" and the state is output as it is. For example, the output and status for the input will change as shown in the table below.
1 | 2 | 3 | 4 | 5 | 6 | 7 | ... | |
---|---|---|---|---|---|---|---|---|
1 | 3 | 2 | 4 | 1 | 0 | 1 | ||
1 | 4 | 6 | 10 | 11 | 11 | 12 | ||
1 | 4 | 6 | 10 | 11 | 11 | 12 |
In TensorFlow + Keras, by using a layer called tf.keras.layers.SimpleRNN
\begin{align}
o_t = s_t = f(Ux_t + Ws_{t-1} + b) \tag{1}
\end{align}
You can define a network of the form. If you set $ f (x) = x $ and train a sequence of numbers and their integration by parts,
\begin{align}
o_t = s_t = Ux_t + Ws_{t-1} + b
\end{align}
It is expected that the weight of will approach $ U = W = 1, ; b = 0 $ (this time we are dealing with one-dimensional values, so you can think of $ U, W, b $ as scalars. Hmm).
Try learning with the code below. A sequence of random numbers of length 30 and a sequence of partial sums calculated from them are given for learning.
first.py
import tensorflow as tf
import numpy as np
from tensorflow.keras import Sequential
from tensorflow.keras.layers import SimpleRNN
from tensorflow.keras.optimizers import SGD
tf.random.set_seed(111)
np.random.seed(111)
model = Sequential([
SimpleRNN(1, activation=None, input_shape=(None, 1), return_sequences=True)
])
model.compile(optimizer=SGD(lr=0.0001), loss="mean_squared_error")
n = 51200
x = np.random.random((n, 30, 1))
y = x.cumsum(axis=1)
model.fit(x, y, batch_size=512, epochs=100)
model.layers[0].weights
# [<tf.Variable 'simple_rnn/kernel:0' shape=(1, 1) dtype=float32, numpy=array([[0.6021545]], dtype=float32)>,
# <tf.Variable 'simple_rnn/recurrent_kernel:0' shape=(1, 1) dtype=float32, numpy=array([[1.0050855]], dtype=float32)>,
# <tf.Variable 'simple_rnn/bias:0' shape=(1,) dtype=float32, numpy=array([0.20719269], dtype=float32)>]
model.predict(np.ones((1, 30, 1)) * 0.5).flatten()
# array([ 0.5082699, 1.0191246, 1.5325773, 2.0486412, 2.5673294,
# 3.0886555, 3.6126328, 4.1392746, 4.6685944, 5.2006063,
# 5.7353234, 6.27276 , 6.8129296, 7.3558464, 7.901524 ,
# 8.449977 , 9.00122 , 9.555265 , 10.112128 , 10.6718235,
# 11.2343645, 11.799767 , 12.368044 , 12.939212 , 13.513284 ,
# 14.090276 , 14.670201 , 15.253077 , 15.838916 , 16.427734 ],
# dtype=float32)
The error seems to be large, but since it is a sample, it would be nice if you could grasp the atmosphere (output accuracy is not pursued here). Here, as a result of learning
\begin{align}
o_t = s_t = 0.6022x_t + 1.0051s_{t-1} + 0.2072
\end{align}
Will be obtained.
SimpleRNN(1, activation=None, input_shape=(None, 1), return_sequences=True)
--The first 1 is the number of dimensions of $ o_t, s_t $. This time it is a scalar, so I specified 1.
--ʻactivation`` is equivalent to $ f $ in equation (1). This time it is an identity function, so I chose `` None``. In the case of `` Dense`` etc., the default is the identity function, but please note that the default is `` tanh`` in the RNN system. --
ʻinput_shapetakes the form of
(None, dimension) . The first
Nonecorresponds to the length of each input column (it is
None to accept variable length input). The second is the number of dimensions of $ x_t $ (1 this time). --
return_sequences = Trueindicates that the output of each time should be returned as the output of the layer. With this specification, for example, the output shape for a column of length 30 will be
(batch_size, 30, 1) . If this is
False, the layer will only output at the last time ($ o_ {30} $ in this case) and the output will be
(batch_size, 1) ``. Please use it properly according to the question setting and the format of the learning data.
See the official documentation for details. tf.keras.layers.SimpleRNN | TensorFlow Core v2.1.0
The code that creates the model above is equivalent to:
from tensorflow.keras.layers import RNN, SimpleRNN, SimpleRNNCell
model = Sequential([
#SimpleRNN(1, activation=None, input_shape=(None, 1), return_sequences=True)
RNN(SimpleRNNCell(1, activation=None), input_shape=(None, 1), return_sequences=True)
])
The cell is the inside of the for loop of a RNN layer. Wrapping a cell inside a tf.keras.layers.RNN layer gives you a layer capable of processing batches of sequences, e.g. RNN(LSTMCell(10)).
Recurrent Neural Networks (RNN) with Keras | TensorFlow Core
SimpleRNNCell
defines an operation (cell) for a single sample, and enclosing it in RNN ()
defines a layer to process the batch.
In other words, you should be able to define an RNN of your favorite structure by defining your own sample-based processing equivalent to SimpleRNNCell
and enclosing it in RNN ()
.
So far, we have understood the "difference between RNN
, SimpleRNN
, and SimpleRNNCell
” mentioned at the beginning.
In preparation for creating your own image of an RNN, let's first see what the existing SimpleRNNCell
is doing. When writing by yourself, it should be a shortcut to imitate the existing process first.
(Also, I think you can refer to the example of tf.keras.layers.RNN | TensorFlow Core v2.1.0)
The source code for SimpleRNNCell
can be found below.
tensorflow/recurrent.py at v2.1.0 · tensorflow/tensorflow · GitHub
Let's take a look at the contents by excerpting from this.
First, inherit Layer
in the class definition. DropoutRNNCellMixin
seems to be inherited to support Dropout, but it's out of the question so I won't touch it here.
recurrent.py
class SimpleRNNCell(DropoutRNNCellMixin, Layer):
In build ()
, the weight required for the layer is defined by ```add_weight. Not only RNN but also
Dense () etc. do the same thing. Corresponding to equation (1),
kernelis $ U $,
recurrent_kernelis $ W $, and
bias`` is $ b $.
Then use call ()
to define the actual processing. This is the most important.
recurrent.py
def call(self, inputs, states, training=None):
prev_output = states[0]
dp_mask = self.get_dropout_mask_for_cell(inputs, training)
rec_dp_mask = self.get_recurrent_dropout_mask_for_cell(
prev_output, training)
if dp_mask is not None:
h = K.dot(inputs * dp_mask, self.kernel)
else:
h = K.dot(inputs, self.kernel)
if self.bias is not None:
h = K.bias_add(h, self.bias)
if rec_dp_mask is not None:
prev_output = prev_output * rec_dp_mask
output = h + K.dot(prev_output, self.recurrent_kernel)
if self.activation is not None:
output = self.activation(output)
return output, [output]
Input $ x_t $ is entered in ʻinputs``, and state $ s_ {t-1} $ (generated at the previous time) is entered in
`states.
Statesis passed as a list of each variable so that it can have multiple states. So, first we are extracting only
states [0] ``.
Let's leave the Dropout related processing aside, and try to extract the main part
h = K.dot(inputs, self.kernel)
if self.bias is not None:
h = K.bias_add(h, self.bias)
output = h + K.dot(prev_output, self.recurrent_kernel)
if self.activation is not None:
output = self.activation(output)
return output, [output]
That's it.
In TensorFlow, each input / output sample is represented by a row vector, so the order of matrix multiplication is reversed, but you can see that it can correspond to equation (1).
The final return
returns the layer output $ o_t $ and the state $ s_t $ you want to bring to the next time. The state passed here can be received by call ()
at the next time. The state is returned as a list as well as the arguments. If you return multiple states here, you will receive multiple states at the next time.
Next, let's look at the LSTM layer as a slightly more complicated example.
First, try using LSTM
instead of SimpleRNN
with the same problem settings as before.
lstm.py
import tensorflow as tf
import numpy as np
from tensorflow.keras import Sequential
from tensorflow.keras.layers import LSTM
from tensorflow.keras.optimizers import SGD
tf.random.set_seed(111)
np.random.seed(111)
model = Sequential([
LSTM(1, activation=None, input_shape=(None, 1), return_sequences=True)
])
model.compile(optimizer=SGD(lr=0.0001), loss="mean_squared_error")
n = 51200
x = np.random.random((n, 30, 1))
y = x.cumsum(axis=1)
model.fit(x, y, batch_size=512, epochs=100)
model.layers[0].weights
# [<tf.Variable 'lstm/kernel:0' shape=(1, 4) dtype=float32, numpy=
# array([[ 0.11471224, -0.15296884, 0.82662594, -0.14256166]],
# dtype=float32)>,
# <tf.Variable 'lstm/recurrent_kernel:0' shape=(1, 4) dtype=float32, numpy=
# array([[ 0.10575113, 0.16468772, -0.05777477, 0.20210776]],
# dtype=float32)>,
# <tf.Variable 'lstm/bias:0' shape=(4,) dtype=float32, numpy=array([0.4812489, 1.6566612, 1.1815464, 0.4349145], dtype=float32)>]
model.predict(np.ones((1, 30, 1)) * 0.5).flatten()
# array([ 0.59412843, 1.1486205 , 1.6723596 , 2.1724625 , 2.6546886 ,
# 3.1237347 , 3.5834525 , 4.0370073 , 4.486994 , 4.93552 ,
# 5.38427 , 5.8345466 , 6.2873073 , 6.7431927 , 7.20255 ,
# 7.6654577 , 8.131752 , 8.601054 , 9.072805 , 9.546291 ,
# 10.0206785 , 10.495057 , 10.968457 , 11.439891 , 11.908364 ,
# 12.372919 , 12.832628 , 13.286626 , 13.734106 , 14.174344 ],
# dtype=float32)
Actually, if you just use it, I have tried using it in the previous article (the problem setting is the same ...). Try return_sequences = True on Keras RNN (LSTM)-Qiita This time, let's dig a little deeper into the implementation part.
Similar to SimpleRNN
, this can also be separated into cells from RNN
to achieve equivalent processing [^ 2].
[^ 2]: Actually, using LSTM
allows a fast CuDNN implementation (in some cases), so if you just want to use LSTM, there is no point in separating it.
from tensorflow.keras.layers import LSTM, RNN, LSTMCell
model = Sequential([
# LSTM(1, activation=None, input_shape=(None, 1), return_sequences=True)
RNN(LSTMCell(1, activation=None), input_shape=(None, 1), return_sequences=True)
])
After this, we will focus on the processing of the cell part LSTMCell
.
Before looking at the implementation, let's first check the LSTM processing. The theoretical meaning of each gate is not mentioned here. (Formulas and figures are quoted from [^ basiclstm])
\begin{align}
o_t &= σ \left( W_ox_t + R_oh_{t-1} + b_o \right) \tag{2.1}\\
f_t &= σ \left( W_fx_t + R_fh_{t-1} + b_f \right) \tag{2.2}\\
i_t &= σ \left( W_ix_t + R_ih_{t-1} + b_i \right) \tag{2.3}\\
z_t &= \tanh \left( W_zx_t + R_zh_{t-1} + b_z \right) \tag{2.4}\\
c_t &= i_t \otimes z_t+c_{t-1} \otimes f_t \tag{2.5}\\
h_t &= o_t \otimes \tanh(c_t) \tag{2.6}
\end{align}
However, $ \ otimes $ is the product of each element, $ \ sigma $ is the sigmoid function, and $ \ tanh $ is the hyperbolic tangent.
Let's look at the implementation of LSTMCell
based on equations (2.1) to (2.6).
tensorflow/recurrent.py at v2.1.0 · tensorflow/tensorflow · GitHub
Exporting a class is the same as SimpleRNN
.
recurrent.py
class LSTMCell(DropoutRNNCellMixin, Layer):
The weight is defined by build ()
.
recurrent.py
def build(self, input_shape):
default_caching_device = _caching_device(self)
input_dim = input_shape[-1]
self.kernel = self.add_weight(
shape=(input_dim, self.units * 4),
name='kernel',
initializer=self.kernel_initializer,
regularizer=self.kernel_regularizer,
constraint=self.kernel_constraint,
caching_device=default_caching_device)
self.recurrent_kernel = self.add_weight(
shape=(self.units, self.units * 4),
name='recurrent_kernel',
initializer=self.recurrent_initializer,
regularizer=self.recurrent_regularizer,
constraint=self.recurrent_constraint,
caching_device=default_caching_device)
if self.use_bias:
if self.unit_forget_bias:
def bias_initializer(_, *args, **kwargs):
return K.concatenate([
self.bias_initializer((self.units,), *args, **kwargs),
initializers.Ones()((self.units,), *args, **kwargs),
self.bias_initializer((self.units * 2,), *args, **kwargs),
])
else:
bias_initializer = self.bias_initializer
self.bias = self.add_weight(
shape=(self.units * 4,),
name='bias',
initializer=bias_initializer,
regularizer=self.bias_regularizer,
constraint=self.bias_constraint,
caching_device=default_caching_device)
else:
self.bias = None
self.built = True
Aside from the details here, please note that there are several descriptions of self.units * 4
.
In fact, kernel
contains a concatenation of four matrices $ W_o, W_f, W_i, W_z $ [^ 4]. Similarly, recurrent_kernel
has four of $ R_o, R_f, R_i, R_z $ together, and bias
has four of b_o, b_f, b_i, b_z
. I have them all together. Of course, it's not a mistake to keep each of them in 4 variables (12 in total).
As usual, the rows and columns are reversed compared to writing in a formula, so looking at the formula, it seems that the number of rows will be quadrupled, but in the code, the number of columns is quadrupled.
[^ 4]: In addition to the effect of reducing the number of variables, there is an advantage that the contents of the activation functions of equations (2.1) to (2.4) can be calculated collectively. If you give ```implementation = 2when creating
LSTMCell () ``, it seems that the implementation that calculates all at once is used.
call ()
is the main body part. For the sake of simplicity, only the processing for `ʻimplementation = 1`` is shown.
recurrent.py
def call(self, inputs, states, training=None):
h_tm1 = states[0] # previous memory state
c_tm1 = states[1] # previous carry state
(Abbreviation)
if 0 < self.dropout < 1.:
(Abbreviation)
else:
inputs_i = inputs
inputs_f = inputs
inputs_c = inputs
inputs_o = inputs
k_i, k_f, k_c, k_o = array_ops.split(
self.kernel, num_or_size_splits=4, axis=1)
x_i = K.dot(inputs_i, k_i)
x_f = K.dot(inputs_f, k_f)
x_c = K.dot(inputs_c, k_c)
x_o = K.dot(inputs_o, k_o)
if self.use_bias:
b_i, b_f, b_c, b_o = array_ops.split(
self.bias, num_or_size_splits=4, axis=0)
x_i = K.bias_add(x_i, b_i)
x_f = K.bias_add(x_f, b_f)
x_c = K.bias_add(x_c, b_c)
x_o = K.bias_add(x_o, b_o)
if 0 < self.recurrent_dropout < 1.:
(Abbreviation)
else:
h_tm1_i = h_tm1
h_tm1_f = h_tm1
h_tm1_c = h_tm1
h_tm1_o = h_tm1
x = (x_i, x_f, x_c, x_o)
h_tm1 = (h_tm1_i, h_tm1_f, h_tm1_c, h_tm1_o)
c, o = self._compute_carry_and_output(x, h_tm1, c_tm1)
(Abbreviation)
h = o * self.activation(c)
return h, [h, c]
In equations (2.1) to (2.6), $ h_ {t-1} and c_ {t-1} $ are used as the information of the previous time **. Therefore, both of them must have as a state. ** As mentioned above, you can handle more than one state by passing the states in a list.
In the first half, we calculate four values: $ W_ox_t + b_o, W_fx_t + b_f, W_ix_t + b_i, W_zx_t + b_z $. (Note that $ W_z, b_z $ in equation (2.4) are subscripted differently from k_c
, b_c
in the code)
In the second half, we use _compute_carry_and_output ()
to calculate the values for $ c_t and o_t $.
Finally, calculate $ h_t $. Here, $ h_t $ is output as it is, and $ h_t and c_t $ are returned as states for use in the calculation at the next time. The default value for ```activationis
tanh``, as in equation (2.6).
_compute_carry_and_output ()
is defined as follows.
recurrent.py
def _compute_carry_and_output(self, x, h_tm1, c_tm1):
"""Computes carry and output using split kernels."""
x_i, x_f, x_c, x_o = x
h_tm1_i, h_tm1_f, h_tm1_c, h_tm1_o = h_tm1
i = self.recurrent_activation(
x_i + K.dot(h_tm1_i, self.recurrent_kernel[:, :self.units]))
f = self.recurrent_activation(x_f + K.dot(
h_tm1_f, self.recurrent_kernel[:, self.units:self.units * 2]))
c = f * c_tm1 + i * self.activation(x_c + K.dot(
h_tm1_c, self.recurrent_kernel[:, self.units * 2:self.units * 3]))
o = self.recurrent_activation(
x_o + K.dot(h_tm1_o, self.recurrent_kernel[:, self.units * 3:]))
return c, o
In each case, the matrix product is calculated using only a part of recurrent_kernel
. The matrix product part is basically $ R_ih_ {t-1}, R_fh_ {t-1}, R_zh_ {t-1}, R_oh_ {t-1} $.
x_i
, x_f
, x_c
, x_o
will contain the calculated values of $ W_ix_t + b_i, W_fx_t + b_f, W_zx_t + b_z, W_ox_t + b_o $ Now you can calculate the contents of the activation function.
The activation function recurrent_activation
corresponds to the sigmoid function of equations (2.1) to (2.3), but it seems that the default value is hard_sigmoid
[^ 5]. The rest is as defined in the formula.
[^ 5]: Keras's hard_sigmoid is max (0, min (1, (0.2 * x) + 0.5))-Qiita
Based on the contents so far, I would like to implement and try the derivative form of LSTM proposed in the paper etc. by myself! I will introduce an example of the flow when I thought.
I'd like a simple example, so I'll try the Simplified LSTM (S-LSTM) proposed in Wu (2016) [^ 6].
First, I will quote the formula of the original paper. However, in order to match the notation with other expressions on this page, the position of the subscript has been changed, and the generalized version of $ \ delta, g $ has been changed to $ \ sigma, \ tanh $. I am doing.
\begin{align}
f_t &=\sigma(W_fx_t+R_fh_{t−1}+b_f) \\
c_t &=f_t \otimes c_{t−1}+ (1−f_t) \otimes \tanh (W_c x_t+R_ch_{t−1}+b_c) \\
h_t &=\tanh (c_t)
\end{align}
First, let's find what you should have as a state from the formula. You must have a variable as a state that uses information from the previous time, that is, references the ** subscript in $ t-1 $. Therefore, this time we have $ h_t and c_t $ as states. ** **
The weights (parameters you want to learn) are $ W_f, R_f, b_f, W_c, R_c, b_c $. Compared to ordinary LSTM, the number of weights is halved.
LSTMCell
inherits Layer
, but when you make it yourself, it seems better to inherit tf.keras.layers.AbstractRNNCell
.
tf.keras.layers.AbstractRNNCell | TensorFlow Core v2.1.0
This is the base class for implementing RNN cells with custom behavior.
For build ()
, would it look like this if modified based on the LSTM implementation? * 4
is changed to * 2
to exclude parts that are not directly related to Dropout.
def build(self, input_shape):
input_dim = input_shape[-1]
self.kernel = self.add_weight(
shape=(input_dim, self.units * 2),
name='kernel',
initializer=self.kernel_initializer,
regularizer=self.kernel_regularizer,
constraint=self.kernel_constraint)
self.recurrent_kernel = self.add_weight(
shape=(self.units, self.units * 2),
name='recurrent_kernel',
initializer=self.recurrent_initializer,
regularizer=self.recurrent_regularizer,
constraint=self.recurrent_constraint)
if self.use_bias:
self.bias = self.add_weight(
shape=(self.units * 2,),
name='bias',
initializer=self.bias_initializer,
regularizer=self.bias_regularizer,
constraint=self.bias_constraint)
else:
self.bias = None
self.built = True
For call ()
, implement only the processing equivalent to implementation = 1
. Maybe something like this.
Note that inputs
and states
are tf.Tensor
, so do not use processing for ndarray
such as np.dot
. , tf.math
, tf.linalg
, tf.keras.backend
, etc. Please process using the function that handles Tensor
.
I want to get used to the Tensor object of TensorFlow --Qiita
def call(self, inputs, states, training=None):
h_tm1 = states[0] # previous memory state
c_tm1 = states[1] # previous carry state
k_f, k_c = array_ops.split(
self.kernel, num_or_size_splits=2, axis=1)
x_f = K.dot(inputs, k_f)
x_c = K.dot(inputs, k_c)
if self.use_bias:
b_f, b_c = array_ops.split(
self.bias, num_or_size_splits=2, axis=0)
x_f = K.bias_add(x_f, b_f)
x_c = K.bias_add(x_c, b_c)
f = self.recurrent_activation(x_f + K.dot(
h_tm1, self.recurrent_kernel[:, :self.units]))
c = f * c_tm1 + (1 - f) * self.activation(x_c + K.dot(
h_tm1, self.recurrent_kernel[:, self.units:]))
h = self.activation(c)
return h, [h, c]
slstm.py
import tensorflow as tf
import numpy as np
from tensorflow.keras import Sequential
from tensorflow.keras.layers import RNN, AbstractRNNCell
from tensorflow.keras.optimizers import SGD
from tensorflow.python.keras import activations, constraints, initializers, regularizers
from tensorflow.python.keras import backend as K
from tensorflow.python.keras.utils import tf_utils
from tensorflow.python.ops import array_ops
class SLSTMCell(AbstractRNNCell):
def __init__(self,
units,
activation='tanh',
recurrent_activation='hard_sigmoid',
use_bias=True,
kernel_initializer='glorot_uniform',
recurrent_initializer='orthogonal',
bias_initializer='zeros',
kernel_regularizer=None,
recurrent_regularizer=None,
bias_regularizer=None,
kernel_constraint=None,
recurrent_constraint=None,
bias_constraint=None,
**kwargs):
super(SLSTMCell, self).__init__(**kwargs)
self.units = units
self.activation = activations.get(activation)
self.recurrent_activation = activations.get(recurrent_activation)
self.use_bias = use_bias
self.kernel_initializer = initializers.get(kernel_initializer)
self.recurrent_initializer = initializers.get(recurrent_initializer)
self.bias_initializer = initializers.get(bias_initializer)
self.kernel_regularizer = regularizers.get(kernel_regularizer)
self.recurrent_regularizer = regularizers.get(recurrent_regularizer)
self.bias_regularizer = regularizers.get(bias_regularizer)
self.kernel_constraint = constraints.get(kernel_constraint)
self.recurrent_constraint = constraints.get(recurrent_constraint)
self.bias_constraint = constraints.get(bias_constraint)
@property
def state_size(self):
return [self.units, self.units]
def build(self, input_shape):
input_dim = input_shape[-1]
self.kernel = self.add_weight(
shape=(input_dim, self.units * 2),
name='kernel',
initializer=self.kernel_initializer,
regularizer=self.kernel_regularizer,
constraint=self.kernel_constraint)
self.recurrent_kernel = self.add_weight(
shape=(self.units, self.units * 2),
name='recurrent_kernel',
initializer=self.recurrent_initializer,
regularizer=self.recurrent_regularizer,
constraint=self.recurrent_constraint)
if self.use_bias:
self.bias = self.add_weight(
shape=(self.units * 2,),
name='bias',
initializer=self.bias_initializer,
regularizer=self.bias_regularizer,
constraint=self.bias_constraint)
else:
self.bias = None
self.built = True
def call(self, inputs, states, training=None):
h_tm1 = states[0] # previous memory state
c_tm1 = states[1] # previous carry state
k_f, k_c = array_ops.split(
self.kernel, num_or_size_splits=2, axis=1)
x_f = K.dot(inputs, k_f)
x_c = K.dot(inputs, k_c)
if self.use_bias:
b_f, b_c = array_ops.split(
self.bias, num_or_size_splits=2, axis=0)
x_f = K.bias_add(x_f, b_f)
x_c = K.bias_add(x_c, b_c)
f = self.recurrent_activation(x_f + K.dot(
h_tm1, self.recurrent_kernel[:, :self.units]))
c = f * c_tm1 + (1 - f) * self.activation(x_c + K.dot(
h_tm1, self.recurrent_kernel[:, self.units:]))
h = self.activation(c)
return h, [h, c]
tf.random.set_seed(111)
np.random.seed(111)
model = Sequential([
RNN(SLSTMCell(1, activation=None), input_shape=(None, 1), return_sequences=True)
])
model.compile(optimizer=SGD(lr=0.0001), loss="mean_squared_error")
n = 51200
x = np.random.random((n, 30, 1))
y = x.cumsum(axis=1)
model.fit(x, y, batch_size=512, epochs=100)
model.layers[0].weights
# [<tf.Variable 'rnn/kernel:0' shape=(1, 2) dtype=float32, numpy=array([[-0.79614836, 0.03041089]], dtype=float32)>,
# <tf.Variable 'rnn/recurrent_kernel:0' shape=(1, 2) dtype=float32, numpy=array([[0.08143749, 1.0668359 ]], dtype=float32)>,
# <tf.Variable 'rnn/bias:0' shape=(2,) dtype=float32, numpy=array([0.6330045, 1.0431471], dtype=float32)>]
model.predict(np.ones((1, 30, 1)) * 0.5).flatten()
# array([ 0.47944844, 0.96489847, 1.4559155 , 1.9520411 , 2.4527955 ,
# 2.9576783 , 3.466171 , 3.9777386 , 4.4918313 , 5.007888 ,
# 5.5253367 , 6.0435996 , 6.5620937 , 7.0802336 , 7.597435 ,
# 8.113117 , 8.626705 , 9.13763 , 9.645338 , 10.149284 ,
# 10.648943 , 11.143805 , 11.633378 , 12.117197 , 12.594816 ,
# 13.065814 , 13.529797 , 13.986397 , 14.435274 , 14.876117 ],
# dtype=float32)
I'm not sure if the implementation is correct, but it works like that, so let's say OK.
Actually, the state (initial state) at the beginning of the input is $ h_0 = c_0 = 0 $. It is defined in `ʻAbstractRNNCell``.
recurrent.py
def get_initial_state(self, inputs=None, batch_size=None, dtype=None):
return _generate_zero_filled_state_for_cell(self, inputs, batch_size, dtype)
That's fine for this issue, but if you want the initial state to be different, you can change it by overriding get_initial_state ()
in the inheritance destination. For example, if you want to start with $ h_0 = 1 $, it will be as follows.
def get_initial_state(self, inputs=None, batch_size=None, dtype=None):
h_0 = tf.ones([batch_size, self.units], dtype)
c_0 = tf.zeros([batch_size, self.units], dtype)
return [h_0, c_0]
Try changing the label of the training data so that it is "partial sum +1".
n = 51200
x = np.random.random((n, 30, 1))
y = x.cumsum(axis=1) + 1
model.fit(x, y, batch_size=512, epochs=100)
model.predict(np.ones((1, 30, 1)) * 0.5).flatten()
# array([ 1.0134857, 1.5777774, 2.140834 , 2.702304 , 3.2618384,
# 3.8190937, 4.3737316, 4.92542 , 5.473833 , 6.018653 ,
# 6.5595713, 7.0962873, 7.62851 , 8.15596 , 8.678368 ,
# 9.195474 , 9.707033 , 10.2128105, 10.712584 , 11.206142 ,
# 11.693292 , 12.173846 , 12.647637 , 13.114506 , 13.574308 ,
# 14.026913 , 14.472202 , 14.91007 , 15.3404255, 15.763187 ],
# dtype=float32)
There seems to be a lot of error, but somehow I got the result.
I wrote how to use RNN in TensorFlow + Keras and how to customize RNN for re-examination of papers. It's not difficult if you just use RNN and LSTM as a black box, but when you try to understand the internal processing, the reference materials (especially in Japanese) are surprising. I hope this article lowers the hurdles for handling RNNs and LSTMs.
Recommended Posts