[JAVA] À propos de l'implémentation du traitement de fusion, y compris la fonction de tri de l'API Stream

introduction

Il a été décidé de fusionner et de produire deux fichiers triés en fonction des besoins des petites entreprises.

Voici un exemple

  1. Le fichier d'entrée est composé de 2 fichiers triés par la première clé.
  2. Le fichier de sortie est trié et émis à partir de toutes les données des deux fichiers.

** Fichier d'entrée 1 **

1,AAA
4,BBB
9,CCC

** Fichier d'entrée 2 **

2,DDD
5,EEE
6,FFF

** Fichier de sortie **

1,AAA
2,DDD
4,BBB
5,EEE
6,FFF
9,CCC

Voilà pourquoi Je voulais lire en utilisant l'API Strem et activer le tri par fusion.

Sensationnel, C'est aussi un sujet trop précis et peu sollicité. (´ ・ ω ・)

Ce que vous souhaitez mettre en œuvre

J'ai décidé de le mettre en œuvre comme ça.

  1. Créez un seul flux avec plusieurs flux comme paramètres.
  2. Passez Comparator pour définir l'ordre de tri.
  3. En principe, chaque flux est trié.
  4. L'apatride est une prémisse majeure.

la mise en oeuvre

Créez StreamUtils pour fusionner les flux.

public class StreamUtils {

	private StreamUtils() {
	}

	/**
	 *Fusionner trier le flux spécifié par l'argument
	 *
	 * @param streamArray Flux trié
	 * @return
	 */
	@SafeVarargs
	public static final <T> Stream<T> merge(final Stream<T>... streamArray) {
		return merge(null, streamArray);
	}

	/**
	 *Fusionner les flux
	 *Contrôlez les éléments acquis avec votre propre Spliterator.
	 *
	 * @condition de tri param comp
	 * @param streamArray Flux trié
	 * @return
	 */
	@SafeVarargs
	public static final <T> Stream<T> merge(final Comparator<T> comp, final Stream<T>... streamArray) {
		try {
			MergedIterator<T> iterator = new MergedIterator<>(streamArray);
			iterator.setComparetor(comp);
			Spliterator<T> spliterator = new SpliteratorAdapter<>(iterator);
			return StreamSupport.stream(spliterator, false).onClose(composedClose(streamArray));
		} catch (Exception exception) {
			for (Stream<?> stream : streamArray) {
				try {
					stream.close();
				} catch (RuntimeException e) {
					try {
						exception.addSuppressed(e);
					} catch (RuntimeException ignore) {
					}
				}
			}
			throw exception;
		}
	}

	@SafeVarargs
	static <T> Runnable composedClose(final Stream<T>... streamArray) {
		return new Runnable() {
			@Override
			public void run() {
				RuntimeException exception = null;
				for (Stream<?> stream : streamArray) {
					try {
						stream.close();
					} catch (RuntimeException e) {
						try {
							if (exception == null) {
								exception = e;
							} else {
								exception.addSuppressed(e);
							}
						} catch (RuntimeException ignore) {
						}
					}
				}
				if (exception != null) {
					throw exception;
				}
			}
		};
	}

}

Comparez la classe qui contient plusieurs flux et sélectionne le contenu à acquérir

public class MergedIterator<T> implements Iterator<T> {

	/**
	 *Carte de l'élément suivant de l'instance Strema que vous détenez
	 */
	private Map<Iterator<T>, T> nextMap;

	/**
	 *Conditions de tri
	 */
	private Comparator<T> comp = null;

	/**
	 * 
	 * @param streamArray Tableau de flux à fusionner
	 */
	@SafeVarargs
	public MergedIterator(final Stream<T>... streamArray) {
		this(Arrays.asList(streamArray).stream().map(stream -> stream.iterator()).collect(Collectors.toList()));
	}

	/**
	 * 
	 * @param itrArray Tableau d'itérateurs à fusionner
	 */
	@SafeVarargs
	public MergedIterator(final Iterator<T>... itrArray) {
		this(Arrays.asList(itrArray));
	}

	/**
	 *
	 * @param itrList Liste des itérateurs à fusionner
	 */
	public MergedIterator(final List<Iterator<T>> itrList) {
		this.nextMap = new HashMap<>();
		for (Iterator<T> itr : itrList) {
			this.nextMap.put(itr, itr.hasNext() ? itr.next() : null);
		}
	}

	/**
	 *Instance de comparaison
	 * @param comp
	 */
	public void setComparetor(final Comparator<T> comp) {
		this.comp = comp;
	}

	@Override
	public boolean hasNext() {
		return this.nextMap.entrySet().stream().filter(entry -> entry.getValue() != null).count() > 0L;
	}

	@Override
	public void remove() {
		throw new UnsupportedOperationException("Not supported.");
	}

