[JAVA] Comment encoder et calculer les bitmaps des ID utilisateur actifs avec des dates différentes dans MaxCompute

Cet article fournit des exemples de code qui montrent comment le module MapReduce de MaxCompute peut être utilisé pour coder et calculer des bitmaps d'ID utilisateur actifs avec des dates différentes.

Depuis Qu Ning

Bitmap est un Data Developer ? spm = a2c65.11461447.0.0.50376dabbEsxtJ) est une technique couramment utilisée pour encoder et compresser les données utilisateur. La vitesse de traitement rapide des opérations AND, OR et NOT sur les bitmaps permet aux développeurs de filtrer les utilisateurs en fonction des informations utilisateur telles que les balises de profil et d'analyser l'activité hebdomadaire.

Considérez l'exemple de code suivant.

import com.aliyun.odps.OdpsException;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.mapred.JobClient;
import com.aliyun.odps.mapred.MapperBase;
import com.aliyun.odps.mapred.ReducerBase;
import com.aliyun.odps.mapred.conf.JobConf;
import com.aliyun.odps.mapred.utils.InputUtils;
import com.aliyun.odps.mapred.utils.OutputUtils;
import com.aliyun.odps.mapred.utils.SchemaUtils;
import org.roaringbitmap.RoaringBitmap;
import org.roaringbitmap.buffer.ImmutableRoaringBitmap;

import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Base64;
import java.util.Iterator;

public class bitmapDemo2
{

    public static class BitMapper extends MapperBase {

        Record key;
        Record value;
        @Override
        public void setup(TaskContext context) throws IOException {
            key = context.createMapOutputKeyRecord();
            value = context.createMapOutputValueRecord();
        }

        @Override
        public void map(long recordNum, Record record, TaskContext context)
                throws IOException
        {
            RoaringBitmap mrb=new RoaringBitmap();
            long AID=0;
            {
                {
                    {
                        {
                            AID=record.getBigint("id");
                            mrb.add((int) AID);
                            //获 clé tori
                            key.set(new Object[] {record.getString("active_date")});

                        }
                    }
                }
            }
            ByteBuffer outbb = ByteBuffer.allocate(mrb.serializedSizeInBytes());
            mrb.serialize(new DataOutputStream(new OutputStream(){
                ByteBuffer mBB;
                OutputStream init(ByteBuffer mbb) {mBB=mbb; return this;}
                public void close() {}
                public void flush() {}
                public void write(int b) {
                    mBB.put((byte) b);}
                public void write(byte[] b) {mBB.put(b);}
                public void write(byte[] b, int off, int l) {mBB.put(b,off,l);}
            }.init(outbb)));
            String serializedstring = Base64.getEncoder().encodeToString(outbb.array());
            value.set(new Object[] {serializedstring});
            context.write(key, value);
        }
    }

    public static class BitReducer extends ReducerBase {
        private Record result = null;

        public void setup(TaskContext context) throws IOException {
            result = context.createOutputRecord();
        }

