Apprentissage semi-supervisé des étiquettes avec DBN et Label Spreading

Certaines personnes étaient fatiguées du travail d'étiquetage pour classer les documents dans l'apprentissage supervisé, alors j'ai fait une classification de texte d'apprentissage semi-supervisé afin que la classification puisse être faite avec un petit nombre d'étiquettes.

Matériel de référence

Ce que j'ai fait

procédure

Extraction de fonctionnalités avec DBN

Apprentissage semi-supervisé avec Label Spreading

avantage

Avantages de l'extraction de caractéristiques avec DBN

Avantages de la diffusion d'étiquettes

Code de référence

Désolé pour le code assez sale, mais je vais le coller ci-dessous. Le code DBN est une copie ronde du tutoriel Deep Learning. Le site du tutoriel d'apprentissage en profondeur a des explications avec des formules mathématiques, je voudrais donc y donner des explications détaillées.

Code DBN

Après avoir lu les données CSV au format BoW et extrait les fonctionnalités, on a l'impression de cracher le CSV. Veuillez noter qu'il contient certaines méthodes qui ne sont pas utilisées.

python


# coding:utf-8

from __future__ import unicode_literals
import time

import numpy as np
import theano
import theano.tensor as T
from theano.tensor.shared_randomstreams import RandomStreams


class DBN:
    def __init__(self, numpy_rng, theano_rng=None, n_ins=784,
                 hidden_layers_sizes=[500, 500], n_outs=10):
        self.sigmoid_layers = []
        self.rbm_layers = []
        self.params = []
        self.n_layers = len(hidden_layers_sizes)

        assert self.n_layers > 0

        if not theano_rng:
            theano_rng = RandomStreams(numpy_rng.randint(2**30))

        # allocate symbolic variables for the data
        self.x = T.matrix('x')
        self.y = T.ivector('y')

        for i in xrange(self.n_layers):
            if i==0:
                input_size = n_ins
                layer_input = self.x
            else:
                input_size = hidden_layers_sizes[i - 1]
                layer_input = self.sigmoid_layers[-1].output

            sigmoid_layer = HiddenLayer(rng=numpy_rng,
                                         input=layer_input,
                                         n_in=input_size,
                                         n_out=hidden_layers_sizes[i],
                                         activation=T.nnet.sigmoid)
            self.sigmoid_layers.append( sigmoid_layer )
            self.params.extend(sigmoid_layer.params)

            rbm_layer = RBM(numpy_rng=numpy_rng,
                            theano_rng=theano_rng,
                            input=layer_input,
                            n_visible=input_size,
                            n_hidden=hidden_layers_sizes[i],
                            W=sigmoid_layer.W,
                            hbias=sigmoid_layer.b)
            self.rbm_layers.append(rbm_layer)

            self.logLayer = LogisticRegression(
                input=self.sigmoid_layers[-1].output,
                n_in=hidden_layers_sizes[-1],
                n_out=n_outs)
            self.params.extend(self.logLayer.params)
            self.finetune_cost = self.logLayer.negative_log_likelihood(self.y)
            self.errors = self.logLayer.errors(self.y)

    def pretrainig_functions(self, train_set_x, batch_size, k):
        index = T.lscalar('index')
        learning_rate = T.scalar('lr')  # learning rate to use
        # number of batches
        n_batches = train_set_x.get_value(borrow=True).shape[0] / batch_size
        # begining of a batch, given `index`
        batch_begin = index * batch_size
        # ending of a batch given `index`
        batch_end = batch_begin + batch_size

        pretrain_fns = []
        for rbm in self.rbm_layers:

            cost, updates = rbm.get_cost_updates(learning_rate,
                                                 persistent=None, k=k)

            # compile the theano function
            fn = theano.function(
                inputs=[index, theano.Param(learning_rate, default=0.1)],
                outputs=cost,
                updates=updates,
                givens={
                    self.x: train_set_x[batch_begin:batch_end]
                }
            )
            # append `fn` to the list of functions
            pretrain_fns.append(fn)

        return pretrain_fns


    def build_finetune_functions(self, datasets, batch_size, learning_rate):
        (train_set_x, train_set_y) = datasets[0]
        (valid_set_x, valid_set_y) = datasets[1]
        (test_set_x, test_set_y) = datasets[2]

        # compute number of minibatches for training, validation and testing
        n_valid_batches = valid_set_x.get_value(borrow=True).shape[0]
        n_valid_batches /= batch_size
        n_test_batches = test_set_x.get_value(borrow=True).shape[0]
        n_test_batches /= batch_size

        index = T.lscalar('index')  # index to a [mini]batch

        # compute the gradients with respect to the model parameters
        gparams = T.grad(self.finetune_cost, self.params)

        # compute list of fine-tuning updates
        updates = []
        for param, gparam in zip(self.params, gparams):
            updates.append((param, param - gparam * learning_rate))

        train_fn = theano.function(
            inputs=[index],
            outputs=self.finetune_cost,
            updates=updates,
            givens={
                self.x: train_set_x[
                    index * batch_size: (index + 1) * batch_size
                ],
                self.y: train_set_y[
                    index * batch_size: (index + 1) * batch_size
                ]
            }
        )

        test_score_i = theano.function(
            [index],
            self.errors,
            givens={
                self.x: test_set_x[
                    index * batch_size: (index + 1) * batch_size
                ],
                self.y: test_set_y[
                    index * batch_size: (index + 1) * batch_size
                ]
            }
        )

        valid_score_i = theano.function(
            [index],
            self.errors,
            givens={
                self.x: valid_set_x[
                    index * batch_size: (index + 1) * batch_size
                ],
                self.y: valid_set_y[
                    index * batch_size: (index + 1) * batch_size
                ]
            }
        )

        # Create a function that scans the entire validation set
        def valid_score():
            return [valid_score_i(i) for i in xrange(n_valid_batches)]

        # Create a function that scans the entire test set
        def test_score():
            return [test_score_i(i) for i in xrange(n_test_batches)]

        return train_fn, valid_score, test_score


