[JAVA] Zählen der Häufigkeit des Auftretens von Wörtern in Sätzen durch Stream-Verarbeitung (Apache Apex) Teil 2 Codierung

[Zählen der Häufigkeit des Auftretens von Wörtern in Sätzen durch Stream-Verarbeitung \ (Apache Apex ) Teil 2 \ -Coding \ - \ - Schlechtes Satzmuster](http://koheikimura.hatenablog.com/entry/2017/06 / 22/19 3000).

Dies ist eine Fortsetzung von Zählen der Häufigkeit des Auftretens von Wörtern in Sätzen durch Stream-Verarbeitung \ (Apache Apex ) \ -Qiita. Beim letzten Mal habe ich nur die Umgebung erstellt und ausgeführt und den Inhalt des Beispielcodes nicht berührt. Dieses Mal werde ich den Beispielcode lesen, um ein Gefühl für die Entwicklung von Apex-Streaming-Anwendungen zu bekommen.

Überblick

Die folgenden 7 Beispielcodedateien sind verfügbar.

topnwords - GitHub

Referenz

ApplicationWordCount

Erstens ist die Anwendung selbst. Sie müssen lediglich einen Operator erstellen und diese zusammen streamen, um eine DAG zu erstellen.

ApplicationWordCount.java


package com.example.myapexapp;


import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.DAG;

import org.apache.hadoop.conf.Configuration;

@ApplicationAnnotation(name="SortedWordCount")
public class ApplicationWordCount implements StreamingApplication
{
  private static final Logger LOG = LoggerFactory.getLogger(ApplicationWordCount.class);

  @Override
  public void populateDAG(DAG dag, Configuration conf)
  {
    // create operators

    LineReader lineReader            = dag.addOperator("lineReader", new LineReader());
    WordReader wordReader            = dag.addOperator("wordReader", new WordReader());
    WindowWordCount windowWordCount  = dag.addOperator("windowWordCount", new WindowWordCount());
    FileWordCount fileWordCount      = dag.addOperator("fileWordCount", new FileWordCount());
    WordCountWriter wcWriter         = dag.addOperator("wcWriter", new WordCountWriter());

    // create streams

    dag.addStream("lines",   lineReader.output,  wordReader.input);
    dag.addStream("control", lineReader.control, fileWordCount.control);
    dag.addStream("words",   wordReader.output,  windowWordCount.input);
    dag.addStream("windowWordCounts", windowWordCount.output, fileWordCount.input);
    dag.addStream("fileWordCounts", fileWordCount.fileOutput, wcWriter.input);
  }

}

Es scheint, dass Sie den Anwendungsnamen mit "@ApplicationAnnotation (name =" SortedWordCount ")" festlegen können. Dies entspricht dem Namen, der beim letzten Ausführen angezeigt wurde.

apex> launch target/myapexapp-1.0-SNAPSHOT.apa
  1. MyFirstApplication
  2. SortedWordCount     #Dies
Choose application: 2
{"appId": "application_1496704660177_0001"}
apex (application_1496704660177_0001) > 

Die folgende Abbildung zeigt die Beziehung zwischen jedem Operator und dem Stream. Es ist fast gerade, aber es gibt einen Stream namens "control", der von "lineReader" zu "fileWordCount" fliegt. In Apex wird eine Anwendung erstellt, indem Operatoren auf diese Weise verbunden werden, um eine DAG zu erstellen. Der Verbindungspunkt des Bedieners wird als Port bezeichnet.

image.png

LineReader

Beginnen wir mit dem ersten lineReader-Operator. Dieser Operator liest die Datei, sendet ihren Inhalt an den "Ausgabe" -Port und den Dateipfad an "Steuerung".

LineReader.java


package com.example.myapexapp;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;

import org.apache.hadoop.fs.Path;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.lib.io.fs.AbstractFileInputOperator;

// reads lines from input file and returns them; if end-of-file is reached, a control tuple
// is emitted on the control port
//
public class LineReader extends AbstractFileInputOperator<String>
{
  private static final Logger LOG = LoggerFactory.getLogger(LineReader.class);

  public final transient DefaultOutputPort<String> output  = new DefaultOutputPort<>();

  @OutputPortFieldAnnotation(optional = true)
  public final transient DefaultOutputPort<String> control = new DefaultOutputPort<>();

  private transient BufferedReader br = null;

  private Path path;

  @Override
  protected InputStream openFile(Path curPath) throws IOException
  {
    LOG.info("openFile: curPath = {}", curPath);
    path = curPath;
    InputStream is = super.openFile(path);
    br = new BufferedReader(new InputStreamReader(is));
    return is;
  }

  @Override
  protected void closeFile(InputStream is) throws IOException
  {
    super.closeFile(is);
    br.close();
    br = null;
    path = null;
  }

  // return empty string 
  @Override
  protected String readEntity() throws IOException
  {
    // try to read a line
    final String line = br.readLine();
    if (null != line) {    // common case
      LOG.debug("readEntity: line = {}", line);
      return line;
    }

    // end-of-file; send control tuple, containing only the last component of the path
    // (only file name) on control port
    //
    if (control.isConnected()) {
      LOG.info("readEntity: EOF for {}", path);
      final String name = path.getName();    // final component of path
      control.emit(name);
    }

    return null;
  }

  @Override
  protected void emit(String tuple)
  {
    output.emit(tuple);
  }
}

Mitgliedsvariablen

Es gibt fünf Mitglieder dieser Operatorklasse:

--LOG: Logger --output: Ausgangsport --control: Ausgangsport --br: Puffer --path: Dateipfad

Es ist keine spezielle Erklärung erforderlich, aber zur Kontrolle

@OutputPortFieldAnnotation(optional = true)

Anmerkung wurde hinzugefügt. Die Option ist wahr. Ich bin mir nicht sicher, warum es optional ist. Ich werde es hinzufügen, wenn ich es verstehe.

Methode

In LineReader werden die Methoden openFile, closeFile, readEntity, emit definiert und die Verarbeitung beim Öffnen der Datei, die Verarbeitung beim Schließen der Datei und das nächste Taple aus der geöffneten Datei. Beschreibt den Prozess des Lesens und des Tippens.

readEntitiy liest eine Zeile aus dem Puffer und gibt sie zurück. Nach dem Lesen von EOF wird der Dateiname an den contorl-Port gesendet und der Stream beendet. Was als Rückgabewert zurückgegeben wird, ist das nächste Tupel, das von der "emit" -Methode verarbeitet wird. Wenn Sie null zurückgeben, wird der Stream beendet.

LineReader.java


  // return empty string 
  @Override
  protected String readEntity() throws IOException
  {
    // try to read a line
    final String line = br.readLine();
    if (null != line) {    // common case
      LOG.debug("readEntity: line = {}", line);
      return line;
    }

    // end-of-file; send control tuple, containing only the last component of the path
    // (only file name) on control port
    //
    if (control.isConnected()) {
      LOG.info("readEntity: EOF for {}", path);
      final String name = path.getName();    // final component of path
      control.emit(name);
    }

    return null;
  }

WordReader

WordReader unterteilt eine von LineReader gesendete Datenzeile in Wörter.

WordReader.java


package com.example.myapexapp;

import java.util.regex.Pattern;

import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.common.util.BaseOperator;

// extracts words from input line
public class WordReader extends BaseOperator
{
  // default pattern for word-separators
  private static final Pattern nonWordDefault = Pattern.compile("[\\p{Punct}\\s]+");

  private String nonWordStr;    // configurable regex
  private transient Pattern nonWord;      // compiled regex

  public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>();

  public final transient DefaultInputPort<String>
    input = new DefaultInputPort<String>() {

    @Override
    public void process(String line)
    {
      // line; split it into words and emit them
      final String[] words = nonWord.split(line);
      for (String word : words) {
        if (word.isEmpty()) continue;
        output.emit(word);
      }
    }
  };

  public String getNonWordStr() {
    return nonWordStr;
  }

  public void setNonWordStr(String regex) {
    nonWordStr = regex;
  }

  @Override
  public void setup(OperatorContext context)
  {
    if (null == nonWordStr) {
      nonWord = nonWordDefault;
    } else {
      nonWord = Pattern.compile(nonWordStr);
    }
  }

}

Mitgliedsvariablen

--nonWordDefault: Standard-Wortteilungsmuster --nonWordStr: Regulärer Ausdruck für die Wortteilung --nonWord: Wortteilungsmuster --output: Ausgangsport --input: Eingangsport

Ich denke, dass dies fast unnötig zu erklären ist, aber der Eingabeport unterscheidet sich vom Ausgabeport und ist nicht nur neu, sondern es wird auch eine gewisse Verarbeitung beschrieben.

WordReader.java


  public final transient DefaultInputPort<String>
    input = new DefaultInputPort<String>() {

    @Override
    public void process(String line)
    {
      // line; split it into words and emit them
      final String[] words = nonWord.split(line);
      for (String word : words) {
        if (word.isEmpty()) continue;
        output.emit(word);
      }
    }
  };

Der Eingabeport beschreibt, wie die Eingabedaten verarbeitet werden. Dies gilt auch für andere Operatoren, und für die Operatoren von WordReader bis FileWordCount ist für jeden Eingabeport eine Prozessmethode definiert. Hier scheint es, dass die Eingabedaten in Wörter mit einem Muster unterteilt und an den Ausgabeport gesendet werden.

Methode

getNonWordStr und setNonWordStr sind nur Getter und Setter, nicht wahr? setup wird während des Setups aufgerufen, um nonWord zu konfigurieren.

WordReader.java


  @Override
  public void setup(OperatorContext context)
  {
    if (null == nonWordStr) {
      nonWord = nonWordDefault;
    } else {
      nonWord = Pattern.compile(nonWordStr);
    }
  }

WindowWordCount WindowWordCount fasst die von WordReader in einem Fenster gesendeten Wörter zu einem Wort / Frequenz-Paar zusammen.

WindowWordCount.java


package com.example.myapexapp;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.common.util.BaseOperator;

// Computes word frequency counts per window and emits them at each endWindow. The output is a
// list of pairs (word, frequency).
//
public class WindowWordCount extends BaseOperator
{
  private static final Logger LOG = LoggerFactory.getLogger(WindowWordCount.class);

  // wordMap : word => frequency
  protected Map<String, WCPair> wordMap = new HashMap<>();

  public final transient DefaultInputPort<String> input = new DefaultInputPort<String>()
  {
    @Override
    public void process(String word)
    {
      WCPair pair = wordMap.get(word);
      if (null != pair) {    // word seen previously
        pair.freq += 1;
        return;
      }

      // new word
      pair = new WCPair();
      pair.word = word;
      pair.freq = 1;
      wordMap.put(word, pair);
    }
  };

  // output port which emits the list of word frequencies for current window
  // fileName => list of (word, freq) pairs
  //
  public final transient DefaultOutputPort<List<WCPair>> output = new DefaultOutputPort<>();

  @Override
  public void endWindow()
  {
    LOG.info("WindowWordCount: endWindow");

    // got EOF; if no words found, do nothing
    if (wordMap.isEmpty()) return;

    // have some words; emit single map and reset for next file
    final ArrayList<WCPair> list = new ArrayList<>(wordMap.values());
    output.emit(list);
    list.clear();
    wordMap.clear();
  }

}

Mitgliedsvariablen

--LOG: Logger --wordMap: Eine Karte von Wort- und Frequenzpaaren --input: Eingangsport --output: Ausgangsport

Das Wort-Frequenz-Paar "wordMap" ist in "WCPair.java" als "WCPair" definiert.

WCPair.java


package com.example.myapexapp;

// a single (word, frequency) pair
public class WCPair {
  public String word;
  public int freq;

  public WCPair() {}

  public WCPair(String w, int f) {
    word = w;
    freq = f;
  }
  
  @Override
  public String toString() {
    return String.format("(%s, %d)", word, freq);
  }
}

Werfen wir einen Blick auf den Inhalt von input. Da die Ausgabe von "WordReader" ein Wort ist, ist die Eingabe ein Wort. Sucht nach dem eingegebenen Wort aus "wordMap", addiert 1 zur Häufigkeit, falls vorhanden, und erstellt ein neues "WCPair", falls es nicht vorhanden ist, und fügt es der Karte hinzu.

WindowWordCount.java


  public final transient DefaultInputPort<String> input = new DefaultInputPort<String>()
  {
    @Override
    public void process(String word)
    {
      WCPair pair = wordMap.get(word);
      if (null != pair) {    // word seen previously
        pair.freq += 1;
        return;
      }

      // new word
      pair = new WCPair();
      pair.word = word;
      pair.freq = 1;
      wordMap.put(word, pair);
    }
  };

Methode

Die einzige Methode ist "endWindow". Diese Methode wird aufgerufen, wenn das Fenster geschlossen wird. Die Karte wird aufgelistet und an den Ausgabeport gesendet. Es scheint, dass das Fenster nicht festgelegt ist, daher scheint es einen Standardwert zu geben, aber ich bin mir nicht sicher, wie der Standardwert lautet. Ich werde es hinzufügen, wenn ich es verstehe.

WindowWordCount.java


  public void endWindow()
  {
    LOG.info("WindowWordCount: endWindow");

    // got EOF; if no words found, do nothing
    if (wordMap.isEmpty()) return;

    // have some words; emit single map and reset for next file
    final ArrayList<WCPair> list = new ArrayList<>(wordMap.values());
    output.emit(list);
    list.clear();
    wordMap.clear();
  }

FileWordCount

FileWordCount aggregiert die Häufigkeit von Wortvorkommen aus der gesamten Datei.

FileWordCount.java


public class FileWordCount extends BaseOperator
{
  private static final Logger LOG = LoggerFactory.getLogger(FileWordCount.class);

  // set to true when we get an EOF control tuple
  protected boolean eof = false;

  // last component of path (i.e. only file name)
  // incoming value from control tuple
  protected String fileName;

  // wordMapFile   : {word => frequency} map, current file, all words
  protected Map<String, WCPair> wordMapFile = new HashMap<>();

  // singleton map of fileName to sorted list of (word, frequency) pairs
  protected transient Map<String, Object> resultFileFinal;

  // final sorted list of (word,frequency) pairs
  protected transient List<WCPair> fileFinalList;

  public final transient DefaultInputPort<List<WCPair>> input = new DefaultInputPort<List<WCPair>>()
  {
    @Override
    public void process(List<WCPair> list)
    {
      // blend incoming list into wordMapFile and wordMapGlobal
      for (WCPair pair : list) {
        final String word = pair.word;
        WCPair filePair = wordMapFile.get(word);
        if (null != filePair) {    // word seen previously in current file
          filePair.freq += pair.freq;
          continue;
        }

        // new word in current file
        filePair = new WCPair(word, pair.freq);
        wordMapFile.put(word, filePair);
      }
    }
  };

  public final transient DefaultInputPort<String> control = new DefaultInputPort<String>()
  {
    @Override
    public void process(String msg)
    {
      if (msg.isEmpty()) {    // sanity check
        throw new RuntimeException("Empty file path");
      }
      LOG.info("FileWordCount: EOF for {}", msg);
      fileName = msg;
      eof = true;
      // NOTE: current version only supports processing one file at a time.
    }
  };

  // fileOutput -- tuple is singleton map {<fileName> => fileFinalList}; emitted on EOF
  public final transient DefaultOutputPort<Map<String, Object>>
    fileOutput = new DefaultOutputPort<>();

  @Override
  public void setup(OperatorContext context)
  {
    // singleton map {<fileName> => fileFinalList}; cannot populate it yet since we need fileName
    resultFileFinal = new HashMap<>(1);
    fileFinalList = new ArrayList<>();
  }

  @Override
  public void endWindow()
  {
    LOG.info("FileWordCount: endWindow for {}", fileName);

    if (wordMapFile.isEmpty()) {    // no words found
      if (eof) {                    // write empty list to fileOutput port
        // got EOF, so output empty list to output file
        fileFinalList.clear();
        resultFileFinal.put(fileName, fileFinalList);
        fileOutput.emit(resultFileFinal);

        // reset for next file
        eof = false;
        fileName = null;
        resultFileFinal.clear();
      }
      LOG.info("FileWordCount: endWindow for {}, no words", fileName);
      return;
    }

    LOG.info("FileWordCount: endWindow for {}, wordMapFile.size = {}, eof = {}",
             fileName, wordMapFile.size(), eof);

    if (eof) {                     // got EOF earlier
      if (null == fileName) {      // need file name to emit topN pairs to file writer
        throw new RuntimeException("EOF but no fileName at endWindow");
      }

      // sort list from wordMapFile into fileFinalList and emit it
      getList(wordMapFile);
      resultFileFinal.put(fileName, fileFinalList);
      fileOutput.emit(resultFileFinal);

      // reset for next file
      eof = false;
      fileName = null;
      wordMapFile.clear();
      resultFileFinal.clear();
    }
  }

  // populate fileFinalList with topN frequencies from argument
  // This list is suitable input to WordCountWriter which writes it to a file
  // MUST have map.size() > 0 here
  //
  private void getList(final Map<String, WCPair> map)
  {
    fileFinalList.clear();
    fileFinalList.addAll(map.values());

    // sort entries in descending order of frequency
    Collections.sort(fileFinalList, new Comparator<WCPair>() {
        @Override
        public int compare(WCPair o1, WCPair o2) {
          return (int)(o2.freq - o1.freq);
        }
    });
  
    LOG.info("FileWordCount:getList: fileFinalList.size = {}", fileFinalList.size());
  }
}

Mitgliedsvariablen

--LOG: Logger --eof: Flag zum Lesen der Datei --fileName: Dateiname --wordMapFile: Häufigkeit des Auftretens von Wörtern pro Datei --resultFileFinal: Zuordnung zum Speichern aggregierter Ergebnisse --fileFinalList: Liste zum Speichern aggregierter Ergebnisse --input: Eingangsport --control: Eingabeport, Dateiname --fileOutput: Ausgabeport

Schauen wir uns die Eingabe an. Es ist fast dasselbe wie "WindowWordCount", und die Häufigkeit des Auftretens von Wörtern wird aggregiert und in die Karte r eingefügt.

FileWordCount.java


  public final transient DefaultInputPort<List<WCPair>> input = new DefaultInputPort<List<WCPair>>()
  {
    @Override
    public void process(List<WCPair> list)
    {
      // blend incoming list into wordMapFile and wordMapGlobal
      for (WCPair pair : list) {
        final String word = pair.word;
        WCPair filePair = wordMapFile.get(word);
        if (null != filePair) {    // word seen previously in current file
          filePair.freq += pair.freq;
          continue;
        }

        // new word in current file
        filePair = new WCPair(word, pair.freq);
        wordMapFile.put(word, filePair);
      }
    }
  };

Als nächstes kommt "Kontrolle". Der Dateiname stammt von lineReader. Speichern Sie ihn also und aktivieren Sie das Flag eof.

FileWordCount.java


  public final transient DefaultInputPort<String> control = new DefaultInputPort<String>()
  {
    @Override
    public void process(String msg)
    {
      if (msg.isEmpty()) {    // sanity check
        throw new RuntimeException("Empty file path");
      }
      LOG.info("FileWordCount: EOF for {}", msg);
      fileName = msg;
      eof = true;
      // NOTE: current version only supports processing one file at a time.
    }
  };

Methode

Da setup nur die Karte und Liste initialisiert, wird es weggelassen.

Schauen wir uns endWindow an. Es ist etwas lang, aber einfach zu bewerkstelligen: Wenn "eof" beim Schließen des Fensters aktiviert ist (dh die Datei wurde vollständig geladen), werden die aggregierten Ergebnisse an den Ausgabeport gesendet. Beim Streaming wird die Verarbeitung wie das Sortieren mit getList durchgeführt.

FileWordCount.java


  public void endWindow()
  {
    LOG.info("FileWordCount: endWindow for {}", fileName);

    if (wordMapFile.isEmpty()) {    // no words found
      if (eof) {                    // write empty list to fileOutput port
        // got EOF, so output empty list to output file
        fileFinalList.clear();
        resultFileFinal.put(fileName, fileFinalList);
        fileOutput.emit(resultFileFinal);

        // reset for next file
        eof = false;
        fileName = null;
        resultFileFinal.clear();
      }
      LOG.info("FileWordCount: endWindow for {}, no words", fileName);
      return;
    }

    LOG.info("FileWordCount: endWindow for {}, wordMapFile.size = {}, eof = {}",
             fileName, wordMapFile.size(), eof);

    if (eof) {                     // got EOF earlier
      if (null == fileName) {      // need file name to emit topN pairs to file writer
        throw new RuntimeException("EOF but no fileName at endWindow");
      }

      // sort list from wordMapFile into fileFinalList and emit it
      getList(wordMapFile);
      resultFileFinal.put(fileName, fileFinalList);
      fileOutput.emit(resultFileFinal);

      // reset for next file
      eof = false;
      fileName = null;
      wordMapFile.clear();
      resultFileFinal.clear();
    }
  }

  private void getList(final Map<String, WCPair> map)
  {
    fileFinalList.clear();
    fileFinalList.addAll(map.values());

    // sort entries in descending order of frequency
    Collections.sort(fileFinalList, new Comparator<WCPair>() {
        @Override
        public int compare(WCPair o1, WCPair o2) {
          return (int)(o2.freq - o1.freq);
        }
    });
  
    LOG.info("FileWordCount:getList: fileFinalList.size = {}", fileFinalList.size());
  }

WordCountWriter

Der letzte ist WordCountWriter. Ein Operator, der die aggregierten Ergebnisse in eine Datei ausgibt. Dies unterscheidet sich von der Vergangenheit und erbt "AbstractFileOutputOperator", sodass sich die Implementierung geringfügig unterscheidet.

WordCountWriter.java


public class WordCountWriter extends AbstractFileOutputOperator<Map<String, Object>>
{
  private static final Logger LOG = LoggerFactory.getLogger(WordCountWriter.class);
  private static final String charsetName = "UTF-8";
  private static final String nl = System.lineSeparator();

  private String fileName;    // current file name
  private transient final StringBuilder sb = new StringBuilder();

  @Override
  public void endWindow()
  {
    if (null != fileName) {
      requestFinalize(fileName);
    }
    super.endWindow();
  }

  // input is a singleton list [M] where M is a singleton map {fileName => L} where L is a
  // list of pairs: (word, frequency)
  //
  @Override
  protected String getFileName(Map<String, Object> tuple)
  {
    LOG.info("getFileName: tuple.size = {}", tuple.size());

    final Map.Entry<String, Object> entry = tuple.entrySet().iterator().next();
    fileName = entry.getKey();
    LOG.info("getFileName: fileName = {}", fileName);
    return fileName;
  }

  @Override
  protected byte[] getBytesForTuple(Map<String, Object> tuple)
  {
    LOG.info("getBytesForTuple: tuple.size = {}", tuple.size());

    // get first and only pair; key is the fileName and is ignored here
    final Map.Entry<String, Object> entry = tuple.entrySet().iterator().next();
    final List<WCPair> list = (List<WCPair>) entry.getValue();

    if (sb.length() > 0) {        // clear buffer
      sb.delete(0, sb.length());
    }

    for ( WCPair pair : list ) {
      sb.append(pair.word); sb.append(" : ");
      sb.append(pair.freq); sb.append(nl);
    }

    final String data = sb.toString();
    LOG.info("getBytesForTuple: data = {}", data);
    try {
      final byte[] result = data.getBytes(charsetName);
      return result;
    } catch (UnsupportedEncodingException ex) {
      throw new RuntimeException("Should never get here", ex);
    }
  }

}

Mitgliedsvariablen

--LOG: Logger --charsetName: Codierung --nl: Zeilenvorschubcode --fileName: Dateiname

Methode

endWindow ruft requestFinalize auf, um die Dateiverarbeitung zu beenden.

WordCountWriter.java


  public void endWindow()
  {
    if (null != fileName) {
      requestFinalize(fileName);
    }
    super.endWindow();
  }

Beschreiben Sie in getFileName, wie Sie den Dateinamen erhalten.

WordCountWriter.java


  protected String getFileName(Map<String, Object> tuple)
  {
    LOG.info("getFileName: tuple.size = {}", tuple.size());

    final Map.Entry<String, Object> entry = tuple.entrySet().iterator().next();
    fileName = entry.getKey();
    LOG.info("getFileName: fileName = {}", fileName);
    return fileName;
  }

getBytesForTuple beschreibt den Inhalt, der vom Taple in die Datei ausgegeben werden soll. Hier wird die Liste formatiert und als einzelne Zeichenfolge zurückgegeben.

WordCountWriter.java


  protected byte[] getBytesForTuple(Map<String, Object> tuple)
  {
    LOG.info("getBytesForTuple: tuple.size = {}", tuple.size());

    // get first and only pair; key is the fileName and is ignored here
    final Map.Entry<String, Object> entry = tuple.entrySet().iterator().next();
    final List<WCPair> list = (List<WCPair>) entry.getValue();

    if (sb.length() > 0) {        // clear buffer
      sb.delete(0, sb.length());
    }

    for ( WCPair pair : list ) {
      sb.append(pair.word); sb.append(" : ");
      sb.append(pair.freq); sb.append(nl);
    }

    final String data = sb.toString();
    LOG.info("getBytesForTuple: data = {}", data);
    try {
      final byte[] result = data.getBytes(charsetName);
      return result;
    } catch (UnsupportedEncodingException ex) {
      throw new RuntimeException("Should never get here", ex);
    }
  }

Zusammenfassung

Apex entwirft zunächst die gesamte Anwendung mit der DAG des Bedieners. Sobald die DAG-Konfiguration festgelegt ist, müssen Sie nur noch die Verarbeitung jedes Operators beschreiben. Ich bin mit anderen Stream-Verarbeitungs-Engines nicht vertraut, daher kann ich sie nicht vergleichen, aber es scheint einfach zu schreiben, da ich mich der verteilten Verarbeitung nicht sehr bewusst sein muss. Außerdem werden die Prozesse, die wahrscheinlich verwendet werden, in Malhar als Bibliothek vorbereitet, sodass das Schreiben einfach ist.

Recommended Posts

Zählen der Häufigkeit des Auftretens von Wörtern in Sätzen durch Stream-Verarbeitung (Apache Apex) Teil 2 Codierung
Zählen Sie die Häufigkeit des Auftretens von Wörtern in einem Satz durch Stream-Verarbeitung (Apache Apex).
Reihenfolge der Verarbeitung im Programm
Herbst 2017 Sicherheitsspezialist Ich habe die Häufigkeit der Wörter überprüft, die am Morgen 2 erschienen sind