1 /*
   2  * Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package java.util.stream;
  26 
  27 import java.util.Objects;
  28 import java.util.Optional;
  29 import java.util.OptionalDouble;
  30 import java.util.OptionalInt;
  31 import java.util.OptionalLong;
  32 import java.util.Spliterator;
  33 import java.util.concurrent.CountedCompleter;
  34 import java.util.function.*;
  35 
  36 /**
  37  * Factory for creating instances of {@code TerminalOp} that implement
  38  * reductions.
  39  *
  40  * @since 1.8
  41  */
  42 final class ReduceOps {
  43 
  44     private ReduceOps() { }
  45 
  46     /**
  47      * Constructs a {@code TerminalOp} that implements a functional reduce on
  48      * reference values.
  49      *
  50      * @param <T> the type of the input elements
  51      * @param <U> the type of the result
  52      * @param seed the identity element for the reduction
  53      * @param reducer the accumulating function that incorporates an additional
  54      *        input element into the result
  55      * @param combiner the combining function that combines two intermediate
  56      *        results
  57      * @return a {@code TerminalOp} implementing the reduction
  58      */
  59     public static <T, U> TerminalOp<T, U>
  60     makeRef(U seed, BiFunction<U, ? super T, U> reducer, BinaryOperator<U> combiner) {
  61         Objects.requireNonNull(reducer);
  62         Objects.requireNonNull(combiner);
  63         class ReducingSink extends Box<U> implements AccumulatingSink<T, U, ReducingSink> {
  64             @Override
  65             public void begin(long size) {
  66                 state = seed;
  67             }
  68 
  69             @Override
  70             public void accept(T t) {
  71                 state = reducer.apply(state, t);
  72             }
  73 
  74             @Override
  75             public void combine(ReducingSink other) {
  76                 state = combiner.apply(state, other.state);
  77             }
  78         }
  79         return new ReduceOp<T, U, ReducingSink>(StreamShape.REFERENCE) {
  80             @Override
  81             public ReducingSink makeSink() {
  82                 return new ReducingSink();
  83             }
  84         };
  85     }
  86 
  87     /**
  88      * Constructs a {@code TerminalOp} that implements a functional reduce on
  89      * reference values producing an optional reference result.
  90      *
  91      * @param <T> The type of the input elements, and the type of the result
  92      * @param operator The reducing function
  93      * @return A {@code TerminalOp} implementing the reduction
  94      */
  95     public static <T> TerminalOp<T, Optional<T>>
  96     makeRef(BinaryOperator<T> operator) {
  97         Objects.requireNonNull(operator);
  98         class ReducingSink
  99                 implements AccumulatingSink<T, Optional<T>, ReducingSink> {
 100             private boolean empty;
 101             private T state;
 102 
 103             public void begin(long size) {
 104                 empty = true;
 105                 state = null;
 106             }
 107 
 108             @Override
 109             public void accept(T t) {
 110                 if (empty) {
 111                     empty = false;
 112                     state = t;
 113                 } else {
 114                     state = operator.apply(state, t);
 115                 }
 116             }
 117 
 118             @Override
 119             public Optional<T> get() {
 120                 return empty ? Optional.empty() : Optional.of(state);
 121             }
 122 
 123             @Override
 124             public void combine(ReducingSink other) {
 125                 if (!other.empty)
 126                     accept(other.state);
 127             }
 128         }
 129         return new ReduceOp<T, Optional<T>, ReducingSink>(StreamShape.REFERENCE) {
 130             @Override
 131             public ReducingSink makeSink() {
 132                 return new ReducingSink();
 133             }
 134         };
 135     }
 136 
 137     /**
 138      * Constructs a {@code TerminalOp} that implements a mutable reduce on
 139      * reference values.
 140      *
 141      * @param <T> the type of the input elements
 142      * @param <I> the type of the intermediate reduction result
 143      * @param collector a {@code Collector} defining the reduction
 144      * @return a {@code ReduceOp} implementing the reduction
 145      */
 146     public static <T, I> TerminalOp<T, I>
 147     makeRef(Collector<? super T, I, ?> collector) {
 148         Supplier<I> supplier = Objects.requireNonNull(collector).supplier();
 149         IntFunction<I> sizedSupplier = collector.sizedSupplier();
 150         BiConsumer<I, ? super T> accumulator = collector.accumulator();
 151         BinaryOperator<I> combiner = collector.combiner();
 152         class ReducingSink extends Box<I>
 153                 implements AccumulatingSink<T, I, ReducingSink> {
 154             @Override
 155             public void begin(long size) {
 156                 if (size < 0 || size > Integer.MAX_VALUE)
 157                     state = supplier.get();
 158                 else
 159                     state = sizedSupplier.apply((int) size);
 160             }
 161 
 162             @Override
 163             public void accept(T t) {
 164                 accumulator.accept(state, t);
 165             }
 166 
 167             @Override
 168             public void combine(ReducingSink other) {
 169                 state = combiner.apply(state, other.state);
 170             }
 171         }
 172         return new ReduceOp<T, I, ReducingSink>(StreamShape.REFERENCE) {
 173             @Override
 174             public ReducingSink makeSink() {
 175                 return new ReducingSink();
 176             }
 177 
 178             @Override
 179             public int getOpFlags() {
 180                 return collector.characteristics().contains(Collector.Characteristics.UNORDERED)
 181                        ? StreamOpFlag.NOT_ORDERED
 182                        : 0;
 183             }
 184         };
 185     }
 186 
 187     /**
 188      * Constructs a {@code TerminalOp} that implements a mutable reduce on
 189      * reference values.
 190      *
 191      * @param <T> the type of the input elements
 192      * @param <R> the type of the result
 193      * @param seedFactory a factory to produce a new base accumulator
 194      * @param accumulator a function to incorporate an element into an
 195      *        accumulator
 196      * @param reducer a function to combine an accumulator into another
 197      * @return a {@code TerminalOp} implementing the reduction
 198      */
 199     public static <T, R> TerminalOp<T, R>
 200     makeRef(Supplier<R> seedFactory,
 201             BiConsumer<R, ? super T> accumulator,
 202             BiConsumer<R,R> reducer) {
 203         Objects.requireNonNull(seedFactory);
 204         Objects.requireNonNull(accumulator);
 205         Objects.requireNonNull(reducer);
 206         class ReducingSink extends Box<R>
 207                 implements AccumulatingSink<T, R, ReducingSink> {
 208             @Override
 209             public void begin(long size) {
 210                 state = seedFactory.get();
 211             }
 212 
 213             @Override
 214             public void accept(T t) {
 215                 accumulator.accept(state, t);
 216             }
 217 
 218             @Override
 219             public void combine(ReducingSink other) {
 220                 reducer.accept(state, other.state);
 221             }
 222         }
 223         return new ReduceOp<T, R, ReducingSink>(StreamShape.REFERENCE) {
 224             @Override
 225             public ReducingSink makeSink() {
 226                 return new ReducingSink();
 227             }
 228         };
 229     }
 230 
 231     /**
 232      * Constructs a {@code TerminalOp} that counts the number of stream
 233      * elements.  If the size of the pipeline is known then count is the size
 234      * and there is no need to evaluate the pipeline.  If the size of the
 235      * pipeline is non known then count is produced, via reduction, using a
 236      * {@link CountingSink}.
 237      *
 238      * @param <T> the type of the input elements
 239      * @return a {@code TerminalOp} implementing the counting
 240      */
 241     public static <T> TerminalOp<T, Long>
 242     makeRefCounting() {
 243         return new ReduceOp<T, Long, CountingSink<T>>(StreamShape.REFERENCE) {
 244             @Override
 245             public CountingSink<T> makeSink() { return new CountingSink.OfRef<>(); }
 246 
 247             @Override
 248             public <P_IN> Long evaluateSequential(PipelineHelper<T> helper,
 249                                                   Spliterator<P_IN> spliterator) {
 250                 if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags()))
 251                     return spliterator.getExactSizeIfKnown();
 252                 return super.evaluateSequential(helper, spliterator);
 253             }
 254 
 255             @Override
 256             public <P_IN> Long evaluateParallel(PipelineHelper<T> helper,
 257                                                 Spliterator<P_IN> spliterator) {
 258                 if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags()))
 259                     return spliterator.getExactSizeIfKnown();
 260                 return super.evaluateParallel(helper, spliterator);
 261             }
 262 
 263             @Override
 264             public int getOpFlags() {
 265                 return StreamOpFlag.NOT_ORDERED;
 266             }
 267         };
 268     }
 269 
 270     /**
 271      * Constructs a {@code TerminalOp} that implements a functional reduce on
 272      * {@code int} values.
 273      *
 274      * @param identity the identity for the combining function
 275      * @param operator the combining function
 276      * @return a {@code TerminalOp} implementing the reduction
 277      */
 278     public static TerminalOp<Integer, Integer>
 279     makeInt(int identity, IntBinaryOperator operator) {
 280         Objects.requireNonNull(operator);
 281         class ReducingSink
 282                 implements AccumulatingSink<Integer, Integer, ReducingSink>, Sink.OfInt {
 283             private int state;
 284 
 285             @Override
 286             public void begin(long size) {
 287                 state = identity;
 288             }
 289 
 290             @Override
 291             public void accept(int t) {
 292                 state = operator.applyAsInt(state, t);
 293             }
 294 
 295             @Override
 296             public Integer get() {
 297                 return state;
 298             }
 299 
 300             @Override
 301             public void combine(ReducingSink other) {
 302                 accept(other.state);
 303             }
 304         }
 305         return new ReduceOp<Integer, Integer, ReducingSink>(StreamShape.INT_VALUE) {
 306             @Override
 307             public ReducingSink makeSink() {
 308                 return new ReducingSink();
 309             }
 310         };
 311     }
 312 
 313     /**
 314      * Constructs a {@code TerminalOp} that implements a functional reduce on
 315      * {@code int} values, producing an optional integer result.
 316      *
 317      * @param operator the combining function
 318      * @return a {@code TerminalOp} implementing the reduction
 319      */
 320     public static TerminalOp<Integer, OptionalInt>
 321     makeInt(IntBinaryOperator operator) {
 322         Objects.requireNonNull(operator);
 323         class ReducingSink
 324                 implements AccumulatingSink<Integer, OptionalInt, ReducingSink>, Sink.OfInt {
 325             private boolean empty;
 326             private int state;
 327 
 328             public void begin(long size) {
 329                 empty = true;
 330                 state = 0;
 331             }
 332 
 333             @Override
 334             public void accept(int t) {
 335                 if (empty) {
 336                     empty = false;
 337                     state = t;
 338                 }
 339                 else {
 340                     state = operator.applyAsInt(state, t);
 341                 }
 342             }
 343 
 344             @Override
 345             public OptionalInt get() {
 346                 return empty ? OptionalInt.empty() : OptionalInt.of(state);
 347             }
 348 
 349             @Override
 350             public void combine(ReducingSink other) {
 351                 if (!other.empty)
 352                     accept(other.state);
 353             }
 354         }
 355         return new ReduceOp<Integer, OptionalInt, ReducingSink>(StreamShape.INT_VALUE) {
 356             @Override
 357             public ReducingSink makeSink() {
 358                 return new ReducingSink();
 359             }
 360         };
 361     }
 362 
 363     /**
 364      * Constructs a {@code TerminalOp} that implements a mutable reduce on
 365      * {@code int} values.
 366      *
 367      * @param <R> The type of the result
 368      * @param supplier a factory to produce a new accumulator of the result type
 369      * @param accumulator a function to incorporate an int into an
 370      *        accumulator
 371      * @param combiner a function to combine an accumulator into another
 372      * @return A {@code ReduceOp} implementing the reduction
 373      */
 374     public static <R> TerminalOp<Integer, R>
 375     makeInt(Supplier<R> supplier,
 376             ObjIntConsumer<R> accumulator,
 377             BinaryOperator<R> combiner) {
 378         Objects.requireNonNull(supplier);
 379         Objects.requireNonNull(accumulator);
 380         Objects.requireNonNull(combiner);
 381         class ReducingSink extends Box<R>
 382                 implements AccumulatingSink<Integer, R, ReducingSink>, Sink.OfInt {
 383             @Override
 384             public void begin(long size) {
 385                 state = supplier.get();
 386             }
 387 
 388             @Override
 389             public void accept(int t) {
 390                 accumulator.accept(state, t);
 391             }
 392 
 393             @Override
 394             public void combine(ReducingSink other) {
 395                 state = combiner.apply(state, other.state);
 396             }
 397         }
 398         return new ReduceOp<Integer, R, ReducingSink>(StreamShape.INT_VALUE) {
 399             @Override
 400             public ReducingSink makeSink() {
 401                 return new ReducingSink();
 402             }
 403         };
 404     }
 405 
 406     /**
 407      * Constructs a {@code TerminalOp} that counts the number of stream
 408      * elements.  If the size of the pipeline is known then count is the size
 409      * and there is no need to evaluate the pipeline.  If the size of the
 410      * pipeline is non known then count is produced, via reduction, using a
 411      * {@link CountingSink}.
 412      *
 413      * @return a {@code TerminalOp} implementing the counting
 414      */
 415     public static TerminalOp<Integer, Long>
 416     makeIntCounting() {
 417         return new ReduceOp<Integer, Long, CountingSink<Integer>>(StreamShape.INT_VALUE) {
 418             @Override
 419             public CountingSink<Integer> makeSink() { return new CountingSink.OfInt(); }
 420 
 421             @Override
 422             public <P_IN> Long evaluateSequential(PipelineHelper<Integer> helper,
 423                                                   Spliterator<P_IN> spliterator) {
 424                 if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags()))
 425                     return spliterator.getExactSizeIfKnown();
 426                 return super.evaluateSequential(helper, spliterator);
 427             }
 428 
 429             @Override
 430             public <P_IN> Long evaluateParallel(PipelineHelper<Integer> helper,
 431                                                 Spliterator<P_IN> spliterator) {
 432                 if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags()))
 433                     return spliterator.getExactSizeIfKnown();
 434                 return super.evaluateParallel(helper, spliterator);
 435             }
 436 
 437             @Override
 438             public int getOpFlags() {
 439                 return StreamOpFlag.NOT_ORDERED;
 440             }
 441         };
 442     }
 443 
 444     /**
 445      * Constructs a {@code TerminalOp} that implements a functional reduce on
 446      * {@code long} values.
 447      *
 448      * @param identity the identity for the combining function
 449      * @param operator the combining function
 450      * @return a {@code TerminalOp} implementing the reduction
 451      */
 452     public static TerminalOp<Long, Long>
 453     makeLong(long identity, LongBinaryOperator operator) {
 454         Objects.requireNonNull(operator);
 455         class ReducingSink
 456                 implements AccumulatingSink<Long, Long, ReducingSink>, Sink.OfLong {
 457             private long state;
 458 
 459             @Override
 460             public void begin(long size) {
 461                 state = identity;
 462             }
 463 
 464             @Override
 465             public void accept(long t) {
 466                 state = operator.applyAsLong(state, t);
 467             }
 468 
 469             @Override
 470             public Long get() {
 471                 return state;
 472             }
 473 
 474             @Override
 475             public void combine(ReducingSink other) {
 476                 accept(other.state);
 477             }
 478         }
 479         return new ReduceOp<Long, Long, ReducingSink>(StreamShape.LONG_VALUE) {
 480             @Override
 481             public ReducingSink makeSink() {
 482                 return new ReducingSink();
 483             }
 484         };
 485     }
 486 
 487     /**
 488      * Constructs a {@code TerminalOp} that implements a functional reduce on
 489      * {@code long} values, producing an optional long result.
 490      *
 491      * @param operator the combining function
 492      * @return a {@code TerminalOp} implementing the reduction
 493      */
 494     public static TerminalOp<Long, OptionalLong>
 495     makeLong(LongBinaryOperator operator) {
 496         Objects.requireNonNull(operator);
 497         class ReducingSink
 498                 implements AccumulatingSink<Long, OptionalLong, ReducingSink>, Sink.OfLong {
 499             private boolean empty;
 500             private long state;
 501 
 502             public void begin(long size) {
 503                 empty = true;
 504                 state = 0;
 505             }
 506 
 507             @Override
 508             public void accept(long t) {
 509                 if (empty) {
 510                     empty = false;
 511                     state = t;
 512                 }
 513                 else {
 514                     state = operator.applyAsLong(state, t);
 515                 }
 516             }
 517 
 518             @Override
 519             public OptionalLong get() {
 520                 return empty ? OptionalLong.empty() : OptionalLong.of(state);
 521             }
 522 
 523             @Override
 524             public void combine(ReducingSink other) {
 525                 if (!other.empty)
 526                     accept(other.state);
 527             }
 528         }
 529         return new ReduceOp<Long, OptionalLong, ReducingSink>(StreamShape.LONG_VALUE) {
 530             @Override
 531             public ReducingSink makeSink() {
 532                 return new ReducingSink();
 533             }
 534         };
 535     }
 536 
 537     /**
 538      * Constructs a {@code TerminalOp} that implements a mutable reduce on
 539      * {@code long} values.
 540      *
 541      * @param <R> the type of the result
 542      * @param supplier a factory to produce a new accumulator of the result type
 543      * @param accumulator a function to incorporate an int into an
 544      *        accumulator
 545      * @param combiner a function to combine an accumulator into another
 546      * @return a {@code TerminalOp} implementing the reduction
 547      */
 548     public static <R> TerminalOp<Long, R>
 549     makeLong(Supplier<R> supplier,
 550              ObjLongConsumer<R> accumulator,
 551              BinaryOperator<R> combiner) {
 552         Objects.requireNonNull(supplier);
 553         Objects.requireNonNull(accumulator);
 554         Objects.requireNonNull(combiner);
 555         class ReducingSink extends Box<R>
 556                 implements AccumulatingSink<Long, R, ReducingSink>, Sink.OfLong {
 557             @Override
 558             public void begin(long size) {
 559                 state = supplier.get();
 560             }
 561 
 562             @Override
 563             public void accept(long t) {
 564                 accumulator.accept(state, t);
 565             }
 566 
 567             @Override
 568             public void combine(ReducingSink other) {
 569                 state = combiner.apply(state, other.state);
 570             }
 571         }
 572         return new ReduceOp<Long, R, ReducingSink>(StreamShape.LONG_VALUE) {
 573             @Override
 574             public ReducingSink makeSink() {
 575                 return new ReducingSink();
 576             }
 577         };
 578     }
 579 
 580     /**
 581      * Constructs a {@code TerminalOp} that counts the number of stream
 582      * elements.  If the size of the pipeline is known then count is the size
 583      * and there is no need to evaluate the pipeline.  If the size of the
 584      * pipeline is non known then count is produced, via reduction, using a
 585      * {@link CountingSink}.
 586      *
 587      * @return a {@code TerminalOp} implementing the counting
 588      */
 589     public static TerminalOp<Long, Long>
 590     makeLongCounting() {
 591         return new ReduceOp<Long, Long, CountingSink<Long>>(StreamShape.LONG_VALUE) {
 592             @Override
 593             public CountingSink<Long> makeSink() { return new CountingSink.OfLong(); }
 594 
 595             @Override
 596             public <P_IN> Long evaluateSequential(PipelineHelper<Long> helper,
 597                                                   Spliterator<P_IN> spliterator) {
 598                 if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags()))
 599                     return spliterator.getExactSizeIfKnown();
 600                 return super.evaluateSequential(helper, spliterator);
 601             }
 602 
 603             @Override
 604             public <P_IN> Long evaluateParallel(PipelineHelper<Long> helper,
 605                                                 Spliterator<P_IN> spliterator) {
 606                 if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags()))
 607                     return spliterator.getExactSizeIfKnown();
 608                 return super.evaluateParallel(helper, spliterator);
 609             }
 610 
 611             @Override
 612             public int getOpFlags() {
 613                 return StreamOpFlag.NOT_ORDERED;
 614             }
 615         };
 616     }
 617 
 618     /**
 619      * Constructs a {@code TerminalOp} that implements a functional reduce on
 620      * {@code double} values.
 621      *
 622      * @param identity the identity for the combining function
 623      * @param operator the combining function
 624      * @return a {@code TerminalOp} implementing the reduction
 625      */
 626     public static TerminalOp<Double, Double>
 627     makeDouble(double identity, DoubleBinaryOperator operator) {
 628         Objects.requireNonNull(operator);
 629         class ReducingSink
 630                 implements AccumulatingSink<Double, Double, ReducingSink>, Sink.OfDouble {
 631             private double state;
 632 
 633             @Override
 634             public void begin(long size) {
 635                 state = identity;
 636             }
 637 
 638             @Override
 639             public void accept(double t) {
 640                 state = operator.applyAsDouble(state, t);
 641             }
 642 
 643             @Override
 644             public Double get() {
 645                 return state;
 646             }
 647 
 648             @Override
 649             public void combine(ReducingSink other) {
 650                 accept(other.state);
 651             }
 652         }
 653         return new ReduceOp<Double, Double, ReducingSink>(StreamShape.DOUBLE_VALUE) {
 654             @Override
 655             public ReducingSink makeSink() {
 656                 return new ReducingSink();
 657             }
 658         };
 659     }
 660 
 661     /**
 662      * Constructs a {@code TerminalOp} that implements a functional reduce on
 663      * {@code double} values, producing an optional double result.
 664      *
 665      * @param operator the combining function
 666      * @return a {@code TerminalOp} implementing the reduction
 667      */
 668     public static TerminalOp<Double, OptionalDouble>
 669     makeDouble(DoubleBinaryOperator operator) {
 670         Objects.requireNonNull(operator);
 671         class ReducingSink
 672                 implements AccumulatingSink<Double, OptionalDouble, ReducingSink>, Sink.OfDouble {
 673             private boolean empty;
 674             private double state;
 675 
 676             public void begin(long size) {
 677                 empty = true;
 678                 state = 0;
 679             }
 680 
 681             @Override
 682             public void accept(double t) {
 683                 if (empty) {
 684                     empty = false;
 685                     state = t;
 686                 }
 687                 else {
 688                     state = operator.applyAsDouble(state, t);
 689                 }
 690             }
 691 
 692             @Override
 693             public OptionalDouble get() {
 694                 return empty ? OptionalDouble.empty() : OptionalDouble.of(state);
 695             }
 696 
 697             @Override
 698             public void combine(ReducingSink other) {
 699                 if (!other.empty)
 700                     accept(other.state);
 701             }
 702         }
 703         return new ReduceOp<Double, OptionalDouble, ReducingSink>(StreamShape.DOUBLE_VALUE) {
 704             @Override
 705             public ReducingSink makeSink() {
 706                 return new ReducingSink();
 707             }
 708         };
 709     }
 710 
 711     /**
 712      * Constructs a {@code TerminalOp} that implements a mutable reduce on
 713      * {@code double} values.
 714      *
 715      * @param <R> the type of the result
 716      * @param supplier a factory to produce a new accumulator of the result type
 717      * @param accumulator a function to incorporate an int into an
 718      *        accumulator
 719      * @param combiner a function to combine an accumulator into another
 720      * @return a {@code TerminalOp} implementing the reduction
 721      */
 722     public static <R> TerminalOp<Double, R>
 723     makeDouble(Supplier<R> supplier,
 724                ObjDoubleConsumer<R> accumulator,
 725                BinaryOperator<R> combiner) {
 726         Objects.requireNonNull(supplier);
 727         Objects.requireNonNull(accumulator);
 728         Objects.requireNonNull(combiner);
 729         class ReducingSink extends Box<R>
 730                 implements AccumulatingSink<Double, R, ReducingSink>, Sink.OfDouble {
 731             @Override
 732             public void begin(long size) {
 733                 state = supplier.get();
 734             }
 735 
 736             @Override
 737             public void accept(double t) {
 738                 accumulator.accept(state, t);
 739             }
 740 
 741             @Override
 742             public void combine(ReducingSink other) {
 743                 state = combiner.apply(state, other.state);
 744             }
 745         }
 746         return new ReduceOp<Double, R, ReducingSink>(StreamShape.DOUBLE_VALUE) {
 747             @Override
 748             public ReducingSink makeSink() {
 749                 return new ReducingSink();
 750             }
 751         };
 752     }
 753 
 754     /**
 755      * Constructs a {@code TerminalOp} that counts the number of stream
 756      * elements.  If the size of the pipeline is known then count is the size
 757      * and there is no need to evaluate the pipeline.  If the size of the
 758      * pipeline is non known then count is produced, via reduction, using a
 759      * {@link CountingSink}.
 760      *
 761      * @return a {@code TerminalOp} implementing the counting
 762      */
 763     public static TerminalOp<Double, Long>
 764     makeDoubleCounting() {
 765         return new ReduceOp<Double, Long, CountingSink<Double>>(StreamShape.DOUBLE_VALUE) {
 766             @Override
 767             public CountingSink<Double> makeSink() { return new CountingSink.OfDouble(); }
 768 
 769             @Override
 770             public <P_IN> Long evaluateSequential(PipelineHelper<Double> helper,
 771                                                   Spliterator<P_IN> spliterator) {
 772                 if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags()))
 773                     return spliterator.getExactSizeIfKnown();
 774                 return super.evaluateSequential(helper, spliterator);
 775             }
 776 
 777             @Override
 778             public <P_IN> Long evaluateParallel(PipelineHelper<Double> helper,
 779                                                 Spliterator<P_IN> spliterator) {
 780                 if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags()))
 781                     return spliterator.getExactSizeIfKnown();
 782                 return super.evaluateParallel(helper, spliterator);
 783             }
 784 
 785             @Override
 786             public int getOpFlags() {
 787                 return StreamOpFlag.NOT_ORDERED;
 788             }
 789         };
 790     }
 791 
 792     /**
 793      * A sink that counts elements
 794      */
 795     abstract static class CountingSink<T>
 796             extends Box<Long>
 797             implements AccumulatingSink<T, Long, CountingSink<T>> {
 798         long count;
 799 
 800         @Override
 801         public void begin(long size) {
 802             count = 0L;
 803         }
 804 
 805         @Override
 806         public Long get() {
 807             return count;
 808         }
 809 
 810         @Override
 811         public void combine(CountingSink<T> other) {
 812             count += other.count;
 813         }
 814 
 815         static final class OfRef<T> extends CountingSink<T> {
 816             @Override
 817             public void accept(T t) {
 818                 count++;
 819             }
 820         }
 821 
 822         static final class OfInt extends CountingSink<Integer> implements Sink.OfInt {
 823             @Override
 824             public void accept(int t) {
 825                 count++;
 826             }
 827         }
 828 
 829         static final class OfLong extends CountingSink<Long> implements Sink.OfLong {
 830             @Override
 831             public void accept(long t) {
 832                 count++;
 833             }
 834         }
 835 
 836         static final class OfDouble extends CountingSink<Double> implements Sink.OfDouble {
 837             @Override
 838             public void accept(double t) {
 839                 count++;
 840             }
 841         }
 842     }
 843 
 844     /**
 845      * A type of {@code TerminalSink} that implements an associative reducing
 846      * operation on elements of type {@code T} and producing a result of type
 847      * {@code R}.
 848      *
 849      * @param <T> the type of input element to the combining operation
 850      * @param <R> the result type
 851      * @param <K> the type of the {@code AccumulatingSink}.
 852      */
 853     private interface AccumulatingSink<T, R, K extends AccumulatingSink<T, R, K>>
 854             extends TerminalSink<T, R> {
 855         void combine(K other);
 856     }
 857 
 858     /**
 859      * State box for a single state element, used as a base class for
 860      * {@code AccumulatingSink} instances
 861      *
 862      * @param <U> The type of the state element
 863      */
 864     private abstract static class Box<U> {
 865         U state;
 866 
 867         Box() {} // Avoid creation of special accessor
 868 
 869         public U get() {
 870             return state;
 871         }
 872     }
 873 
 874     /**
 875      * A {@code TerminalOp} that evaluates a stream pipeline and sends the
 876      * output into an {@code AccumulatingSink}, which performs a reduce
 877      * operation. The {@code AccumulatingSink} must represent an associative
 878      * reducing operation.
 879      *
 880      * @param <T> the output type of the stream pipeline
 881      * @param <R> the result type of the reducing operation
 882      * @param <S> the type of the {@code AccumulatingSink}
 883      */
 884     private abstract static class ReduceOp<T, R, S extends AccumulatingSink<T, R, S>>
 885             implements TerminalOp<T, R> {
 886         private final StreamShape inputShape;
 887 
 888         /**
 889          * Create a {@code ReduceOp} of the specified stream shape which uses
 890          * the specified {@code Supplier} to create accumulating sinks.
 891          *
 892          * @param shape The shape of the stream pipeline
 893          */
 894         ReduceOp(StreamShape shape) {
 895             inputShape = shape;
 896         }
 897 
 898         public abstract S makeSink();
 899 
 900         @Override
 901         public StreamShape inputShape() {
 902             return inputShape;
 903         }
 904 
 905         @Override
 906         public <P_IN> R evaluateSequential(PipelineHelper<T> helper,
 907                                            Spliterator<P_IN> spliterator) {
 908             return helper.wrapAndCopyInto(makeSink(), spliterator).get();
 909         }
 910 
 911         @Override
 912         public <P_IN> R evaluateParallel(PipelineHelper<T> helper,
 913                                          Spliterator<P_IN> spliterator) {
 914             return new ReduceTask<>(this, helper, spliterator).invoke().get();
 915         }
 916     }
 917 
 918     /**
 919      * A {@code ForkJoinTask} for performing a parallel reduce operation.
 920      */
 921     @SuppressWarnings("serial")
 922     private static final class ReduceTask<P_IN, P_OUT, R,
 923                                           S extends AccumulatingSink<P_OUT, R, S>>
 924             extends AbstractTask<P_IN, P_OUT, S, ReduceTask<P_IN, P_OUT, R, S>> {
 925         private final ReduceOp<P_OUT, R, S> op;
 926 
 927         ReduceTask(ReduceOp<P_OUT, R, S> op,
 928                    PipelineHelper<P_OUT> helper,
 929                    Spliterator<P_IN> spliterator) {
 930             super(helper, spliterator);
 931             this.op = op;
 932         }
 933 
 934         ReduceTask(ReduceTask<P_IN, P_OUT, R, S> parent,
 935                    Spliterator<P_IN> spliterator) {
 936             super(parent, spliterator);
 937             this.op = parent.op;
 938         }
 939 
 940         @Override
 941         protected ReduceTask<P_IN, P_OUT, R, S> makeChild(Spliterator<P_IN> spliterator) {
 942             return new ReduceTask<>(this, spliterator);
 943         }
 944 
 945         @Override
 946         protected S doLeaf() {
 947             return helper.wrapAndCopyInto(op.makeSink(), spliterator);
 948         }
 949 
 950         @Override
 951         public void onCompletion(CountedCompleter<?> caller) {
 952             if (!isLeaf()) {
 953                 S leftResult = leftChild.getLocalResult();
 954                 leftResult.combine(rightChild.getLocalResult());
 955                 setLocalResult(leftResult);
 956             }
 957             // GC spliterator, left and right child
 958             super.onCompletion(caller);
 959         }
 960     }
 961 }