class HiddenLayer:
    def __init__(self, rng, input, n_in, n_out, W=None, b=None,
                 activation=T.tanh):
        self.input = input

        if W is None:
            W_values = np.asarray(
                rng.uniform(
                    low=-np.sqrt(6. / (n_in+n_out)),
                    high=np.sqrt(6. / (n_in+n_out)),
                    size=(n_in, n_out)
                ),
                dtype=theano.config.floatX
            )
            if activation == theano.tensor.nnet.sigmoid:
                W_values *=4
            W = theano.shared(value=W_values, name='W', borrow=True)

        if b is None:
            b_values = np.zeros((n_out,), dtype=theano.config.floatX)
            b = theano.shared(value=b_values, name='b', borrow=True)
        self.W = W
        self.b = b

        lin_output = T.dot(input, self.W) + self.b
        self.output = (
              lin_output if activation is None
              else activation(lin_output)
        )
        self.params = [self.W, self.b]

class LogisticRegression:
    def __init__(self, input, n_in, n_out):
        self.W = theano.shared(
            value=np.zeros(
                (n_in, n_out),
                dtype=theano.config.floatX
            ),
            name='W',
            borrow=True
        )
        # initialize the baises b as a vector of n_out 0s
        self.b = theano.shared(
            value=np.zeros(
                (n_out,),
                dtype=theano.config.floatX
            ),
            name='b',
            borrow=True
        )
        self.p_y_given_x = T.nnet.softmax(T.dot(input, self.W) + self.b)
        self.y_pred = T.argmax(self.p_y_given_x, axis=1)
        self.params = [self.W, self.b]

    def negative_log_likelihood(self, y):
        return -T.mean(T.log(self.p_y_given_x)[T.arange(y.shape[0]), y])

    def errors(self, y):
        # check if y has same dimension of y_pred
        if y.ndim != self.y_pred.ndim:
            raise TypeError(
                'y should have the same shape as self.y_pred',
                ('y', y.type, 'y_pred', self.y_pred.type)
            )
        # check if y is of the correct datatype
        if y.dtype.startswith('int'):
            return T.mean(T.neq(self.y_pred, y))
        else:
            raise NotImplementedError()


