[JAVA] Comptage de la fréquence d'apparition des mots dans les phrases par traitement de flux (Apache Apex) Partie 2 Codage

[Comptage de la fréquence d'apparition des mots dans les phrases par traitement de flux \ (Apache Apex ) Part 2 \ -Coding \ - \ - Mauvais modèle de phrase](http://koheikimura.hatenablog.com/entry/2017/06 / 22/19 3000).

Ceci est une suite de Compter la fréquence d'apparition des mots dans les phrases par traitement de flux \ (Apache Apex ) \ -Qiita. La dernière fois, j'ai uniquement construit et exécuté l'environnement, et je n'ai pas touché au contenu de l'exemple de code. Donc, cette fois, je vais lire l'exemple de code pour avoir une idée du développement d'applications de streaming Apex.

Aperçu

Les 7 exemples de fichiers de code suivants sont disponibles.

topnwords - GitHub

référence

ApplicationWordCount

Le premier est l'application elle-même. Tout ce que vous avez à faire est de créer un opérateur et de les diffuser ensemble pour créer un DAG.

ApplicationWordCount.java


package com.example.myapexapp;


import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.DAG;

import org.apache.hadoop.conf.Configuration;

@ApplicationAnnotation(name="SortedWordCount")
public class ApplicationWordCount implements StreamingApplication
{
  private static final Logger LOG = LoggerFactory.getLogger(ApplicationWordCount.class);

  @Override
  public void populateDAG(DAG dag, Configuration conf)
  {
    // create operators

    LineReader lineReader            = dag.addOperator("lineReader", new LineReader());
    WordReader wordReader            = dag.addOperator("wordReader", new WordReader());
    WindowWordCount windowWordCount  = dag.addOperator("windowWordCount", new WindowWordCount());
    FileWordCount fileWordCount      = dag.addOperator("fileWordCount", new FileWordCount());
    WordCountWriter wcWriter         = dag.addOperator("wcWriter", new WordCountWriter());

    // create streams

    dag.addStream("lines",   lineReader.output,  wordReader.input);
    dag.addStream("control", lineReader.control, fileWordCount.control);
    dag.addStream("words",   wordReader.output,  windowWordCount.input);
    dag.addStream("windowWordCounts", windowWordCount.output, fileWordCount.input);
    dag.addStream("fileWordCounts", fileWordCount.fileOutput, wcWriter.input);
  }

}

Il semble que vous puissiez définir le nom de l'application avec @ApplicationAnnotation (name =" SortedWordCount "). Cela correspond au nom qui est apparu la dernière fois qu'il a été exécuté.

apex> launch target/myapexapp-1.0-SNAPSHOT.apa
  1. MyFirstApplication
  2. SortedWordCount     #cette
Choose application: 2
{"appId": "application_1496704660177_0001"}
apex (application_1496704660177_0001) > 

La figure ci-dessous montre la relation entre chaque opérateur et le flux. C'est presque direct, mais il y a un flux appelé control qui vole de lineReader à fileWordCount. Dans Apex, une application est créée en connectant des opérateurs de cette manière pour créer un DAG. Le point de connexion de l'opérateur est appelé un port.

image.png

LineReader

Commençons par le premier opérateur lineReader. Cet opérateur lit le fichier, envoie son contenu vers le port ʻoutputet le chemin du fichier verscontrol`.

LineReader.java


package com.example.myapexapp;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;

import org.apache.hadoop.fs.Path;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.lib.io.fs.AbstractFileInputOperator;

// reads lines from input file and returns them; if end-of-file is reached, a control tuple
// is emitted on the control port
//
public class LineReader extends AbstractFileInputOperator<String>
{
  private static final Logger LOG = LoggerFactory.getLogger(LineReader.class);

  public final transient DefaultOutputPort<String> output  = new DefaultOutputPort<>();

  @OutputPortFieldAnnotation(optional = true)
  public final transient DefaultOutputPort<String> control = new DefaultOutputPort<>();

  private transient BufferedReader br = null;

  private Path path;

  @Override
  protected InputStream openFile(Path curPath) throws IOException
  {
    LOG.info("openFile: curPath = {}", curPath);
    path = curPath;
    InputStream is = super.openFile(path);
    br = new BufferedReader(new InputStreamReader(is));
    return is;
  }

  @Override
  protected void closeFile(InputStream is) throws IOException
  {
    super.closeFile(is);
    br.close();
    br = null;
    path = null;
  }

  // return empty string 
  @Override
  protected String readEntity() throws IOException
  {
    // try to read a line
    final String line = br.readLine();
    if (null != line) {    // common case
      LOG.debug("readEntity: line = {}", line);
      return line;
    }

    // end-of-file; send control tuple, containing only the last component of the path
    // (only file name) on control port
    //
    if (control.isConnected()) {
      LOG.info("readEntity: EOF for {}", path);
      final String name = path.getName();    // final component of path
      control.emit(name);
    }

    return null;
  }

  @Override
  protected void emit(String tuple)
  {
    output.emit(tuple);
  }
}

Variables de membre

Il y a cinq membres de cette classe d'opérateur:

--LOG: enregistreur --output: port de sortie --control: port de sortie --br: tampon --path: chemin du fichier

Aucune explication particulière n'est requise, mais pour le contrôle

@OutputPortFieldAnnotation(optional = true)

Une annotation a été ajoutée. L'option est vraie. Je ne sais pas pourquoi c'est facultatif. Je l'ajouterai quand je comprendrai.

Méthode

Dans LineReader, les méthodes ʻopenFile, closeFile, readEntity, ʻemit sont définies, et le traitement lorsque le fichier est ouvert, le traitement lorsqu'il est fermé et le prochain taple du fichier ouvert. Décrit le processus de lecture et le processus de tapotement.

readEntitiy lit une ligne du tampon et la renvoie. Après avoir lu jusqu'à EOF, le nom du fichier est envoyé au port contorl et le flux est arrêté. Ce qui est retourné comme valeur de retour est le tuple suivant, qui est traité par la méthode ʻemit`. Renvoyer null mettra fin au flux.

LineReader.java


  // return empty string 
  @Override
  protected String readEntity() throws IOException
  {
    // try to read a line
    final String line = br.readLine();
    if (null != line) {    // common case
      LOG.debug("readEntity: line = {}", line);
      return line;
    }

    // end-of-file; send control tuple, containing only the last component of the path
    // (only file name) on control port
    //
    if (control.isConnected()) {
      LOG.info("readEntity: EOF for {}", path);
      final String name = path.getName();    // final component of path
      control.emit(name);
    }

    return null;
  }

WordReader

WordReader divise une ligne de données envoyées par LineReader en mots.

WordReader.java


package com.example.myapexapp;

import java.util.regex.Pattern;

import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.common.util.BaseOperator;

// extracts words from input line
public class WordReader extends BaseOperator
{
  // default pattern for word-separators
  private static final Pattern nonWordDefault = Pattern.compile("[\\p{Punct}\\s]+");

  private String nonWordStr;    // configurable regex
  private transient Pattern nonWord;      // compiled regex

  public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>();

  public final transient DefaultInputPort<String>
    input = new DefaultInputPort<String>() {

    @Override
    public void process(String line)
    {
      // line; split it into words and emit them
      final String[] words = nonWord.split(line);
      for (String word : words) {
        if (word.isEmpty()) continue;
        output.emit(word);
      }
    }
  };

  public String getNonWordStr() {
    return nonWordStr;
  }

  public void setNonWordStr(String regex) {
    nonWordStr = regex;
  }

  @Override
  public void setup(OperatorContext context)
  {
    if (null == nonWordStr) {
      nonWord = nonWordDefault;
    } else {
      nonWord = Pattern.compile(nonWordStr);
    }
  }

}

Variables de membre

--nonWordDefault: modèle de division de mot par défaut --nonWordStr: Expression régulière pour le fractionnement de mots --nonWord: modèle de partage de mots --output: port de sortie --input: port d'entrée

Je pense que c'est presque inutile d'expliquer, mais le port d'entrée est différent du port de sortie, et ce n'est pas seulement nouveau, mais certains traitements sont décrits.

WordReader.java


  public final transient DefaultInputPort<String>
    input = new DefaultInputPort<String>() {

    @Override
    public void process(String line)
    {
      // line; split it into words and emit them
      final String[] words = nonWord.split(line);
      for (String word : words) {
        if (word.isEmpty()) continue;
        output.emit(word);
      }
    }
  };

Le port d'entrée décrit comment traiter les données d'entrée. C'est la même chose pour les autres opérateurs, et les opérateurs de WordReader à FileWordCount ont également une méthode process définie pour chaque port d'entrée. Ici, il semble que les données d'entrée soient divisées en mots avec un motif et envoyées au port de sortie.

Méthode

getNonWordStr et setNonWordStr ne sont que des getters et des setters, n'est-ce pas? setup est appelé pendant l'installation pour configurer nonWord.

WordReader.java


  @Override
  public void setup(OperatorContext context)
  {
    if (null == nonWordStr) {
      nonWord = nonWordDefault;
    } else {
      nonWord = Pattern.compile(nonWordStr);
    }
  }

WindowWordCount WindowWordCount regroupe les mots envoyés par WordReader dans une fenêtre en une paire mot / fréquence.

WindowWordCount.java


package com.example.myapexapp;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.common.util.BaseOperator;

// Computes word frequency counts per window and emits them at each endWindow. The output is a
// list of pairs (word, frequency).
//
public class WindowWordCount extends BaseOperator
{
  private static final Logger LOG = LoggerFactory.getLogger(WindowWordCount.class);

  // wordMap : word => frequency
  protected Map<String, WCPair> wordMap = new HashMap<>();

  public final transient DefaultInputPort<String> input = new DefaultInputPort<String>()
  {
    @Override
    public void process(String word)
    {
      WCPair pair = wordMap.get(word);
      if (null != pair) {    // word seen previously
        pair.freq += 1;
        return;
      }

      // new word
      pair = new WCPair();
      pair.word = word;
      pair.freq = 1;
      wordMap.put(word, pair);
    }
  };

  // output port which emits the list of word frequencies for current window
  // fileName => list of (word, freq) pairs
  //
  public final transient DefaultOutputPort<List<WCPair>> output = new DefaultOutputPort<>();

  @Override
  public void endWindow()
  {
    LOG.info("WindowWordCount: endWindow");

    // got EOF; if no words found, do nothing
    if (wordMap.isEmpty()) return;

    // have some words; emit single map and reset for next file
    final ArrayList<WCPair> list = new ArrayList<>(wordMap.values());
    output.emit(list);
    list.clear();
    wordMap.clear();
  }

}

Variables de membre

--LOG: enregistreur --wordMap: une carte des paires de mots et de fréquences --input: port d'entrée --output: port de sortie

Le mot et la paire de fréquences «wordMap» sont définis comme «WCPair» dans «WCPair.java».

WCPair.java


package com.example.myapexapp;

// a single (word, frequency) pair
public class WCPair {
  public String word;
  public int freq;

  public WCPair() {}

  public WCPair(String w, int f) {
    word = w;
    freq = f;
  }
  
  @Override
  public String toString() {
    return String.format("(%s, %d)", word, freq);
  }
}

Jetons un coup d'œil au contenu de ʻinput. Puisque la sortie de WordReaderest un mot, l'entrée est un mot. Recherche le mot entré danswordMap, ajoute 1 à la fréquence s'il existe et crée un nouveau WCPair` s'il n'existe pas et l'ajoute à la carte.

WindowWordCount.java


  public final transient DefaultInputPort<String> input = new DefaultInputPort<String>()
  {
    @Override
    public void process(String word)
    {
      WCPair pair = wordMap.get(word);
      if (null != pair) {    // word seen previously
        pair.freq += 1;
        return;
      }

      // new word
      pair = new WCPair();
      pair.word = word;
      pair.freq = 1;
      wordMap.put(word, pair);
    }
  };

Méthode

La seule méthode est ʻendWindow`. Cette méthode est appelée lorsque la fenêtre est fermée. La carte est répertoriée et envoyée au port de sortie. Il semble que la fenêtre n'est pas définie, il semble donc qu'il existe une valeur par défaut, mais je ne suis pas sûr de la valeur par défaut. Je l'ajouterai quand je comprendrai.

WindowWordCount.java


  public void endWindow()
  {
    LOG.info("WindowWordCount: endWindow");

    // got EOF; if no words found, do nothing
    if (wordMap.isEmpty()) return;

    // have some words; emit single map and reset for next file
    final ArrayList<WCPair> list = new ArrayList<>(wordMap.values());
    output.emit(list);
    list.clear();
    wordMap.clear();
  }

FileWordCount

FileWordCount regroupe la fréquence des occurrences de mots de l'ensemble du fichier.

FileWordCount.java


public class FileWordCount extends BaseOperator
{
  private static final Logger LOG = LoggerFactory.getLogger(FileWordCount.class);

  // set to true when we get an EOF control tuple
  protected boolean eof = false;

  // last component of path (i.e. only file name)
  // incoming value from control tuple
  protected String fileName;

  // wordMapFile   : {word => frequency} map, current file, all words
  protected Map<String, WCPair> wordMapFile = new HashMap<>();

  // singleton map of fileName to sorted list of (word, frequency) pairs
  protected transient Map<String, Object> resultFileFinal;

  // final sorted list of (word,frequency) pairs
  protected transient List<WCPair> fileFinalList;

  public final transient DefaultInputPort<List<WCPair>> input = new DefaultInputPort<List<WCPair>>()
  {
    @Override
    public void process(List<WCPair> list)
    {
      // blend incoming list into wordMapFile and wordMapGlobal
      for (WCPair pair : list) {
        final String word = pair.word;
        WCPair filePair = wordMapFile.get(word);
        if (null != filePair) {    // word seen previously in current file
          filePair.freq += pair.freq;
          continue;
        }

        // new word in current file
        filePair = new WCPair(word, pair.freq);
        wordMapFile.put(word, filePair);
      }
    }
  };

  public final transient DefaultInputPort<String> control = new DefaultInputPort<String>()
  {
    @Override
    public void process(String msg)
    {
      if (msg.isEmpty()) {    // sanity check
        throw new RuntimeException("Empty file path");
      }
      LOG.info("FileWordCount: EOF for {}", msg);
      fileName = msg;
      eof = true;
      // NOTE: current version only supports processing one file at a time.
    }
  };

  // fileOutput -- tuple is singleton map {<fileName> => fileFinalList}; emitted on EOF
  public final transient DefaultOutputPort<Map<String, Object>>
    fileOutput = new DefaultOutputPort<>();

  @Override
  public void setup(OperatorContext context)
  {
    // singleton map {<fileName> => fileFinalList}; cannot populate it yet since we need fileName
    resultFileFinal = new HashMap<>(1);
    fileFinalList = new ArrayList<>();
  }

  @Override
  public void endWindow()
  {
    LOG.info("FileWordCount: endWindow for {}", fileName);

    if (wordMapFile.isEmpty()) {    // no words found
      if (eof) {                    // write empty list to fileOutput port
        // got EOF, so output empty list to output file
        fileFinalList.clear();
        resultFileFinal.put(fileName, fileFinalList);
        fileOutput.emit(resultFileFinal);

        // reset for next file
        eof = false;
        fileName = null;
        resultFileFinal.clear();
      }
      LOG.info("FileWordCount: endWindow for {}, no words", fileName);
      return;
    }

    LOG.info("FileWordCount: endWindow for {}, wordMapFile.size = {}, eof = {}",
             fileName, wordMapFile.size(), eof);

    if (eof) {                     // got EOF earlier
      if (null == fileName) {      // need file name to emit topN pairs to file writer
        throw new RuntimeException("EOF but no fileName at endWindow");
      }

      // sort list from wordMapFile into fileFinalList and emit it
      getList(wordMapFile);
      resultFileFinal.put(fileName, fileFinalList);
      fileOutput.emit(resultFileFinal);

      // reset for next file
      eof = false;
      fileName = null;
      wordMapFile.clear();
      resultFileFinal.clear();
    }
  }

  // populate fileFinalList with topN frequencies from argument
  // This list is suitable input to WordCountWriter which writes it to a file
  // MUST have map.size() > 0 here
  //
  private void getList(final Map<String, WCPair> map)
  {
    fileFinalList.clear();
    fileFinalList.addAll(map.values());

    // sort entries in descending order of frequency
    Collections.sort(fileFinalList, new Comparator<WCPair>() {
        @Override
        public int compare(WCPair o1, WCPair o2) {
          return (int)(o2.freq - o1.freq);
        }
    });
  
    LOG.info("FileWordCount:getList: fileFinalList.size = {}", fileFinalList.size());
  }
}

Variables de membre

--LOG: enregistreur --eof: indicateur de fin de lecture du fichier --fileName: nom de fichier --wordMapFile: fréquence d'occurrence de mot par fichier --resultFileFinal: mapper pour stocker les résultats agrégés --fileFinalList: Liste pour stocker les résultats agrégés --input: port d'entrée --control: port d'entrée, nom de fichier --fileOutput: port de sortie

Jetons un œil à ʻinput`. C'est presque le même que "WindowWordCount", et la fréquence d'occurrence des mots est agrégée et placée dans la carte r.

FileWordCount.java


  public final transient DefaultInputPort<List<WCPair>> input = new DefaultInputPort<List<WCPair>>()
  {
    @Override
    public void process(List<WCPair> list)
    {
      // blend incoming list into wordMapFile and wordMapGlobal
      for (WCPair pair : list) {
        final String word = pair.word;
        WCPair filePair = wordMapFile.get(word);
        if (null != filePair) {    // word seen previously in current file
          filePair.freq += pair.freq;
          continue;
        }

        // new word in current file
        filePair = new WCPair(word, pair.freq);
        wordMapFile.put(word, filePair);
      }
    }
  };

Vient ensuite control. Le nom du fichier vient de lineReader, donc enregistrez-le et activez le drapeau ʻeof`.

FileWordCount.java


  public final transient DefaultInputPort<String> control = new DefaultInputPort<String>()
  {
    @Override
    public void process(String msg)
    {
      if (msg.isEmpty()) {    // sanity check
        throw new RuntimeException("Empty file path");
      }
      LOG.info("FileWordCount: EOF for {}", msg);
      fileName = msg;
      eof = true;
      // NOTE: current version only supports processing one file at a time.
    }
  };

Méthode

Puisque setup initialise uniquement la carte et la liste, il est omis.

Jetons un œil à ʻendWindow. C'est un peu long, mais c'est simple à faire: si ʻeof est activé (c'est-à-dire que le fichier a fini de se charger) lorsque la fenêtre est fermée, les résultats agrégés seront envoyés au port de sortie. Lors du streaming, un traitement tel que le tri est effectué avec getList.

FileWordCount.java


  public void endWindow()
  {
    LOG.info("FileWordCount: endWindow for {}", fileName);

    if (wordMapFile.isEmpty()) {    // no words found
      if (eof) {                    // write empty list to fileOutput port
        // got EOF, so output empty list to output file
        fileFinalList.clear();
        resultFileFinal.put(fileName, fileFinalList);
        fileOutput.emit(resultFileFinal);

        // reset for next file
        eof = false;
        fileName = null;
        resultFileFinal.clear();
      }
      LOG.info("FileWordCount: endWindow for {}, no words", fileName);
      return;
    }

    LOG.info("FileWordCount: endWindow for {}, wordMapFile.size = {}, eof = {}",
             fileName, wordMapFile.size(), eof);

    if (eof) {                     // got EOF earlier
      if (null == fileName) {      // need file name to emit topN pairs to file writer
        throw new RuntimeException("EOF but no fileName at endWindow");
      }

      // sort list from wordMapFile into fileFinalList and emit it
      getList(wordMapFile);
      resultFileFinal.put(fileName, fileFinalList);
      fileOutput.emit(resultFileFinal);

      // reset for next file
      eof = false;
      fileName = null;
      wordMapFile.clear();
      resultFileFinal.clear();
    }
  }

  private void getList(final Map<String, WCPair> map)
  {
    fileFinalList.clear();
    fileFinalList.addAll(map.values());

    // sort entries in descending order of frequency
    Collections.sort(fileFinalList, new Comparator<WCPair>() {
        @Override
        public int compare(WCPair o1, WCPair o2) {
          return (int)(o2.freq - o1.freq);
        }
    });
  
    LOG.info("FileWordCount:getList: fileFinalList.size = {}", fileFinalList.size());
  }

WordCountWriter

Le dernier est WordCountWriter. Un opérateur qui génère les résultats agrégés dans un fichier. Ceci est différent du précédent et hérite de ʻAbstractFileOutputOperator`, donc l'implémentation est légèrement différente.

WordCountWriter.java


public class WordCountWriter extends AbstractFileOutputOperator<Map<String, Object>>
{
  private static final Logger LOG = LoggerFactory.getLogger(WordCountWriter.class);
  private static final String charsetName = "UTF-8";
  private static final String nl = System.lineSeparator();

  private String fileName;    // current file name
  private transient final StringBuilder sb = new StringBuilder();

  @Override
  public void endWindow()
  {
    if (null != fileName) {
      requestFinalize(fileName);
    }
    super.endWindow();
  }

  // input is a singleton list [M] where M is a singleton map {fileName => L} where L is a
  // list of pairs: (word, frequency)
  //
  @Override
  protected String getFileName(Map<String, Object> tuple)
  {
    LOG.info("getFileName: tuple.size = {}", tuple.size());

    final Map.Entry<String, Object> entry = tuple.entrySet().iterator().next();
    fileName = entry.getKey();
    LOG.info("getFileName: fileName = {}", fileName);
    return fileName;
  }

  @Override
  protected byte[] getBytesForTuple(Map<String, Object> tuple)
  {
    LOG.info("getBytesForTuple: tuple.size = {}", tuple.size());

    // get first and only pair; key is the fileName and is ignored here
    final Map.Entry<String, Object> entry = tuple.entrySet().iterator().next();
    final List<WCPair> list = (List<WCPair>) entry.getValue();

    if (sb.length() > 0) {        // clear buffer
      sb.delete(0, sb.length());
    }

    for ( WCPair pair : list ) {
      sb.append(pair.word); sb.append(" : ");
      sb.append(pair.freq); sb.append(nl);
    }

    final String data = sb.toString();
    LOG.info("getBytesForTuple: data = {}", data);
    try {
      final byte[] result = data.getBytes(charsetName);
      return result;
    } catch (UnsupportedEncodingException ex) {
      throw new RuntimeException("Should never get here", ex);
    }
  }

}

Variables de membre

--LOG: enregistreur --charsetName: encodage --nl: code de saut de ligne --fileName: nom de fichier

Méthode

ʻEndWindow appelle requestFinalize` pour terminer le traitement du fichier.

WordCountWriter.java


  public void endWindow()
  {
    if (null != fileName) {
      requestFinalize(fileName);
    }
    super.endWindow();
  }

Dans getFileName, décrivez comment obtenir le nom du fichier.

WordCountWriter.java


  protected String getFileName(Map<String, Object> tuple)
  {
    LOG.info("getFileName: tuple.size = {}", tuple.size());

    final Map.Entry<String, Object> entry = tuple.entrySet().iterator().next();
    fileName = entry.getKey();
    LOG.info("getFileName: fileName = {}", fileName);
    return fileName;
  }

getBytesForTuple décrit le contenu à sortir du taple vers le fichier. Ici, la liste est formatée et renvoyée sous la forme d'une chaîne de caractères unique.

WordCountWriter.java


  protected byte[] getBytesForTuple(Map<String, Object> tuple)
  {
    LOG.info("getBytesForTuple: tuple.size = {}", tuple.size());

    // get first and only pair; key is the fileName and is ignored here
    final Map.Entry<String, Object> entry = tuple.entrySet().iterator().next();
    final List<WCPair> list = (List<WCPair>) entry.getValue();

    if (sb.length() > 0) {        // clear buffer
      sb.delete(0, sb.length());
    }

    for ( WCPair pair : list ) {
      sb.append(pair.word); sb.append(" : ");
      sb.append(pair.freq); sb.append(nl);
    }

    final String data = sb.toString();
    LOG.info("getBytesForTuple: data = {}", data);
    try {
      final byte[] result = data.getBytes(charsetName);
      return result;
    } catch (UnsupportedEncodingException ex) {
      throw new RuntimeException("Should never get here", ex);
    }
  }

Résumé

Apex conçoit d'abord l'ensemble de l'application avec le DAG de l'opérateur. Une fois la configuration du DAG décidée, il ne vous reste plus qu'à décrire le traitement de chaque opérateur. Je ne suis pas familier avec les autres moteurs de traitement de flux, donc je ne peux pas les comparer, mais cela semble facile à écrire car je n'ai pas à être très conscient du traitement distribué. De plus, les processus susceptibles d'être utilisés sont préparés sous forme de bibliothèque dans Malhar, il est donc facile à écrire.

Recommended Posts

Comptage de la fréquence d'apparition des mots dans les phrases par traitement de flux (Apache Apex) Partie 2 Codage
Compter la fréquence d'apparition des mots dans une phrase par traitement de flux (Apache Apex)
Ordre de traitement dans le programme
Spécialiste de la sécurité de l'automne 2017 J'ai vérifié la fréquence des mots qui apparaissaient le matin 2