1 /* 2 * Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.util.stream; 26 27 import java.util.Objects; 28 import java.util.Optional; 29 import java.util.OptionalDouble; 30 import java.util.OptionalInt; 31 import java.util.OptionalLong; 32 import java.util.Spliterator; 33 import java.util.concurrent.CountedCompleter; 34 import java.util.function.BiConsumer; 35 import java.util.function.BiFunction; 36 import java.util.function.BinaryOperator; 37 import java.util.function.DoubleBinaryOperator; 38 import java.util.function.IntBinaryOperator; 39 import java.util.function.LongBinaryOperator; 40 import java.util.function.ObjDoubleConsumer; 41 import java.util.function.ObjIntConsumer; 42 import java.util.function.ObjLongConsumer; 43 import java.util.function.Supplier; 44 45 /** 46 * Factory for creating instances of {@code TerminalOp} that implement 47 * reductions. 48 * 49 * @since 1.8 50 */ 51 final class ReduceOps { 52 53 private ReduceOps() { } 54 55 /** 56 * Constructs a {@code TerminalOp} that implements a functional reduce on 57 * reference values. 58 * 59 * @param <T> the type of the input elements 60 * @param <U> the type of the result 61 * @param seed the identity element for the reduction 62 * @param reducer the accumulating function that incorporates an additional 63 * input element into the result 64 * @param combiner the combining function that combines two intermediate 65 * results 66 * @return a {@code TerminalOp} implementing the reduction 67 */ 68 public static <T, U> TerminalOp<T, U> 69 makeRef(U seed, BiFunction<U, ? super T, U> reducer, BinaryOperator<U> combiner) { 70 Objects.requireNonNull(reducer); 71 Objects.requireNonNull(combiner); 72 class ReducingSink extends Box<U> implements AccumulatingSink<T, U, ReducingSink> { 73 @Override 74 public void begin(long size) { 75 state = seed; 76 } 77 78 @Override 79 public void accept(T t) { 80 state = reducer.apply(state, t); 81 } 82 83 @Override 84 public void combine(ReducingSink other) { 85 state = combiner.apply(state, other.state); 86 } 87 } 88 return new ReduceOp<T, U, ReducingSink>(StreamShape.REFERENCE) { 89 @Override 90 public ReducingSink makeSink() { 91 return new ReducingSink(); 92 } 93 }; 94 } 95 96 /** 97 * Constructs a {@code TerminalOp} that implements a functional reduce on 98 * reference values producing an optional reference result. 99 * 100 * @param <T> The type of the input elements, and the type of the result 101 * @param operator The reducing function 102 * @return A {@code TerminalOp} implementing the reduction 103 */ 104 public static <T> TerminalOp<T, Optional<T>> 105 makeRef(BinaryOperator<T> operator) { 106 Objects.requireNonNull(operator); 107 class ReducingSink 108 implements AccumulatingSink<T, Optional<T>, ReducingSink> { 109 private boolean empty; 110 private T state; 111 112 public void begin(long size) { 113 empty = true; 114 state = null; 115 } 116 117 @Override 118 public void accept(T t) { 119 if (empty) { 120 empty = false; 121 state = t; 122 } else { 123 state = operator.apply(state, t); 124 } 125 } 126 127 @Override 128 public Optional<T> get() { 129 return empty ? Optional.empty() : Optional.of(state); 130 } 131 132 @Override 133 public void combine(ReducingSink other) { 134 if (!other.empty) 135 accept(other.state); 136 } 137 } 138 return new ReduceOp<T, Optional<T>, ReducingSink>(StreamShape.REFERENCE) { 139 @Override 140 public ReducingSink makeSink() { 141 return new ReducingSink(); 142 } 143 }; 144 } 145 146 /** 147 * Constructs a {@code TerminalOp} that implements a mutable reduce on 148 * reference values. 149 * 150 * @param <T> the type of the input elements 151 * @param <I> the type of the intermediate reduction result 152 * @param collector a {@code Collector} defining the reduction 153 * @return a {@code ReduceOp} implementing the reduction 154 */ 155 public static <T, I> TerminalOp<T, I> 156 makeRef(Collector<? super T, I, ?> collector) { 157 Supplier<I> supplier = Objects.requireNonNull(collector).supplier(); 158 BiConsumer<I, ? super T> accumulator = collector.accumulator(); 159 BinaryOperator<I> combiner = collector.combiner(); 160 class ReducingSink extends Box<I> 161 implements AccumulatingSink<T, I, ReducingSink> { 162 @Override 163 public void begin(long size) { 164 state = supplier.get(); 165 } 166 167 @Override 168 public void accept(T t) { 169 accumulator.accept(state, t); 170 } 171 172 @Override 173 public void combine(ReducingSink other) { 174 state = combiner.apply(state, other.state); 175 } 176 } 177 return new ReduceOp<T, I, ReducingSink>(StreamShape.REFERENCE) { 178 @Override 179 public ReducingSink makeSink() { 180 return new ReducingSink(); 181 } 182 183 @Override 184 public int getOpFlags() { 185 return collector.characteristics().contains(Collector.Characteristics.UNORDERED) 186 ? StreamOpFlag.NOT_ORDERED 187 : 0; 188 } 189 }; 190 } 191 192 /** 193 * Constructs a {@code TerminalOp} that implements a mutable reduce on 194 * reference values. 195 * 196 * @param <T> the type of the input elements 197 * @param <R> the type of the result 198 * @param seedFactory a factory to produce a new base accumulator 199 * @param accumulator a function to incorporate an element into an 200 * accumulator 201 * @param reducer a function to combine an accumulator into another 202 * @return a {@code TerminalOp} implementing the reduction 203 */ 204 public static <T, R> TerminalOp<T, R> 205 makeRef(Supplier<R> seedFactory, 206 BiConsumer<R, ? super T> accumulator, 207 BiConsumer<R,R> reducer) { 208 Objects.requireNonNull(seedFactory); 209 Objects.requireNonNull(accumulator); 210 Objects.requireNonNull(reducer); 211 class ReducingSink extends Box<R> 212 implements AccumulatingSink<T, R, ReducingSink> { 213 @Override 214 public void begin(long size) { 215 state = seedFactory.get(); 216 } 217 218 @Override 219 public void accept(T t) { 220 accumulator.accept(state, t); 221 } 222 223 @Override 224 public void combine(ReducingSink other) { 225 reducer.accept(state, other.state); 226 } 227 } 228 return new ReduceOp<T, R, ReducingSink>(StreamShape.REFERENCE) { 229 @Override 230 public ReducingSink makeSink() { 231 return new ReducingSink(); 232 } 233 }; 234 } 235 236 /** 237 * Constructs a {@code TerminalOp} that counts the number of stream 238 * elements. If the size of the pipeline is known then count is the size 239 * and there is no need to evaluate the pipeline. If the size of the 240 * pipeline is non known then count is produced, via reduction, using a 241 * {@link CountingSink}. 242 * 243 * @param <T> the type of the input elements 244 * @return a {@code TerminalOp} implementing the counting 245 */ 246 public static <T> TerminalOp<T, Long> 247 makeRefCounting() { 248 return new ReduceOp<T, Long, CountingSink<T>>(StreamShape.REFERENCE) { 249 @Override 250 public CountingSink<T> makeSink() { return new CountingSink.OfRef<>(); } 251 252 @Override 253 public <P_IN> Long evaluateSequential(PipelineHelper<T> helper, 254 Spliterator<P_IN> spliterator) { 255 if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags())) 256 return spliterator.getExactSizeIfKnown(); 257 return super.evaluateSequential(helper, spliterator); 258 } 259 260 @Override 261 public <P_IN> Long evaluateParallel(PipelineHelper<T> helper, 262 Spliterator<P_IN> spliterator) { 263 if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags())) 264 return spliterator.getExactSizeIfKnown(); 265 return super.evaluateParallel(helper, spliterator); 266 } 267 268 @Override 269 public int getOpFlags() { 270 return StreamOpFlag.NOT_ORDERED; 271 } 272 }; 273 } 274 275 /** 276 * Constructs a {@code TerminalOp} that implements a functional reduce on 277 * {@code int} values. 278 * 279 * @param identity the identity for the combining function 280 * @param operator the combining function 281 * @return a {@code TerminalOp} implementing the reduction 282 */ 283 public static TerminalOp<Integer, Integer> 284 makeInt(int identity, IntBinaryOperator operator) { 285 Objects.requireNonNull(operator); 286 class ReducingSink 287 implements AccumulatingSink<Integer, Integer, ReducingSink>, Sink.OfInt { 288 private int state; 289 290 @Override 291 public void begin(long size) { 292 state = identity; 293 } 294 295 @Override 296 public void accept(int t) { 297 state = operator.applyAsInt(state, t); 298 } 299 300 @Override 301 public Integer get() { 302 return state; 303 } 304 305 @Override 306 public void combine(ReducingSink other) { 307 accept(other.state); 308 } 309 } 310 return new ReduceOp<Integer, Integer, ReducingSink>(StreamShape.INT_VALUE) { 311 @Override 312 public ReducingSink makeSink() { 313 return new ReducingSink(); 314 } 315 }; 316 } 317 318 /** 319 * Constructs a {@code TerminalOp} that implements a functional reduce on 320 * {@code int} values, producing an optional integer result. 321 * 322 * @param operator the combining function 323 * @return a {@code TerminalOp} implementing the reduction 324 */ 325 public static TerminalOp<Integer, OptionalInt> 326 makeInt(IntBinaryOperator operator) { 327 Objects.requireNonNull(operator); 328 class ReducingSink 329 implements AccumulatingSink<Integer, OptionalInt, ReducingSink>, Sink.OfInt { 330 private boolean empty; 331 private int state; 332 333 public void begin(long size) { 334 empty = true; 335 state = 0; 336 } 337 338 @Override 339 public void accept(int t) { 340 if (empty) { 341 empty = false; 342 state = t; 343 } 344 else { 345 state = operator.applyAsInt(state, t); 346 } 347 } 348 349 @Override 350 public OptionalInt get() { 351 return empty ? OptionalInt.empty() : OptionalInt.of(state); 352 } 353 354 @Override 355 public void combine(ReducingSink other) { 356 if (!other.empty) 357 accept(other.state); 358 } 359 } 360 return new ReduceOp<Integer, OptionalInt, ReducingSink>(StreamShape.INT_VALUE) { 361 @Override 362 public ReducingSink makeSink() { 363 return new ReducingSink(); 364 } 365 }; 366 } 367 368 /** 369 * Constructs a {@code TerminalOp} that implements a mutable reduce on 370 * {@code int} values. 371 * 372 * @param <R> The type of the result 373 * @param supplier a factory to produce a new accumulator of the result type 374 * @param accumulator a function to incorporate an int into an 375 * accumulator 376 * @param combiner a function to combine an accumulator into another 377 * @return A {@code ReduceOp} implementing the reduction 378 */ 379 public static <R> TerminalOp<Integer, R> 380 makeInt(Supplier<R> supplier, 381 ObjIntConsumer<R> accumulator, 382 BinaryOperator<R> combiner) { 383 Objects.requireNonNull(supplier); 384 Objects.requireNonNull(accumulator); 385 Objects.requireNonNull(combiner); 386 class ReducingSink extends Box<R> 387 implements AccumulatingSink<Integer, R, ReducingSink>, Sink.OfInt { 388 @Override 389 public void begin(long size) { 390 state = supplier.get(); 391 } 392 393 @Override 394 public void accept(int t) { 395 accumulator.accept(state, t); 396 } 397 398 @Override 399 public void combine(ReducingSink other) { 400 state = combiner.apply(state, other.state); 401 } 402 } 403 return new ReduceOp<Integer, R, ReducingSink>(StreamShape.INT_VALUE) { 404 @Override 405 public ReducingSink makeSink() { 406 return new ReducingSink(); 407 } 408 }; 409 } 410 411 /** 412 * Constructs a {@code TerminalOp} that counts the number of stream 413 * elements. If the size of the pipeline is known then count is the size 414 * and there is no need to evaluate the pipeline. If the size of the 415 * pipeline is non known then count is produced, via reduction, using a 416 * {@link CountingSink}. 417 * 418 * @return a {@code TerminalOp} implementing the counting 419 */ 420 public static TerminalOp<Integer, Long> 421 makeIntCounting() { 422 return new ReduceOp<Integer, Long, CountingSink<Integer>>(StreamShape.INT_VALUE) { 423 @Override 424 public CountingSink<Integer> makeSink() { return new CountingSink.OfInt(); } 425 426 @Override 427 public <P_IN> Long evaluateSequential(PipelineHelper<Integer> helper, 428 Spliterator<P_IN> spliterator) { 429 if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags())) 430 return spliterator.getExactSizeIfKnown(); 431 return super.evaluateSequential(helper, spliterator); 432 } 433 434 @Override 435 public <P_IN> Long evaluateParallel(PipelineHelper<Integer> helper, 436 Spliterator<P_IN> spliterator) { 437 if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags())) 438 return spliterator.getExactSizeIfKnown(); 439 return super.evaluateParallel(helper, spliterator); 440 } 441 442 @Override 443 public int getOpFlags() { 444 return StreamOpFlag.NOT_ORDERED; 445 } 446 }; 447 } 448 449 /** 450 * Constructs a {@code TerminalOp} that implements a functional reduce on 451 * {@code long} values. 452 * 453 * @param identity the identity for the combining function 454 * @param operator the combining function 455 * @return a {@code TerminalOp} implementing the reduction 456 */ 457 public static TerminalOp<Long, Long> 458 makeLong(long identity, LongBinaryOperator operator) { 459 Objects.requireNonNull(operator); 460 class ReducingSink 461 implements AccumulatingSink<Long, Long, ReducingSink>, Sink.OfLong { 462 private long state; 463 464 @Override 465 public void begin(long size) { 466 state = identity; 467 } 468 469 @Override 470 public void accept(long t) { 471 state = operator.applyAsLong(state, t); 472 } 473 474 @Override 475 public Long get() { 476 return state; 477 } 478 479 @Override 480 public void combine(ReducingSink other) { 481 accept(other.state); 482 } 483 } 484 return new ReduceOp<Long, Long, ReducingSink>(StreamShape.LONG_VALUE) { 485 @Override 486 public ReducingSink makeSink() { 487 return new ReducingSink(); 488 } 489 }; 490 } 491 492 /** 493 * Constructs a {@code TerminalOp} that implements a functional reduce on 494 * {@code long} values, producing an optional long result. 495 * 496 * @param operator the combining function 497 * @return a {@code TerminalOp} implementing the reduction 498 */ 499 public static TerminalOp<Long, OptionalLong> 500 makeLong(LongBinaryOperator operator) { 501 Objects.requireNonNull(operator); 502 class ReducingSink 503 implements AccumulatingSink<Long, OptionalLong, ReducingSink>, Sink.OfLong { 504 private boolean empty; 505 private long state; 506 507 public void begin(long size) { 508 empty = true; 509 state = 0; 510 } 511 512 @Override 513 public void accept(long t) { 514 if (empty) { 515 empty = false; 516 state = t; 517 } 518 else { 519 state = operator.applyAsLong(state, t); 520 } 521 } 522 523 @Override 524 public OptionalLong get() { 525 return empty ? OptionalLong.empty() : OptionalLong.of(state); 526 } 527 528 @Override 529 public void combine(ReducingSink other) { 530 if (!other.empty) 531 accept(other.state); 532 } 533 } 534 return new ReduceOp<Long, OptionalLong, ReducingSink>(StreamShape.LONG_VALUE) { 535 @Override 536 public ReducingSink makeSink() { 537 return new ReducingSink(); 538 } 539 }; 540 } 541 542 /** 543 * Constructs a {@code TerminalOp} that implements a mutable reduce on 544 * {@code long} values. 545 * 546 * @param <R> the type of the result 547 * @param supplier a factory to produce a new accumulator of the result type 548 * @param accumulator a function to incorporate an int into an 549 * accumulator 550 * @param combiner a function to combine an accumulator into another 551 * @return a {@code TerminalOp} implementing the reduction 552 */ 553 public static <R> TerminalOp<Long, R> 554 makeLong(Supplier<R> supplier, 555 ObjLongConsumer<R> accumulator, 556 BinaryOperator<R> combiner) { 557 Objects.requireNonNull(supplier); 558 Objects.requireNonNull(accumulator); 559 Objects.requireNonNull(combiner); 560 class ReducingSink extends Box<R> 561 implements AccumulatingSink<Long, R, ReducingSink>, Sink.OfLong { 562 @Override 563 public void begin(long size) { 564 state = supplier.get(); 565 } 566 567 @Override 568 public void accept(long t) { 569 accumulator.accept(state, t); 570 } 571 572 @Override 573 public void combine(ReducingSink other) { 574 state = combiner.apply(state, other.state); 575 } 576 } 577 return new ReduceOp<Long, R, ReducingSink>(StreamShape.LONG_VALUE) { 578 @Override 579 public ReducingSink makeSink() { 580 return new ReducingSink(); 581 } 582 }; 583 } 584 585 /** 586 * Constructs a {@code TerminalOp} that counts the number of stream 587 * elements. If the size of the pipeline is known then count is the size 588 * and there is no need to evaluate the pipeline. If the size of the 589 * pipeline is non known then count is produced, via reduction, using a 590 * {@link CountingSink}. 591 * 592 * @return a {@code TerminalOp} implementing the counting 593 */ 594 public static TerminalOp<Long, Long> 595 makeLongCounting() { 596 return new ReduceOp<Long, Long, CountingSink<Long>>(StreamShape.LONG_VALUE) { 597 @Override 598 public CountingSink<Long> makeSink() { return new CountingSink.OfLong(); } 599 600 @Override 601 public <P_IN> Long evaluateSequential(PipelineHelper<Long> helper, 602 Spliterator<P_IN> spliterator) { 603 if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags())) 604 return spliterator.getExactSizeIfKnown(); 605 return super.evaluateSequential(helper, spliterator); 606 } 607 608 @Override 609 public <P_IN> Long evaluateParallel(PipelineHelper<Long> helper, 610 Spliterator<P_IN> spliterator) { 611 if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags())) 612 return spliterator.getExactSizeIfKnown(); 613 return super.evaluateParallel(helper, spliterator); 614 } 615 616 @Override 617 public int getOpFlags() { 618 return StreamOpFlag.NOT_ORDERED; 619 } 620 }; 621 } 622 623 /** 624 * Constructs a {@code TerminalOp} that implements a functional reduce on 625 * {@code double} values. 626 * 627 * @param identity the identity for the combining function 628 * @param operator the combining function 629 * @return a {@code TerminalOp} implementing the reduction 630 */ 631 public static TerminalOp<Double, Double> 632 makeDouble(double identity, DoubleBinaryOperator operator) { 633 Objects.requireNonNull(operator); 634 class ReducingSink 635 implements AccumulatingSink<Double, Double, ReducingSink>, Sink.OfDouble { 636 private double state; 637 638 @Override 639 public void begin(long size) { 640 state = identity; 641 } 642 643 @Override 644 public void accept(double t) { 645 state = operator.applyAsDouble(state, t); 646 } 647 648 @Override 649 public Double get() { 650 return state; 651 } 652 653 @Override 654 public void combine(ReducingSink other) { 655 accept(other.state); 656 } 657 } 658 return new ReduceOp<Double, Double, ReducingSink>(StreamShape.DOUBLE_VALUE) { 659 @Override 660 public ReducingSink makeSink() { 661 return new ReducingSink(); 662 } 663 }; 664 } 665 666 /** 667 * Constructs a {@code TerminalOp} that implements a functional reduce on 668 * {@code double} values, producing an optional double result. 669 * 670 * @param operator the combining function 671 * @return a {@code TerminalOp} implementing the reduction 672 */ 673 public static TerminalOp<Double, OptionalDouble> 674 makeDouble(DoubleBinaryOperator operator) { 675 Objects.requireNonNull(operator); 676 class ReducingSink 677 implements AccumulatingSink<Double, OptionalDouble, ReducingSink>, Sink.OfDouble { 678 private boolean empty; 679 private double state; 680 681 public void begin(long size) { 682 empty = true; 683 state = 0; 684 } 685 686 @Override 687 public void accept(double t) { 688 if (empty) { 689 empty = false; 690 state = t; 691 } 692 else { 693 state = operator.applyAsDouble(state, t); 694 } 695 } 696 697 @Override 698 public OptionalDouble get() { 699 return empty ? OptionalDouble.empty() : OptionalDouble.of(state); 700 } 701 702 @Override 703 public void combine(ReducingSink other) { 704 if (!other.empty) 705 accept(other.state); 706 } 707 } 708 return new ReduceOp<Double, OptionalDouble, ReducingSink>(StreamShape.DOUBLE_VALUE) { 709 @Override 710 public ReducingSink makeSink() { 711 return new ReducingSink(); 712 } 713 }; 714 } 715 716 /** 717 * Constructs a {@code TerminalOp} that implements a mutable reduce on 718 * {@code double} values. 719 * 720 * @param <R> the type of the result 721 * @param supplier a factory to produce a new accumulator of the result type 722 * @param accumulator a function to incorporate an int into an 723 * accumulator 724 * @param combiner a function to combine an accumulator into another 725 * @return a {@code TerminalOp} implementing the reduction 726 */ 727 public static <R> TerminalOp<Double, R> 728 makeDouble(Supplier<R> supplier, 729 ObjDoubleConsumer<R> accumulator, 730 BinaryOperator<R> combiner) { 731 Objects.requireNonNull(supplier); 732 Objects.requireNonNull(accumulator); 733 Objects.requireNonNull(combiner); 734 class ReducingSink extends Box<R> 735 implements AccumulatingSink<Double, R, ReducingSink>, Sink.OfDouble { 736 @Override 737 public void begin(long size) { 738 state = supplier.get(); 739 } 740 741 @Override 742 public void accept(double t) { 743 accumulator.accept(state, t); 744 } 745 746 @Override 747 public void combine(ReducingSink other) { 748 state = combiner.apply(state, other.state); 749 } 750 } 751 return new ReduceOp<Double, R, ReducingSink>(StreamShape.DOUBLE_VALUE) { 752 @Override 753 public ReducingSink makeSink() { 754 return new ReducingSink(); 755 } 756 }; 757 } 758 759 /** 760 * Constructs a {@code TerminalOp} that counts the number of stream 761 * elements. If the size of the pipeline is known then count is the size 762 * and there is no need to evaluate the pipeline. If the size of the 763 * pipeline is non known then count is produced, via reduction, using a 764 * {@link CountingSink}. 765 * 766 * @return a {@code TerminalOp} implementing the counting 767 */ 768 public static TerminalOp<Double, Long> 769 makeDoubleCounting() { 770 return new ReduceOp<Double, Long, CountingSink<Double>>(StreamShape.DOUBLE_VALUE) { 771 @Override 772 public CountingSink<Double> makeSink() { return new CountingSink.OfDouble(); } 773 774 @Override 775 public <P_IN> Long evaluateSequential(PipelineHelper<Double> helper, 776 Spliterator<P_IN> spliterator) { 777 if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags())) 778 return spliterator.getExactSizeIfKnown(); 779 return super.evaluateSequential(helper, spliterator); 780 } 781 782 @Override 783 public <P_IN> Long evaluateParallel(PipelineHelper<Double> helper, 784 Spliterator<P_IN> spliterator) { 785 if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags())) 786 return spliterator.getExactSizeIfKnown(); 787 return super.evaluateParallel(helper, spliterator); 788 } 789 790 @Override 791 public int getOpFlags() { 792 return StreamOpFlag.NOT_ORDERED; 793 } 794 }; 795 } 796 797 /** 798 * A sink that counts elements 799 */ 800 abstract static class CountingSink<T> 801 extends Box<Long> 802 implements AccumulatingSink<T, Long, CountingSink<T>> { 803 long count; 804 805 @Override 806 public void begin(long size) { 807 count = 0L; 808 } 809 810 @Override 811 public Long get() { 812 return count; 813 } 814 815 @Override 816 public void combine(CountingSink<T> other) { 817 count += other.count; 818 } 819 820 static final class OfRef<T> extends CountingSink<T> { 821 @Override 822 public void accept(T t) { 823 count++; 824 } 825 } 826 827 static final class OfInt extends CountingSink<Integer> implements Sink.OfInt { 828 @Override 829 public void accept(int t) { 830 count++; 831 } 832 } 833 834 static final class OfLong extends CountingSink<Long> implements Sink.OfLong { 835 @Override 836 public void accept(long t) { 837 count++; 838 } 839 } 840 841 static final class OfDouble extends CountingSink<Double> implements Sink.OfDouble { 842 @Override 843 public void accept(double t) { 844 count++; 845 } 846 } 847 } 848 849 /** 850 * A type of {@code TerminalSink} that implements an associative reducing 851 * operation on elements of type {@code T} and producing a result of type 852 * {@code R}. 853 * 854 * @param <T> the type of input element to the combining operation 855 * @param <R> the result type 856 * @param <K> the type of the {@code AccumulatingSink}. 857 */ 858 private interface AccumulatingSink<T, R, K extends AccumulatingSink<T, R, K>> 859 extends TerminalSink<T, R> { 860 void combine(K other); 861 } 862 863 /** 864 * State box for a single state element, used as a base class for 865 * {@code AccumulatingSink} instances 866 * 867 * @param <U> The type of the state element 868 */ 869 private abstract static class Box<U> { 870 U state; 871 872 Box() {} // Avoid creation of special accessor 873 874 public U get() { 875 return state; 876 } 877 } 878 879 /** 880 * A {@code TerminalOp} that evaluates a stream pipeline and sends the 881 * output into an {@code AccumulatingSink}, which performs a reduce 882 * operation. The {@code AccumulatingSink} must represent an associative 883 * reducing operation. 884 * 885 * @param <T> the output type of the stream pipeline 886 * @param <R> the result type of the reducing operation 887 * @param <S> the type of the {@code AccumulatingSink} 888 */ 889 private abstract static class ReduceOp<T, R, S extends AccumulatingSink<T, R, S>> 890 implements TerminalOp<T, R> { 891 private final StreamShape inputShape; 892 893 /** 894 * Create a {@code ReduceOp} of the specified stream shape which uses 895 * the specified {@code Supplier} to create accumulating sinks. 896 * 897 * @param shape The shape of the stream pipeline 898 */ 899 ReduceOp(StreamShape shape) { 900 inputShape = shape; 901 } 902 903 public abstract S makeSink(); 904 905 @Override 906 public StreamShape inputShape() { 907 return inputShape; 908 } 909 910 @Override 911 public <P_IN> R evaluateSequential(PipelineHelper<T> helper, 912 Spliterator<P_IN> spliterator) { 913 return helper.wrapAndCopyInto(makeSink(), spliterator).get(); 914 } 915 916 @Override 917 public <P_IN> R evaluateParallel(PipelineHelper<T> helper, 918 Spliterator<P_IN> spliterator) { 919 return new ReduceTask<>(this, helper, spliterator).invoke().get(); 920 } 921 } 922 923 /** 924 * A {@code ForkJoinTask} for performing a parallel reduce operation. 925 */ 926 @SuppressWarnings("serial") 927 private static final class ReduceTask<P_IN, P_OUT, R, 928 S extends AccumulatingSink<P_OUT, R, S>> 929 extends AbstractTask<P_IN, P_OUT, S, ReduceTask<P_IN, P_OUT, R, S>> { 930 private final ReduceOp<P_OUT, R, S> op; 931 932 ReduceTask(ReduceOp<P_OUT, R, S> op, 933 PipelineHelper<P_OUT> helper, 934 Spliterator<P_IN> spliterator) { 935 super(helper, spliterator); 936 this.op = op; 937 } 938 939 ReduceTask(ReduceTask<P_IN, P_OUT, R, S> parent, 940 Spliterator<P_IN> spliterator) { 941 super(parent, spliterator); 942 this.op = parent.op; 943 } 944 945 @Override 946 protected ReduceTask<P_IN, P_OUT, R, S> makeChild(Spliterator<P_IN> spliterator) { 947 return new ReduceTask<>(this, spliterator); 948 } 949 950 @Override 951 protected S doLeaf() { 952 return helper.wrapAndCopyInto(op.makeSink(), spliterator); 953 } 954 955 @Override 956 public void onCompletion(CountedCompleter<?> caller) { 957 if (!isLeaf()) { 958 S leftResult = leftChild.getLocalResult(); 959 leftResult.combine(rightChild.getLocalResult()); 960 setLocalResult(leftResult); 961 } 962 // GC spliterator, left and right child 963 super.onCompletion(caller); 964 } 965 } 966 }