class RBM(object):
    """Restricted Boltzmann Machine (RBM)  """
    def __init__(
        self,
        input=None,
        n_visible=784,
        n_hidden=500,
        W=None,
        hbias=None,
        vbias=None,
        numpy_rng=None,
        theano_rng=None
    ):

        self.n_visible = n_visible
        self.n_hidden = n_hidden

        if numpy_rng is None:
            # create a number generator
            numpy_rng = np.random.RandomState(1234)

        if theano_rng is None:
            theano_rng = RandomStreams(numpy_rng.randint(2 ** 30))

        if W is None:
            initial_W = np.asarray(
                numpy_rng.uniform(
                    low=-4 * np.sqrt(6. / (n_hidden + n_visible)),
                    high=4 * np.sqrt(6. / (n_hidden + n_visible)),
                    size=(n_visible, n_hidden)
                ),
                dtype=theano.config.floatX
            )
            # theano shared variables for weights and biases
            W = theano.shared(value=initial_W, name='W', borrow=True)

        if hbias is None:
            # create shared variable for hidden units bias
            hbias = theano.shared(
                value=np.zeros(
                    n_hidden,
                    dtype=theano.config.floatX
                ),
                name='hbias',
                borrow=True
            )

        if vbias is None:
            # create shared variable for visible units bias
            vbias = theano.shared(
                value=np.zeros(
                    n_visible,
                    dtype=theano.config.floatX
                ),
                name='vbias',
                borrow=True
            )

        # initialize input layer for standalone RBM or layer0 of DBN
        self.input = input
        if not input:
            self.input = T.matrix('input')

        self.W = W
        self.hbias = hbias
        self.vbias = vbias
        self.theano_rng = theano_rng
        self.params = [self.W, self.hbias, self.vbias]

    def propup(self, vis):
        pre_sigmoid_activation = T.dot(vis, self.W) + self.hbias
        return [pre_sigmoid_activation, T.nnet.sigmoid(pre_sigmoid_activation)]


    def sample_h_given_v(self, v0_sample):
        ''' This function infers state of hidden units given visible units '''
        pre_sigmoid_h1, h1_mean = self.propup(v0_sample)
        h1_sample = self.theano_rng.binomial(size=h1_mean.shape,
                                             n=1, p=h1_mean,
                                             dtype=theano.config.floatX)
        return [pre_sigmoid_h1, h1_mean, h1_sample]


    def propdown(self, hid):
        pre_sigmoid_activation = T.dot(hid, self.W.T) + self.vbias
        return [pre_sigmoid_activation, T.nnet.sigmoid(pre_sigmoid_activation)]


    def sample_v_given_h(self, h0_sample):
        ''' This function infers state of visible units given hidden units '''
        # compute the activation of the visible given the hidden sample
        pre_sigmoid_v1, v1_mean = self.propdown(h0_sample)
        v1_sample = self.theano_rng.binomial(size=v1_mean.shape,
                                             n=1, p=v1_mean,
                                             dtype=theano.config.floatX)
        return [pre_sigmoid_v1, v1_mean, v1_sample]


    def gibbs_hvh(self, h0_sample):
        ''' This function implements one step of Gibbs sampling,
            starting from the hidden state'''
        pre_sigmoid_v1, v1_mean, v1_sample = self.sample_v_given_h(h0_sample)
        pre_sigmoid_h1, h1_mean, h1_sample = self.sample_h_given_v(v1_sample)
        return [pre_sigmoid_v1, v1_mean, v1_sample,
                pre_sigmoid_h1, h1_mean, h1_sample]


    def gibbs_vhv(self, v0_sample):
        ''' This function implements one step of Gibbs sampling,
            starting from the visible state'''
        pre_sigmoid_h1, h1_mean, h1_sample = self.sample_h_given_v(v0_sample)
        pre_sigmoid_v1, v1_mean, v1_sample = self.sample_v_given_h(h1_sample)
        return [pre_sigmoid_h1, h1_mean, h1_sample,
                pre_sigmoid_v1, v1_mean, v1_sample]


    def free_energy(self, v_sample):
        ''' Function to compute the free energy '''
        wx_b = T.dot(v_sample, self.W) + self.hbias
        vbias_term = T.dot(v_sample, self.vbias)
        hidden_term = T.sum(T.log(1 + T.exp(wx_b)), axis=1)
        return -hidden_term - vbias_term


    def get_cost_updates(self, lr=0.1, persistent=None, k=1):
        # compute positive phase
        pre_sigmoid_ph, ph_mean, ph_sample = self.sample_h_given_v(self.input)

        if persistent is None:
            chain_start = ph_sample
        else:
            chain_start = persistent
        (
            [
                pre_sigmoid_nvs,
                nv_means,
                nv_samples,
                pre_sigmoid_nhs,
                nh_means,
                nh_samples
            ],
            updates
        ) = theano.scan(
            self.gibbs_hvh,
            outputs_info=[None, None, None, None, None, chain_start],
            n_steps=k
        )

        chain_end = nv_samples[-1]
        cost = T.mean(self.free_energy(self.input)) - T.mean(
            self.free_energy(chain_end))
        # We must not compute the gradient through the gibbs sampling
        gparams = T.grad(cost, self.params, consider_constant=[chain_end])


        for gparam, param in zip(gparams, self.params):
            # make sure that the learning rate is of the right dtype
            updates[param] = param - gparam * T.cast(
                lr,
                dtype=theano.config.floatX
            )
        if persistent:
            # Note that this works only if persistent is a shared variable
            updates[persistent] = nh_samples[-1]
            # pseudo-likelihood is a better proxy for PCD
            monitoring_cost = self.get_pseudo_likelihood_cost(updates)
        else:
            # reconstruction cross-entropy is a better proxy for CD
            monitoring_cost = self.get_reconstruction_cost(updates,
                                                           pre_sigmoid_nvs[-1])

        return monitoring_cost, updates

    def get_pseudo_likelihood_cost(self, updates):
        """Stochastic approximation to the pseudo-likelihood"""

        # index of bit i in expression p(x_i | x_{\i})
        bit_i_idx = theano.shared(value=0, name='bit_i_idx')

        # binarize the input image by rounding to nearest integer
        xi = T.round(self.input)

        # calculate free energy for the given bit configuration
        fe_xi = self.free_energy(xi)

        # flip bit x_i of matrix xi and preserve all other bits x_{\i}
        # Equivalent to xi[:,bit_i_idx] = 1-xi[:, bit_i_idx], but assigns
        # the result to xi_flip, instead of working in place on xi.
        xi_flip = T.set_subtensor(xi[:, bit_i_idx], 1 - xi[:, bit_i_idx])

        # calculate free energy with bit flipped
        fe_xi_flip = self.free_energy(xi_flip)

        # equivalent to e^(-FE(x_i)) / (e^(-FE(x_i)) + e^(-FE(x_{\i})))
        cost = T.mean(self.n_visible * T.log(T.nnet.sigmoid(fe_xi_flip -
                                                            fe_xi)))

        # increment bit_i_idx % number as part of updates
        updates[bit_i_idx] = (bit_i_idx + 1) % self.n_visible

        return cost


    def get_reconstruction_cost(self, updates, pre_sigmoid_nv):
        cross_entropy = T.mean(
             T.sum(
                 self.input * T.log(T.nnet.sigmoid(pre_sigmoid_nv)) +
                 (1 - self.input) * T.log(1 - T.nnet.sigmoid(pre_sigmoid_nv)),
                 axis=1
             )
        )

        return cross_entropy

