< prev index next >
src/java.base/share/classes/java/util/stream/ReferencePipeline.java
Print this page
rev 53968 : enable Collector pre-sizing
*** 568,589 ****
public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) {
A container;
if (isParallel()
&& (collector.characteristics().contains(Collector.Characteristics.CONCURRENT))
&& (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) {
! container = collector.supplier().get();
! BiConsumer<A, ? super P_OUT> accumulator = collector.accumulator();
! forEach(u -> accumulator.accept(container, u));
}
else {
container = evaluate(ReduceOps.makeRef(collector));
}
return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)
? (R) container
: collector.finisher().apply(container);
}
@Override
public final <R> R collect(Supplier<R> supplier,
BiConsumer<R, ? super P_OUT> accumulator,
BiConsumer<R, R> combiner) {
return evaluate(ReduceOps.makeRef(supplier, accumulator, combiner));
--- 568,633 ----
public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) {
A container;
if (isParallel()
&& (collector.characteristics().contains(Collector.Characteristics.CONCURRENT))
&& (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) {
! container = evaluate(new UnorderedConcurrentCollectorOp<>(collector));
}
else {
container = evaluate(ReduceOps.makeRef(collector));
}
return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)
? (R) container
: collector.finisher().apply(container);
}
+ /**
+ * A TerminalOp for unordered, concurrent collectors. The semantics are
+ * very similar to ForEachOp, except presizing is supported and the
+ * intermediary Collector accumulation type is made accessible.
+ *
+ * @param <T> The Collector's input element type
+ * @param <A> The Collector's mutable accumulation type
+ */
+ static final class UnorderedConcurrentCollectorOp<T, A> implements TerminalOp<T, A>, TerminalSink<T, A> {
+ final Collector<? super T, A, ?> collector;
+ final BiConsumer<A, ? super T> accumulator;
+
+ A res;
+
+ UnorderedConcurrentCollectorOp(Collector<? super T, A, ?> collector) {
+ this.collector = collector;
+ this.accumulator = collector.accumulator();
+ }
+
+ @Override
+ public A get() {
+ return res;
+ }
+
+ @Override
+ public void accept(T t) {
+ accumulator.accept(res, t);
+ }
+
+ @Override
+ public <P_IN> A evaluateSequential(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) {
+ throw new IllegalStateException("Cannot evaluate UnorderedConcurrentCollectorOp sequentially");
+ }
+
+ @Override
+ public <P_IN> A evaluateParallel(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) {
+ long size;
+ if ((size = helper.exactOutputSizeIfKnown(spliterator)) != -1)
+ res = collector.sizedSupplier().apply((int) size);
+ else
+ res = collector.supplier().get();
+ new ForEachOps.ForEachTask<>(helper, spliterator, helper.wrapSink(this)).invoke();
+ return res;
+ }
+ }
+
@Override
public final <R> R collect(Supplier<R> supplier,
BiConsumer<R, ? super P_OUT> accumulator,
BiConsumer<R, R> combiner) {
return evaluate(ReduceOps.makeRef(supplier, accumulator, combiner));
< prev index next >