1 /*
   2  * Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package java.util.stream;
  26 
  27 import java.util.Objects;
  28 import java.util.Optional;
  29 import java.util.OptionalDouble;
  30 import java.util.OptionalInt;
  31 import java.util.OptionalLong;
  32 import java.util.Spliterator;
  33 import java.util.concurrent.CountedCompleter;
  34 import java.util.function.BiConsumer;
  35 import java.util.function.BiFunction;
  36 import java.util.function.BinaryOperator;
  37 import java.util.function.DoubleBinaryOperator;
  38 import java.util.function.IntBinaryOperator;
  39 import java.util.function.LongBinaryOperator;
  40 import java.util.function.ObjDoubleConsumer;
  41 import java.util.function.ObjIntConsumer;
  42 import java.util.function.ObjLongConsumer;
  43 import java.util.function.Supplier;
  44 
  45 /**
  46  * Factory for creating instances of {@code TerminalOp} that implement
  47  * reductions.
  48  *
  49  * @since 1.8
  50  */
  51 final class ReduceOps {
  52 
  53     private ReduceOps() { }
  54 
  55     /**
  56      * Constructs a {@code TerminalOp} that implements a functional reduce on
  57      * reference values.
  58      *
  59      * @param <T> the type of the input elements
  60      * @param <U> the type of the result
  61      * @param seed the identity element for the reduction
  62      * @param reducer the accumulating function that incorporates an additional
  63      *        input element into the result
  64      * @param combiner the combining function that combines two intermediate
  65      *        results
  66      * @return a {@code TerminalOp} implementing the reduction
  67      */
  68     public static <T, U> TerminalOp<T, U>
  69     makeRef(U seed, BiFunction<U, ? super T, U> reducer, BinaryOperator<U> combiner) {
  70         Objects.requireNonNull(reducer);
  71         Objects.requireNonNull(combiner);
  72         class ReducingSink extends Box<U> implements AccumulatingSink<T, U, ReducingSink> {
  73             @Override
  74             public void begin(long size) {
  75                 state = seed;
  76             }
  77 
  78             @Override
  79             public void accept(T t) {
  80                 state = reducer.apply(state, t);
  81             }
  82 
  83             @Override
  84             public void combine(ReducingSink other) {
  85                 state = combiner.apply(state, other.state);
  86             }
  87         }
  88         return new ReduceOp<T, U, ReducingSink>(StreamShape.REFERENCE) {
  89             @Override
  90             public ReducingSink makeSink() {
  91                 return new ReducingSink();
  92             }
  93         };
  94     }
  95 
  96     /**
  97      * Constructs a {@code TerminalOp} that implements a functional reduce on
  98      * reference values producing an optional reference result.
  99      *
 100      * @param <T> The type of the input elements, and the type of the result
 101      * @param operator The reducing function
 102      * @return A {@code TerminalOp} implementing the reduction
 103      */
 104     public static <T> TerminalOp<T, Optional<T>>
 105     makeRef(BinaryOperator<T> operator) {
 106         Objects.requireNonNull(operator);
 107         class ReducingSink
 108                 implements AccumulatingSink<T, Optional<T>, ReducingSink> {
 109             private boolean empty;
 110             private T state;
 111 
 112             public void begin(long size) {
 113                 empty = true;
 114                 state = null;
 115             }
 116 
 117             @Override
 118             public void accept(T t) {
 119                 if (empty) {
 120                     empty = false;
 121                     state = t;
 122                 } else {
 123                     state = operator.apply(state, t);
 124                 }
 125             }
 126 
 127             @Override
 128             public Optional<T> get() {
 129                 return empty ? Optional.empty() : Optional.of(state);
 130             }
 131 
 132             @Override
 133             public void combine(ReducingSink other) {
 134                 if (!other.empty)
 135                     accept(other.state);
 136             }
 137         }
 138         return new ReduceOp<T, Optional<T>, ReducingSink>(StreamShape.REFERENCE) {
 139             @Override
 140             public ReducingSink makeSink() {
 141                 return new ReducingSink();
 142             }
 143         };
 144     }
 145 
 146     /**
 147      * Constructs a {@code TerminalOp} that implements a mutable reduce on
 148      * reference values.
 149      *
 150      * @param <T> the type of the input elements
 151      * @param <I> the type of the intermediate reduction result
 152      * @param collector a {@code Collector} defining the reduction
 153      * @return a {@code ReduceOp} implementing the reduction
 154      */
 155     public static <T, I> TerminalOp<T, I>
 156     makeRef(Collector<? super T, I, ?> collector) {
 157         BiConsumer<I, ? super T> accumulator = Objects.requireNonNull(collector).accumulator();
 158         BinaryOperator<I> combiner = collector.combiner();
 159         class ReducingSink extends Box<I>
 160                 implements AccumulatingSink<T, I, ReducingSink> {
 161             @Override
 162             public void begin(long size) {
 163                 if (size == -1)
 164                     state = collector.supplier().get();
 165                 else
 166                     state = collector.sizedSupplier().apply((int) size);
 167             }
 168 
 169             @Override
 170             public void accept(T t) {
 171                 accumulator.accept(state, t);
 172             }
 173 
 174             @Override
 175             public void combine(ReducingSink other) {
 176                 state = combiner.apply(state, other.state);
 177             }
 178         }
 179         return new ReduceOp<T, I, ReducingSink>(StreamShape.REFERENCE) {
 180             @Override
 181             public ReducingSink makeSink() {
 182                 return new ReducingSink();
 183             }
 184 
 185             @Override
 186             public int getOpFlags() {
 187                 return collector.characteristics().contains(Collector.Characteristics.UNORDERED)
 188                        ? StreamOpFlag.NOT_ORDERED
 189                        : 0;
 190             }
 191         };
 192     }
 193 
 194     /**
 195      * Constructs a {@code TerminalOp} that implements a mutable reduce on
 196      * reference values.
 197      *
 198      * @param <T> the type of the input elements
 199      * @param <R> the type of the result
 200      * @param seedFactory a factory to produce a new base accumulator
 201      * @param accumulator a function to incorporate an element into an
 202      *        accumulator
 203      * @param reducer a function to combine an accumulator into another
 204      * @return a {@code TerminalOp} implementing the reduction
 205      */
 206     public static <T, R> TerminalOp<T, R>
 207     makeRef(Supplier<R> seedFactory,
 208             BiConsumer<R, ? super T> accumulator,
 209             BiConsumer<R,R> reducer) {
 210         Objects.requireNonNull(seedFactory);
 211         Objects.requireNonNull(accumulator);
 212         Objects.requireNonNull(reducer);
 213         class ReducingSink extends Box<R>
 214                 implements AccumulatingSink<T, R, ReducingSink> {
 215             @Override
 216             public void begin(long size) {
 217                 state = seedFactory.get();
 218             }
 219 
 220             @Override
 221             public void accept(T t) {
 222                 accumulator.accept(state, t);
 223             }
 224 
 225             @Override
 226             public void combine(ReducingSink other) {
 227                 reducer.accept(state, other.state);
 228             }
 229         }
 230         return new ReduceOp<T, R, ReducingSink>(StreamShape.REFERENCE) {
 231             @Override
 232             public ReducingSink makeSink() {
 233                 return new ReducingSink();
 234             }
 235         };
 236     }
 237 
 238     /**
 239      * Constructs a {@code TerminalOp} that counts the number of stream
 240      * elements.  If the size of the pipeline is known then count is the size
 241      * and there is no need to evaluate the pipeline.  If the size of the
 242      * pipeline is non known then count is produced, via reduction, using a
 243      * {@link CountingSink}.
 244      *
 245      * @param <T> the type of the input elements
 246      * @return a {@code TerminalOp} implementing the counting
 247      */
 248     public static <T> TerminalOp<T, Long>
 249     makeRefCounting() {
 250         return new ReduceOp<T, Long, CountingSink<T>>(StreamShape.REFERENCE) {
 251             @Override
 252             public CountingSink<T> makeSink() { return new CountingSink.OfRef<>(); }
 253 
 254             @Override
 255             public <P_IN> Long evaluateSequential(PipelineHelper<T> helper,
 256                                                   Spliterator<P_IN> spliterator) {
 257                 if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags()))
 258                     return spliterator.getExactSizeIfKnown();
 259                 return super.evaluateSequential(helper, spliterator);
 260             }
 261 
 262             @Override
 263             public <P_IN> Long evaluateParallel(PipelineHelper<T> helper,
 264                                                 Spliterator<P_IN> spliterator) {
 265                 if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags()))
 266                     return spliterator.getExactSizeIfKnown();
 267                 return super.evaluateParallel(helper, spliterator);
 268             }
 269 
 270             @Override
 271             public int getOpFlags() {
 272                 return StreamOpFlag.NOT_ORDERED;
 273             }
 274         };
 275     }
 276 
 277     /**
 278      * Constructs a {@code TerminalOp} that implements a functional reduce on
 279      * {@code int} values.
 280      *
 281      * @param identity the identity for the combining function
 282      * @param operator the combining function
 283      * @return a {@code TerminalOp} implementing the reduction
 284      */
 285     public static TerminalOp<Integer, Integer>
 286     makeInt(int identity, IntBinaryOperator operator) {
 287         Objects.requireNonNull(operator);
 288         class ReducingSink
 289                 implements AccumulatingSink<Integer, Integer, ReducingSink>, Sink.OfInt {
 290             private int state;
 291 
 292             @Override
 293             public void begin(long size) {
 294                 state = identity;
 295             }
 296 
 297             @Override
 298             public void accept(int t) {
 299                 state = operator.applyAsInt(state, t);
 300             }
 301 
 302             @Override
 303             public Integer get() {
 304                 return state;
 305             }
 306 
 307             @Override
 308             public void combine(ReducingSink other) {
 309                 accept(other.state);
 310             }
 311         }
 312         return new ReduceOp<Integer, Integer, ReducingSink>(StreamShape.INT_VALUE) {
 313             @Override
 314             public ReducingSink makeSink() {
 315                 return new ReducingSink();
 316             }
 317         };
 318     }
 319 
 320     /**
 321      * Constructs a {@code TerminalOp} that implements a functional reduce on
 322      * {@code int} values, producing an optional integer result.
 323      *
 324      * @param operator the combining function
 325      * @return a {@code TerminalOp} implementing the reduction
 326      */
 327     public static TerminalOp<Integer, OptionalInt>
 328     makeInt(IntBinaryOperator operator) {
 329         Objects.requireNonNull(operator);
 330         class ReducingSink
 331                 implements AccumulatingSink<Integer, OptionalInt, ReducingSink>, Sink.OfInt {
 332             private boolean empty;
 333             private int state;
 334 
 335             public void begin(long size) {
 336                 empty = true;
 337                 state = 0;
 338             }
 339 
 340             @Override
 341             public void accept(int t) {
 342                 if (empty) {
 343                     empty = false;
 344                     state = t;
 345                 }
 346                 else {
 347                     state = operator.applyAsInt(state, t);
 348                 }
 349             }
 350 
 351             @Override
 352             public OptionalInt get() {
 353                 return empty ? OptionalInt.empty() : OptionalInt.of(state);
 354             }
 355 
 356             @Override
 357             public void combine(ReducingSink other) {
 358                 if (!other.empty)
 359                     accept(other.state);
 360             }
 361         }
 362         return new ReduceOp<Integer, OptionalInt, ReducingSink>(StreamShape.INT_VALUE) {
 363             @Override
 364             public ReducingSink makeSink() {
 365                 return new ReducingSink();
 366             }
 367         };
 368     }
 369 
 370     /**
 371      * Constructs a {@code TerminalOp} that implements a mutable reduce on
 372      * {@code int} values.
 373      *
 374      * @param <R> The type of the result
 375      * @param supplier a factory to produce a new accumulator of the result type
 376      * @param accumulator a function to incorporate an int into an
 377      *        accumulator
 378      * @param combiner a function to combine an accumulator into another
 379      * @return A {@code ReduceOp} implementing the reduction
 380      */
 381     public static <R> TerminalOp<Integer, R>
 382     makeInt(Supplier<R> supplier,
 383             ObjIntConsumer<R> accumulator,
 384             BinaryOperator<R> combiner) {
 385         Objects.requireNonNull(supplier);
 386         Objects.requireNonNull(accumulator);
 387         Objects.requireNonNull(combiner);
 388         class ReducingSink extends Box<R>
 389                 implements AccumulatingSink<Integer, R, ReducingSink>, Sink.OfInt {
 390             @Override
 391             public void begin(long size) {
 392                 state = supplier.get();
 393             }
 394 
 395             @Override
 396             public void accept(int t) {
 397                 accumulator.accept(state, t);
 398             }
 399 
 400             @Override
 401             public void combine(ReducingSink other) {
 402                 state = combiner.apply(state, other.state);
 403             }
 404         }
 405         return new ReduceOp<Integer, R, ReducingSink>(StreamShape.INT_VALUE) {
 406             @Override
 407             public ReducingSink makeSink() {
 408                 return new ReducingSink();
 409             }
 410         };
 411     }
 412 
 413     /**
 414      * Constructs a {@code TerminalOp} that counts the number of stream
 415      * elements.  If the size of the pipeline is known then count is the size
 416      * and there is no need to evaluate the pipeline.  If the size of the
 417      * pipeline is non known then count is produced, via reduction, using a
 418      * {@link CountingSink}.
 419      *
 420      * @return a {@code TerminalOp} implementing the counting
 421      */
 422     public static TerminalOp<Integer, Long>
 423     makeIntCounting() {
 424         return new ReduceOp<Integer, Long, CountingSink<Integer>>(StreamShape.INT_VALUE) {
 425             @Override
 426             public CountingSink<Integer> makeSink() { return new CountingSink.OfInt(); }
 427 
 428             @Override
 429             public <P_IN> Long evaluateSequential(PipelineHelper<Integer> helper,
 430                                                   Spliterator<P_IN> spliterator) {
 431                 if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags()))
 432                     return spliterator.getExactSizeIfKnown();
 433                 return super.evaluateSequential(helper, spliterator);
 434             }
 435 
 436             @Override
 437             public <P_IN> Long evaluateParallel(PipelineHelper<Integer> helper,
 438                                                 Spliterator<P_IN> spliterator) {
 439                 if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags()))
 440                     return spliterator.getExactSizeIfKnown();
 441                 return super.evaluateParallel(helper, spliterator);
 442             }
 443 
 444             @Override
 445             public int getOpFlags() {
 446                 return StreamOpFlag.NOT_ORDERED;
 447             }
 448         };
 449     }
 450 
 451     /**
 452      * Constructs a {@code TerminalOp} that implements a functional reduce on
 453      * {@code long} values.
 454      *
 455      * @param identity the identity for the combining function
 456      * @param operator the combining function
 457      * @return a {@code TerminalOp} implementing the reduction
 458      */
 459     public static TerminalOp<Long, Long>
 460     makeLong(long identity, LongBinaryOperator operator) {
 461         Objects.requireNonNull(operator);
 462         class ReducingSink
 463                 implements AccumulatingSink<Long, Long, ReducingSink>, Sink.OfLong {
 464             private long state;
 465 
 466             @Override
 467             public void begin(long size) {
 468                 state = identity;
 469             }
 470 
 471             @Override
 472             public void accept(long t) {
 473                 state = operator.applyAsLong(state, t);
 474             }
 475 
 476             @Override
 477             public Long get() {
 478                 return state;
 479             }
 480 
 481             @Override
 482             public void combine(ReducingSink other) {
 483                 accept(other.state);
 484             }
 485         }
 486         return new ReduceOp<Long, Long, ReducingSink>(StreamShape.LONG_VALUE) {
 487             @Override
 488             public ReducingSink makeSink() {
 489                 return new ReducingSink();
 490             }
 491         };
 492     }
 493 
 494     /**
 495      * Constructs a {@code TerminalOp} that implements a functional reduce on
 496      * {@code long} values, producing an optional long result.
 497      *
 498      * @param operator the combining function
 499      * @return a {@code TerminalOp} implementing the reduction
 500      */
 501     public static TerminalOp<Long, OptionalLong>
 502     makeLong(LongBinaryOperator operator) {
 503         Objects.requireNonNull(operator);
 504         class ReducingSink
 505                 implements AccumulatingSink<Long, OptionalLong, ReducingSink>, Sink.OfLong {
 506             private boolean empty;
 507             private long state;
 508 
 509             public void begin(long size) {
 510                 empty = true;
 511                 state = 0;
 512             }
 513 
 514             @Override
 515             public void accept(long t) {
 516                 if (empty) {
 517                     empty = false;
 518                     state = t;
 519                 }
 520                 else {
 521                     state = operator.applyAsLong(state, t);
 522                 }
 523             }
 524 
 525             @Override
 526             public OptionalLong get() {
 527                 return empty ? OptionalLong.empty() : OptionalLong.of(state);
 528             }
 529 
 530             @Override
 531             public void combine(ReducingSink other) {
 532                 if (!other.empty)
 533                     accept(other.state);
 534             }
 535         }
 536         return new ReduceOp<Long, OptionalLong, ReducingSink>(StreamShape.LONG_VALUE) {
 537             @Override
 538             public ReducingSink makeSink() {
 539                 return new ReducingSink();
 540             }
 541         };
 542     }
 543 
 544     /**
 545      * Constructs a {@code TerminalOp} that implements a mutable reduce on
 546      * {@code long} values.
 547      *
 548      * @param <R> the type of the result
 549      * @param supplier a factory to produce a new accumulator of the result type
 550      * @param accumulator a function to incorporate an int into an
 551      *        accumulator
 552      * @param combiner a function to combine an accumulator into another
 553      * @return a {@code TerminalOp} implementing the reduction
 554      */
 555     public static <R> TerminalOp<Long, R>
 556     makeLong(Supplier<R> supplier,
 557              ObjLongConsumer<R> accumulator,
 558              BinaryOperator<R> combiner) {
 559         Objects.requireNonNull(supplier);
 560         Objects.requireNonNull(accumulator);
 561         Objects.requireNonNull(combiner);
 562         class ReducingSink extends Box<R>
 563                 implements AccumulatingSink<Long, R, ReducingSink>, Sink.OfLong {
 564             @Override
 565             public void begin(long size) {
 566                 state = supplier.get();
 567             }
 568 
 569             @Override
 570             public void accept(long t) {
 571                 accumulator.accept(state, t);
 572             }
 573 
 574             @Override
 575             public void combine(ReducingSink other) {
 576                 state = combiner.apply(state, other.state);
 577             }
 578         }
 579         return new ReduceOp<Long, R, ReducingSink>(StreamShape.LONG_VALUE) {
 580             @Override
 581             public ReducingSink makeSink() {
 582                 return new ReducingSink();
 583             }
 584         };
 585     }
 586 
 587     /**
 588      * Constructs a {@code TerminalOp} that counts the number of stream
 589      * elements.  If the size of the pipeline is known then count is the size
 590      * and there is no need to evaluate the pipeline.  If the size of the
 591      * pipeline is non known then count is produced, via reduction, using a
 592      * {@link CountingSink}.
 593      *
 594      * @return a {@code TerminalOp} implementing the counting
 595      */
 596     public static TerminalOp<Long, Long>
 597     makeLongCounting() {
 598         return new ReduceOp<Long, Long, CountingSink<Long>>(StreamShape.LONG_VALUE) {
 599             @Override
 600             public CountingSink<Long> makeSink() { return new CountingSink.OfLong(); }
 601 
 602             @Override
 603             public <P_IN> Long evaluateSequential(PipelineHelper<Long> helper,
 604                                                   Spliterator<P_IN> spliterator) {
 605                 if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags()))
 606                     return spliterator.getExactSizeIfKnown();
 607                 return super.evaluateSequential(helper, spliterator);
 608             }
 609 
 610             @Override
 611             public <P_IN> Long evaluateParallel(PipelineHelper<Long> helper,
 612                                                 Spliterator<P_IN> spliterator) {
 613                 if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags()))
 614                     return spliterator.getExactSizeIfKnown();
 615                 return super.evaluateParallel(helper, spliterator);
 616             }
 617 
 618             @Override
 619             public int getOpFlags() {
 620                 return StreamOpFlag.NOT_ORDERED;
 621             }
 622         };
 623     }
 624 
 625     /**
 626      * Constructs a {@code TerminalOp} that implements a functional reduce on
 627      * {@code double} values.
 628      *
 629      * @param identity the identity for the combining function
 630      * @param operator the combining function
 631      * @return a {@code TerminalOp} implementing the reduction
 632      */
 633     public static TerminalOp<Double, Double>
 634     makeDouble(double identity, DoubleBinaryOperator operator) {
 635         Objects.requireNonNull(operator);
 636         class ReducingSink
 637                 implements AccumulatingSink<Double, Double, ReducingSink>, Sink.OfDouble {
 638             private double state;
 639 
 640             @Override
 641             public void begin(long size) {
 642                 state = identity;
 643             }
 644 
 645             @Override
 646             public void accept(double t) {
 647                 state = operator.applyAsDouble(state, t);
 648             }
 649 
 650             @Override
 651             public Double get() {
 652                 return state;
 653             }
 654 
 655             @Override
 656             public void combine(ReducingSink other) {
 657                 accept(other.state);
 658             }
 659         }
 660         return new ReduceOp<Double, Double, ReducingSink>(StreamShape.DOUBLE_VALUE) {
 661             @Override
 662             public ReducingSink makeSink() {
 663                 return new ReducingSink();
 664             }
 665         };
 666     }
 667 
 668     /**
 669      * Constructs a {@code TerminalOp} that implements a functional reduce on
 670      * {@code double} values, producing an optional double result.
 671      *
 672      * @param operator the combining function
 673      * @return a {@code TerminalOp} implementing the reduction
 674      */
 675     public static TerminalOp<Double, OptionalDouble>
 676     makeDouble(DoubleBinaryOperator operator) {
 677         Objects.requireNonNull(operator);
 678         class ReducingSink
 679                 implements AccumulatingSink<Double, OptionalDouble, ReducingSink>, Sink.OfDouble {
 680             private boolean empty;
 681             private double state;
 682 
 683             public void begin(long size) {
 684                 empty = true;
 685                 state = 0;
 686             }
 687 
 688             @Override
 689             public void accept(double t) {
 690                 if (empty) {
 691                     empty = false;
 692                     state = t;
 693                 }
 694                 else {
 695                     state = operator.applyAsDouble(state, t);
 696                 }
 697             }
 698 
 699             @Override
 700             public OptionalDouble get() {
 701                 return empty ? OptionalDouble.empty() : OptionalDouble.of(state);
 702             }
 703 
 704             @Override
 705             public void combine(ReducingSink other) {
 706                 if (!other.empty)
 707                     accept(other.state);
 708             }
 709         }
 710         return new ReduceOp<Double, OptionalDouble, ReducingSink>(StreamShape.DOUBLE_VALUE) {
 711             @Override
 712             public ReducingSink makeSink() {
 713                 return new ReducingSink();
 714             }
 715         };
 716     }
 717 
 718     /**
 719      * Constructs a {@code TerminalOp} that implements a mutable reduce on
 720      * {@code double} values.
 721      *
 722      * @param <R> the type of the result
 723      * @param supplier a factory to produce a new accumulator of the result type
 724      * @param accumulator a function to incorporate an int into an
 725      *        accumulator
 726      * @param combiner a function to combine an accumulator into another
 727      * @return a {@code TerminalOp} implementing the reduction
 728      */
 729     public static <R> TerminalOp<Double, R>
 730     makeDouble(Supplier<R> supplier,
 731                ObjDoubleConsumer<R> accumulator,
 732                BinaryOperator<R> combiner) {
 733         Objects.requireNonNull(supplier);
 734         Objects.requireNonNull(accumulator);
 735         Objects.requireNonNull(combiner);
 736         class ReducingSink extends Box<R>
 737                 implements AccumulatingSink<Double, R, ReducingSink>, Sink.OfDouble {
 738             @Override
 739             public void begin(long size) {
 740                 state = supplier.get();
 741             }
 742 
 743             @Override
 744             public void accept(double t) {
 745                 accumulator.accept(state, t);
 746             }
 747 
 748             @Override
 749             public void combine(ReducingSink other) {
 750                 state = combiner.apply(state, other.state);
 751             }
 752         }
 753         return new ReduceOp<Double, R, ReducingSink>(StreamShape.DOUBLE_VALUE) {
 754             @Override
 755             public ReducingSink makeSink() {
 756                 return new ReducingSink();
 757             }
 758         };
 759     }
 760 
 761     /**
 762      * Constructs a {@code TerminalOp} that counts the number of stream
 763      * elements.  If the size of the pipeline is known then count is the size
 764      * and there is no need to evaluate the pipeline.  If the size of the
 765      * pipeline is non known then count is produced, via reduction, using a
 766      * {@link CountingSink}.
 767      *
 768      * @return a {@code TerminalOp} implementing the counting
 769      */
 770     public static TerminalOp<Double, Long>
 771     makeDoubleCounting() {
 772         return new ReduceOp<Double, Long, CountingSink<Double>>(StreamShape.DOUBLE_VALUE) {
 773             @Override
 774             public CountingSink<Double> makeSink() { return new CountingSink.OfDouble(); }
 775 
 776             @Override
 777             public <P_IN> Long evaluateSequential(PipelineHelper<Double> helper,
 778                                                   Spliterator<P_IN> spliterator) {
 779                 if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags()))
 780                     return spliterator.getExactSizeIfKnown();
 781                 return super.evaluateSequential(helper, spliterator);
 782             }
 783 
 784             @Override
 785             public <P_IN> Long evaluateParallel(PipelineHelper<Double> helper,
 786                                                 Spliterator<P_IN> spliterator) {
 787                 if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags()))
 788                     return spliterator.getExactSizeIfKnown();
 789                 return super.evaluateParallel(helper, spliterator);
 790             }
 791 
 792             @Override
 793             public int getOpFlags() {
 794                 return StreamOpFlag.NOT_ORDERED;
 795             }
 796         };
 797     }
 798 
 799     /**
 800      * A sink that counts elements
 801      */
 802     abstract static class CountingSink<T>
 803             extends Box<Long>
 804             implements AccumulatingSink<T, Long, CountingSink<T>> {
 805         long count;
 806 
 807         @Override
 808         public void begin(long size) {
 809             count = 0L;
 810         }
 811 
 812         @Override
 813         public Long get() {
 814             return count;
 815         }
 816 
 817         @Override
 818         public void combine(CountingSink<T> other) {
 819             count += other.count;
 820         }
 821 
 822         static final class OfRef<T> extends CountingSink<T> {
 823             @Override
 824             public void accept(T t) {
 825                 count++;
 826             }
 827         }
 828 
 829         static final class OfInt extends CountingSink<Integer> implements Sink.OfInt {
 830             @Override
 831             public void accept(int t) {
 832                 count++;
 833             }
 834         }
 835 
 836         static final class OfLong extends CountingSink<Long> implements Sink.OfLong {
 837             @Override
 838             public void accept(long t) {
 839                 count++;
 840             }
 841         }
 842 
 843         static final class OfDouble extends CountingSink<Double> implements Sink.OfDouble {
 844             @Override
 845             public void accept(double t) {
 846                 count++;
 847             }
 848         }
 849     }
 850 
 851     /**
 852      * A type of {@code TerminalSink} that implements an associative reducing
 853      * operation on elements of type {@code T} and producing a result of type
 854      * {@code R}.
 855      *
 856      * @param <T> the type of input element to the combining operation
 857      * @param <R> the result type
 858      * @param <K> the type of the {@code AccumulatingSink}.
 859      */
 860     private interface AccumulatingSink<T, R, K extends AccumulatingSink<T, R, K>>
 861             extends TerminalSink<T, R> {
 862         void combine(K other);
 863     }
 864 
 865     /**
 866      * State box for a single state element, used as a base class for
 867      * {@code AccumulatingSink} instances
 868      *
 869      * @param <U> The type of the state element
 870      */
 871     private abstract static class Box<U> {
 872         U state;
 873 
 874         Box() {} // Avoid creation of special accessor
 875 
 876         public U get() {
 877             return state;
 878         }
 879     }
 880 
 881     /**
 882      * A {@code TerminalOp} that evaluates a stream pipeline and sends the
 883      * output into an {@code AccumulatingSink}, which performs a reduce
 884      * operation. The {@code AccumulatingSink} must represent an associative
 885      * reducing operation.
 886      *
 887      * @param <T> the output type of the stream pipeline
 888      * @param <R> the result type of the reducing operation
 889      * @param <S> the type of the {@code AccumulatingSink}
 890      */
 891     private abstract static class ReduceOp<T, R, S extends AccumulatingSink<T, R, S>>
 892             implements TerminalOp<T, R> {
 893         private final StreamShape inputShape;
 894 
 895         /**
 896          * Create a {@code ReduceOp} of the specified stream shape which uses
 897          * the specified {@code Supplier} to create accumulating sinks.
 898          *
 899          * @param shape The shape of the stream pipeline
 900          */
 901         ReduceOp(StreamShape shape) {
 902             inputShape = shape;
 903         }
 904 
 905         public abstract S makeSink();
 906 
 907         @Override
 908         public StreamShape inputShape() {
 909             return inputShape;
 910         }
 911 
 912         @Override
 913         public <P_IN> R evaluateSequential(PipelineHelper<T> helper,
 914                                            Spliterator<P_IN> spliterator) {
 915             return helper.wrapAndCopyInto(makeSink(), spliterator).get();
 916         }
 917 
 918         @Override
 919         public <P_IN> R evaluateParallel(PipelineHelper<T> helper,
 920                                          Spliterator<P_IN> spliterator) {
 921             return new ReduceTask<>(this, helper, spliterator).invoke().get();
 922         }
 923     }
 924 
 925     /**
 926      * A {@code ForkJoinTask} for performing a parallel reduce operation.
 927      */
 928     @SuppressWarnings("serial")
 929     private static final class ReduceTask<P_IN, P_OUT, R,
 930                                           S extends AccumulatingSink<P_OUT, R, S>>
 931             extends AbstractTask<P_IN, P_OUT, S, ReduceTask<P_IN, P_OUT, R, S>> {
 932         private final ReduceOp<P_OUT, R, S> op;
 933 
 934         ReduceTask(ReduceOp<P_OUT, R, S> op,
 935                    PipelineHelper<P_OUT> helper,
 936                    Spliterator<P_IN> spliterator) {
 937             super(helper, spliterator);
 938             this.op = op;
 939         }
 940 
 941         ReduceTask(ReduceTask<P_IN, P_OUT, R, S> parent,
 942                    Spliterator<P_IN> spliterator) {
 943             super(parent, spliterator);
 944             this.op = parent.op;
 945         }
 946 
 947         @Override
 948         protected ReduceTask<P_IN, P_OUT, R, S> makeChild(Spliterator<P_IN> spliterator) {
 949             return new ReduceTask<>(this, spliterator);
 950         }
 951 
 952         @Override
 953         protected S doLeaf() {
 954             return helper.wrapAndCopyInto(op.makeSink(), spliterator);
 955         }
 956 
 957         @Override
 958         public void onCompletion(CountedCompleter<?> caller) {
 959             if (!isLeaf()) {
 960                 S leftResult = leftChild.getLocalResult();
 961                 leftResult.combine(rightChild.getLocalResult());
 962                 setLocalResult(leftResult);
 963             }
 964             // GC spliterator, left and right child
 965             super.onCompletion(caller);
 966         }
 967     }
 968 }