1 /*
   2  * Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package java.util.stream;
  26 
  27 import java.util.Objects;
  28 import java.util.Optional;
  29 import java.util.OptionalDouble;
  30 import java.util.OptionalInt;
  31 import java.util.OptionalLong;
  32 import java.util.Spliterator;
  33 import java.util.concurrent.CountedCompleter;
  34 import java.util.function.BiConsumer;
  35 import java.util.function.BiFunction;
  36 import java.util.function.BinaryOperator;
  37 import java.util.function.DoubleBinaryOperator;
  38 import java.util.function.IntBinaryOperator;
  39 import java.util.function.LongBinaryOperator;
  40 import java.util.function.ObjDoubleConsumer;
  41 import java.util.function.ObjIntConsumer;
  42 import java.util.function.ObjLongConsumer;
  43 import java.util.function.Supplier;
  44 
  45 /**
  46  * Factory for creating instances of {@code TerminalOp} that implement
  47  * reductions.
  48  *
  49  * @since 1.8
  50  */
  51 final class ReduceOps {
  52 
  53     private ReduceOps() { }
  54 
  55     /**
  56      * Constructs a {@code TerminalOp} that implements a functional reduce on
  57      * reference values.
  58      *
  59      * @param <T> the type of the input elements
  60      * @param <U> the type of the result
  61      * @param seed the identity element for the reduction
  62      * @param reducer the accumulating function that incorporates an additional
  63      *        input element into the result
  64      * @param combiner the combining function that combines two intermediate
  65      *        results
  66      * @return a {@code TerminalOp} implementing the reduction
  67      */
  68     public static <T, U> TerminalOp<T, U>
  69     makeRef(U seed, BiFunction<U, ? super T, U> reducer, BinaryOperator<U> combiner) {
  70         Objects.requireNonNull(reducer);
  71         Objects.requireNonNull(combiner);
  72         class ReducingSink extends Box<U> implements AccumulatingSink<T, U, ReducingSink> {
  73             @Override
  74             public void begin(long size) {
  75                 state = seed;
  76             }
  77 
  78             @Override
  79             public void accept(T t) {
  80                 state = reducer.apply(state, t);
  81             }
  82 
  83             @Override
  84             public void combine(ReducingSink other) {
  85                 state = combiner.apply(state, other.state);
  86             }
  87         }
  88         return new ReduceOp<T, U, ReducingSink>(StreamShape.REFERENCE) {
  89             @Override
  90             public ReducingSink makeSink() {
  91                 return new ReducingSink();
  92             }
  93         };
  94     }
  95 
  96     /**
  97      * Constructs a {@code TerminalOp} that implements a functional reduce on
  98      * reference values producing an optional reference result.
  99      *
 100      * @param <T> The type of the input elements, and the type of the result
 101      * @param operator The reducing function
 102      * @return A {@code TerminalOp} implementing the reduction
 103      */
 104     public static <T> TerminalOp<T, Optional<T>>
 105     makeRef(BinaryOperator<T> operator) {
 106         Objects.requireNonNull(operator);
 107         class ReducingSink
 108                 implements AccumulatingSink<T, Optional<T>, ReducingSink> {
 109             private boolean empty;
 110             private T state;
 111 
 112             public void begin(long size) {
 113                 empty = true;
 114                 state = null;
 115             }
 116 
 117             @Override
 118             public void accept(T t) {
 119                 if (empty) {
 120                     empty = false;
 121                     state = t;
 122                 } else {
 123                     state = operator.apply(state, t);
 124                 }
 125             }
 126 
 127             @Override
 128             public Optional<T> get() {
 129                 return empty ? Optional.empty() : Optional.of(state);
 130             }
 131 
 132             @Override
 133             public void combine(ReducingSink other) {
 134                 if (!other.empty)
 135                     accept(other.state);
 136             }
 137         }
 138         return new ReduceOp<T, Optional<T>, ReducingSink>(StreamShape.REFERENCE) {
 139             @Override
 140             public ReducingSink makeSink() {
 141                 return new ReducingSink();
 142             }
 143         };
 144     }
 145 
 146     /**
 147      * Constructs a {@code TerminalOp} that implements a mutable reduce on
 148      * reference values.
 149      *
 150      * @param <T> the type of the input elements
 151      * @param <I> the type of the intermediate reduction result
 152      * @param collector a {@code Collector} defining the reduction
 153      * @return a {@code ReduceOp} implementing the reduction
 154      */
 155     public static <T, I> TerminalOp<T, I>
 156     makeRef(Collector<? super T, I, ?> collector) {
 157         Supplier<I> supplier = Objects.requireNonNull(collector).supplier();
 158         BiConsumer<I, ? super T> accumulator = collector.accumulator();
 159         BinaryOperator<I> combiner = collector.combiner();
 160         class ReducingSink extends Box<I>
 161                 implements AccumulatingSink<T, I, ReducingSink> {
 162             @Override
 163             public void begin(long size) {
 164                 state = supplier.get();
 165             }
 166 
 167             @Override
 168             public void accept(T t) {
 169                 accumulator.accept(state, t);
 170             }
 171 
 172             @Override
 173             public void combine(ReducingSink other) {
 174                 state = combiner.apply(state, other.state);
 175             }
 176         }
 177         return new ReduceOp<T, I, ReducingSink>(StreamShape.REFERENCE) {
 178             @Override
 179             public ReducingSink makeSink() {
 180                 return new ReducingSink();
 181             }
 182 
 183             @Override
 184             public int getOpFlags() {
 185                 return collector.characteristics().contains(Collector.Characteristics.UNORDERED)
 186                        ? StreamOpFlag.NOT_ORDERED
 187                        : 0;
 188             }
 189         };
 190     }
 191 
 192     /**
 193      * Constructs a {@code TerminalOp} that implements a mutable reduce on
 194      * reference values.
 195      *
 196      * @param <T> the type of the input elements
 197      * @param <R> the type of the result
 198      * @param seedFactory a factory to produce a new base accumulator
 199      * @param accumulator a function to incorporate an element into an
 200      *        accumulator
 201      * @param reducer a function to combine an accumulator into another
 202      * @return a {@code TerminalOp} implementing the reduction
 203      */
 204     public static <T, R> TerminalOp<T, R>
 205     makeRef(Supplier<R> seedFactory,
 206             BiConsumer<R, ? super T> accumulator,
 207             BiConsumer<R,R> reducer) {
 208         Objects.requireNonNull(seedFactory);
 209         Objects.requireNonNull(accumulator);
 210         Objects.requireNonNull(reducer);
 211         class ReducingSink extends Box<R>
 212                 implements AccumulatingSink<T, R, ReducingSink> {
 213             @Override
 214             public void begin(long size) {
 215                 state = seedFactory.get();
 216             }
 217 
 218             @Override
 219             public void accept(T t) {
 220                 accumulator.accept(state, t);
 221             }
 222 
 223             @Override
 224             public void combine(ReducingSink other) {
 225                 reducer.accept(state, other.state);
 226             }
 227         }
 228         return new ReduceOp<T, R, ReducingSink>(StreamShape.REFERENCE) {
 229             @Override
 230             public ReducingSink makeSink() {
 231                 return new ReducingSink();
 232             }
 233         };
 234     }
 235 
 236     /**
 237      * Constructs a {@code TerminalOp} that counts the number of stream
 238      * elements.  If the size of the pipeline is known then count is the size
 239      * and there is no need to evaluate the pipeline.  If the size of the
 240      * pipeline is non known then count is produced, via reduction, using a
 241      * {@link CountingSink}.
 242      *
 243      * @param <T> the type of the input elements
 244      * @return a {@code TerminalOp} implementing the counting
 245      */
 246     public static <T> TerminalOp<T, Long>
 247     makeRefCounting() {
 248         return new ReduceOp<T, Long, CountingSink<T>>(StreamShape.REFERENCE) {
 249             @Override
 250             public CountingSink<T> makeSink() { return new CountingSink.OfRef<>(); }
 251 
 252             @Override
 253             public <P_IN> Long evaluateSequential(PipelineHelper<T> helper,
 254                                                   Spliterator<P_IN> spliterator) {
 255                 if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags()))
 256                     return spliterator.getExactSizeIfKnown();
 257                 return super.evaluateSequential(helper, spliterator);
 258             }
 259 
 260             @Override
 261             public <P_IN> Long evaluateParallel(PipelineHelper<T> helper,
 262                                                 Spliterator<P_IN> spliterator) {
 263                 if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags()))
 264                     return spliterator.getExactSizeIfKnown();
 265                 return super.evaluateParallel(helper, spliterator);
 266             }
 267 
 268             @Override
 269             public int getOpFlags() {
 270                 return StreamOpFlag.NOT_ORDERED;
 271             }
 272         };
 273     }
 274 
 275     /**
 276      * Constructs a {@code TerminalOp} that implements a functional reduce on
 277      * {@code int} values.
 278      *
 279      * @param identity the identity for the combining function
 280      * @param operator the combining function
 281      * @return a {@code TerminalOp} implementing the reduction
 282      */
 283     public static TerminalOp<Integer, Integer>
 284     makeInt(int identity, IntBinaryOperator operator) {
 285         Objects.requireNonNull(operator);
 286         class ReducingSink
 287                 implements AccumulatingSink<Integer, Integer, ReducingSink>, Sink.OfInt {
 288             private int state;
 289 
 290             @Override
 291             public void begin(long size) {
 292                 state = identity;
 293             }
 294 
 295             @Override
 296             public void accept(int t) {
 297                 state = operator.applyAsInt(state, t);
 298             }
 299 
 300             @Override
 301             public Integer get() {
 302                 return state;
 303             }
 304 
 305             @Override
 306             public void combine(ReducingSink other) {
 307                 accept(other.state);
 308             }
 309         }
 310         return new ReduceOp<Integer, Integer, ReducingSink>(StreamShape.INT_VALUE) {
 311             @Override
 312             public ReducingSink makeSink() {
 313                 return new ReducingSink();
 314             }
 315         };
 316     }
 317 
 318     /**
 319      * Constructs a {@code TerminalOp} that implements a functional reduce on
 320      * {@code int} values, producing an optional integer result.
 321      *
 322      * @param operator the combining function
 323      * @return a {@code TerminalOp} implementing the reduction
 324      */
 325     public static TerminalOp<Integer, OptionalInt>
 326     makeInt(IntBinaryOperator operator) {
 327         Objects.requireNonNull(operator);
 328         class ReducingSink
 329                 implements AccumulatingSink<Integer, OptionalInt, ReducingSink>, Sink.OfInt {
 330             private boolean empty;
 331             private int state;
 332 
 333             public void begin(long size) {
 334                 empty = true;
 335                 state = 0;
 336             }
 337 
 338             @Override
 339             public void accept(int t) {
 340                 if (empty) {
 341                     empty = false;
 342                     state = t;
 343                 }
 344                 else {
 345                     state = operator.applyAsInt(state, t);
 346                 }
 347             }
 348 
 349             @Override
 350             public OptionalInt get() {
 351                 return empty ? OptionalInt.empty() : OptionalInt.of(state);
 352             }
 353 
 354             @Override
 355             public void combine(ReducingSink other) {
 356                 if (!other.empty)
 357                     accept(other.state);
 358             }
 359         }
 360         return new ReduceOp<Integer, OptionalInt, ReducingSink>(StreamShape.INT_VALUE) {
 361             @Override
 362             public ReducingSink makeSink() {
 363                 return new ReducingSink();
 364             }
 365         };
 366     }
 367 
 368     /**
 369      * Constructs a {@code TerminalOp} that implements a mutable reduce on
 370      * {@code int} values.
 371      *
 372      * @param <R> The type of the result
 373      * @param supplier a factory to produce a new accumulator of the result type
 374      * @param accumulator a function to incorporate an int into an
 375      *        accumulator
 376      * @param combiner a function to combine an accumulator into another
 377      * @return A {@code ReduceOp} implementing the reduction
 378      */
 379     public static <R> TerminalOp<Integer, R>
 380     makeInt(Supplier<R> supplier,
 381             ObjIntConsumer<R> accumulator,
 382             BinaryOperator<R> combiner) {
 383         Objects.requireNonNull(supplier);
 384         Objects.requireNonNull(accumulator);
 385         Objects.requireNonNull(combiner);
 386         class ReducingSink extends Box<R>
 387                 implements AccumulatingSink<Integer, R, ReducingSink>, Sink.OfInt {
 388             @Override
 389             public void begin(long size) {
 390                 state = supplier.get();
 391             }
 392 
 393             @Override
 394             public void accept(int t) {
 395                 accumulator.accept(state, t);
 396             }
 397 
 398             @Override
 399             public void combine(ReducingSink other) {
 400                 state = combiner.apply(state, other.state);
 401             }
 402         }
 403         return new ReduceOp<Integer, R, ReducingSink>(StreamShape.INT_VALUE) {
 404             @Override
 405             public ReducingSink makeSink() {
 406                 return new ReducingSink();
 407             }
 408         };
 409     }
 410 
 411     /**
 412      * Constructs a {@code TerminalOp} that counts the number of stream
 413      * elements.  If the size of the pipeline is known then count is the size
 414      * and there is no need to evaluate the pipeline.  If the size of the
 415      * pipeline is non known then count is produced, via reduction, using a
 416      * {@link CountingSink}.
 417      *
 418      * @return a {@code TerminalOp} implementing the counting
 419      */
 420     public static TerminalOp<Integer, Long>
 421     makeIntCounting() {
 422         return new ReduceOp<Integer, Long, CountingSink<Integer>>(StreamShape.INT_VALUE) {
 423             @Override
 424             public CountingSink<Integer> makeSink() { return new CountingSink.OfInt(); }
 425 
 426             @Override
 427             public <P_IN> Long evaluateSequential(PipelineHelper<Integer> helper,
 428                                                   Spliterator<P_IN> spliterator) {
 429                 if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags()))
 430                     return spliterator.getExactSizeIfKnown();
 431                 return super.evaluateSequential(helper, spliterator);
 432             }
 433 
 434             @Override
 435             public <P_IN> Long evaluateParallel(PipelineHelper<Integer> helper,
 436                                                 Spliterator<P_IN> spliterator) {
 437                 if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags()))
 438                     return spliterator.getExactSizeIfKnown();
 439                 return super.evaluateParallel(helper, spliterator);
 440             }
 441 
 442             @Override
 443             public int getOpFlags() {
 444                 return StreamOpFlag.NOT_ORDERED;
 445             }
 446         };
 447     }
 448 
 449     /**
 450      * Constructs a {@code TerminalOp} that implements a functional reduce on
 451      * {@code long} values.
 452      *
 453      * @param identity the identity for the combining function
 454      * @param operator the combining function
 455      * @return a {@code TerminalOp} implementing the reduction
 456      */
 457     public static TerminalOp<Long, Long>
 458     makeLong(long identity, LongBinaryOperator operator) {
 459         Objects.requireNonNull(operator);
 460         class ReducingSink
 461                 implements AccumulatingSink<Long, Long, ReducingSink>, Sink.OfLong {
 462             private long state;
 463 
 464             @Override
 465             public void begin(long size) {
 466                 state = identity;
 467             }
 468 
 469             @Override
 470             public void accept(long t) {
 471                 state = operator.applyAsLong(state, t);
 472             }
 473 
 474             @Override
 475             public Long get() {
 476                 return state;
 477             }
 478 
 479             @Override
 480             public void combine(ReducingSink other) {
 481                 accept(other.state);
 482             }
 483         }
 484         return new ReduceOp<Long, Long, ReducingSink>(StreamShape.LONG_VALUE) {
 485             @Override
 486             public ReducingSink makeSink() {
 487                 return new ReducingSink();
 488             }
 489         };
 490     }
 491 
 492     /**
 493      * Constructs a {@code TerminalOp} that implements a functional reduce on
 494      * {@code long} values, producing an optional long result.
 495      *
 496      * @param operator the combining function
 497      * @return a {@code TerminalOp} implementing the reduction
 498      */
 499     public static TerminalOp<Long, OptionalLong>
 500     makeLong(LongBinaryOperator operator) {
 501         Objects.requireNonNull(operator);
 502         class ReducingSink
 503                 implements AccumulatingSink<Long, OptionalLong, ReducingSink>, Sink.OfLong {
 504             private boolean empty;
 505             private long state;
 506 
 507             public void begin(long size) {
 508                 empty = true;
 509                 state = 0;
 510             }
 511 
 512             @Override
 513             public void accept(long t) {
 514                 if (empty) {
 515                     empty = false;
 516                     state = t;
 517                 }
 518                 else {
 519                     state = operator.applyAsLong(state, t);
 520                 }
 521             }
 522 
 523             @Override
 524             public OptionalLong get() {
 525                 return empty ? OptionalLong.empty() : OptionalLong.of(state);
 526             }
 527 
 528             @Override
 529             public void combine(ReducingSink other) {
 530                 if (!other.empty)
 531                     accept(other.state);
 532             }
 533         }
 534         return new ReduceOp<Long, OptionalLong, ReducingSink>(StreamShape.LONG_VALUE) {
 535             @Override
 536             public ReducingSink makeSink() {
 537                 return new ReducingSink();
 538             }
 539         };
 540     }
 541 
 542     /**
 543      * Constructs a {@code TerminalOp} that implements a mutable reduce on
 544      * {@code long} values.
 545      *
 546      * @param <R> the type of the result
 547      * @param supplier a factory to produce a new accumulator of the result type
 548      * @param accumulator a function to incorporate an int into an
 549      *        accumulator
 550      * @param combiner a function to combine an accumulator into another
 551      * @return a {@code TerminalOp} implementing the reduction
 552      */
 553     public static <R> TerminalOp<Long, R>
 554     makeLong(Supplier<R> supplier,
 555              ObjLongConsumer<R> accumulator,
 556              BinaryOperator<R> combiner) {
 557         Objects.requireNonNull(supplier);
 558         Objects.requireNonNull(accumulator);
 559         Objects.requireNonNull(combiner);
 560         class ReducingSink extends Box<R>
 561                 implements AccumulatingSink<Long, R, ReducingSink>, Sink.OfLong {
 562             @Override
 563             public void begin(long size) {
 564                 state = supplier.get();
 565             }
 566 
 567             @Override
 568             public void accept(long t) {
 569                 accumulator.accept(state, t);
 570             }
 571 
 572             @Override
 573             public void combine(ReducingSink other) {
 574                 state = combiner.apply(state, other.state);
 575             }
 576         }
 577         return new ReduceOp<Long, R, ReducingSink>(StreamShape.LONG_VALUE) {
 578             @Override
 579             public ReducingSink makeSink() {
 580                 return new ReducingSink();
 581             }
 582         };
 583     }
 584 
 585     /**
 586      * Constructs a {@code TerminalOp} that counts the number of stream
 587      * elements.  If the size of the pipeline is known then count is the size
 588      * and there is no need to evaluate the pipeline.  If the size of the
 589      * pipeline is non known then count is produced, via reduction, using a
 590      * {@link CountingSink}.
 591      *
 592      * @return a {@code TerminalOp} implementing the counting
 593      */
 594     public static TerminalOp<Long, Long>
 595     makeLongCounting() {
 596         return new ReduceOp<Long, Long, CountingSink<Long>>(StreamShape.LONG_VALUE) {
 597             @Override
 598             public CountingSink<Long> makeSink() { return new CountingSink.OfLong(); }
 599 
 600             @Override
 601             public <P_IN> Long evaluateSequential(PipelineHelper<Long> helper,
 602                                                   Spliterator<P_IN> spliterator) {
 603                 if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags()))
 604                     return spliterator.getExactSizeIfKnown();
 605                 return super.evaluateSequential(helper, spliterator);
 606             }
 607 
 608             @Override
 609             public <P_IN> Long evaluateParallel(PipelineHelper<Long> helper,
 610                                                 Spliterator<P_IN> spliterator) {
 611                 if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags()))
 612                     return spliterator.getExactSizeIfKnown();
 613                 return super.evaluateParallel(helper, spliterator);
 614             }
 615 
 616             @Override
 617             public int getOpFlags() {
 618                 return StreamOpFlag.NOT_ORDERED;
 619             }
 620         };
 621     }
 622 
 623     /**
 624      * Constructs a {@code TerminalOp} that implements a functional reduce on
 625      * {@code double} values.
 626      *
 627      * @param identity the identity for the combining function
 628      * @param operator the combining function
 629      * @return a {@code TerminalOp} implementing the reduction
 630      */
 631     public static TerminalOp<Double, Double>
 632     makeDouble(double identity, DoubleBinaryOperator operator) {
 633         Objects.requireNonNull(operator);
 634         class ReducingSink
 635                 implements AccumulatingSink<Double, Double, ReducingSink>, Sink.OfDouble {
 636             private double state;
 637 
 638             @Override
 639             public void begin(long size) {
 640                 state = identity;
 641             }
 642 
 643             @Override
 644             public void accept(double t) {
 645                 state = operator.applyAsDouble(state, t);
 646             }
 647 
 648             @Override
 649             public Double get() {
 650                 return state;
 651             }
 652 
 653             @Override
 654             public void combine(ReducingSink other) {
 655                 accept(other.state);
 656             }
 657         }
 658         return new ReduceOp<Double, Double, ReducingSink>(StreamShape.DOUBLE_VALUE) {
 659             @Override
 660             public ReducingSink makeSink() {
 661                 return new ReducingSink();
 662             }
 663         };
 664     }
 665 
 666     /**
 667      * Constructs a {@code TerminalOp} that implements a functional reduce on
 668      * {@code double} values, producing an optional double result.
 669      *
 670      * @param operator the combining function
 671      * @return a {@code TerminalOp} implementing the reduction
 672      */
 673     public static TerminalOp<Double, OptionalDouble>
 674     makeDouble(DoubleBinaryOperator operator) {
 675         Objects.requireNonNull(operator);
 676         class ReducingSink
 677                 implements AccumulatingSink<Double, OptionalDouble, ReducingSink>, Sink.OfDouble {
 678             private boolean empty;
 679             private double state;
 680 
 681             public void begin(long size) {
 682                 empty = true;
 683                 state = 0;
 684             }
 685 
 686             @Override
 687             public void accept(double t) {
 688                 if (empty) {
 689                     empty = false;
 690                     state = t;
 691                 }
 692                 else {
 693                     state = operator.applyAsDouble(state, t);
 694                 }
 695             }
 696 
 697             @Override
 698             public OptionalDouble get() {
 699                 return empty ? OptionalDouble.empty() : OptionalDouble.of(state);
 700             }
 701 
 702             @Override
 703             public void combine(ReducingSink other) {
 704                 if (!other.empty)
 705                     accept(other.state);
 706             }
 707         }
 708         return new ReduceOp<Double, OptionalDouble, ReducingSink>(StreamShape.DOUBLE_VALUE) {
 709             @Override
 710             public ReducingSink makeSink() {
 711                 return new ReducingSink();
 712             }
 713         };
 714     }
 715 
 716     /**
 717      * Constructs a {@code TerminalOp} that implements a mutable reduce on
 718      * {@code double} values.
 719      *
 720      * @param <R> the type of the result
 721      * @param supplier a factory to produce a new accumulator of the result type
 722      * @param accumulator a function to incorporate an int into an
 723      *        accumulator
 724      * @param combiner a function to combine an accumulator into another
 725      * @return a {@code TerminalOp} implementing the reduction
 726      */
 727     public static <R> TerminalOp<Double, R>
 728     makeDouble(Supplier<R> supplier,
 729                ObjDoubleConsumer<R> accumulator,
 730                BinaryOperator<R> combiner) {
 731         Objects.requireNonNull(supplier);
 732         Objects.requireNonNull(accumulator);
 733         Objects.requireNonNull(combiner);
 734         class ReducingSink extends Box<R>
 735                 implements AccumulatingSink<Double, R, ReducingSink>, Sink.OfDouble {
 736             @Override
 737             public void begin(long size) {
 738                 state = supplier.get();
 739             }
 740 
 741             @Override
 742             public void accept(double t) {
 743                 accumulator.accept(state, t);
 744             }
 745 
 746             @Override
 747             public void combine(ReducingSink other) {
 748                 state = combiner.apply(state, other.state);
 749             }
 750         }
 751         return new ReduceOp<Double, R, ReducingSink>(StreamShape.DOUBLE_VALUE) {
 752             @Override
 753             public ReducingSink makeSink() {
 754                 return new ReducingSink();
 755             }
 756         };
 757     }
 758 
 759     /**
 760      * Constructs a {@code TerminalOp} that counts the number of stream
 761      * elements.  If the size of the pipeline is known then count is the size
 762      * and there is no need to evaluate the pipeline.  If the size of the
 763      * pipeline is non known then count is produced, via reduction, using a
 764      * {@link CountingSink}.
 765      *
 766      * @return a {@code TerminalOp} implementing the counting
 767      */
 768     public static TerminalOp<Double, Long>
 769     makeDoubleCounting() {
 770         return new ReduceOp<Double, Long, CountingSink<Double>>(StreamShape.DOUBLE_VALUE) {
 771             @Override
 772             public CountingSink<Double> makeSink() { return new CountingSink.OfDouble(); }
 773 
 774             @Override
 775             public <P_IN> Long evaluateSequential(PipelineHelper<Double> helper,
 776                                                   Spliterator<P_IN> spliterator) {
 777                 if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags()))
 778                     return spliterator.getExactSizeIfKnown();
 779                 return super.evaluateSequential(helper, spliterator);
 780             }
 781 
 782             @Override
 783             public <P_IN> Long evaluateParallel(PipelineHelper<Double> helper,
 784                                                 Spliterator<P_IN> spliterator) {
 785                 if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags()))
 786                     return spliterator.getExactSizeIfKnown();
 787                 return super.evaluateParallel(helper, spliterator);
 788             }
 789 
 790             @Override
 791             public int getOpFlags() {
 792                 return StreamOpFlag.NOT_ORDERED;
 793             }
 794         };
 795     }
 796 
 797     /**
 798      * A sink that counts elements
 799      */
 800     abstract static class CountingSink<T>
 801             extends Box<Long>
 802             implements AccumulatingSink<T, Long, CountingSink<T>> {
 803         long count;
 804 
 805         @Override
 806         public void begin(long size) {
 807             count = 0L;
 808         }
 809 
 810         @Override
 811         public Long get() {
 812             return count;
 813         }
 814 
 815         @Override
 816         public void combine(CountingSink<T> other) {
 817             count += other.count;
 818         }
 819 
 820         static final class OfRef<T> extends CountingSink<T> {
 821             @Override
 822             public void accept(T t) {
 823                 count++;
 824             }
 825         }
 826 
 827         static final class OfInt extends CountingSink<Integer> implements Sink.OfInt {
 828             @Override
 829             public void accept(int t) {
 830                 count++;
 831             }
 832         }
 833 
 834         static final class OfLong extends CountingSink<Long> implements Sink.OfLong {
 835             @Override
 836             public void accept(long t) {
 837                 count++;
 838             }
 839         }
 840 
 841         static final class OfDouble extends CountingSink<Double> implements Sink.OfDouble {
 842             @Override
 843             public void accept(double t) {
 844                 count++;
 845             }
 846         }
 847     }
 848 
 849     /**
 850      * A type of {@code TerminalSink} that implements an associative reducing
 851      * operation on elements of type {@code T} and producing a result of type
 852      * {@code R}.
 853      *
 854      * @param <T> the type of input element to the combining operation
 855      * @param <R> the result type
 856      * @param <K> the type of the {@code AccumulatingSink}.
 857      */
 858     private interface AccumulatingSink<T, R, K extends AccumulatingSink<T, R, K>>
 859             extends TerminalSink<T, R> {
 860         void combine(K other);
 861     }
 862 
 863     /**
 864      * State box for a single state element, used as a base class for
 865      * {@code AccumulatingSink} instances
 866      *
 867      * @param <U> The type of the state element
 868      */
 869     private abstract static class Box<U> {
 870         U state;
 871 
 872         Box() {} // Avoid creation of special accessor
 873 
 874         public U get() {
 875             return state;
 876         }
 877     }
 878 
 879     /**
 880      * A {@code TerminalOp} that evaluates a stream pipeline and sends the
 881      * output into an {@code AccumulatingSink}, which performs a reduce
 882      * operation. The {@code AccumulatingSink} must represent an associative
 883      * reducing operation.
 884      *
 885      * @param <T> the output type of the stream pipeline
 886      * @param <R> the result type of the reducing operation
 887      * @param <S> the type of the {@code AccumulatingSink}
 888      */
 889     private abstract static class ReduceOp<T, R, S extends AccumulatingSink<T, R, S>>
 890             implements TerminalOp<T, R> {
 891         private final StreamShape inputShape;
 892 
 893         /**
 894          * Create a {@code ReduceOp} of the specified stream shape which uses
 895          * the specified {@code Supplier} to create accumulating sinks.
 896          *
 897          * @param shape The shape of the stream pipeline
 898          */
 899         ReduceOp(StreamShape shape) {
 900             inputShape = shape;
 901         }
 902 
 903         public abstract S makeSink();
 904 
 905         @Override
 906         public StreamShape inputShape() {
 907             return inputShape;
 908         }
 909 
 910         @Override
 911         public <P_IN> R evaluateSequential(PipelineHelper<T> helper,
 912                                            Spliterator<P_IN> spliterator) {
 913             return helper.wrapAndCopyInto(makeSink(), spliterator).get();
 914         }
 915 
 916         @Override
 917         public <P_IN> R evaluateParallel(PipelineHelper<T> helper,
 918                                          Spliterator<P_IN> spliterator) {
 919             return new ReduceTask<>(this, helper, spliterator).invoke().get();
 920         }
 921     }
 922 
 923     /**
 924      * A {@code ForkJoinTask} for performing a parallel reduce operation.
 925      */
 926     @SuppressWarnings("serial")
 927     private static final class ReduceTask<P_IN, P_OUT, R,
 928                                           S extends AccumulatingSink<P_OUT, R, S>>
 929             extends AbstractTask<P_IN, P_OUT, S, ReduceTask<P_IN, P_OUT, R, S>> {
 930         private final ReduceOp<P_OUT, R, S> op;
 931 
 932         ReduceTask(ReduceOp<P_OUT, R, S> op,
 933                    PipelineHelper<P_OUT> helper,
 934                    Spliterator<P_IN> spliterator) {
 935             super(helper, spliterator);
 936             this.op = op;
 937         }
 938 
 939         ReduceTask(ReduceTask<P_IN, P_OUT, R, S> parent,
 940                    Spliterator<P_IN> spliterator) {
 941             super(parent, spliterator);
 942             this.op = parent.op;
 943         }
 944 
 945         @Override
 946         protected ReduceTask<P_IN, P_OUT, R, S> makeChild(Spliterator<P_IN> spliterator) {
 947             return new ReduceTask<>(this, spliterator);
 948         }
 949 
 950         @Override
 951         protected S doLeaf() {
 952             return helper.wrapAndCopyInto(op.makeSink(), spliterator);
 953         }
 954 
 955         @Override
 956         public void onCompletion(CountedCompleter<?> caller) {
 957             if (!isLeaf()) {
 958                 S leftResult = leftChild.getLocalResult();
 959                 leftResult.combine(rightChild.getLocalResult());
 960                 setLocalResult(leftResult);
 961             }
 962             // GC spliterator, left and right child
 963             super.onCompletion(caller);
 964         }
 965     }
 966 }