[Zählen der Häufigkeit des Auftretens von Wörtern in Sätzen durch Stream-Verarbeitung \ (Apache Apex ) Teil 2 \ -Coding \ - \ - Schlechtes Satzmuster](http://koheikimura.hatenablog.com/entry/2017/06 / 22/19 3000).
Dies ist eine Fortsetzung von Zählen der Häufigkeit des Auftretens von Wörtern in Sätzen durch Stream-Verarbeitung \ (Apache Apex ) \ -Qiita. Beim letzten Mal habe ich nur die Umgebung erstellt und ausgeführt und den Inhalt des Beispielcodes nicht berührt. Dieses Mal werde ich den Beispielcode lesen, um ein Gefühl für die Entwicklung von Apex-Streaming-Anwendungen zu bekommen.
Die folgenden 7 Beispielcodedateien sind verfügbar.
ApplicationWordCount
Erstens ist die Anwendung selbst. Sie müssen lediglich einen Operator erstellen und diese zusammen streamen, um eine DAG zu erstellen.
ApplicationWordCount.java
package com.example.myapexapp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.DAG;
import org.apache.hadoop.conf.Configuration;
@ApplicationAnnotation(name="SortedWordCount")
public class ApplicationWordCount implements StreamingApplication
{
private static final Logger LOG = LoggerFactory.getLogger(ApplicationWordCount.class);
@Override
public void populateDAG(DAG dag, Configuration conf)
{
// create operators
LineReader lineReader = dag.addOperator("lineReader", new LineReader());
WordReader wordReader = dag.addOperator("wordReader", new WordReader());
WindowWordCount windowWordCount = dag.addOperator("windowWordCount", new WindowWordCount());
FileWordCount fileWordCount = dag.addOperator("fileWordCount", new FileWordCount());
WordCountWriter wcWriter = dag.addOperator("wcWriter", new WordCountWriter());
// create streams
dag.addStream("lines", lineReader.output, wordReader.input);
dag.addStream("control", lineReader.control, fileWordCount.control);
dag.addStream("words", wordReader.output, windowWordCount.input);
dag.addStream("windowWordCounts", windowWordCount.output, fileWordCount.input);
dag.addStream("fileWordCounts", fileWordCount.fileOutput, wcWriter.input);
}
}
Es scheint, dass Sie den Anwendungsnamen mit "@ApplicationAnnotation (name =" SortedWordCount ")" festlegen können. Dies entspricht dem Namen, der beim letzten Ausführen angezeigt wurde.
apex> launch target/myapexapp-1.0-SNAPSHOT.apa
1. MyFirstApplication
2. SortedWordCount #Dies
Choose application: 2
{"appId": "application_1496704660177_0001"}
apex (application_1496704660177_0001) >
Die folgende Abbildung zeigt die Beziehung zwischen jedem Operator und dem Stream. Es ist fast gerade, aber es gibt einen Stream namens "control", der von "lineReader" zu "fileWordCount" fliegt. In Apex wird eine Anwendung erstellt, indem Operatoren auf diese Weise verbunden werden, um eine DAG zu erstellen. Der Verbindungspunkt des Bedieners wird als Port bezeichnet.
LineReader
Beginnen wir mit dem ersten lineReader-Operator. Dieser Operator liest die Datei, sendet ihren Inhalt an den "Ausgabe" -Port und den Dateipfad an "Steuerung".
LineReader.java
package com.example.myapexapp;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.lib.io.fs.AbstractFileInputOperator;
// reads lines from input file and returns them; if end-of-file is reached, a control tuple
// is emitted on the control port
//
public class LineReader extends AbstractFileInputOperator<String>
{
private static final Logger LOG = LoggerFactory.getLogger(LineReader.class);
public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>();
@OutputPortFieldAnnotation(optional = true)
public final transient DefaultOutputPort<String> control = new DefaultOutputPort<>();
private transient BufferedReader br = null;
private Path path;
@Override
protected InputStream openFile(Path curPath) throws IOException
{
LOG.info("openFile: curPath = {}", curPath);
path = curPath;
InputStream is = super.openFile(path);
br = new BufferedReader(new InputStreamReader(is));
return is;
}
@Override
protected void closeFile(InputStream is) throws IOException
{
super.closeFile(is);
br.close();
br = null;
path = null;
}
// return empty string
@Override
protected String readEntity() throws IOException
{
// try to read a line
final String line = br.readLine();
if (null != line) { // common case
LOG.debug("readEntity: line = {}", line);
return line;
}
// end-of-file; send control tuple, containing only the last component of the path
// (only file name) on control port
//
if (control.isConnected()) {
LOG.info("readEntity: EOF for {}", path);
final String name = path.getName(); // final component of path
control.emit(name);
}
return null;
}
@Override
protected void emit(String tuple)
{
output.emit(tuple);
}
}
Es gibt fünf Mitglieder dieser Operatorklasse:
--LOG: Logger --output: Ausgangsport --control: Ausgangsport --br: Puffer --path: Dateipfad
Es ist keine spezielle Erklärung erforderlich, aber zur Kontrolle
@OutputPortFieldAnnotation(optional = true)
Anmerkung wurde hinzugefügt. Die Option ist wahr. Ich bin mir nicht sicher, warum es optional ist. Ich werde es hinzufügen, wenn ich es verstehe.
In LineReader
werden die Methoden openFile
, closeFile
, readEntity
, emit
definiert und die Verarbeitung beim Öffnen der Datei, die Verarbeitung beim Schließen der Datei und das nächste Taple aus der geöffneten Datei. Beschreibt den Prozess des Lesens und des Tippens.
readEntitiy
liest eine Zeile aus dem Puffer und gibt sie zurück. Nach dem Lesen von EOF wird der Dateiname an den contorl-Port gesendet und der Stream beendet. Was als Rückgabewert zurückgegeben wird, ist das nächste Tupel, das von der "emit" -Methode verarbeitet wird. Wenn Sie null zurückgeben, wird der Stream beendet.
LineReader.java
// return empty string
@Override
protected String readEntity() throws IOException
{
// try to read a line
final String line = br.readLine();
if (null != line) { // common case
LOG.debug("readEntity: line = {}", line);
return line;
}
// end-of-file; send control tuple, containing only the last component of the path
// (only file name) on control port
//
if (control.isConnected()) {
LOG.info("readEntity: EOF for {}", path);
final String name = path.getName(); // final component of path
control.emit(name);
}
return null;
}
WordReader
WordReader unterteilt eine von LineReader gesendete Datenzeile in Wörter.
WordReader.java
package com.example.myapexapp;
import java.util.regex.Pattern;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.common.util.BaseOperator;
// extracts words from input line
public class WordReader extends BaseOperator
{
// default pattern for word-separators
private static final Pattern nonWordDefault = Pattern.compile("[\\p{Punct}\\s]+");
private String nonWordStr; // configurable regex
private transient Pattern nonWord; // compiled regex
public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>();
public final transient DefaultInputPort<String>
input = new DefaultInputPort<String>() {
@Override
public void process(String line)
{
// line; split it into words and emit them
final String[] words = nonWord.split(line);
for (String word : words) {
if (word.isEmpty()) continue;
output.emit(word);
}
}
};
public String getNonWordStr() {
return nonWordStr;
}
public void setNonWordStr(String regex) {
nonWordStr = regex;
}
@Override
public void setup(OperatorContext context)
{
if (null == nonWordStr) {
nonWord = nonWordDefault;
} else {
nonWord = Pattern.compile(nonWordStr);
}
}
}
--nonWordDefault: Standard-Wortteilungsmuster --nonWordStr: Regulärer Ausdruck für die Wortteilung --nonWord: Wortteilungsmuster --output: Ausgangsport --input: Eingangsport
Ich denke, dass dies fast unnötig zu erklären ist, aber der Eingabeport unterscheidet sich vom Ausgabeport und ist nicht nur neu, sondern es wird auch eine gewisse Verarbeitung beschrieben.
WordReader.java
public final transient DefaultInputPort<String>
input = new DefaultInputPort<String>() {
@Override
public void process(String line)
{
// line; split it into words and emit them
final String[] words = nonWord.split(line);
for (String word : words) {
if (word.isEmpty()) continue;
output.emit(word);
}
}
};
Der Eingabeport beschreibt, wie die Eingabedaten verarbeitet werden. Dies gilt auch für andere Operatoren, und für die Operatoren von WordReader bis FileWordCount ist für jeden Eingabeport eine Prozessmethode definiert. Hier scheint es, dass die Eingabedaten in Wörter mit einem Muster unterteilt und an den Ausgabeport gesendet werden.
getNonWordStr
und setNonWordStr
sind nur Getter und Setter, nicht wahr? setup
wird während des Setups aufgerufen, um nonWord
zu konfigurieren.
WordReader.java
@Override
public void setup(OperatorContext context)
{
if (null == nonWordStr) {
nonWord = nonWordDefault;
} else {
nonWord = Pattern.compile(nonWordStr);
}
}
WindowWordCount WindowWordCount fasst die von WordReader in einem Fenster gesendeten Wörter zu einem Wort / Frequenz-Paar zusammen.
WindowWordCount.java
package com.example.myapexapp;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.common.util.BaseOperator;
// Computes word frequency counts per window and emits them at each endWindow. The output is a
// list of pairs (word, frequency).
//
public class WindowWordCount extends BaseOperator
{
private static final Logger LOG = LoggerFactory.getLogger(WindowWordCount.class);
// wordMap : word => frequency
protected Map<String, WCPair> wordMap = new HashMap<>();
public final transient DefaultInputPort<String> input = new DefaultInputPort<String>()
{
@Override
public void process(String word)
{
WCPair pair = wordMap.get(word);
if (null != pair) { // word seen previously
pair.freq += 1;
return;
}
// new word
pair = new WCPair();
pair.word = word;
pair.freq = 1;
wordMap.put(word, pair);
}
};
// output port which emits the list of word frequencies for current window
// fileName => list of (word, freq) pairs
//
public final transient DefaultOutputPort<List<WCPair>> output = new DefaultOutputPort<>();
@Override
public void endWindow()
{
LOG.info("WindowWordCount: endWindow");
// got EOF; if no words found, do nothing
if (wordMap.isEmpty()) return;
// have some words; emit single map and reset for next file
final ArrayList<WCPair> list = new ArrayList<>(wordMap.values());
output.emit(list);
list.clear();
wordMap.clear();
}
}
--LOG: Logger --wordMap: Eine Karte von Wort- und Frequenzpaaren --input: Eingangsport --output: Ausgangsport
Das Wort-Frequenz-Paar "wordMap" ist in "WCPair.java" als "WCPair" definiert.
WCPair.java
package com.example.myapexapp;
// a single (word, frequency) pair
public class WCPair {
public String word;
public int freq;
public WCPair() {}
public WCPair(String w, int f) {
word = w;
freq = f;
}
@Override
public String toString() {
return String.format("(%s, %d)", word, freq);
}
}
Werfen wir einen Blick auf den Inhalt von input
. Da die Ausgabe von "WordReader" ein Wort ist, ist die Eingabe ein Wort. Sucht nach dem eingegebenen Wort aus "wordMap", addiert 1 zur Häufigkeit, falls vorhanden, und erstellt ein neues "WCPair", falls es nicht vorhanden ist, und fügt es der Karte hinzu.
WindowWordCount.java
public final transient DefaultInputPort<String> input = new DefaultInputPort<String>()
{
@Override
public void process(String word)
{
WCPair pair = wordMap.get(word);
if (null != pair) { // word seen previously
pair.freq += 1;
return;
}
// new word
pair = new WCPair();
pair.word = word;
pair.freq = 1;
wordMap.put(word, pair);
}
};
Die einzige Methode ist "endWindow". Diese Methode wird aufgerufen, wenn das Fenster geschlossen wird. Die Karte wird aufgelistet und an den Ausgabeport gesendet. Es scheint, dass das Fenster nicht festgelegt ist, daher scheint es einen Standardwert zu geben, aber ich bin mir nicht sicher, wie der Standardwert lautet. Ich werde es hinzufügen, wenn ich es verstehe.
WindowWordCount.java
public void endWindow()
{
LOG.info("WindowWordCount: endWindow");
// got EOF; if no words found, do nothing
if (wordMap.isEmpty()) return;
// have some words; emit single map and reset for next file
final ArrayList<WCPair> list = new ArrayList<>(wordMap.values());
output.emit(list);
list.clear();
wordMap.clear();
}
FileWordCount
FileWordCount aggregiert die Häufigkeit von Wortvorkommen aus der gesamten Datei.
FileWordCount.java
public class FileWordCount extends BaseOperator
{
private static final Logger LOG = LoggerFactory.getLogger(FileWordCount.class);
// set to true when we get an EOF control tuple
protected boolean eof = false;
// last component of path (i.e. only file name)
// incoming value from control tuple
protected String fileName;
// wordMapFile : {word => frequency} map, current file, all words
protected Map<String, WCPair> wordMapFile = new HashMap<>();
// singleton map of fileName to sorted list of (word, frequency) pairs
protected transient Map<String, Object> resultFileFinal;
// final sorted list of (word,frequency) pairs
protected transient List<WCPair> fileFinalList;
public final transient DefaultInputPort<List<WCPair>> input = new DefaultInputPort<List<WCPair>>()
{
@Override
public void process(List<WCPair> list)
{
// blend incoming list into wordMapFile and wordMapGlobal
for (WCPair pair : list) {
final String word = pair.word;
WCPair filePair = wordMapFile.get(word);
if (null != filePair) { // word seen previously in current file
filePair.freq += pair.freq;
continue;
}
// new word in current file
filePair = new WCPair(word, pair.freq);
wordMapFile.put(word, filePair);
}
}
};
public final transient DefaultInputPort<String> control = new DefaultInputPort<String>()
{
@Override
public void process(String msg)
{
if (msg.isEmpty()) { // sanity check
throw new RuntimeException("Empty file path");
}
LOG.info("FileWordCount: EOF for {}", msg);
fileName = msg;
eof = true;
// NOTE: current version only supports processing one file at a time.
}
};
// fileOutput -- tuple is singleton map {<fileName> => fileFinalList}; emitted on EOF
public final transient DefaultOutputPort<Map<String, Object>>
fileOutput = new DefaultOutputPort<>();
@Override
public void setup(OperatorContext context)
{
// singleton map {<fileName> => fileFinalList}; cannot populate it yet since we need fileName
resultFileFinal = new HashMap<>(1);
fileFinalList = new ArrayList<>();
}
@Override
public void endWindow()
{
LOG.info("FileWordCount: endWindow for {}", fileName);
if (wordMapFile.isEmpty()) { // no words found
if (eof) { // write empty list to fileOutput port
// got EOF, so output empty list to output file
fileFinalList.clear();
resultFileFinal.put(fileName, fileFinalList);
fileOutput.emit(resultFileFinal);
// reset for next file
eof = false;
fileName = null;
resultFileFinal.clear();
}
LOG.info("FileWordCount: endWindow for {}, no words", fileName);
return;
}
LOG.info("FileWordCount: endWindow for {}, wordMapFile.size = {}, eof = {}",
fileName, wordMapFile.size(), eof);
if (eof) { // got EOF earlier
if (null == fileName) { // need file name to emit topN pairs to file writer
throw new RuntimeException("EOF but no fileName at endWindow");
}
// sort list from wordMapFile into fileFinalList and emit it
getList(wordMapFile);
resultFileFinal.put(fileName, fileFinalList);
fileOutput.emit(resultFileFinal);
// reset for next file
eof = false;
fileName = null;
wordMapFile.clear();
resultFileFinal.clear();
}
}
// populate fileFinalList with topN frequencies from argument
// This list is suitable input to WordCountWriter which writes it to a file
// MUST have map.size() > 0 here
//
private void getList(final Map<String, WCPair> map)
{
fileFinalList.clear();
fileFinalList.addAll(map.values());
// sort entries in descending order of frequency
Collections.sort(fileFinalList, new Comparator<WCPair>() {
@Override
public int compare(WCPair o1, WCPair o2) {
return (int)(o2.freq - o1.freq);
}
});
LOG.info("FileWordCount:getList: fileFinalList.size = {}", fileFinalList.size());
}
}
--LOG: Logger --eof: Flag zum Lesen der Datei --fileName: Dateiname --wordMapFile: Häufigkeit des Auftretens von Wörtern pro Datei --resultFileFinal: Zuordnung zum Speichern aggregierter Ergebnisse --fileFinalList: Liste zum Speichern aggregierter Ergebnisse --input: Eingangsport --control: Eingabeport, Dateiname --fileOutput: Ausgabeport
Schauen wir uns die Eingabe an. Es ist fast dasselbe wie "WindowWordCount", und die Häufigkeit des Auftretens von Wörtern wird aggregiert und in die Karte r eingefügt.
FileWordCount.java
public final transient DefaultInputPort<List<WCPair>> input = new DefaultInputPort<List<WCPair>>()
{
@Override
public void process(List<WCPair> list)
{
// blend incoming list into wordMapFile and wordMapGlobal
for (WCPair pair : list) {
final String word = pair.word;
WCPair filePair = wordMapFile.get(word);
if (null != filePair) { // word seen previously in current file
filePair.freq += pair.freq;
continue;
}
// new word in current file
filePair = new WCPair(word, pair.freq);
wordMapFile.put(word, filePair);
}
}
};
Als nächstes kommt "Kontrolle". Der Dateiname stammt von lineReader
. Speichern Sie ihn also und aktivieren Sie das Flag eof
.
FileWordCount.java
public final transient DefaultInputPort<String> control = new DefaultInputPort<String>()
{
@Override
public void process(String msg)
{
if (msg.isEmpty()) { // sanity check
throw new RuntimeException("Empty file path");
}
LOG.info("FileWordCount: EOF for {}", msg);
fileName = msg;
eof = true;
// NOTE: current version only supports processing one file at a time.
}
};
Da setup
nur die Karte und Liste initialisiert, wird es weggelassen.
Schauen wir uns endWindow
an. Es ist etwas lang, aber einfach zu bewerkstelligen: Wenn "eof" beim Schließen des Fensters aktiviert ist (dh die Datei wurde vollständig geladen), werden die aggregierten Ergebnisse an den Ausgabeport gesendet. Beim Streaming wird die Verarbeitung wie das Sortieren mit getList
durchgeführt.
FileWordCount.java
public void endWindow()
{
LOG.info("FileWordCount: endWindow for {}", fileName);
if (wordMapFile.isEmpty()) { // no words found
if (eof) { // write empty list to fileOutput port
// got EOF, so output empty list to output file
fileFinalList.clear();
resultFileFinal.put(fileName, fileFinalList);
fileOutput.emit(resultFileFinal);
// reset for next file
eof = false;
fileName = null;
resultFileFinal.clear();
}
LOG.info("FileWordCount: endWindow for {}, no words", fileName);
return;
}
LOG.info("FileWordCount: endWindow for {}, wordMapFile.size = {}, eof = {}",
fileName, wordMapFile.size(), eof);
if (eof) { // got EOF earlier
if (null == fileName) { // need file name to emit topN pairs to file writer
throw new RuntimeException("EOF but no fileName at endWindow");
}
// sort list from wordMapFile into fileFinalList and emit it
getList(wordMapFile);
resultFileFinal.put(fileName, fileFinalList);
fileOutput.emit(resultFileFinal);
// reset for next file
eof = false;
fileName = null;
wordMapFile.clear();
resultFileFinal.clear();
}
}
private void getList(final Map<String, WCPair> map)
{
fileFinalList.clear();
fileFinalList.addAll(map.values());
// sort entries in descending order of frequency
Collections.sort(fileFinalList, new Comparator<WCPair>() {
@Override
public int compare(WCPair o1, WCPair o2) {
return (int)(o2.freq - o1.freq);
}
});
LOG.info("FileWordCount:getList: fileFinalList.size = {}", fileFinalList.size());
}
WordCountWriter
Der letzte ist WordCountWriter. Ein Operator, der die aggregierten Ergebnisse in eine Datei ausgibt. Dies unterscheidet sich von der Vergangenheit und erbt "AbstractFileOutputOperator", sodass sich die Implementierung geringfügig unterscheidet.
WordCountWriter.java
public class WordCountWriter extends AbstractFileOutputOperator<Map<String, Object>>
{
private static final Logger LOG = LoggerFactory.getLogger(WordCountWriter.class);
private static final String charsetName = "UTF-8";
private static final String nl = System.lineSeparator();
private String fileName; // current file name
private transient final StringBuilder sb = new StringBuilder();
@Override
public void endWindow()
{
if (null != fileName) {
requestFinalize(fileName);
}
super.endWindow();
}
// input is a singleton list [M] where M is a singleton map {fileName => L} where L is a
// list of pairs: (word, frequency)
//
@Override
protected String getFileName(Map<String, Object> tuple)
{
LOG.info("getFileName: tuple.size = {}", tuple.size());
final Map.Entry<String, Object> entry = tuple.entrySet().iterator().next();
fileName = entry.getKey();
LOG.info("getFileName: fileName = {}", fileName);
return fileName;
}
@Override
protected byte[] getBytesForTuple(Map<String, Object> tuple)
{
LOG.info("getBytesForTuple: tuple.size = {}", tuple.size());
// get first and only pair; key is the fileName and is ignored here
final Map.Entry<String, Object> entry = tuple.entrySet().iterator().next();
final List<WCPair> list = (List<WCPair>) entry.getValue();
if (sb.length() > 0) { // clear buffer
sb.delete(0, sb.length());
}
for ( WCPair pair : list ) {
sb.append(pair.word); sb.append(" : ");
sb.append(pair.freq); sb.append(nl);
}
final String data = sb.toString();
LOG.info("getBytesForTuple: data = {}", data);
try {
final byte[] result = data.getBytes(charsetName);
return result;
} catch (UnsupportedEncodingException ex) {
throw new RuntimeException("Should never get here", ex);
}
}
}
--LOG: Logger --charsetName: Codierung --nl: Zeilenvorschubcode --fileName: Dateiname
endWindow
ruft requestFinalize
auf, um die Dateiverarbeitung zu beenden.
WordCountWriter.java
public void endWindow()
{
if (null != fileName) {
requestFinalize(fileName);
}
super.endWindow();
}
Beschreiben Sie in getFileName
, wie Sie den Dateinamen erhalten.
WordCountWriter.java
protected String getFileName(Map<String, Object> tuple)
{
LOG.info("getFileName: tuple.size = {}", tuple.size());
final Map.Entry<String, Object> entry = tuple.entrySet().iterator().next();
fileName = entry.getKey();
LOG.info("getFileName: fileName = {}", fileName);
return fileName;
}
getBytesForTuple
beschreibt den Inhalt, der vom Taple in die Datei ausgegeben werden soll. Hier wird die Liste formatiert und als einzelne Zeichenfolge zurückgegeben.
WordCountWriter.java
protected byte[] getBytesForTuple(Map<String, Object> tuple)
{
LOG.info("getBytesForTuple: tuple.size = {}", tuple.size());
// get first and only pair; key is the fileName and is ignored here
final Map.Entry<String, Object> entry = tuple.entrySet().iterator().next();
final List<WCPair> list = (List<WCPair>) entry.getValue();
if (sb.length() > 0) { // clear buffer
sb.delete(0, sb.length());
}
for ( WCPair pair : list ) {
sb.append(pair.word); sb.append(" : ");
sb.append(pair.freq); sb.append(nl);
}
final String data = sb.toString();
LOG.info("getBytesForTuple: data = {}", data);
try {
final byte[] result = data.getBytes(charsetName);
return result;
} catch (UnsupportedEncodingException ex) {
throw new RuntimeException("Should never get here", ex);
}
}
Apex entwirft zunächst die gesamte Anwendung mit der DAG des Bedieners. Sobald die DAG-Konfiguration festgelegt ist, müssen Sie nur noch die Verarbeitung jedes Operators beschreiben. Ich bin mit anderen Stream-Verarbeitungs-Engines nicht vertraut, daher kann ich sie nicht vergleichen, aber es scheint einfach zu schreiben, da ich mich der verteilten Verarbeitung nicht sehr bewusst sein muss. Außerdem werden die Prozesse, die wahrscheinlich verwendet werden, in Malhar als Bibliothek vorbereitet, sodass das Schreiben einfach ist.
Recommended Posts