When I was investigating anomaly detection methods in my work, I found a method called EfficientGAN, but the author's source code did not describe the version of the library and it was difficult to execute, so I implemented it with keras for studying as well. .. In addition, only the network for table data was implemented, and "feature-matching loss" in the loss calculation at the time of inference is not implemented.
Source code: https://github.com/asm94/EfficientGAN
↓ Referenced Original paper: https://arxiv.org/abs/1802.06222 Author source code: https://github.com/houssamzenati/Efficient-GAN-Anomaly-Detection Commentary article: https://qiita.com/masataka46/items/49dba2790fa59c29126b
・ Windows10 64bit -Python 3.8.3 ・ Numpy 1.18.5 ・ Tensorflow 2.3.1 ・ Scikit-learn 0.22.2
This time, we have defined the network and learning and inference functions of EfficientGAN as one class. The whole picture is as follows. The individual functions will be described later.
class EfficientGAN(object):
def __init__(self, input_dim=0, latent_dim=32):
self.input_dim = int(input_dim)
self.latent_dim = int(latent_dim)
#Train model
def fit(self, X_train, epochs=50, batch_size=50, loss=tf.keras.losses.BinaryCrossentropy(),
optimizer=tf.keras.optimizers.Adam(lr=1e-5, beta_1=0.5), test=tuple(), early_stop_num=50,
verbose=1):
#See below
#Test model
def predict(self, X_test, weight=0.9, degree=1):
#See below
##Encoder
def get_encoder(self, initializer=tf.keras.initializers.GlorotUniform()):
#See below
##Generator
def get_generator(self, initializer=tf.keras.initializers.GlorotUniform()):
#See below
##Discriminator
def get_discriminator(self, initializer=tf.keras.initializers.GlorotUniform()):
#See below
I implemented it as follows with reference to the paper. -"Input_dim" is 121, which is the number of dimensions of the data used in the paper, but it has been changed so that it can be set variably. -The activation function of the output layer of Discriminator is linear in the paper, but looking at the author's source code, it is converted by the sigmoid function at the time of loss calculation, so this time it was incorporated into the network.
##Encoder
def get_encoder(self, initializer=tf.keras.initializers.GlorotUniform()):
inputs = Input(shape=(self.input_dim,), name='input')
net = inputs
net = Dense(64, activation=LeakyReLU(alpha=0.1), kernel_initializer=initializer,
name='layer_1')(net)
outputs = Dense(self.latent_dim, activation='linear', kernel_initializer=initializer,
name='output')(net)
return Model(inputs=inputs, outputs=outputs, name='Encoder')
##Generator
def get_generator(self, initializer=tf.keras.initializers.GlorotUniform()):
inputs = Input(shape=(self.latent_dim,), name='input')
net = inputs
net = Dense(64, activation='relu', kernel_initializer=initializer,
name='layer_1')(net)
net = Dense(128, activation='relu', kernel_initializer=initializer,
name='layer_2')(net)
outputs = Dense(self.input_dim, activation='linear', kernel_initializer=initializer,
name='output')(net)
return Model(inputs=inputs, outputs=outputs, name='Generator')
##Discriminator
def get_discriminator(self, initializer=tf.keras.initializers.GlorotUniform()):
#D(x)
inputs1 = Input(shape=(self.input_dim,), name='real')
net = inputs1
net = Dense(128, activation=LeakyReLU(alpha=0.1), kernel_initializer=initializer,
name='layer_1')(net)
dx = Dropout(.2)(net)
#D(z)
inputs2 = Input(shape=(self.latent_dim,), name='noise')
net = inputs2
net = Dense(128, activation=LeakyReLU(alpha=0.1), kernel_initializer=initializer,
name='layer_2')(net)
dz = Dropout(.2)(net)
#D(x)And D(z)Combine
conet = Concatenate(axis=1)([dx,dz])
#D(x,z)
conet = Dense(128, activation=LeakyReLU(alpha=0.1), kernel_initializer=initializer,
name='layer_3')(conet)
conet = Dropout(.2)(conet)
outputs = Dense(1, activation='sigmoid', kernel_initializer=initializer,
name='output')(conet)
return Model(inputs=[inputs1,inputs2], outputs=outputs, name='Discriminator')
I implemented it as follows with reference to the paper. -In the author's source code, the conversion is performed by the sigmoid function immediately before the loss calculation, but as described in Section 2, the conversion by the sigmoid function is incorporated in the network, so it is not converted here. -Each partial model such as Discriminator is defined at the time of learning, not at the time of defining the EfficientGAN class, and if the number of dimensions of the input is undefined, the number of dimensions of the training data is set to the number of dimensions of the input at this timing.
#Train model
def fit(self, X_train, epochs=50, batch_size=50, loss=tf.keras.losses.BinaryCrossentropy(),
optimizer=tf.keras.optimizers.Adam(lr=1e-5, beta_1=0.5), test=tuple(), early_stop_num=50,
verbose=1):
#Convert training data to numpy type
X_train = np.array(X_train)
#"input_dim"If is not 1 or more (assuming undefined), set the number of dimensions of the training data
if not self.input_dim >= 1: self.input_dim = X_train.shape[1]
#Discriminator model definition
self.dis = self.get_discriminator()
self.dis.compile(loss=loss, optimizer=optimizer)
#Model definition for Encoder learning (Encoder → Discriminator)
self.enc = self.get_encoder()
x = Input(shape=(self.input_dim,))
z_gen = self.enc(x)
valid = self.dis([x, z_gen])
enc_dis = Model(inputs=x, outputs=valid, name='enc_to_dis')
enc_dis.compile(loss=loss, optimizer=optimizer)
#Model definition for Generator learning (Generator → Discriminator)
self.gen = self.get_generator()
z = Input(shape=(self.latent_dim,))
x_gen = self.gen(z)
valid = self.dis([x_gen, z])
gen_dis = Model(inputs=z, outputs=valid, name='gen_to_dis')
gen_dis.compile(loss=loss, optimizer=optimizer)
#Training
min_val_loss = float('inf')
stop_count = 0
for i in range(epochs):
#Discriminator with learning function turned on
self.dis.trainable = True
#From the training data"batch_size"Randomly get half of
idx = np.random.randint(0, X_train.shape[0], batch_size//2)
real_data = X_train[idx]
#"batch_size"Noise is generated by half of the generated noise, and data is generated from each generated noise.
noise = np.random.normal(0, 1, (len(idx), self.latent_dim))
gen_data = self.gen.predict(noise)
#Generate noise from each acquired training data
enc_noise = self.enc.predict(real_data)
#Discriminator learning
d_enc_loss = self.dis.train_on_batch([real_data, enc_noise], np.ones((len(real_data), 1)))
d_gen_loss = self.dis.train_on_batch([gen_data, noise], np.zeros((len(gen_data), 1)))
d_loss = d_enc_loss + d_gen_loss
#Turn off the learning function of Discriminator
self.dis.trainable = False
#Encoder learning
e_loss = enc_dis.train_on_batch(real_data, np.zeros((len(real_data), 1)))
#Generator learning
g_loss = gen_dis.train_on_batch(noise, np.ones((len(noise), 1)))
#If there is evaluation data setting, loss calculation of the data and early stop examination
if len(test)>0:
#Acquisition of evaluation data
X_test = test[0]
y_true = test[1]
#Inference of evaluation data
proba = self.predict(X_test)
proba = minmax_scale(proba)
#loss calculation
val_loss = tf.keras.losses.binary_crossentropy(y_true, proba).numpy()
#If the loss of the evaluation data is more attenuated than before, update the minimum loss and reset the early stop count.
if min_val_loss > val_loss:
min_val_loss = val_loss #Update "min_val_loss" to "val_loss"
stop_count = 0 #Change "stop_count" to 0
#If "stop_count" is equal or more than "early_stop_num", training is end
#If the loss of evaluation data does not decrease within the specified number of times, learning stop
elif stop_count >= early_stop_num:
break
else:
stop_count += 1
#Display of learning status
if verbose==1 and i%100==0:
if len(test)==0: print(f'epoch{i}-> d_loss:{d_loss} e_loss:{e_loss} g_loss:{g_loss}')
else: print(f'epoch{i}-> d_loss:{d_loss} e_loss:{e_loss} g_loss:{g_loss} val_loss:{val_loss}')
I implemented it as follows with reference to the paper. As shown in the paper, the abnormal score is calculated by the following formula (the higher the value, the more abnormal).
A(x)=αL_G(x)+(1-α)L_D(x)・ ・ ・ Anomaly Score
L_G(x)=||x-G(E(x))||_1 ・ ・ ・ Generator Loss
L_D(x)=σ(D(x,E(x)),1)・ ・ ・ Discriminator Loss
By the way, in the author's source code, DiscriminatorLoss is as follows, and I was wondering which one to take with the content of the paper, but this time I implemented it with the above formula as in the paper.
L_D(x)=σ(D(G(E(x)),E(x)),1)
#Test model
def predict(self, X_test, weight=0.9, degree=1):
#Convert evaluation data to numpy type
X_test = np.array(X_test)
#Noise generation from evaluation data
z_gen = self.enc.predict(X_test)
#Data is generated again with the noise generated from the evaluation data.
reconstructs = self.gen.predict(z_gen)
#Calculate the difference between the original data and the regenerated data for each explanatory variable and add them together
#If the data is similar to the training data, you should be able to regenerate the input data with the encoder and generator that you learned well.
delta = X_test - reconstructs
gen_score = tf.norm(delta, ord=degree, axis=1).numpy()
#Infer Encoder input / output with Discriminator
l_encoder = self.dis.predict([X_test, z_gen])
#Calculate the cross entropy between the above inference result and an array of all 1s
#If the data is similar to the training data, the result of inferring the input / output of the Encoder with the Discriminator should be 1.
dis_score = tf.keras.losses.binary_crossentropy(np.ones((len(X_test), 1)), l_encoder).numpy()
#Return anomality calculated "gen_score" and "dis_score"
return weight*gen_score + (1-weight)*dis_score
Thank you for visiting our website. If you have any concerns, I would appreciate it if you could point out.
Recommended Posts