[Comptage de la fréquence d'apparition des mots dans les phrases par traitement de flux \ (Apache Apex ) Part 2 \ -Coding \ - \ - Mauvais modèle de phrase](http://koheikimura.hatenablog.com/entry/2017/06 / 22/19 3000).
Ceci est une suite de Compter la fréquence d'apparition des mots dans les phrases par traitement de flux \ (Apache Apex ) \ -Qiita. La dernière fois, j'ai uniquement construit et exécuté l'environnement, et je n'ai pas touché au contenu de l'exemple de code. Donc, cette fois, je vais lire l'exemple de code pour avoir une idée du développement d'applications de streaming Apex.
Les 7 exemples de fichiers de code suivants sont disponibles.
ApplicationWordCount
Le premier est l'application elle-même. Tout ce que vous avez à faire est de créer un opérateur et de les diffuser ensemble pour créer un DAG.
ApplicationWordCount.java
package com.example.myapexapp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.DAG;
import org.apache.hadoop.conf.Configuration;
@ApplicationAnnotation(name="SortedWordCount")
public class ApplicationWordCount implements StreamingApplication
{
private static final Logger LOG = LoggerFactory.getLogger(ApplicationWordCount.class);
@Override
public void populateDAG(DAG dag, Configuration conf)
{
// create operators
LineReader lineReader = dag.addOperator("lineReader", new LineReader());
WordReader wordReader = dag.addOperator("wordReader", new WordReader());
WindowWordCount windowWordCount = dag.addOperator("windowWordCount", new WindowWordCount());
FileWordCount fileWordCount = dag.addOperator("fileWordCount", new FileWordCount());
WordCountWriter wcWriter = dag.addOperator("wcWriter", new WordCountWriter());
// create streams
dag.addStream("lines", lineReader.output, wordReader.input);
dag.addStream("control", lineReader.control, fileWordCount.control);
dag.addStream("words", wordReader.output, windowWordCount.input);
dag.addStream("windowWordCounts", windowWordCount.output, fileWordCount.input);
dag.addStream("fileWordCounts", fileWordCount.fileOutput, wcWriter.input);
}
}
Il semble que vous puissiez définir le nom de l'application avec @ApplicationAnnotation (name =" SortedWordCount ")
. Cela correspond au nom qui est apparu la dernière fois qu'il a été exécuté.
apex> launch target/myapexapp-1.0-SNAPSHOT.apa
1. MyFirstApplication
2. SortedWordCount #cette
Choose application: 2
{"appId": "application_1496704660177_0001"}
apex (application_1496704660177_0001) >
La figure ci-dessous montre la relation entre chaque opérateur et le flux. C'est presque direct, mais il y a un flux appelé control
qui vole de lineReader
à fileWordCount
. Dans Apex, une application est créée en connectant des opérateurs de cette manière pour créer un DAG. Le point de connexion de l'opérateur est appelé un port.
LineReader
Commençons par le premier opérateur lineReader
. Cet opérateur lit le fichier, envoie son contenu vers le port ʻoutputet le chemin du fichier vers
control`.
LineReader.java
package com.example.myapexapp;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.lib.io.fs.AbstractFileInputOperator;
// reads lines from input file and returns them; if end-of-file is reached, a control tuple
// is emitted on the control port
//
public class LineReader extends AbstractFileInputOperator<String>
{
private static final Logger LOG = LoggerFactory.getLogger(LineReader.class);
public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>();
@OutputPortFieldAnnotation(optional = true)
public final transient DefaultOutputPort<String> control = new DefaultOutputPort<>();
private transient BufferedReader br = null;
private Path path;
@Override
protected InputStream openFile(Path curPath) throws IOException
{
LOG.info("openFile: curPath = {}", curPath);
path = curPath;
InputStream is = super.openFile(path);
br = new BufferedReader(new InputStreamReader(is));
return is;
}
@Override
protected void closeFile(InputStream is) throws IOException
{
super.closeFile(is);
br.close();
br = null;
path = null;
}
// return empty string
@Override
protected String readEntity() throws IOException
{
// try to read a line
final String line = br.readLine();
if (null != line) { // common case
LOG.debug("readEntity: line = {}", line);
return line;
}
// end-of-file; send control tuple, containing only the last component of the path
// (only file name) on control port
//
if (control.isConnected()) {
LOG.info("readEntity: EOF for {}", path);
final String name = path.getName(); // final component of path
control.emit(name);
}
return null;
}
@Override
protected void emit(String tuple)
{
output.emit(tuple);
}
}
Il y a cinq membres de cette classe d'opérateur:
--LOG: enregistreur --output: port de sortie --control: port de sortie --br: tampon --path: chemin du fichier
Aucune explication particulière n'est requise, mais pour le contrôle
@OutputPortFieldAnnotation(optional = true)
Une annotation a été ajoutée. L'option est vraie. Je ne sais pas pourquoi c'est facultatif. Je l'ajouterai quand je comprendrai.
Dans LineReader
, les méthodes ʻopenFile,
closeFile,
readEntity, ʻemit
sont définies, et le traitement lorsque le fichier est ouvert, le traitement lorsqu'il est fermé et le prochain taple du fichier ouvert. Décrit le processus de lecture et le processus de tapotement.
readEntitiy
lit une ligne du tampon et la renvoie. Après avoir lu jusqu'à EOF, le nom du fichier est envoyé au port contorl et le flux est arrêté. Ce qui est retourné comme valeur de retour est le tuple suivant, qui est traité par la méthode ʻemit`. Renvoyer null mettra fin au flux.
LineReader.java
// return empty string
@Override
protected String readEntity() throws IOException
{
// try to read a line
final String line = br.readLine();
if (null != line) { // common case
LOG.debug("readEntity: line = {}", line);
return line;
}
// end-of-file; send control tuple, containing only the last component of the path
// (only file name) on control port
//
if (control.isConnected()) {
LOG.info("readEntity: EOF for {}", path);
final String name = path.getName(); // final component of path
control.emit(name);
}
return null;
}
WordReader
WordReader divise une ligne de données envoyées par LineReader en mots.
WordReader.java
package com.example.myapexapp;
import java.util.regex.Pattern;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.common.util.BaseOperator;
// extracts words from input line
public class WordReader extends BaseOperator
{
// default pattern for word-separators
private static final Pattern nonWordDefault = Pattern.compile("[\\p{Punct}\\s]+");
private String nonWordStr; // configurable regex
private transient Pattern nonWord; // compiled regex
public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>();
public final transient DefaultInputPort<String>
input = new DefaultInputPort<String>() {
@Override
public void process(String line)
{
// line; split it into words and emit them
final String[] words = nonWord.split(line);
for (String word : words) {
if (word.isEmpty()) continue;
output.emit(word);
}
}
};
public String getNonWordStr() {
return nonWordStr;
}
public void setNonWordStr(String regex) {
nonWordStr = regex;
}
@Override
public void setup(OperatorContext context)
{
if (null == nonWordStr) {
nonWord = nonWordDefault;
} else {
nonWord = Pattern.compile(nonWordStr);
}
}
}
--nonWordDefault: modèle de division de mot par défaut --nonWordStr: Expression régulière pour le fractionnement de mots --nonWord: modèle de partage de mots --output: port de sortie --input: port d'entrée
Je pense que c'est presque inutile d'expliquer, mais le port d'entrée est différent du port de sortie, et ce n'est pas seulement nouveau, mais certains traitements sont décrits.
WordReader.java
public final transient DefaultInputPort<String>
input = new DefaultInputPort<String>() {
@Override
public void process(String line)
{
// line; split it into words and emit them
final String[] words = nonWord.split(line);
for (String word : words) {
if (word.isEmpty()) continue;
output.emit(word);
}
}
};
Le port d'entrée décrit comment traiter les données d'entrée. C'est la même chose pour les autres opérateurs, et les opérateurs de WordReader à FileWordCount ont également une méthode process
définie pour chaque port d'entrée. Ici, il semble que les données d'entrée soient divisées en mots avec un motif et envoyées au port de sortie.
getNonWordStr
et setNonWordStr
ne sont que des getters et des setters, n'est-ce pas? setup
est appelé pendant l'installation pour configurer nonWord
.
WordReader.java
@Override
public void setup(OperatorContext context)
{
if (null == nonWordStr) {
nonWord = nonWordDefault;
} else {
nonWord = Pattern.compile(nonWordStr);
}
}
WindowWordCount WindowWordCount regroupe les mots envoyés par WordReader dans une fenêtre en une paire mot / fréquence.
WindowWordCount.java
package com.example.myapexapp;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.common.util.BaseOperator;
// Computes word frequency counts per window and emits them at each endWindow. The output is a
// list of pairs (word, frequency).
//
public class WindowWordCount extends BaseOperator
{
private static final Logger LOG = LoggerFactory.getLogger(WindowWordCount.class);
// wordMap : word => frequency
protected Map<String, WCPair> wordMap = new HashMap<>();
public final transient DefaultInputPort<String> input = new DefaultInputPort<String>()
{
@Override
public void process(String word)
{
WCPair pair = wordMap.get(word);
if (null != pair) { // word seen previously
pair.freq += 1;
return;
}
// new word
pair = new WCPair();
pair.word = word;
pair.freq = 1;
wordMap.put(word, pair);
}
};
// output port which emits the list of word frequencies for current window
// fileName => list of (word, freq) pairs
//
public final transient DefaultOutputPort<List<WCPair>> output = new DefaultOutputPort<>();
@Override
public void endWindow()
{
LOG.info("WindowWordCount: endWindow");
// got EOF; if no words found, do nothing
if (wordMap.isEmpty()) return;
// have some words; emit single map and reset for next file
final ArrayList<WCPair> list = new ArrayList<>(wordMap.values());
output.emit(list);
list.clear();
wordMap.clear();
}
}
--LOG: enregistreur --wordMap: une carte des paires de mots et de fréquences --input: port d'entrée --output: port de sortie
Le mot et la paire de fréquences «wordMap» sont définis comme «WCPair» dans «WCPair.java».
WCPair.java
package com.example.myapexapp;
// a single (word, frequency) pair
public class WCPair {
public String word;
public int freq;
public WCPair() {}
public WCPair(String w, int f) {
word = w;
freq = f;
}
@Override
public String toString() {
return String.format("(%s, %d)", word, freq);
}
}
Jetons un coup d'œil au contenu de ʻinput. Puisque la sortie de
WordReaderest un mot, l'entrée est un mot. Recherche le mot entré dans
wordMap, ajoute 1 à la fréquence s'il existe et crée un nouveau
WCPair` s'il n'existe pas et l'ajoute à la carte.
WindowWordCount.java
public final transient DefaultInputPort<String> input = new DefaultInputPort<String>()
{
@Override
public void process(String word)
{
WCPair pair = wordMap.get(word);
if (null != pair) { // word seen previously
pair.freq += 1;
return;
}
// new word
pair = new WCPair();
pair.word = word;
pair.freq = 1;
wordMap.put(word, pair);
}
};
La seule méthode est ʻendWindow`. Cette méthode est appelée lorsque la fenêtre est fermée. La carte est répertoriée et envoyée au port de sortie. Il semble que la fenêtre n'est pas définie, il semble donc qu'il existe une valeur par défaut, mais je ne suis pas sûr de la valeur par défaut. Je l'ajouterai quand je comprendrai.
WindowWordCount.java
public void endWindow()
{
LOG.info("WindowWordCount: endWindow");
// got EOF; if no words found, do nothing
if (wordMap.isEmpty()) return;
// have some words; emit single map and reset for next file
final ArrayList<WCPair> list = new ArrayList<>(wordMap.values());
output.emit(list);
list.clear();
wordMap.clear();
}
FileWordCount
FileWordCount regroupe la fréquence des occurrences de mots de l'ensemble du fichier.
FileWordCount.java
public class FileWordCount extends BaseOperator
{
private static final Logger LOG = LoggerFactory.getLogger(FileWordCount.class);
// set to true when we get an EOF control tuple
protected boolean eof = false;
// last component of path (i.e. only file name)
// incoming value from control tuple
protected String fileName;
// wordMapFile : {word => frequency} map, current file, all words
protected Map<String, WCPair> wordMapFile = new HashMap<>();
// singleton map of fileName to sorted list of (word, frequency) pairs
protected transient Map<String, Object> resultFileFinal;
// final sorted list of (word,frequency) pairs
protected transient List<WCPair> fileFinalList;
public final transient DefaultInputPort<List<WCPair>> input = new DefaultInputPort<List<WCPair>>()
{
@Override
public void process(List<WCPair> list)
{
// blend incoming list into wordMapFile and wordMapGlobal
for (WCPair pair : list) {
final String word = pair.word;
WCPair filePair = wordMapFile.get(word);
if (null != filePair) { // word seen previously in current file
filePair.freq += pair.freq;
continue;
}
// new word in current file
filePair = new WCPair(word, pair.freq);
wordMapFile.put(word, filePair);
}
}
};
public final transient DefaultInputPort<String> control = new DefaultInputPort<String>()
{
@Override
public void process(String msg)
{
if (msg.isEmpty()) { // sanity check
throw new RuntimeException("Empty file path");
}
LOG.info("FileWordCount: EOF for {}", msg);
fileName = msg;
eof = true;
// NOTE: current version only supports processing one file at a time.
}
};
// fileOutput -- tuple is singleton map {<fileName> => fileFinalList}; emitted on EOF
public final transient DefaultOutputPort<Map<String, Object>>
fileOutput = new DefaultOutputPort<>();
@Override
public void setup(OperatorContext context)
{
// singleton map {<fileName> => fileFinalList}; cannot populate it yet since we need fileName
resultFileFinal = new HashMap<>(1);
fileFinalList = new ArrayList<>();
}
@Override
public void endWindow()
{
LOG.info("FileWordCount: endWindow for {}", fileName);
if (wordMapFile.isEmpty()) { // no words found
if (eof) { // write empty list to fileOutput port
// got EOF, so output empty list to output file
fileFinalList.clear();
resultFileFinal.put(fileName, fileFinalList);
fileOutput.emit(resultFileFinal);
// reset for next file
eof = false;
fileName = null;
resultFileFinal.clear();
}
LOG.info("FileWordCount: endWindow for {}, no words", fileName);
return;
}
LOG.info("FileWordCount: endWindow for {}, wordMapFile.size = {}, eof = {}",
fileName, wordMapFile.size(), eof);
if (eof) { // got EOF earlier
if (null == fileName) { // need file name to emit topN pairs to file writer
throw new RuntimeException("EOF but no fileName at endWindow");
}
// sort list from wordMapFile into fileFinalList and emit it
getList(wordMapFile);
resultFileFinal.put(fileName, fileFinalList);
fileOutput.emit(resultFileFinal);
// reset for next file
eof = false;
fileName = null;
wordMapFile.clear();
resultFileFinal.clear();
}
}
// populate fileFinalList with topN frequencies from argument
// This list is suitable input to WordCountWriter which writes it to a file
// MUST have map.size() > 0 here
//
private void getList(final Map<String, WCPair> map)
{
fileFinalList.clear();
fileFinalList.addAll(map.values());
// sort entries in descending order of frequency
Collections.sort(fileFinalList, new Comparator<WCPair>() {
@Override
public int compare(WCPair o1, WCPair o2) {
return (int)(o2.freq - o1.freq);
}
});
LOG.info("FileWordCount:getList: fileFinalList.size = {}", fileFinalList.size());
}
}
--LOG: enregistreur --eof: indicateur de fin de lecture du fichier --fileName: nom de fichier --wordMapFile: fréquence d'occurrence de mot par fichier --resultFileFinal: mapper pour stocker les résultats agrégés --fileFinalList: Liste pour stocker les résultats agrégés --input: port d'entrée --control: port d'entrée, nom de fichier --fileOutput: port de sortie
Jetons un œil à ʻinput`. C'est presque le même que "WindowWordCount", et la fréquence d'occurrence des mots est agrégée et placée dans la carte r.
FileWordCount.java
public final transient DefaultInputPort<List<WCPair>> input = new DefaultInputPort<List<WCPair>>()
{
@Override
public void process(List<WCPair> list)
{
// blend incoming list into wordMapFile and wordMapGlobal
for (WCPair pair : list) {
final String word = pair.word;
WCPair filePair = wordMapFile.get(word);
if (null != filePair) { // word seen previously in current file
filePair.freq += pair.freq;
continue;
}
// new word in current file
filePair = new WCPair(word, pair.freq);
wordMapFile.put(word, filePair);
}
}
};
Vient ensuite control
. Le nom du fichier vient de lineReader
, donc enregistrez-le et activez le drapeau ʻeof`.
FileWordCount.java
public final transient DefaultInputPort<String> control = new DefaultInputPort<String>()
{
@Override
public void process(String msg)
{
if (msg.isEmpty()) { // sanity check
throw new RuntimeException("Empty file path");
}
LOG.info("FileWordCount: EOF for {}", msg);
fileName = msg;
eof = true;
// NOTE: current version only supports processing one file at a time.
}
};
Puisque setup
initialise uniquement la carte et la liste, il est omis.
Jetons un œil à ʻendWindow. C'est un peu long, mais c'est simple à faire: si ʻeof
est activé (c'est-à-dire que le fichier a fini de se charger) lorsque la fenêtre est fermée, les résultats agrégés seront envoyés au port de sortie. Lors du streaming, un traitement tel que le tri est effectué avec getList
.
FileWordCount.java
public void endWindow()
{
LOG.info("FileWordCount: endWindow for {}", fileName);
if (wordMapFile.isEmpty()) { // no words found
if (eof) { // write empty list to fileOutput port
// got EOF, so output empty list to output file
fileFinalList.clear();
resultFileFinal.put(fileName, fileFinalList);
fileOutput.emit(resultFileFinal);
// reset for next file
eof = false;
fileName = null;
resultFileFinal.clear();
}
LOG.info("FileWordCount: endWindow for {}, no words", fileName);
return;
}
LOG.info("FileWordCount: endWindow for {}, wordMapFile.size = {}, eof = {}",
fileName, wordMapFile.size(), eof);
if (eof) { // got EOF earlier
if (null == fileName) { // need file name to emit topN pairs to file writer
throw new RuntimeException("EOF but no fileName at endWindow");
}
// sort list from wordMapFile into fileFinalList and emit it
getList(wordMapFile);
resultFileFinal.put(fileName, fileFinalList);
fileOutput.emit(resultFileFinal);
// reset for next file
eof = false;
fileName = null;
wordMapFile.clear();
resultFileFinal.clear();
}
}
private void getList(final Map<String, WCPair> map)
{
fileFinalList.clear();
fileFinalList.addAll(map.values());
// sort entries in descending order of frequency
Collections.sort(fileFinalList, new Comparator<WCPair>() {
@Override
public int compare(WCPair o1, WCPair o2) {
return (int)(o2.freq - o1.freq);
}
});
LOG.info("FileWordCount:getList: fileFinalList.size = {}", fileFinalList.size());
}
WordCountWriter
Le dernier est WordCountWriter. Un opérateur qui génère les résultats agrégés dans un fichier. Ceci est différent du précédent et hérite de ʻAbstractFileOutputOperator`, donc l'implémentation est légèrement différente.
WordCountWriter.java
public class WordCountWriter extends AbstractFileOutputOperator<Map<String, Object>>
{
private static final Logger LOG = LoggerFactory.getLogger(WordCountWriter.class);
private static final String charsetName = "UTF-8";
private static final String nl = System.lineSeparator();
private String fileName; // current file name
private transient final StringBuilder sb = new StringBuilder();
@Override
public void endWindow()
{
if (null != fileName) {
requestFinalize(fileName);
}
super.endWindow();
}
// input is a singleton list [M] where M is a singleton map {fileName => L} where L is a
// list of pairs: (word, frequency)
//
@Override
protected String getFileName(Map<String, Object> tuple)
{
LOG.info("getFileName: tuple.size = {}", tuple.size());
final Map.Entry<String, Object> entry = tuple.entrySet().iterator().next();
fileName = entry.getKey();
LOG.info("getFileName: fileName = {}", fileName);
return fileName;
}
@Override
protected byte[] getBytesForTuple(Map<String, Object> tuple)
{
LOG.info("getBytesForTuple: tuple.size = {}", tuple.size());
// get first and only pair; key is the fileName and is ignored here
final Map.Entry<String, Object> entry = tuple.entrySet().iterator().next();
final List<WCPair> list = (List<WCPair>) entry.getValue();
if (sb.length() > 0) { // clear buffer
sb.delete(0, sb.length());
}
for ( WCPair pair : list ) {
sb.append(pair.word); sb.append(" : ");
sb.append(pair.freq); sb.append(nl);
}
final String data = sb.toString();
LOG.info("getBytesForTuple: data = {}", data);
try {
final byte[] result = data.getBytes(charsetName);
return result;
} catch (UnsupportedEncodingException ex) {
throw new RuntimeException("Should never get here", ex);
}
}
}
--LOG: enregistreur --charsetName: encodage --nl: code de saut de ligne --fileName: nom de fichier
ʻEndWindow appelle
requestFinalize` pour terminer le traitement du fichier.
WordCountWriter.java
public void endWindow()
{
if (null != fileName) {
requestFinalize(fileName);
}
super.endWindow();
}
Dans getFileName
, décrivez comment obtenir le nom du fichier.
WordCountWriter.java
protected String getFileName(Map<String, Object> tuple)
{
LOG.info("getFileName: tuple.size = {}", tuple.size());
final Map.Entry<String, Object> entry = tuple.entrySet().iterator().next();
fileName = entry.getKey();
LOG.info("getFileName: fileName = {}", fileName);
return fileName;
}
getBytesForTuple
décrit le contenu à sortir du taple vers le fichier. Ici, la liste est formatée et renvoyée sous la forme d'une chaîne de caractères unique.
WordCountWriter.java
protected byte[] getBytesForTuple(Map<String, Object> tuple)
{
LOG.info("getBytesForTuple: tuple.size = {}", tuple.size());
// get first and only pair; key is the fileName and is ignored here
final Map.Entry<String, Object> entry = tuple.entrySet().iterator().next();
final List<WCPair> list = (List<WCPair>) entry.getValue();
if (sb.length() > 0) { // clear buffer
sb.delete(0, sb.length());
}
for ( WCPair pair : list ) {
sb.append(pair.word); sb.append(" : ");
sb.append(pair.freq); sb.append(nl);
}
final String data = sb.toString();
LOG.info("getBytesForTuple: data = {}", data);
try {
final byte[] result = data.getBytes(charsetName);
return result;
} catch (UnsupportedEncodingException ex) {
throw new RuntimeException("Should never get here", ex);
}
}
Apex conçoit d'abord l'ensemble de l'application avec le DAG de l'opérateur. Une fois la configuration du DAG décidée, il ne vous reste plus qu'à décrire le traitement de chaque opérateur. Je ne suis pas familier avec les autres moteurs de traitement de flux, donc je ne peux pas les comparer, mais cela semble facile à écrire car je n'ai pas à être très conscient du traitement distribué. De plus, les processus susceptibles d'être utilisés sont préparés sous forme de bibliothèque dans Malhar, il est donc facile à écrire.
Recommended Posts