We have decided to merge and output two sorted files according to the business requirements.
Here is an example
** Input file 1 **
1,AAA
4,BBB
9,CCC
** Input file 2 **
2,DDD
5,EEE
6,FFF
** Output file **
1,AAA
2,DDD
4,BBB
5,EEE
6,FFF
9,CCC
That's why I wanted to read it with the Strem API so that it can be merge-sorted.
Wow, It's also a subject that is too pinpoint and there is little demand. (´ ・ ω ・)
I decided to implement it like this.
Create StreamUtils to merge Streams.
public class StreamUtils {
private StreamUtils() {
}
/**
*Merge sort the stream specified by the argument
*
* @param streamArray Sorted stream
* @return
*/
@SafeVarargs
public static final <T> Stream<T> merge(final Stream<T>... streamArray) {
return merge(null, streamArray);
}
/**
*Merge streams
*Control the acquired elements with your own Spliterator.
*
* @param comp sort condition
* @param streamArray Sorted stream
* @return
*/
@SafeVarargs
public static final <T> Stream<T> merge(final Comparator<T> comp, final Stream<T>... streamArray) {
try {
MergedIterator<T> iterator = new MergedIterator<>(streamArray);
iterator.setComparetor(comp);
Spliterator<T> spliterator = new SpliteratorAdapter<>(iterator);
return StreamSupport.stream(spliterator, false).onClose(composedClose(streamArray));
} catch (Exception exception) {
for (Stream<?> stream : streamArray) {
try {
stream.close();
} catch (RuntimeException e) {
try {
exception.addSuppressed(e);
} catch (RuntimeException ignore) {
}
}
}
throw exception;
}
}
@SafeVarargs
static <T> Runnable composedClose(final Stream<T>... streamArray) {
return new Runnable() {
@Override
public void run() {
RuntimeException exception = null;
for (Stream<?> stream : streamArray) {
try {
stream.close();
} catch (RuntimeException e) {
try {
if (exception == null) {
exception = e;
} else {
exception.addSuppressed(e);
}
} catch (RuntimeException ignore) {
}
}
}
if (exception != null) {
throw exception;
}
}
};
}
}
Compare class that holds multiple Streams and selects the content to be acquired
public class MergedIterator<T> implements Iterator<T> {
/**
*Map of the next element of the Strema instance you are holding
*/
private Map<Iterator<T>, T> nextMap;
/**
*Sort conditions
*/
private Comparator<T> comp = null;
/**
*
* @param streamArray Array of Streams to merge
*/
@SafeVarargs
public MergedIterator(final Stream<T>... streamArray) {
this(Arrays.asList(streamArray).stream().map(stream -> stream.iterator()).collect(Collectors.toList()));
}
/**
*
* @param itrArray Array of Iterators to merge
*/
@SafeVarargs
public MergedIterator(final Iterator<T>... itrArray) {
this(Arrays.asList(itrArray));
}
/**
*
* @param itrList List of Iterator to merge
*/
public MergedIterator(final List<Iterator<T>> itrList) {
this.nextMap = new HashMap<>();
for (Iterator<T> itr : itrList) {
this.nextMap.put(itr, itr.hasNext() ? itr.next() : null);
}
}
/**
*Instance for comparison
* @param comp
*/
public void setComparetor(final Comparator<T> comp) {
this.comp = comp;
}
@Override
public boolean hasNext() {
return this.nextMap.entrySet().stream().filter(entry -> entry.getValue() != null).count() > 0L;
}
@Override
public void remove() {
throw new UnsupportedOperationException("Not supported.");
}
@Override
public T next() {
if (!hasNext()) {
return null;
}
Entry<Iterator<T>, T> nextEntry = this.nextMap.entrySet().stream().filter(entry -> entry.getValue() != null)
.min(new Comparator<Entry<Iterator<T>, T>>() {
@Override
public int compare(final Entry<Iterator<T>, T> o1, final Entry<Iterator<T>, T> o2) {
return MergedIterator.this.comp != null
? MergedIterator.this.comp.compare(o1.getValue(), o2.getValue())
: 0;
}
}).orElse(null);
T returnObject = nextEntry.getValue();
nextEntry.setValue(nextEntry.getKey().hasNext() ? nextEntry.getKey().next() : null);
return returnObject;
}
}
Spliterator (element scanning used inside Stream) class
public class SpliteratorAdapter<T> extends Spliterators.AbstractSpliterator<T> {
private final Iterator<T> iterator;
/**
*
* @param iter
*/
public SpliteratorAdapter(final Iterator<T> iter) {
super(Long.MAX_VALUE, 0);
this.iterator = iter;
}
@Override
public synchronized boolean tryAdvance(final Consumer<? super T> action) {
if (this.iterator.hasNext()) {
action.accept(this.iterator.next());
return true;
}
return false;
}
}
Call it as follows. Stream resources will be closed after merging, but they need to be created individually (streamArray). I'm wondering if I can make it smart. .. .. ..
It may be easier if you include that in StreamUtils.
//streamArray Strema array
//implementation of comp Comparator
try (Stream<List<String>> mergeStream = StreamUtils
.merge(comp, streamArray)
.onClose(() -> LOG.debug("All Stream processing completed"))) {
/*Processing is executed here In this example, standard output is output and the process ends.*/
mergeStream.forEach(line -> System.out.println(line));
} catch (final IOException e) {
//Exception handling
}
I think that there are quite a few processes that span multiple Streams other than sticking them in series. DIFF can be done stateless as well, so I think it would be interesting to create that area.
In this example, the parallel option doesn't work, I think there is some processing that works.
I would like to organize it once.
Recommended Posts