Dieser Artikel enthält Codebeispiele, die zeigen, wie das MapReduce-Modul von MaxCompute zum Codieren und Berechnen von Bitmaps aktiver Benutzer-IDs mit unterschiedlichen Daten verwendet werden kann.
Von Qu Ning
Bitmap ist ein Datenentwickler ? spm = a2c65.11461447.0.0.50376dabbEsxtJ) ist eine häufig verwendete Technik zum Codieren und Komprimieren von Benutzerdaten. Die schnelle Verarbeitungsgeschwindigkeit von UND-, ODER- und NICHT-Operationen auf Bitmaps ermöglicht es Entwicklern, Benutzer nach Benutzerinformationen wie Profil-Tags zu filtern und wöchentliche Aktivitäten zu analysieren.
Betrachten Sie das folgende Codebeispiel.
import com.aliyun.odps.OdpsException;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.mapred.JobClient;
import com.aliyun.odps.mapred.MapperBase;
import com.aliyun.odps.mapred.ReducerBase;
import com.aliyun.odps.mapred.conf.JobConf;
import com.aliyun.odps.mapred.utils.InputUtils;
import com.aliyun.odps.mapred.utils.OutputUtils;
import com.aliyun.odps.mapred.utils.SchemaUtils;
import org.roaringbitmap.RoaringBitmap;
import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Base64;
import java.util.Iterator;
public class bitmapDemo2
{
public static class BitMapper extends MapperBase {
Record key;
Record value;
@Override
public void setup(TaskContext context) throws IOException {
key = context.createMapOutputKeyRecord();
value = context.createMapOutputValueRecord();
}
@Override
public void map(long recordNum, Record record, TaskContext context)
throws IOException
{
RoaringBitmap mrb=new RoaringBitmap();
long AID=0;
{
{
{
{
AID=record.getBigint("id");
mrb.add((int) AID);
//获 Tori-Schlüssel
key.set(new Object[] {record.getString("active_date")});
}
}
}
}
ByteBuffer outbb = ByteBuffer.allocate(mrb.serializedSizeInBytes());
mrb.serialize(new DataOutputStream(new OutputStream(){
ByteBuffer mBB;
OutputStream init(ByteBuffer mbb) {mBB=mbb; return this;}
public void close() {}
public void flush() {}
public void write(int b) {
mBB.put((byte) b);}
public void write(byte[] b) {mBB.put(b);}
public void write(byte[] b, int off, int l) {mBB.put(b,off,l);}
}.init(outbb)));
String serializedstring = Base64.getEncoder().encodeToString(outbb.array());
value.set(new Object[] {serializedstring});
context.write(key, value);
}
}
public static class BitReducer extends ReducerBase {
private Record result = null;
public void setup(TaskContext context) throws IOException {
result = context.createOutputRecord();
}
public void reduce(Record key, Iterator<Record> values, TaskContext context) throws IOException {
long fcount = 0;
RoaringBitmap rbm=new RoaringBitmap();
while (values.hasNext())
{
Record val = values.next();
ByteBuffer newbb = ByteBuffer.wrap(Base64.getDecoder().decode((String)val.get(0)));
ImmutableRoaringBitmap irb = new ImmutableRoaringBitmap(newbb);
RoaringBitmap p= new RoaringBitmap(irb);
rbm.or(p);
}
ByteBuffer outbb = ByteBuffer.allocate(rbm.serializedSizeInBytes());
rbm.serialize(new DataOutputStream(new OutputStream(){
ByteBuffer mBB;
OutputStream init(ByteBuffer mbb) {mBB=mbb; return this;}
public void close() {}
public void flush() {}
public void write(int b) {
mBB.put((byte) b);}
public void write(byte[] b) {mBB.put(b);}
public void write(byte[] b, int off, int l) {mBB.put(b,off,l);}
}.init(outbb)));
String serializedstring = Base64.getEncoder().encodeToString(outbb.array());
result.set(0, key.get(0));
result.set(1, serializedstring);
context.write(result);
}
}
public static void main( String[] args ) throws OdpsException
{
System.out.println("begin.........");
JobConf job = new JobConf();
job.setMapperClass(BitMapper.class);
job.setReducerClass(BitReducer.class);
job.setMapOutputKeySchema(SchemaUtils.fromString("active_date:string"));
job.setMapOutputValueSchema(SchemaUtils.fromString("id:string"));
InputUtils.addTable(TableInfo.builder().tableName("bitmap_source").cols(new String[] {"id","active_date"}).build(), job);
// +------------+-------------+
// | id | active_date |
// +------------+-------------+
// | 1 | 20190729 |
// | 2 | 20190729 |
// | 3 | 20190730 |
// | 4 | 20190801 |
// | 5 | 20190801 |
// +------------+-------------+
OutputUtils.addTable(TableInfo.builder().tableName("bitmap_target").build(), job);
// +-------------+------------+
// | active_date | bit_map |
// +-------------+------------+
// 20190729,OjAAAAEAAAAAAAEAEAAAAAEAAgA=3D
// 20190730,OjAAAAEAAAAAAAAAEAAAAAMA
// 20190801,OjAAAAEAAAAAAAEAEAAAAAQABQA=3D
JobClient.runJob(job);
}
}
Lassen Sie uns nun über diesen Code sprechen. Nach dem Packen der Java-Anwendung und dem Hochladen in das MaxCompute-Projekt kann der Entwickler die Angaben zu diesem MapReduce-Job in MaxCompute aufrufen. Für die Daten in der Eingabetabelle wird die Benutzer-ID unter Verwendung des Datums als Schlüssel codiert, und die ODER-Operation wird für die Benutzer-ID ausgeführt, die durch die Bitmap desselben Datums codiert ist. Alternativ kann, falls erforderlich, die UND-Operation ausgeführt werden, beispielsweise im Fall einer Aufbewahrung. Die verarbeiteten Daten werden dann zur weiteren Verarbeitung in die Zielstrukturtabelle geschrieben.