	@Override
	public T next() {
		if (!hasNext()) {
			return null;
		}

		Entry<Iterator<T>, T> nextEntry = this.nextMap.entrySet().stream().filter(entry -> entry.getValue() != null)
				.min(new Comparator<Entry<Iterator<T>, T>>() {

					@Override
					public int compare(final Entry<Iterator<T>, T> o1, final Entry<Iterator<T>, T> o2) {
						return MergedIterator.this.comp != null
								? MergedIterator.this.comp.compare(o1.getValue(), o2.getValue())
								: 0;
					}
				}).orElse(null);

		T returnObject = nextEntry.getValue();
		nextEntry.setValue(nextEntry.getKey().hasNext() ? nextEntry.getKey().next() : null);

		return returnObject;

	}

}

Classe Spliterator (analyse d'élément utilisée dans Stream)

public class SpliteratorAdapter<T> extends Spliterators.AbstractSpliterator<T> {

	private final Iterator<T> iterator;

	/**
	 *
	 * @param iter
	 */
	public SpliteratorAdapter(final Iterator<T> iter) {
		super(Long.MAX_VALUE, 0);
		this.iterator = iter;
	}

	@Override
	public synchronized boolean tryAdvance(final Consumer<? super T> action) {
		if (this.iterator.hasNext()) {
			action.accept(this.iterator.next());
			return true;
		}
		return false;
	}
}

Code d'exécution

Appelez-le comme suit. Les ressources de flux seront fermées après la fusion, mais elles doivent être créées individuellement (streamArray). Je me demande si je peux le rendre intelligent. .. .. ..

Cela peut être plus facile si vous incluez cela dans StreamUtils.

	//tableau streamArray Strema
	//implémentation de comp Comparator
	try (Stream<List<String>> mergeStream = StreamUtils
				.merge(comp, streamArray)
				.onClose(() -> LOG.debug("Traitement de tous les flux terminé"))) {
		/*Le traitement est exécuté ici Dans cet exemple, la sortie standard est sortie et le processus se termine.*/
		mergeStream.forEach(line -> System.out.println(line));

	} catch (final IOException e) {
		//Gestion des exceptions
	}

en conclusion

Je pense qu'il y a pas mal de processus qui couvrent plusieurs flux autres que de les coller en série. DIFF peut également être effectué sans état, Je pense qu'il serait intéressant de créer cette zone.

Dans cet exemple, l'option parallèle ne fonctionne pas, Je pense qu'il y a un traitement qui fonctionne.

Je voudrais l'organiser une fois.

Recommended Posts

À propos de l'implémentation du traitement de fusion, y compris la fonction de tri de l'API Stream
[Rails] À propos de la mise en œuvre de la fonction similaire
Implémentation de la fonction de recherche
Mise en œuvre de la fonction de pagénation
Implémentation de la fonction de recherche séquentielle
Implémentation d'une fonction similaire (Ajax)
Implémentation de la fonction de prévisualisation d'image
[Rails] Implémentation de la fonction de catégorie
Mise en place de la fonction de tri des rails (affichés par ordre de nombre de like)
[Java] Stream API - Traitement de l'arrêt du flux
[Java] Stream API - Traitement intermédiaire de flux
Mise en œuvre de la fonction déroulante de catégorie
[Rails] Implémentation de la fonction tutoriel
[Rails] Implémentation d'une fonction similaire
[Rails] Implémentation de la fonction coupon (avec fonction de suppression automatique par traitement par lots)
[Rails] Implémentation de la fonction d'importation CSV
[Rails] Implémentation asynchrone de la fonction similaire
[Rails] Implémentation de la fonction de prévisualisation d'image
À propos de Lambda, Stream, LocalDate de Java8
[Introduction à Java] À propos de l'API Stream
À propos de la gestion des erreurs de la fonction de commentaire
[Rails] Implémentation de la fonction de retrait utilisateur
[Rails] Implémentation de la fonction d'exportation CSV
À propos de la compilation, de l'API, de l'implémentation, etc. de Gradle
Implémentation du traitement asynchrone dans Tomcat
Flux de traitement de base de Java Stream
Implémentation d'une fonction similaire en Java
Comparaison de la vitesse de traitement entre Stream incluant le cast et Extended for Statement
Traitement des données à l'aide de l'API de flux de Java 8
Implémentation de la fonction d'authentification des utilisateurs à l'aide de devise (2)
Implémentation du traitement asynchrone compatible multi-tenant dans Tomcat
Implémentation de la fonction d'authentification des utilisateurs à l'aide de devise (1)
Rails [Pour les débutants] Implémentation de la fonction de commentaire
Implémentation de la fonction d'authentification des utilisateurs à l'aide de devise (3)
[Ruby on rails] Implémentation d'une fonction similaire