Certaines personnes étaient fatiguées du travail d'étiquetage pour classer les documents dans l'apprentissage supervisé, alors j'ai fait une classification de texte d'apprentissage semi-supervisé afin que la classification puisse être faite avec un petit nombre d'étiquettes.
Désolé pour le code assez sale, mais je vais le coller ci-dessous. Le code DBN est une copie ronde du tutoriel Deep Learning. Le site du tutoriel d'apprentissage en profondeur a des explications avec des formules mathématiques, je voudrais donc y donner des explications détaillées.
Après avoir lu les données CSV au format BoW et extrait les fonctionnalités, on a l'impression de cracher le CSV. Veuillez noter qu'il contient certaines méthodes qui ne sont pas utilisées.
python
# coding:utf-8
from __future__ import unicode_literals
import time
import numpy as np
import theano
import theano.tensor as T
from theano.tensor.shared_randomstreams import RandomStreams
class DBN:
def __init__(self, numpy_rng, theano_rng=None, n_ins=784,
hidden_layers_sizes=[500, 500], n_outs=10):
self.sigmoid_layers = []
self.rbm_layers = []
self.params = []
self.n_layers = len(hidden_layers_sizes)
assert self.n_layers > 0
if not theano_rng:
theano_rng = RandomStreams(numpy_rng.randint(2**30))
# allocate symbolic variables for the data
self.x = T.matrix('x')
self.y = T.ivector('y')
for i in xrange(self.n_layers):
if i==0:
input_size = n_ins
layer_input = self.x
else:
input_size = hidden_layers_sizes[i - 1]
layer_input = self.sigmoid_layers[-1].output
sigmoid_layer = HiddenLayer(rng=numpy_rng,
input=layer_input,
n_in=input_size,
n_out=hidden_layers_sizes[i],
activation=T.nnet.sigmoid)
self.sigmoid_layers.append( sigmoid_layer )
self.params.extend(sigmoid_layer.params)
rbm_layer = RBM(numpy_rng=numpy_rng,
theano_rng=theano_rng,
input=layer_input,
n_visible=input_size,
n_hidden=hidden_layers_sizes[i],
W=sigmoid_layer.W,
hbias=sigmoid_layer.b)
self.rbm_layers.append(rbm_layer)
self.logLayer = LogisticRegression(
input=self.sigmoid_layers[-1].output,
n_in=hidden_layers_sizes[-1],
n_out=n_outs)
self.params.extend(self.logLayer.params)
self.finetune_cost = self.logLayer.negative_log_likelihood(self.y)
self.errors = self.logLayer.errors(self.y)
def pretrainig_functions(self, train_set_x, batch_size, k):
index = T.lscalar('index')
learning_rate = T.scalar('lr') # learning rate to use
# number of batches
n_batches = train_set_x.get_value(borrow=True).shape[0] / batch_size
# begining of a batch, given `index`
batch_begin = index * batch_size
# ending of a batch given `index`
batch_end = batch_begin + batch_size
pretrain_fns = []
for rbm in self.rbm_layers:
cost, updates = rbm.get_cost_updates(learning_rate,
persistent=None, k=k)
# compile the theano function
fn = theano.function(
inputs=[index, theano.Param(learning_rate, default=0.1)],
outputs=cost,
updates=updates,
givens={
self.x: train_set_x[batch_begin:batch_end]
}
)
# append `fn` to the list of functions
pretrain_fns.append(fn)
return pretrain_fns
def build_finetune_functions(self, datasets, batch_size, learning_rate):
(train_set_x, train_set_y) = datasets[0]
(valid_set_x, valid_set_y) = datasets[1]
(test_set_x, test_set_y) = datasets[2]
# compute number of minibatches for training, validation and testing
n_valid_batches = valid_set_x.get_value(borrow=True).shape[0]
n_valid_batches /= batch_size
n_test_batches = test_set_x.get_value(borrow=True).shape[0]
n_test_batches /= batch_size
index = T.lscalar('index') # index to a [mini]batch
# compute the gradients with respect to the model parameters
gparams = T.grad(self.finetune_cost, self.params)
# compute list of fine-tuning updates
updates = []
for param, gparam in zip(self.params, gparams):
updates.append((param, param - gparam * learning_rate))
train_fn = theano.function(
inputs=[index],
outputs=self.finetune_cost,
updates=updates,
givens={
self.x: train_set_x[
index * batch_size: (index + 1) * batch_size
],
self.y: train_set_y[
index * batch_size: (index + 1) * batch_size
]
}
)
test_score_i = theano.function(
[index],
self.errors,
givens={
self.x: test_set_x[
index * batch_size: (index + 1) * batch_size
],
self.y: test_set_y[
index * batch_size: (index + 1) * batch_size
]
}
)
valid_score_i = theano.function(
[index],
self.errors,
givens={
self.x: valid_set_x[
index * batch_size: (index + 1) * batch_size
],
self.y: valid_set_y[
index * batch_size: (index + 1) * batch_size
]
}
)
# Create a function that scans the entire validation set
def valid_score():
return [valid_score_i(i) for i in xrange(n_valid_batches)]
# Create a function that scans the entire test set
def test_score():
return [test_score_i(i) for i in xrange(n_test_batches)]
return train_fn, valid_score, test_score
class HiddenLayer:
def __init__(self, rng, input, n_in, n_out, W=None, b=None,
activation=T.tanh):
self.input = input
if W is None:
W_values = np.asarray(
rng.uniform(
low=-np.sqrt(6. / (n_in+n_out)),
high=np.sqrt(6. / (n_in+n_out)),
size=(n_in, n_out)
),
dtype=theano.config.floatX
)
if activation == theano.tensor.nnet.sigmoid:
W_values *=4
W = theano.shared(value=W_values, name='W', borrow=True)
if b is None:
b_values = np.zeros((n_out,), dtype=theano.config.floatX)
b = theano.shared(value=b_values, name='b', borrow=True)
self.W = W
self.b = b
lin_output = T.dot(input, self.W) + self.b
self.output = (
lin_output if activation is None
else activation(lin_output)
)
self.params = [self.W, self.b]
class LogisticRegression:
def __init__(self, input, n_in, n_out):
self.W = theano.shared(
value=np.zeros(
(n_in, n_out),
dtype=theano.config.floatX
),
name='W',
borrow=True
)
# initialize the baises b as a vector of n_out 0s
self.b = theano.shared(
value=np.zeros(
(n_out,),
dtype=theano.config.floatX
),
name='b',
borrow=True
)
self.p_y_given_x = T.nnet.softmax(T.dot(input, self.W) + self.b)
self.y_pred = T.argmax(self.p_y_given_x, axis=1)
self.params = [self.W, self.b]
def negative_log_likelihood(self, y):
return -T.mean(T.log(self.p_y_given_x)[T.arange(y.shape[0]), y])
def errors(self, y):
# check if y has same dimension of y_pred
if y.ndim != self.y_pred.ndim:
raise TypeError(
'y should have the same shape as self.y_pred',
('y', y.type, 'y_pred', self.y_pred.type)
)
# check if y is of the correct datatype
if y.dtype.startswith('int'):
return T.mean(T.neq(self.y_pred, y))
else:
raise NotImplementedError()
class RBM(object):
"""Restricted Boltzmann Machine (RBM) """
def __init__(
self,
input=None,
n_visible=784,
n_hidden=500,
W=None,
hbias=None,
vbias=None,
numpy_rng=None,
theano_rng=None
):
self.n_visible = n_visible
self.n_hidden = n_hidden
if numpy_rng is None:
# create a number generator
numpy_rng = np.random.RandomState(1234)
if theano_rng is None:
theano_rng = RandomStreams(numpy_rng.randint(2 ** 30))
if W is None:
initial_W = np.asarray(
numpy_rng.uniform(
low=-4 * np.sqrt(6. / (n_hidden + n_visible)),
high=4 * np.sqrt(6. / (n_hidden + n_visible)),
size=(n_visible, n_hidden)
),
dtype=theano.config.floatX
)
# theano shared variables for weights and biases
W = theano.shared(value=initial_W, name='W', borrow=True)
if hbias is None:
# create shared variable for hidden units bias
hbias = theano.shared(
value=np.zeros(
n_hidden,
dtype=theano.config.floatX
),
name='hbias',
borrow=True
)
if vbias is None:
# create shared variable for visible units bias
vbias = theano.shared(
value=np.zeros(
n_visible,
dtype=theano.config.floatX
),
name='vbias',
borrow=True
)
# initialize input layer for standalone RBM or layer0 of DBN
self.input = input
if not input:
self.input = T.matrix('input')
self.W = W
self.hbias = hbias
self.vbias = vbias
self.theano_rng = theano_rng
self.params = [self.W, self.hbias, self.vbias]
def propup(self, vis):
pre_sigmoid_activation = T.dot(vis, self.W) + self.hbias
return [pre_sigmoid_activation, T.nnet.sigmoid(pre_sigmoid_activation)]
def sample_h_given_v(self, v0_sample):
''' This function infers state of hidden units given visible units '''
pre_sigmoid_h1, h1_mean = self.propup(v0_sample)
h1_sample = self.theano_rng.binomial(size=h1_mean.shape,
n=1, p=h1_mean,
dtype=theano.config.floatX)
return [pre_sigmoid_h1, h1_mean, h1_sample]
def propdown(self, hid):
pre_sigmoid_activation = T.dot(hid, self.W.T) + self.vbias
return [pre_sigmoid_activation, T.nnet.sigmoid(pre_sigmoid_activation)]
def sample_v_given_h(self, h0_sample):
''' This function infers state of visible units given hidden units '''
# compute the activation of the visible given the hidden sample
pre_sigmoid_v1, v1_mean = self.propdown(h0_sample)
v1_sample = self.theano_rng.binomial(size=v1_mean.shape,
n=1, p=v1_mean,
dtype=theano.config.floatX)
return [pre_sigmoid_v1, v1_mean, v1_sample]
def gibbs_hvh(self, h0_sample):
''' This function implements one step of Gibbs sampling,
starting from the hidden state'''
pre_sigmoid_v1, v1_mean, v1_sample = self.sample_v_given_h(h0_sample)
pre_sigmoid_h1, h1_mean, h1_sample = self.sample_h_given_v(v1_sample)
return [pre_sigmoid_v1, v1_mean, v1_sample,
pre_sigmoid_h1, h1_mean, h1_sample]
def gibbs_vhv(self, v0_sample):
''' This function implements one step of Gibbs sampling,
starting from the visible state'''
pre_sigmoid_h1, h1_mean, h1_sample = self.sample_h_given_v(v0_sample)
pre_sigmoid_v1, v1_mean, v1_sample = self.sample_v_given_h(h1_sample)
return [pre_sigmoid_h1, h1_mean, h1_sample,
pre_sigmoid_v1, v1_mean, v1_sample]
def free_energy(self, v_sample):
''' Function to compute the free energy '''
wx_b = T.dot(v_sample, self.W) + self.hbias
vbias_term = T.dot(v_sample, self.vbias)
hidden_term = T.sum(T.log(1 + T.exp(wx_b)), axis=1)
return -hidden_term - vbias_term
def get_cost_updates(self, lr=0.1, persistent=None, k=1):
# compute positive phase
pre_sigmoid_ph, ph_mean, ph_sample = self.sample_h_given_v(self.input)
if persistent is None:
chain_start = ph_sample
else:
chain_start = persistent
(
[
pre_sigmoid_nvs,
nv_means,
nv_samples,
pre_sigmoid_nhs,
nh_means,
nh_samples
],
updates
) = theano.scan(
self.gibbs_hvh,
outputs_info=[None, None, None, None, None, chain_start],
n_steps=k
)
chain_end = nv_samples[-1]
cost = T.mean(self.free_energy(self.input)) - T.mean(
self.free_energy(chain_end))
# We must not compute the gradient through the gibbs sampling
gparams = T.grad(cost, self.params, consider_constant=[chain_end])
for gparam, param in zip(gparams, self.params):
# make sure that the learning rate is of the right dtype
updates[param] = param - gparam * T.cast(
lr,
dtype=theano.config.floatX
)
if persistent:
# Note that this works only if persistent is a shared variable
updates[persistent] = nh_samples[-1]
# pseudo-likelihood is a better proxy for PCD
monitoring_cost = self.get_pseudo_likelihood_cost(updates)
else:
# reconstruction cross-entropy is a better proxy for CD
monitoring_cost = self.get_reconstruction_cost(updates,
pre_sigmoid_nvs[-1])
return monitoring_cost, updates
def get_pseudo_likelihood_cost(self, updates):
"""Stochastic approximation to the pseudo-likelihood"""
# index of bit i in expression p(x_i | x_{\i})
bit_i_idx = theano.shared(value=0, name='bit_i_idx')
# binarize the input image by rounding to nearest integer
xi = T.round(self.input)
# calculate free energy for the given bit configuration
fe_xi = self.free_energy(xi)
# flip bit x_i of matrix xi and preserve all other bits x_{\i}
# Equivalent to xi[:,bit_i_idx] = 1-xi[:, bit_i_idx], but assigns
# the result to xi_flip, instead of working in place on xi.
xi_flip = T.set_subtensor(xi[:, bit_i_idx], 1 - xi[:, bit_i_idx])
# calculate free energy with bit flipped
fe_xi_flip = self.free_energy(xi_flip)
# equivalent to e^(-FE(x_i)) / (e^(-FE(x_i)) + e^(-FE(x_{\i})))
cost = T.mean(self.n_visible * T.log(T.nnet.sigmoid(fe_xi_flip -
fe_xi)))
# increment bit_i_idx % number as part of updates
updates[bit_i_idx] = (bit_i_idx + 1) % self.n_visible
return cost
def get_reconstruction_cost(self, updates, pre_sigmoid_nv):
cross_entropy = T.mean(
T.sum(
self.input * T.log(T.nnet.sigmoid(pre_sigmoid_nv)) +
(1 - self.input) * T.log(1 - T.nnet.sigmoid(pre_sigmoid_nv)),
axis=1
)
)
return cross_entropy
def output(input_data, w, b):
x = np.dot(input_data,w)+np.kron( np.ones((input_data.shape[0],1)),b)
return 1/(1+np.exp(-x))
if __name__=='__main__':
numpy_rng = np.random.RandomState(123)
print '... building the model'
ifname = 'bow_data.csv'
data = np.loadtxt(ifname, delimiter=',')
train_set_x = theano.shared(np.asarray(data, np.float64))
dbn = DBN(numpy_rng=numpy_rng, n_ins=data.shape[1],
hidden_layers_sizes=[2000, 1000, 100],
n_outs=10)
#########################
# PRETRAINING THE MODEL #
#########################
print '... getting the pretraining functions'
batch_size=10
k = 5
pretraining_fns = dbn.pretrainig_functions(train_set_x=train_set_x,
batch_size=batch_size,
k=k)
print '... pre-training the model'
pretraining_epochs = 100
n_train_batches = 10
pretrain_lr = 0.1
## Pre-train layer-wise
for i in xrange(dbn.n_layers):
# go through pretraining epochs
for epoch in xrange(pretraining_epochs):
# go through the training set
c = []
for batch_index in xrange(n_train_batches):
c.append(pretraining_fns[i](index=batch_index,
lr=pretrain_lr))
print 'Pre-training layer %i, epoch %d, cost ' % (i, epoch),
print np.mean(c)
layer_output =[]
for i in xrange(dbn.n_layers):
w = dbn.rbm_layers[i].W.get_value()
hbias = dbn.rbm_layers[i].hbias.get_value()
if i==0:
layer_output.append( train_set_x.get_value() )
layer_output.append( output(layer_output[-1],w, hbias) )
else:
layer_output.append( output(layer_output[-1],w, hbias) )
print layer_output[-1]
np.savetxt('DBN_features.csv',layer_output[-1], delimiter=',')
Le contenu de original_data.csv est (0 \ t1 0 1 0 0 0 0 \ txxxx) et se compose d'une ligne. Ce qu'il représente est de la gauche Indicateur de données couvrant plusieurs classes, indicateur de chaque classe, texte Il est devenu. Dans le code ci-dessous, lors de la formation, les données avec une seule étiquette sont utilisées autant que possible comme données d'enseignant. J'ai ajouté une fonction pour que vous puissiez la vérifier avec les données d'iris, veuillez donc l'utiliser si vous souhaitez essayer la diffusion d'étiquettes lorsque vous ne disposez pas de données appropriées.
python
# coding: utf-8
from sklearn import datasets
from sklearn.semi_supervised import LabelSpreading
import numpy as np
from numpy.random import seed
seed(555)
from collections import defaultdict
def iris():
iris = datasets.load_iris()
random_unlabeled_points = np.where(np.random.random_integers(0, 1, size=len(iris.target)))
labels = np.copy(iris.target)
labels[random_unlabeled_points] = -1
label_prop_model = LabelSpreading()
label_prop_model.fit(iris.data, labels) # unlabeled as -1
pred_prop = label_prop_model.predict_proba(iris.data)
pred_label = label_prop_model.predict(iris.data)
for pp, pl, label, trgt in zip(pred_prop,pred_label,labels,iris.target):
print pp, pl, label, trgt
def main(X, labels_info, min_number=20, label_num=6, n_neighbors=7, alpha=0.3, typ='knn', threshold=0.5):
target = get_target(labels_info)
random_unlabeled_points = np.where(np.random.random_integers(0, 1, size=len(target)))[0]
cnt_dict = defaultdict(int)
for i, t in enumerate(target):
if len(t)==1 and (i in random_unlabeled_points):
target[i] = -1
cnt_dict[-1] += 1
elif len(t)>=2:
target[i] = -1
cnt_dict[-1] += 1
elif cnt_dict[target[i][0]]<min_number:
target[i] = target[i][0]
cnt_dict[target[i]] += 1
elif cnt_dict[target[i][0]]>=min_number:
target[i] = -1
cnt_dict[target[i]] += 1
print cnt_dict
if typ=='knn':
label_prop_model = LabelSpreading(kernel=typ, n_neighbors=n_neighbors)
else:
label_prop_model = LabelSpreading(kernel=typ, alpha=alpha)
label_prop_model.fit(X, target) # unlabeled as -1
pred_prop = label_prop_model.predict_proba(X)
pred_label = label_prop_model.predict(X)
res_dict = defaultdict(dict) # TP, FP, FN,Magasin TN
for label in ('TP', 'FP', 'FN', 'TN'):
res_dict[label] = defaultdict(int)
label_dict = defaultdict(int)
for pp, pl, labels, trgt in zip(pred_prop,pred_label,get_target(labels_info),target):
#étiquette est l'étiquette de réponse correcte
print pp, np.where(pp>=threshold)[0]+1, labels, trgt
#L'étiquette qui est sortie dans la prédiction
#Faites-le softmax
predicted_labels = np.where(pp/np.sum(pp)>=threshold)[0]+1
# predicted_labels = [int(pl)]
#Étiquettes non incluses dans la bonne réponse
F_labels = set([l+1 for l in xrange(label_num)]).difference(label)
#Étiquettes non incluses dans les prévisions
predicted_F_labels = \
set([l+1 for l in xrange(label_num)]).difference(predicted_labels)
#Stocker TP dans le dictionnaire
print 'TP labels:'
print set(labels).intersection(predicted_labels)
for tp_l in set(labels).intersection(predicted_labels):
res_dict['TP'][tp_l] += 1
#Stocker FP dans le dictionnaire
print 'FP labels:'
print set(predicted_labels).difference(labels)
for fp_l in set(predicted_labels).difference(labels):
res_dict['FP'][fp_l] += 1
#Stocker FN dans le dictionnaire
print 'FN labels'
print set(labels).difference(predicted_labels)
for fn_l in set(labels).difference(predicted_labels):
res_dict['FN'][fn_l] += 1
#Stocker TN dans le dictionnaire
print 'TN labels'
print set(F_labels).intersection(predicted_F_labels)
for tn_l in set(F_labels).intersection(predicted_F_labels):
res_dict['TN'][tn_l] += 1
#Comptez le nombre de chaque étiquette correcte
for l in labels:
label_dict[l] += 1
for i_label in xrange(label_num):
print "label=",i_label+1
print 'TP:', res_dict['TP'][i_label+1], 'FP:',res_dict['FP'][i_label+1], 'FN:', res_dict['FN'][i_label+1], 'TN:',res_dict['TN'][i_label+1]
print float(res_dict['TP'][i_label+1])/label_dict[i_label+1], float(res_dict['FP'][i_label+1])/label_dict[i_label+1], float(res_dict['FN'][i_label+1])/label_dict[i_label+1], float(res_dict['TN'][i_label+1])/label_dict[i_label+1]
accuracy = float(res_dict['TP'][i_label+1]+res_dict['TN'][i_label+1])/(res_dict['TP'][i_label+1]+res_dict['FP'][i_label+1]+res_dict['FN'][i_label+1]+res_dict['TN'][i_label+1])
precision = float(res_dict['TP'][i_label+1])/(res_dict['TP'][i_label+1]+res_dict['FP'][i_label+1])
recall = float(res_dict['TP'][i_label+1])/(res_dict['TP'][i_label+1]+res_dict['FN'][i_label+1])
f_measure = (2*recall*precision)/(recall+precision)
print 'Accuracy:', accuracy, 'Precision:', precision, 'Recall:', recall, 'F-measure:', f_measure
#Corrigez l'étiquette de données correcte en un nombre compris entre 1 et n
def get_target(labels_info):
result = []
raw_target = labels_info[:,1:]
for line in raw_target:
result.append( np.where(line==1)[0]+1 )
return result
def get_labels():
pass
def get_labels_info(label_fname):
label_flag = []
label_flag_apd = label_flag.append
labels_info = []
labels_info_apd = labels_info.append
with open(label_fname, 'r') as f:
for line in f:
data = line.strip().split('\t')
label_flag_apd(int(data[0]))
labels_info_apd(
np.array(data[1].strip().split(' '), dtype=np.int32 )
)
return np.hstack( (np.array(label_flag).reshape((len(label_flag), 1)), np.array(labels_info)) )
if __name__=='__main__':
ifname = 'DBN_features.csv'
label_fname = 'original_data.csv'
X =np.loadtxt(ifname, delimiter=',')
labels_info = get_labels_info(label_fname)
##typ est{knn,rbf}Choisissez parmi
main(X, labels_info, 50, label_num=6, n_neighbors=7, alpha=0.2, typ='knn', threshold=0.5)
Nous vous prions de nous excuser pour la gêne occasionnée, mais nous vous serions reconnaissants de bien vouloir signaler toute erreur.
Recommended Posts