< prev index next >

src/java.base/share/classes/java/util/stream/ReferencePipeline.java

Print this page
rev 53968 : enable Collector pre-sizing
rev 53969 : added map loadfactor and collector nullchecks

@@ -568,22 +568,68 @@
     public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) {
         A container;
         if (isParallel()
                 && (collector.characteristics().contains(Collector.Characteristics.CONCURRENT))
                 && (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) {
-            container = collector.supplier().get();
-            BiConsumer<A, ? super P_OUT> accumulator = collector.accumulator();
-            forEach(u -> accumulator.accept(container, u));
+            container = evaluate(new UnorderedConcurrentCollectorOp<>(collector));
         }
         else {
             container = evaluate(ReduceOps.makeRef(collector));
         }
         return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)
                ? (R) container
                : collector.finisher().apply(container);
     }
 
+    /**
+     * A TerminalOp for unordered, concurrent collectors. The semantics are
+     * very similar to ForEachOp, except presizing is supported and the
+     * intermediary Collector accumulation type is made accessible.
+     *
+     * @param <T> The Collector's input element type
+     * @param <A> The Collector's mutable accumulation type
+     */
+    static final class UnorderedConcurrentCollectorOp<T, A> implements TerminalOp<T, A>, TerminalSink<T, A> {
+        final IntFunction<A> sizedSupplier;
+        final Supplier<A> supplier;
+        final BiConsumer<A, ? super T> accumulator;
+
+        A res;
+
+        UnorderedConcurrentCollectorOp(Collector<? super T, A, ?> collector) {
+            this.sizedSupplier = Objects.requireNonNull(collector).sizedSupplier();
+            this.supplier = collector.supplier();
+            this.accumulator = collector.accumulator();
+        }
+
+        @Override
+        public A get() {
+            return res;
+        }
+
+        @Override
+        public void accept(T t) {
+            accumulator.accept(res, t);
+        }
+
+        @Override
+        public <P_IN> A evaluateSequential(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) {
+            throw new IllegalStateException("Cannot evaluate UnorderedConcurrentCollectorOp sequentially");
+        }
+
+        @Override
+        public <P_IN> A evaluateParallel(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) {
+            long size = helper.exactOutputSizeIfKnown(spliterator);
+            if (size < 0 || size > Integer.MAX_VALUE)
+                res = supplier.get();
+            else
+                res = sizedSupplier.apply((int) size);
+            new ForEachOps.ForEachTask<>(helper, spliterator, helper.wrapSink(this)).invoke();
+            return res;
+        }
+    }
+
     @Override
     public final <R> R collect(Supplier<R> supplier,
                                BiConsumer<R, ? super P_OUT> accumulator,
                                BiConsumer<R, R> combiner) {
         return evaluate(ReduceOps.makeRef(supplier, accumulator, combiner));
< prev index next >