def output(input_data, w, b):
    x = np.dot(input_data,w)+np.kron( np.ones((input_data.shape[0],1)),b)
    return 1/(1+np.exp(-x))

if __name__=='__main__':
    numpy_rng = np.random.RandomState(123)
    print '... building the model'

    ifname = 'bow_data.csv'
    data = np.loadtxt(ifname, delimiter=',')
    train_set_x = theano.shared(np.asarray(data, np.float64))

    dbn = DBN(numpy_rng=numpy_rng, n_ins=data.shape[1],
              hidden_layers_sizes=[2000, 1000, 100],
              n_outs=10)
    #########################
    # PRETRAINING THE MODEL #
    #########################
    print '... getting the pretraining functions'
    batch_size=10
    k = 5
    pretraining_fns = dbn.pretrainig_functions(train_set_x=train_set_x,
                                                batch_size=batch_size,
                                                k=k)

    print '... pre-training the model'
    pretraining_epochs = 100
    n_train_batches = 10
    pretrain_lr = 0.1
    ## Pre-train layer-wise
    for i in xrange(dbn.n_layers):
        # go through pretraining epochs
        for epoch in xrange(pretraining_epochs):
            # go through the training set
            c = []
            for batch_index in xrange(n_train_batches):
                c.append(pretraining_fns[i](index=batch_index,
                                            lr=pretrain_lr))
            print 'Pre-training layer %i, epoch %d, cost ' % (i, epoch),
            print np.mean(c)

    layer_output =[]
    for i in xrange(dbn.n_layers):
        w = dbn.rbm_layers[i].W.get_value()
        hbias = dbn.rbm_layers[i].hbias.get_value()
        if i==0:
            layer_output.append( train_set_x.get_value() )
            layer_output.append( output(layer_output[-1],w, hbias) )
        else:
            layer_output.append( output(layer_output[-1],w, hbias) )
    print layer_output[-1]
    np.savetxt('DBN_features.csv',layer_output[-1], delimiter=',')

