Il a été décidé de fusionner et de produire deux fichiers triés en fonction des besoins des petites entreprises.
Voici un exemple
** Fichier d'entrée 1 **
1,AAA
4,BBB
9,CCC
** Fichier d'entrée 2 **
2,DDD
5,EEE
6,FFF
** Fichier de sortie **
1,AAA
2,DDD
4,BBB
5,EEE
6,FFF
9,CCC
Voilà pourquoi Je voulais lire en utilisant l'API Strem et activer le tri par fusion.
Sensationnel, C'est aussi un sujet trop précis et peu sollicité. (´ ・ ω ・)
J'ai décidé de le mettre en œuvre comme ça.
Créez StreamUtils pour fusionner les flux.
public class StreamUtils {
private StreamUtils() {
}
/**
*Fusionner trier le flux spécifié par l'argument
*
* @param streamArray Flux trié
* @return
*/
@SafeVarargs
public static final <T> Stream<T> merge(final Stream<T>... streamArray) {
return merge(null, streamArray);
}
/**
*Fusionner les flux
*Contrôlez les éléments acquis avec votre propre Spliterator.
*
* @condition de tri param comp
* @param streamArray Flux trié
* @return
*/
@SafeVarargs
public static final <T> Stream<T> merge(final Comparator<T> comp, final Stream<T>... streamArray) {
try {
MergedIterator<T> iterator = new MergedIterator<>(streamArray);
iterator.setComparetor(comp);
Spliterator<T> spliterator = new SpliteratorAdapter<>(iterator);
return StreamSupport.stream(spliterator, false).onClose(composedClose(streamArray));
} catch (Exception exception) {
for (Stream<?> stream : streamArray) {
try {
stream.close();
} catch (RuntimeException e) {
try {
exception.addSuppressed(e);
} catch (RuntimeException ignore) {
}
}
}
throw exception;
}
}
@SafeVarargs
static <T> Runnable composedClose(final Stream<T>... streamArray) {
return new Runnable() {
@Override
public void run() {
RuntimeException exception = null;
for (Stream<?> stream : streamArray) {
try {
stream.close();
} catch (RuntimeException e) {
try {
if (exception == null) {
exception = e;
} else {
exception.addSuppressed(e);
}
} catch (RuntimeException ignore) {
}
}
}
if (exception != null) {
throw exception;
}
}
};
}
}
Comparez la classe qui contient plusieurs flux et sélectionne le contenu à acquérir
public class MergedIterator<T> implements Iterator<T> {
/**
*Carte de l'élément suivant de l'instance Strema que vous détenez
*/
private Map<Iterator<T>, T> nextMap;
/**
*Conditions de tri
*/
private Comparator<T> comp = null;
/**
*
* @param streamArray Tableau de flux à fusionner
*/
@SafeVarargs
public MergedIterator(final Stream<T>... streamArray) {
this(Arrays.asList(streamArray).stream().map(stream -> stream.iterator()).collect(Collectors.toList()));
}
/**
*
* @param itrArray Tableau d'itérateurs à fusionner
*/
@SafeVarargs
public MergedIterator(final Iterator<T>... itrArray) {
this(Arrays.asList(itrArray));
}
/**
*
* @param itrList Liste des itérateurs à fusionner
*/
public MergedIterator(final List<Iterator<T>> itrList) {
this.nextMap = new HashMap<>();
for (Iterator<T> itr : itrList) {
this.nextMap.put(itr, itr.hasNext() ? itr.next() : null);
}
}
/**
*Instance de comparaison
* @param comp
*/
public void setComparetor(final Comparator<T> comp) {
this.comp = comp;
}
@Override
public boolean hasNext() {
return this.nextMap.entrySet().stream().filter(entry -> entry.getValue() != null).count() > 0L;
}
@Override
public void remove() {
throw new UnsupportedOperationException("Not supported.");
}
@Override
public T next() {
if (!hasNext()) {
return null;
}
Entry<Iterator<T>, T> nextEntry = this.nextMap.entrySet().stream().filter(entry -> entry.getValue() != null)
.min(new Comparator<Entry<Iterator<T>, T>>() {
@Override
public int compare(final Entry<Iterator<T>, T> o1, final Entry<Iterator<T>, T> o2) {
return MergedIterator.this.comp != null
? MergedIterator.this.comp.compare(o1.getValue(), o2.getValue())
: 0;
}
}).orElse(null);
T returnObject = nextEntry.getValue();
nextEntry.setValue(nextEntry.getKey().hasNext() ? nextEntry.getKey().next() : null);
return returnObject;
}
}
Classe Spliterator (analyse d'élément utilisée dans Stream)
public class SpliteratorAdapter<T> extends Spliterators.AbstractSpliterator<T> {
private final Iterator<T> iterator;
/**
*
* @param iter
*/
public SpliteratorAdapter(final Iterator<T> iter) {
super(Long.MAX_VALUE, 0);
this.iterator = iter;
}
@Override
public synchronized boolean tryAdvance(final Consumer<? super T> action) {
if (this.iterator.hasNext()) {
action.accept(this.iterator.next());
return true;
}
return false;
}
}
Appelez-le comme suit. Les ressources de flux seront fermées après la fusion, mais elles doivent être créées individuellement (streamArray). Je me demande si je peux le rendre intelligent. .. .. ..
Cela peut être plus facile si vous incluez cela dans StreamUtils.
//tableau streamArray Strema
//implémentation de comp Comparator
try (Stream<List<String>> mergeStream = StreamUtils
.merge(comp, streamArray)
.onClose(() -> LOG.debug("Traitement de tous les flux terminé"))) {
/*Le traitement est exécuté ici Dans cet exemple, la sortie standard est sortie et le processus se termine.*/
mergeStream.forEach(line -> System.out.println(line));
} catch (final IOException e) {
//Gestion des exceptions
}
Je pense qu'il y a pas mal de processus qui couvrent plusieurs flux autres que de les coller en série. DIFF peut également être effectué sans état, Je pense qu'il serait intéressant de créer cette zone.
Dans cet exemple, l'option parallèle ne fonctionne pas, Je pense qu'il y a un traitement qui fonctionne.
Je voudrais l'organiser une fois.
Recommended Posts