        public void reduce(Record key, Iterator<Record> values, TaskContext context) throws IOException {
            long fcount = 0;
            RoaringBitmap rbm=new RoaringBitmap();
            while (values.hasNext())
            {
                Record val = values.next();
                ByteBuffer newbb = ByteBuffer.wrap(Base64.getDecoder().decode((String)val.get(0)));
                ImmutableRoaringBitmap irb = new ImmutableRoaringBitmap(newbb);
                RoaringBitmap p= new RoaringBitmap(irb);
                rbm.or(p);
            }
            ByteBuffer outbb = ByteBuffer.allocate(rbm.serializedSizeInBytes());
            rbm.serialize(new DataOutputStream(new OutputStream(){
                ByteBuffer mBB;
                OutputStream init(ByteBuffer mbb) {mBB=mbb; return this;}
                public void close() {}
                public void flush() {}
                public void write(int b) {
                    mBB.put((byte) b);}
                public void write(byte[] b) {mBB.put(b);}
                public void write(byte[] b, int off, int l) {mBB.put(b,off,l);}
            }.init(outbb)));
            String serializedstring = Base64.getEncoder().encodeToString(outbb.array());
            result.set(0, key.get(0));
            result.set(1, serializedstring);
            context.write(result);
        }
    }
    public static void main( String[] args ) throws OdpsException
    {

        System.out.println("begin.........");
        JobConf job = new JobConf();
        
        job.setMapperClass(BitMapper.class);
        job.setReducerClass(BitReducer.class);

        job.setMapOutputKeySchema(SchemaUtils.fromString("active_date:string"));
        job.setMapOutputValueSchema(SchemaUtils.fromString("id:string"));

        InputUtils.addTable(TableInfo.builder().tableName("bitmap_source").cols(new String[] {"id","active_date"}).build(), job);
//        +------------+-------------+
//        | id         | active_date |
//        +------------+-------------+
//        | 1          | 20190729    |
//        | 2          | 20190729    |
//        | 3          | 20190730    |
//        | 4          | 20190801    |
//        | 5          | 20190801    |
//        +------------+-------------+
        OutputUtils.addTable(TableInfo.builder().tableName("bitmap_target").build(), job);
//        +-------------+------------+
//        | active_date | bit_map    |
//        +-------------+------------+
//        20190729,OjAAAAEAAAAAAAEAEAAAAAEAAgA=3D
//        20190730,OjAAAAEAAAAAAAAAEAAAAAMA
//        20190801,OjAAAAEAAAAAAAEAEAAAAAQABQA=3D

        JobClient.runJob(job);
}
}

Parlons maintenant de ce code. Après avoir empaqueté l'application Java et l'avoir téléchargée dans le projet MaxCompute, le développeur peut appeler ce qui est donné sur ce travail MapReduce dans MaxCompute. Pour les données de la table d'entrée, l'ID utilisateur est codé en utilisant la date comme clé et l'opération OR est effectuée sur l'ID utilisateur codé par le bitmap de la même date. Alternativement, si nécessaire, l'opération ET peut être effectuée, par exemple, en cas de rétention. Les données traitées sont ensuite écrites dans la table de structure cible pour un traitement ultérieur.

Recommended Posts

Comment encoder et calculer les bitmaps des ID utilisateur actifs avec des dates différentes dans MaxCompute
Comment gérer différentes versions de rbenv et Ruby
[Webpacker] Résumé de l'installation de Bootstrap et jQuery dans Rails 6.0
Comment supprimer de grandes quantités de données dans Rails et problèmes
Comment exécuter avec des commandes de langage de développement normales dans l'environnement de développement Docker
Comment modifier le nombre maximum et maximum de données POST dans Spark
[Rails] Comment obtenir les informations sur l'utilisateur actuellement connecté avec devise
Comment insérer un traitement avec n'importe quel nombre d'éléments dans le traitement itératif dans Ruby
Comment envoyer des métriques et des événements personnalisés à datadog avec laravel dans l'environnement docker-compose
Comment convertir une valeur d'un type différent et l'affecter à une autre variable
Comment utiliser Eclipse sur mon PC avec 32 bits et 2 Go de mémoire
Résumé de la sélection des éléments dans Selenium
Promesse JDBC et exemple d'écriture
Comment utiliser JQuery dans Rails 6 js.erb
Comment créer une API avec GraphQL et Rails
Comment rediriger après la connexion de l'utilisateur avec Spring-security
Comment implémenter le processus d'authentification en spécifiant le nom d'utilisateur et le mot de passe dans Spring Boot
Comment utiliser git avec la puissance de jgit dans un environnement sans commandes git
Comment définir une limite de relance pour sidekiq et notifier les files d'attente mortes avec Slack
[Explication approximative] Comment séparer le fonctionnement de l'environnement de production et de l'environnement de développement avec Rails
Résumé de l'utilisation du jeu de proxy dans IE lors de la connexion avec Java