Code de diffusion d'étiquettes

Le contenu de original_data.csv est (0 \ t1 0 1 0 0 0 0 \ txxxx) et se compose d'une ligne. Ce qu'il représente est de la gauche Indicateur de données couvrant plusieurs classes, indicateur de chaque classe, texte Il est devenu. Dans le code ci-dessous, lors de la formation, les données avec une seule étiquette sont utilisées autant que possible comme données d'enseignant. J'ai ajouté une fonction pour que vous puissiez la vérifier avec les données d'iris, veuillez donc l'utiliser si vous souhaitez essayer la diffusion d'étiquettes lorsque vous ne disposez pas de données appropriées.

python


# coding: utf-8

from sklearn import datasets
from sklearn.semi_supervised import LabelSpreading
import numpy as np
from numpy.random import seed
seed(555)
from collections import defaultdict

def iris():
    iris = datasets.load_iris()
    random_unlabeled_points = np.where(np.random.random_integers(0, 1, size=len(iris.target)))
    labels = np.copy(iris.target)
    labels[random_unlabeled_points] = -1

    label_prop_model = LabelSpreading()
    label_prop_model.fit(iris.data, labels) # unlabeled as -1
    pred_prop = label_prop_model.predict_proba(iris.data)
    pred_label = label_prop_model.predict(iris.data)

    for pp, pl, label, trgt in zip(pred_prop,pred_label,labels,iris.target):
        print pp, pl, label, trgt

