553 return evaluate(ReduceOps.makeRef(identity, accumulator, accumulator));
554 }
555
556 @Override
557 public final Optional<P_OUT> reduce(BinaryOperator<P_OUT> accumulator) {
558 return evaluate(ReduceOps.makeRef(accumulator));
559 }
560
561 @Override
562 public final <R> R reduce(R identity, BiFunction<R, ? super P_OUT, R> accumulator, BinaryOperator<R> combiner) {
563 return evaluate(ReduceOps.makeRef(identity, accumulator, combiner));
564 }
565
566 @Override
567 @SuppressWarnings("unchecked")
568 public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) {
569 A container;
570 if (isParallel()
571 && (collector.characteristics().contains(Collector.Characteristics.CONCURRENT))
572 && (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) {
573 container = collector.supplier().get();
574 BiConsumer<A, ? super P_OUT> accumulator = collector.accumulator();
575 forEach(u -> accumulator.accept(container, u));
576 }
577 else {
578 container = evaluate(ReduceOps.makeRef(collector));
579 }
580 return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)
581 ? (R) container
582 : collector.finisher().apply(container);
583 }
584
585 @Override
586 public final <R> R collect(Supplier<R> supplier,
587 BiConsumer<R, ? super P_OUT> accumulator,
588 BiConsumer<R, R> combiner) {
589 return evaluate(ReduceOps.makeRef(supplier, accumulator, combiner));
590 }
591
592 @Override
593 public final Optional<P_OUT> max(Comparator<? super P_OUT> comparator) {
594 return reduce(BinaryOperator.maxBy(comparator));
595 }
596
597 @Override
598 public final Optional<P_OUT> min(Comparator<? super P_OUT> comparator) {
599 return reduce(BinaryOperator.minBy(comparator));
600
601 }
602
603 @Override
604 public final long count() {
|
553 return evaluate(ReduceOps.makeRef(identity, accumulator, accumulator));
554 }
555
556 @Override
557 public final Optional<P_OUT> reduce(BinaryOperator<P_OUT> accumulator) {
558 return evaluate(ReduceOps.makeRef(accumulator));
559 }
560
561 @Override
562 public final <R> R reduce(R identity, BiFunction<R, ? super P_OUT, R> accumulator, BinaryOperator<R> combiner) {
563 return evaluate(ReduceOps.makeRef(identity, accumulator, combiner));
564 }
565
566 @Override
567 @SuppressWarnings("unchecked")
568 public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) {
569 A container;
570 if (isParallel()
571 && (collector.characteristics().contains(Collector.Characteristics.CONCURRENT))
572 && (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) {
573 container = evaluate(new UnorderedConcurrentCollectorOp<>(collector));
574 }
575 else {
576 container = evaluate(ReduceOps.makeRef(collector));
577 }
578 return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)
579 ? (R) container
580 : collector.finisher().apply(container);
581 }
582
583 /**
584 * A TerminalOp for unordered, concurrent collectors. The semantics are
585 * very similar to ForEachOp, except presizing is supported and the
586 * intermediary Collector accumulation type is made accessible.
587 *
588 * @param <T> The Collector's input element type
589 * @param <A> The Collector's mutable accumulation type
590 */
591 static final class UnorderedConcurrentCollectorOp<T, A> implements TerminalOp<T, A>, TerminalSink<T, A> {
592 final IntFunction<A> sizedSupplier;
593 final Supplier<A> supplier;
594 final BiConsumer<A, ? super T> accumulator;
595
596 A res;
597
598 UnorderedConcurrentCollectorOp(Collector<? super T, A, ?> collector) {
599 this.sizedSupplier = Objects.requireNonNull(collector).sizedSupplier();
600 this.supplier = collector.supplier();
601 this.accumulator = collector.accumulator();
602 }
603
604 @Override
605 public A get() {
606 return res;
607 }
608
609 @Override
610 public void accept(T t) {
611 accumulator.accept(res, t);
612 }
613
614 @Override
615 public <P_IN> A evaluateSequential(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) {
616 throw new IllegalStateException("Cannot evaluate UnorderedConcurrentCollectorOp sequentially");
617 }
618
619 @Override
620 public <P_IN> A evaluateParallel(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) {
621 long size = helper.exactOutputSizeIfKnown(spliterator);
622 if (size < 0 || size > Integer.MAX_VALUE)
623 res = supplier.get();
624 else
625 res = sizedSupplier.apply((int) size);
626 new ForEachOps.ForEachTask<>(helper, spliterator, helper.wrapSink(this)).invoke();
627 return res;
628 }
629 }
630
631 @Override
632 public final <R> R collect(Supplier<R> supplier,
633 BiConsumer<R, ? super P_OUT> accumulator,
634 BiConsumer<R, R> combiner) {
635 return evaluate(ReduceOps.makeRef(supplier, accumulator, combiner));
636 }
637
638 @Override
639 public final Optional<P_OUT> max(Comparator<? super P_OUT> comparator) {
640 return reduce(BinaryOperator.maxBy(comparator));
641 }
642
643 @Override
644 public final Optional<P_OUT> min(Comparator<? super P_OUT> comparator) {
645 return reduce(BinaryOperator.minBy(comparator));
646
647 }
648
649 @Override
650 public final long count() {
|