< prev index next >

src/java.base/share/classes/java/util/stream/ReferencePipeline.java

Print this page
rev 53968 : enable Collector pre-sizing
rev 53969 : added map loadfactor and collector nullchecks

*** 568,589 **** public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) { A container; if (isParallel() && (collector.characteristics().contains(Collector.Characteristics.CONCURRENT)) && (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) { ! container = collector.supplier().get(); ! BiConsumer<A, ? super P_OUT> accumulator = collector.accumulator(); ! forEach(u -> accumulator.accept(container, u)); } else { container = evaluate(ReduceOps.makeRef(collector)); } return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH) ? (R) container : collector.finisher().apply(container); } @Override public final <R> R collect(Supplier<R> supplier, BiConsumer<R, ? super P_OUT> accumulator, BiConsumer<R, R> combiner) { return evaluate(ReduceOps.makeRef(supplier, accumulator, combiner)); --- 568,635 ---- public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) { A container; if (isParallel() && (collector.characteristics().contains(Collector.Characteristics.CONCURRENT)) && (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) { ! container = evaluate(new UnorderedConcurrentCollectorOp<>(collector)); } else { container = evaluate(ReduceOps.makeRef(collector)); } return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH) ? (R) container : collector.finisher().apply(container); } + /** + * A TerminalOp for unordered, concurrent collectors. The semantics are + * very similar to ForEachOp, except presizing is supported and the + * intermediary Collector accumulation type is made accessible. + * + * @param <T> The Collector's input element type + * @param <A> The Collector's mutable accumulation type + */ + static final class UnorderedConcurrentCollectorOp<T, A> implements TerminalOp<T, A>, TerminalSink<T, A> { + final IntFunction<A> sizedSupplier; + final Supplier<A> supplier; + final BiConsumer<A, ? super T> accumulator; + + A res; + + UnorderedConcurrentCollectorOp(Collector<? super T, A, ?> collector) { + this.sizedSupplier = Objects.requireNonNull(collector).sizedSupplier(); + this.supplier = collector.supplier(); + this.accumulator = collector.accumulator(); + } + + @Override + public A get() { + return res; + } + + @Override + public void accept(T t) { + accumulator.accept(res, t); + } + + @Override + public <P_IN> A evaluateSequential(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) { + throw new IllegalStateException("Cannot evaluate UnorderedConcurrentCollectorOp sequentially"); + } + + @Override + public <P_IN> A evaluateParallel(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) { + long size = helper.exactOutputSizeIfKnown(spliterator); + if (size < 0 || size > Integer.MAX_VALUE) + res = supplier.get(); + else + res = sizedSupplier.apply((int) size); + new ForEachOps.ForEachTask<>(helper, spliterator, helper.wrapSink(this)).invoke(); + return res; + } + } + @Override public final <R> R collect(Supplier<R> supplier, BiConsumer<R, ? super P_OUT> accumulator, BiConsumer<R, R> combiner) { return evaluate(ReduceOps.makeRef(supplier, accumulator, combiner));
< prev index next >