def main(X, labels_info, min_number=20, label_num=6, n_neighbors=7, alpha=0.3, typ='knn', threshold=0.5):
    target = get_target(labels_info)
    random_unlabeled_points = np.where(np.random.random_integers(0, 1, size=len(target)))[0]
    cnt_dict = defaultdict(int)
    for i, t in enumerate(target):
        if len(t)==1 and (i in random_unlabeled_points):
            target[i] = -1
            cnt_dict[-1] += 1
        elif len(t)>=2:
            target[i] = -1
            cnt_dict[-1] += 1
        elif cnt_dict[target[i][0]]<min_number:
            target[i] = target[i][0]
            cnt_dict[target[i]] += 1
        elif cnt_dict[target[i][0]]>=min_number:
            target[i] = -1
            cnt_dict[target[i]] += 1
    print cnt_dict

    if typ=='knn':
        label_prop_model = LabelSpreading(kernel=typ, n_neighbors=n_neighbors)
    else:
        label_prop_model = LabelSpreading(kernel=typ, alpha=alpha)
    label_prop_model.fit(X, target) # unlabeled as -1
    pred_prop = label_prop_model.predict_proba(X)
    pred_label = label_prop_model.predict(X)

    res_dict = defaultdict(dict)  # TP, FP, FN,Magasin TN
    for label in ('TP', 'FP', 'FN', 'TN'):
        res_dict[label] = defaultdict(int)
    label_dict = defaultdict(int)

    for pp, pl, labels, trgt in zip(pred_prop,pred_label,get_target(labels_info),target):
        #étiquette est l'étiquette de réponse correcte
        print pp, np.where(pp>=threshold)[0]+1, labels, trgt
        #L'étiquette qui est sortie dans la prédiction
        #Faites-le softmax
        predicted_labels = np.where(pp/np.sum(pp)>=threshold)[0]+1
        # predicted_labels = [int(pl)]
        #Étiquettes non incluses dans la bonne réponse
        F_labels = set([l+1 for l in xrange(label_num)]).difference(label)
        #Étiquettes non incluses dans les prévisions
        predicted_F_labels = \
                    set([l+1 for l in xrange(label_num)]).difference(predicted_labels)

        #Stocker TP dans le dictionnaire
        print 'TP labels:'
        print set(labels).intersection(predicted_labels)
        for tp_l in set(labels).intersection(predicted_labels):
            res_dict['TP'][tp_l] += 1
        #Stocker FP dans le dictionnaire
        print 'FP labels:'
        print set(predicted_labels).difference(labels)
        for fp_l in set(predicted_labels).difference(labels):
            res_dict['FP'][fp_l] += 1
        #Stocker FN dans le dictionnaire
        print 'FN labels'
        print set(labels).difference(predicted_labels)
        for fn_l in set(labels).difference(predicted_labels):
            res_dict['FN'][fn_l] += 1
        #Stocker TN dans le dictionnaire
        print 'TN labels'
        print set(F_labels).intersection(predicted_F_labels)
        for tn_l in set(F_labels).intersection(predicted_F_labels):
            res_dict['TN'][tn_l] += 1
        #Comptez le nombre de chaque étiquette correcte
        for l in labels:
            label_dict[l] += 1

    for i_label in xrange(label_num):
        print "label=",i_label+1
        print 'TP:', res_dict['TP'][i_label+1], 'FP:',res_dict['FP'][i_label+1], 'FN:', res_dict['FN'][i_label+1], 'TN:',res_dict['TN'][i_label+1]
        print float(res_dict['TP'][i_label+1])/label_dict[i_label+1], float(res_dict['FP'][i_label+1])/label_dict[i_label+1], float(res_dict['FN'][i_label+1])/label_dict[i_label+1], float(res_dict['TN'][i_label+1])/label_dict[i_label+1]
        accuracy = float(res_dict['TP'][i_label+1]+res_dict['TN'][i_label+1])/(res_dict['TP'][i_label+1]+res_dict['FP'][i_label+1]+res_dict['FN'][i_label+1]+res_dict['TN'][i_label+1])
        precision = float(res_dict['TP'][i_label+1])/(res_dict['TP'][i_label+1]+res_dict['FP'][i_label+1])
        recall = float(res_dict['TP'][i_label+1])/(res_dict['TP'][i_label+1]+res_dict['FN'][i_label+1])
        f_measure = (2*recall*precision)/(recall+precision)
        print 'Accuracy:', accuracy, 'Precision:', precision, 'Recall:', recall, 'F-measure:', f_measure

#Corrigez l'étiquette de données correcte en un nombre compris entre 1 et n
def get_target(labels_info):
    result = []
    raw_target = labels_info[:,1:]
    for line in raw_target:
        result.append( np.where(line==1)[0]+1 )
    return result

def get_labels():
    pass

def get_labels_info(label_fname):
    label_flag = []
    label_flag_apd = label_flag.append
    labels_info = []
    labels_info_apd = labels_info.append
    with open(label_fname, 'r') as f:
        for line in f:
            data = line.strip().split('\t')
            label_flag_apd(int(data[0]))
            labels_info_apd(
                            np.array(data[1].strip().split(' '), dtype=np.int32 )
                            )
    return np.hstack( (np.array(label_flag).reshape((len(label_flag), 1)), np.array(labels_info)) )

if __name__=='__main__':
    ifname = 'DBN_features.csv'
    label_fname = 'original_data.csv'

    X =np.loadtxt(ifname, delimiter=',')
    labels_info = get_labels_info(label_fname)

    ##typ est{knn,rbf}Choisissez parmi
    main(X, labels_info, 50, label_num=6, n_neighbors=7, alpha=0.2, typ='knn', threshold=0.5)

Nous vous prions de nous excuser pour la gêne occasionnée, mais nous vous serions reconnaissants de bien vouloir signaler toute erreur.

Recommended Posts

Apprentissage semi-supervisé des étiquettes avec DBN et Label Spreading
Collecte et automatisation d'images érotiques à l'aide du deep learning
Examen de la méthode de prévision des échanges utilisant le Deep Learning et la conversion en ondelettes - Partie 2
Ce que j'ai appris sur l'IA / l'apprentissage automatique avec Python (4)