1 /* 2 * Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.util.stream; 26 27 import java.util.Objects; 28 import java.util.Optional; 29 import java.util.OptionalDouble; 30 import java.util.OptionalInt; 31 import java.util.OptionalLong; 32 import java.util.Spliterator; 33 import java.util.concurrent.CountedCompleter; 34 import java.util.function.*; 35 36 /** 37 * Factory for creating instances of {@code TerminalOp} that implement 38 * reductions. 39 * 40 * @since 1.8 41 */ 42 final class ReduceOps { 43 44 private ReduceOps() { } 45 46 /** 47 * Constructs a {@code TerminalOp} that implements a functional reduce on 48 * reference values. 49 * 50 * @param <T> the type of the input elements 51 * @param <U> the type of the result 52 * @param seed the identity element for the reduction 53 * @param reducer the accumulating function that incorporates an additional 54 * input element into the result 55 * @param combiner the combining function that combines two intermediate 56 * results 57 * @return a {@code TerminalOp} implementing the reduction 58 */ 59 public static <T, U> TerminalOp<T, U> 60 makeRef(U seed, BiFunction<U, ? super T, U> reducer, BinaryOperator<U> combiner) { 61 Objects.requireNonNull(reducer); 62 Objects.requireNonNull(combiner); 63 class ReducingSink extends Box<U> implements AccumulatingSink<T, U, ReducingSink> { 64 @Override 65 public void begin(long size) { 66 state = seed; 67 } 68 69 @Override 70 public void accept(T t) { 71 state = reducer.apply(state, t); 72 } 73 74 @Override 75 public void combine(ReducingSink other) { 76 state = combiner.apply(state, other.state); 77 } 78 } 79 return new ReduceOp<T, U, ReducingSink>(StreamShape.REFERENCE) { 80 @Override 81 public ReducingSink makeSink() { 82 return new ReducingSink(); 83 } 84 }; 85 } 86 87 /** 88 * Constructs a {@code TerminalOp} that implements a functional reduce on 89 * reference values producing an optional reference result. 90 * 91 * @param <T> The type of the input elements, and the type of the result 92 * @param operator The reducing function 93 * @return A {@code TerminalOp} implementing the reduction 94 */ 95 public static <T> TerminalOp<T, Optional<T>> 96 makeRef(BinaryOperator<T> operator) { 97 Objects.requireNonNull(operator); 98 class ReducingSink 99 implements AccumulatingSink<T, Optional<T>, ReducingSink> { 100 private boolean empty; 101 private T state; 102 103 public void begin(long size) { 104 empty = true; 105 state = null; 106 } 107 108 @Override 109 public void accept(T t) { 110 if (empty) { 111 empty = false; 112 state = t; 113 } else { 114 state = operator.apply(state, t); 115 } 116 } 117 118 @Override 119 public Optional<T> get() { 120 return empty ? Optional.empty() : Optional.of(state); 121 } 122 123 @Override 124 public void combine(ReducingSink other) { 125 if (!other.empty) 126 accept(other.state); 127 } 128 } 129 return new ReduceOp<T, Optional<T>, ReducingSink>(StreamShape.REFERENCE) { 130 @Override 131 public ReducingSink makeSink() { 132 return new ReducingSink(); 133 } 134 }; 135 } 136 137 /** 138 * Constructs a {@code TerminalOp} that implements a mutable reduce on 139 * reference values. 140 * 141 * @param <T> the type of the input elements 142 * @param <I> the type of the intermediate reduction result 143 * @param collector a {@code Collector} defining the reduction 144 * @return a {@code ReduceOp} implementing the reduction 145 */ 146 public static <T, I> TerminalOp<T, I> 147 makeRef(Collector<? super T, I, ?> collector) { 148 Supplier<I> supplier = Objects.requireNonNull(collector).supplier(); 149 IntFunction<I> sizedSupplier = collector.sizedSupplier(); 150 BiConsumer<I, ? super T> accumulator = collector.accumulator(); 151 BinaryOperator<I> combiner = collector.combiner(); 152 class ReducingSink extends Box<I> 153 implements AccumulatingSink<T, I, ReducingSink> { 154 @Override 155 public void begin(long size) { 156 if (size < 0 || size > Integer.MAX_VALUE) 157 state = supplier.get(); 158 else 159 state = sizedSupplier.apply((int) size); 160 } 161 162 @Override 163 public void accept(T t) { 164 accumulator.accept(state, t); 165 } 166 167 @Override 168 public void combine(ReducingSink other) { 169 state = combiner.apply(state, other.state); 170 } 171 } 172 return new ReduceOp<T, I, ReducingSink>(StreamShape.REFERENCE) { 173 @Override 174 public ReducingSink makeSink() { 175 return new ReducingSink(); 176 } 177 178 @Override 179 public int getOpFlags() { 180 return collector.characteristics().contains(Collector.Characteristics.UNORDERED) 181 ? StreamOpFlag.NOT_ORDERED 182 : 0; 183 } 184 }; 185 } 186 187 /** 188 * Constructs a {@code TerminalOp} that implements a mutable reduce on 189 * reference values. 190 * 191 * @param <T> the type of the input elements 192 * @param <R> the type of the result 193 * @param seedFactory a factory to produce a new base accumulator 194 * @param accumulator a function to incorporate an element into an 195 * accumulator 196 * @param reducer a function to combine an accumulator into another 197 * @return a {@code TerminalOp} implementing the reduction 198 */ 199 public static <T, R> TerminalOp<T, R> 200 makeRef(Supplier<R> seedFactory, 201 BiConsumer<R, ? super T> accumulator, 202 BiConsumer<R,R> reducer) { 203 Objects.requireNonNull(seedFactory); 204 Objects.requireNonNull(accumulator); 205 Objects.requireNonNull(reducer); 206 class ReducingSink extends Box<R> 207 implements AccumulatingSink<T, R, ReducingSink> { 208 @Override 209 public void begin(long size) { 210 state = seedFactory.get(); 211 } 212 213 @Override 214 public void accept(T t) { 215 accumulator.accept(state, t); 216 } 217 218 @Override 219 public void combine(ReducingSink other) { 220 reducer.accept(state, other.state); 221 } 222 } 223 return new ReduceOp<T, R, ReducingSink>(StreamShape.REFERENCE) { 224 @Override 225 public ReducingSink makeSink() { 226 return new ReducingSink(); 227 } 228 }; 229 } 230 231 /** 232 * Constructs a {@code TerminalOp} that counts the number of stream 233 * elements. If the size of the pipeline is known then count is the size 234 * and there is no need to evaluate the pipeline. If the size of the 235 * pipeline is non known then count is produced, via reduction, using a 236 * {@link CountingSink}. 237 * 238 * @param <T> the type of the input elements 239 * @return a {@code TerminalOp} implementing the counting 240 */ 241 public static <T> TerminalOp<T, Long> 242 makeRefCounting() { 243 return new ReduceOp<T, Long, CountingSink<T>>(StreamShape.REFERENCE) { 244 @Override 245 public CountingSink<T> makeSink() { return new CountingSink.OfRef<>(); } 246 247 @Override 248 public <P_IN> Long evaluateSequential(PipelineHelper<T> helper, 249 Spliterator<P_IN> spliterator) { 250 if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags())) 251 return spliterator.getExactSizeIfKnown(); 252 return super.evaluateSequential(helper, spliterator); 253 } 254 255 @Override 256 public <P_IN> Long evaluateParallel(PipelineHelper<T> helper, 257 Spliterator<P_IN> spliterator) { 258 if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags())) 259 return spliterator.getExactSizeIfKnown(); 260 return super.evaluateParallel(helper, spliterator); 261 } 262 263 @Override 264 public int getOpFlags() { 265 return StreamOpFlag.NOT_ORDERED; 266 } 267 }; 268 } 269 270 /** 271 * Constructs a {@code TerminalOp} that implements a functional reduce on 272 * {@code int} values. 273 * 274 * @param identity the identity for the combining function 275 * @param operator the combining function 276 * @return a {@code TerminalOp} implementing the reduction 277 */ 278 public static TerminalOp<Integer, Integer> 279 makeInt(int identity, IntBinaryOperator operator) { 280 Objects.requireNonNull(operator); 281 class ReducingSink 282 implements AccumulatingSink<Integer, Integer, ReducingSink>, Sink.OfInt { 283 private int state; 284 285 @Override 286 public void begin(long size) { 287 state = identity; 288 } 289 290 @Override 291 public void accept(int t) { 292 state = operator.applyAsInt(state, t); 293 } 294 295 @Override 296 public Integer get() { 297 return state; 298 } 299 300 @Override 301 public void combine(ReducingSink other) { 302 accept(other.state); 303 } 304 } 305 return new ReduceOp<Integer, Integer, ReducingSink>(StreamShape.INT_VALUE) { 306 @Override 307 public ReducingSink makeSink() { 308 return new ReducingSink(); 309 } 310 }; 311 } 312 313 /** 314 * Constructs a {@code TerminalOp} that implements a functional reduce on 315 * {@code int} values, producing an optional integer result. 316 * 317 * @param operator the combining function 318 * @return a {@code TerminalOp} implementing the reduction 319 */ 320 public static TerminalOp<Integer, OptionalInt> 321 makeInt(IntBinaryOperator operator) { 322 Objects.requireNonNull(operator); 323 class ReducingSink 324 implements AccumulatingSink<Integer, OptionalInt, ReducingSink>, Sink.OfInt { 325 private boolean empty; 326 private int state; 327 328 public void begin(long size) { 329 empty = true; 330 state = 0; 331 } 332 333 @Override 334 public void accept(int t) { 335 if (empty) { 336 empty = false; 337 state = t; 338 } 339 else { 340 state = operator.applyAsInt(state, t); 341 } 342 } 343 344 @Override 345 public OptionalInt get() { 346 return empty ? OptionalInt.empty() : OptionalInt.of(state); 347 } 348 349 @Override 350 public void combine(ReducingSink other) { 351 if (!other.empty) 352 accept(other.state); 353 } 354 } 355 return new ReduceOp<Integer, OptionalInt, ReducingSink>(StreamShape.INT_VALUE) { 356 @Override 357 public ReducingSink makeSink() { 358 return new ReducingSink(); 359 } 360 }; 361 } 362 363 /** 364 * Constructs a {@code TerminalOp} that implements a mutable reduce on 365 * {@code int} values. 366 * 367 * @param <R> The type of the result 368 * @param supplier a factory to produce a new accumulator of the result type 369 * @param accumulator a function to incorporate an int into an 370 * accumulator 371 * @param combiner a function to combine an accumulator into another 372 * @return A {@code ReduceOp} implementing the reduction 373 */ 374 public static <R> TerminalOp<Integer, R> 375 makeInt(Supplier<R> supplier, 376 ObjIntConsumer<R> accumulator, 377 BinaryOperator<R> combiner) { 378 Objects.requireNonNull(supplier); 379 Objects.requireNonNull(accumulator); 380 Objects.requireNonNull(combiner); 381 class ReducingSink extends Box<R> 382 implements AccumulatingSink<Integer, R, ReducingSink>, Sink.OfInt { 383 @Override 384 public void begin(long size) { 385 state = supplier.get(); 386 } 387 388 @Override 389 public void accept(int t) { 390 accumulator.accept(state, t); 391 } 392 393 @Override 394 public void combine(ReducingSink other) { 395 state = combiner.apply(state, other.state); 396 } 397 } 398 return new ReduceOp<Integer, R, ReducingSink>(StreamShape.INT_VALUE) { 399 @Override 400 public ReducingSink makeSink() { 401 return new ReducingSink(); 402 } 403 }; 404 } 405 406 /** 407 * Constructs a {@code TerminalOp} that counts the number of stream 408 * elements. If the size of the pipeline is known then count is the size 409 * and there is no need to evaluate the pipeline. If the size of the 410 * pipeline is non known then count is produced, via reduction, using a 411 * {@link CountingSink}. 412 * 413 * @return a {@code TerminalOp} implementing the counting 414 */ 415 public static TerminalOp<Integer, Long> 416 makeIntCounting() { 417 return new ReduceOp<Integer, Long, CountingSink<Integer>>(StreamShape.INT_VALUE) { 418 @Override 419 public CountingSink<Integer> makeSink() { return new CountingSink.OfInt(); } 420 421 @Override 422 public <P_IN> Long evaluateSequential(PipelineHelper<Integer> helper, 423 Spliterator<P_IN> spliterator) { 424 if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags())) 425 return spliterator.getExactSizeIfKnown(); 426 return super.evaluateSequential(helper, spliterator); 427 } 428 429 @Override 430 public <P_IN> Long evaluateParallel(PipelineHelper<Integer> helper, 431 Spliterator<P_IN> spliterator) { 432 if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags())) 433 return spliterator.getExactSizeIfKnown(); 434 return super.evaluateParallel(helper, spliterator); 435 } 436 437 @Override 438 public int getOpFlags() { 439 return StreamOpFlag.NOT_ORDERED; 440 } 441 }; 442 } 443 444 /** 445 * Constructs a {@code TerminalOp} that implements a functional reduce on 446 * {@code long} values. 447 * 448 * @param identity the identity for the combining function 449 * @param operator the combining function 450 * @return a {@code TerminalOp} implementing the reduction 451 */ 452 public static TerminalOp<Long, Long> 453 makeLong(long identity, LongBinaryOperator operator) { 454 Objects.requireNonNull(operator); 455 class ReducingSink 456 implements AccumulatingSink<Long, Long, ReducingSink>, Sink.OfLong { 457 private long state; 458 459 @Override 460 public void begin(long size) { 461 state = identity; 462 } 463 464 @Override 465 public void accept(long t) { 466 state = operator.applyAsLong(state, t); 467 } 468 469 @Override 470 public Long get() { 471 return state; 472 } 473 474 @Override 475 public void combine(ReducingSink other) { 476 accept(other.state); 477 } 478 } 479 return new ReduceOp<Long, Long, ReducingSink>(StreamShape.LONG_VALUE) { 480 @Override 481 public ReducingSink makeSink() { 482 return new ReducingSink(); 483 } 484 }; 485 } 486 487 /** 488 * Constructs a {@code TerminalOp} that implements a functional reduce on 489 * {@code long} values, producing an optional long result. 490 * 491 * @param operator the combining function 492 * @return a {@code TerminalOp} implementing the reduction 493 */ 494 public static TerminalOp<Long, OptionalLong> 495 makeLong(LongBinaryOperator operator) { 496 Objects.requireNonNull(operator); 497 class ReducingSink 498 implements AccumulatingSink<Long, OptionalLong, ReducingSink>, Sink.OfLong { 499 private boolean empty; 500 private long state; 501 502 public void begin(long size) { 503 empty = true; 504 state = 0; 505 } 506 507 @Override 508 public void accept(long t) { 509 if (empty) { 510 empty = false; 511 state = t; 512 } 513 else { 514 state = operator.applyAsLong(state, t); 515 } 516 } 517 518 @Override 519 public OptionalLong get() { 520 return empty ? OptionalLong.empty() : OptionalLong.of(state); 521 } 522 523 @Override 524 public void combine(ReducingSink other) { 525 if (!other.empty) 526 accept(other.state); 527 } 528 } 529 return new ReduceOp<Long, OptionalLong, ReducingSink>(StreamShape.LONG_VALUE) { 530 @Override 531 public ReducingSink makeSink() { 532 return new ReducingSink(); 533 } 534 }; 535 } 536 537 /** 538 * Constructs a {@code TerminalOp} that implements a mutable reduce on 539 * {@code long} values. 540 * 541 * @param <R> the type of the result 542 * @param supplier a factory to produce a new accumulator of the result type 543 * @param accumulator a function to incorporate an int into an 544 * accumulator 545 * @param combiner a function to combine an accumulator into another 546 * @return a {@code TerminalOp} implementing the reduction 547 */ 548 public static <R> TerminalOp<Long, R> 549 makeLong(Supplier<R> supplier, 550 ObjLongConsumer<R> accumulator, 551 BinaryOperator<R> combiner) { 552 Objects.requireNonNull(supplier); 553 Objects.requireNonNull(accumulator); 554 Objects.requireNonNull(combiner); 555 class ReducingSink extends Box<R> 556 implements AccumulatingSink<Long, R, ReducingSink>, Sink.OfLong { 557 @Override 558 public void begin(long size) { 559 state = supplier.get(); 560 } 561 562 @Override 563 public void accept(long t) { 564 accumulator.accept(state, t); 565 } 566 567 @Override 568 public void combine(ReducingSink other) { 569 state = combiner.apply(state, other.state); 570 } 571 } 572 return new ReduceOp<Long, R, ReducingSink>(StreamShape.LONG_VALUE) { 573 @Override 574 public ReducingSink makeSink() { 575 return new ReducingSink(); 576 } 577 }; 578 } 579 580 /** 581 * Constructs a {@code TerminalOp} that counts the number of stream 582 * elements. If the size of the pipeline is known then count is the size 583 * and there is no need to evaluate the pipeline. If the size of the 584 * pipeline is non known then count is produced, via reduction, using a 585 * {@link CountingSink}. 586 * 587 * @return a {@code TerminalOp} implementing the counting 588 */ 589 public static TerminalOp<Long, Long> 590 makeLongCounting() { 591 return new ReduceOp<Long, Long, CountingSink<Long>>(StreamShape.LONG_VALUE) { 592 @Override 593 public CountingSink<Long> makeSink() { return new CountingSink.OfLong(); } 594 595 @Override 596 public <P_IN> Long evaluateSequential(PipelineHelper<Long> helper, 597 Spliterator<P_IN> spliterator) { 598 if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags())) 599 return spliterator.getExactSizeIfKnown(); 600 return super.evaluateSequential(helper, spliterator); 601 } 602 603 @Override 604 public <P_IN> Long evaluateParallel(PipelineHelper<Long> helper, 605 Spliterator<P_IN> spliterator) { 606 if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags())) 607 return spliterator.getExactSizeIfKnown(); 608 return super.evaluateParallel(helper, spliterator); 609 } 610 611 @Override 612 public int getOpFlags() { 613 return StreamOpFlag.NOT_ORDERED; 614 } 615 }; 616 } 617 618 /** 619 * Constructs a {@code TerminalOp} that implements a functional reduce on 620 * {@code double} values. 621 * 622 * @param identity the identity for the combining function 623 * @param operator the combining function 624 * @return a {@code TerminalOp} implementing the reduction 625 */ 626 public static TerminalOp<Double, Double> 627 makeDouble(double identity, DoubleBinaryOperator operator) { 628 Objects.requireNonNull(operator); 629 class ReducingSink 630 implements AccumulatingSink<Double, Double, ReducingSink>, Sink.OfDouble { 631 private double state; 632 633 @Override 634 public void begin(long size) { 635 state = identity; 636 } 637 638 @Override 639 public void accept(double t) { 640 state = operator.applyAsDouble(state, t); 641 } 642 643 @Override 644 public Double get() { 645 return state; 646 } 647 648 @Override 649 public void combine(ReducingSink other) { 650 accept(other.state); 651 } 652 } 653 return new ReduceOp<Double, Double, ReducingSink>(StreamShape.DOUBLE_VALUE) { 654 @Override 655 public ReducingSink makeSink() { 656 return new ReducingSink(); 657 } 658 }; 659 } 660 661 /** 662 * Constructs a {@code TerminalOp} that implements a functional reduce on 663 * {@code double} values, producing an optional double result. 664 * 665 * @param operator the combining function 666 * @return a {@code TerminalOp} implementing the reduction 667 */ 668 public static TerminalOp<Double, OptionalDouble> 669 makeDouble(DoubleBinaryOperator operator) { 670 Objects.requireNonNull(operator); 671 class ReducingSink 672 implements AccumulatingSink<Double, OptionalDouble, ReducingSink>, Sink.OfDouble { 673 private boolean empty; 674 private double state; 675 676 public void begin(long size) { 677 empty = true; 678 state = 0; 679 } 680 681 @Override 682 public void accept(double t) { 683 if (empty) { 684 empty = false; 685 state = t; 686 } 687 else { 688 state = operator.applyAsDouble(state, t); 689 } 690 } 691 692 @Override 693 public OptionalDouble get() { 694 return empty ? OptionalDouble.empty() : OptionalDouble.of(state); 695 } 696 697 @Override 698 public void combine(ReducingSink other) { 699 if (!other.empty) 700 accept(other.state); 701 } 702 } 703 return new ReduceOp<Double, OptionalDouble, ReducingSink>(StreamShape.DOUBLE_VALUE) { 704 @Override 705 public ReducingSink makeSink() { 706 return new ReducingSink(); 707 } 708 }; 709 } 710 711 /** 712 * Constructs a {@code TerminalOp} that implements a mutable reduce on 713 * {@code double} values. 714 * 715 * @param <R> the type of the result 716 * @param supplier a factory to produce a new accumulator of the result type 717 * @param accumulator a function to incorporate an int into an 718 * accumulator 719 * @param combiner a function to combine an accumulator into another 720 * @return a {@code TerminalOp} implementing the reduction 721 */ 722 public static <R> TerminalOp<Double, R> 723 makeDouble(Supplier<R> supplier, 724 ObjDoubleConsumer<R> accumulator, 725 BinaryOperator<R> combiner) { 726 Objects.requireNonNull(supplier); 727 Objects.requireNonNull(accumulator); 728 Objects.requireNonNull(combiner); 729 class ReducingSink extends Box<R> 730 implements AccumulatingSink<Double, R, ReducingSink>, Sink.OfDouble { 731 @Override 732 public void begin(long size) { 733 state = supplier.get(); 734 } 735 736 @Override 737 public void accept(double t) { 738 accumulator.accept(state, t); 739 } 740 741 @Override 742 public void combine(ReducingSink other) { 743 state = combiner.apply(state, other.state); 744 } 745 } 746 return new ReduceOp<Double, R, ReducingSink>(StreamShape.DOUBLE_VALUE) { 747 @Override 748 public ReducingSink makeSink() { 749 return new ReducingSink(); 750 } 751 }; 752 } 753 754 /** 755 * Constructs a {@code TerminalOp} that counts the number of stream 756 * elements. If the size of the pipeline is known then count is the size 757 * and there is no need to evaluate the pipeline. If the size of the 758 * pipeline is non known then count is produced, via reduction, using a 759 * {@link CountingSink}. 760 * 761 * @return a {@code TerminalOp} implementing the counting 762 */ 763 public static TerminalOp<Double, Long> 764 makeDoubleCounting() { 765 return new ReduceOp<Double, Long, CountingSink<Double>>(StreamShape.DOUBLE_VALUE) { 766 @Override 767 public CountingSink<Double> makeSink() { return new CountingSink.OfDouble(); } 768 769 @Override 770 public <P_IN> Long evaluateSequential(PipelineHelper<Double> helper, 771 Spliterator<P_IN> spliterator) { 772 if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags())) 773 return spliterator.getExactSizeIfKnown(); 774 return super.evaluateSequential(helper, spliterator); 775 } 776 777 @Override 778 public <P_IN> Long evaluateParallel(PipelineHelper<Double> helper, 779 Spliterator<P_IN> spliterator) { 780 if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags())) 781 return spliterator.getExactSizeIfKnown(); 782 return super.evaluateParallel(helper, spliterator); 783 } 784 785 @Override 786 public int getOpFlags() { 787 return StreamOpFlag.NOT_ORDERED; 788 } 789 }; 790 } 791 792 /** 793 * A sink that counts elements 794 */ 795 abstract static class CountingSink<T> 796 extends Box<Long> 797 implements AccumulatingSink<T, Long, CountingSink<T>> { 798 long count; 799 800 @Override 801 public void begin(long size) { 802 count = 0L; 803 } 804 805 @Override 806 public Long get() { 807 return count; 808 } 809 810 @Override 811 public void combine(CountingSink<T> other) { 812 count += other.count; 813 } 814 815 static final class OfRef<T> extends CountingSink<T> { 816 @Override 817 public void accept(T t) { 818 count++; 819 } 820 } 821 822 static final class OfInt extends CountingSink<Integer> implements Sink.OfInt { 823 @Override 824 public void accept(int t) { 825 count++; 826 } 827 } 828 829 static final class OfLong extends CountingSink<Long> implements Sink.OfLong { 830 @Override 831 public void accept(long t) { 832 count++; 833 } 834 } 835 836 static final class OfDouble extends CountingSink<Double> implements Sink.OfDouble { 837 @Override 838 public void accept(double t) { 839 count++; 840 } 841 } 842 } 843 844 /** 845 * A type of {@code TerminalSink} that implements an associative reducing 846 * operation on elements of type {@code T} and producing a result of type 847 * {@code R}. 848 * 849 * @param <T> the type of input element to the combining operation 850 * @param <R> the result type 851 * @param <K> the type of the {@code AccumulatingSink}. 852 */ 853 private interface AccumulatingSink<T, R, K extends AccumulatingSink<T, R, K>> 854 extends TerminalSink<T, R> { 855 void combine(K other); 856 } 857 858 /** 859 * State box for a single state element, used as a base class for 860 * {@code AccumulatingSink} instances 861 * 862 * @param <U> The type of the state element 863 */ 864 private abstract static class Box<U> { 865 U state; 866 867 Box() {} // Avoid creation of special accessor 868 869 public U get() { 870 return state; 871 } 872 } 873 874 /** 875 * A {@code TerminalOp} that evaluates a stream pipeline and sends the 876 * output into an {@code AccumulatingSink}, which performs a reduce 877 * operation. The {@code AccumulatingSink} must represent an associative 878 * reducing operation. 879 * 880 * @param <T> the output type of the stream pipeline 881 * @param <R> the result type of the reducing operation 882 * @param <S> the type of the {@code AccumulatingSink} 883 */ 884 private abstract static class ReduceOp<T, R, S extends AccumulatingSink<T, R, S>> 885 implements TerminalOp<T, R> { 886 private final StreamShape inputShape; 887 888 /** 889 * Create a {@code ReduceOp} of the specified stream shape which uses 890 * the specified {@code Supplier} to create accumulating sinks. 891 * 892 * @param shape The shape of the stream pipeline 893 */ 894 ReduceOp(StreamShape shape) { 895 inputShape = shape; 896 } 897 898 public abstract S makeSink(); 899 900 @Override 901 public StreamShape inputShape() { 902 return inputShape; 903 } 904 905 @Override 906 public <P_IN> R evaluateSequential(PipelineHelper<T> helper, 907 Spliterator<P_IN> spliterator) { 908 return helper.wrapAndCopyInto(makeSink(), spliterator).get(); 909 } 910 911 @Override 912 public <P_IN> R evaluateParallel(PipelineHelper<T> helper, 913 Spliterator<P_IN> spliterator) { 914 return new ReduceTask<>(this, helper, spliterator).invoke().get(); 915 } 916 } 917 918 /** 919 * A {@code ForkJoinTask} for performing a parallel reduce operation. 920 */ 921 @SuppressWarnings("serial") 922 private static final class ReduceTask<P_IN, P_OUT, R, 923 S extends AccumulatingSink<P_OUT, R, S>> 924 extends AbstractTask<P_IN, P_OUT, S, ReduceTask<P_IN, P_OUT, R, S>> { 925 private final ReduceOp<P_OUT, R, S> op; 926 927 ReduceTask(ReduceOp<P_OUT, R, S> op, 928 PipelineHelper<P_OUT> helper, 929 Spliterator<P_IN> spliterator) { 930 super(helper, spliterator); 931 this.op = op; 932 } 933 934 ReduceTask(ReduceTask<P_IN, P_OUT, R, S> parent, 935 Spliterator<P_IN> spliterator) { 936 super(parent, spliterator); 937 this.op = parent.op; 938 } 939 940 @Override 941 protected ReduceTask<P_IN, P_OUT, R, S> makeChild(Spliterator<P_IN> spliterator) { 942 return new ReduceTask<>(this, spliterator); 943 } 944 945 @Override 946 protected S doLeaf() { 947 return helper.wrapAndCopyInto(op.makeSink(), spliterator); 948 } 949 950 @Override 951 public void onCompletion(CountedCompleter<?> caller) { 952 if (!isLeaf()) { 953 S leftResult = leftChild.getLocalResult(); 954 leftResult.combine(rightChild.getLocalResult()); 955 setLocalResult(leftResult); 956 } 957 // GC spliterator, left and right child 958 super.onCompletion(caller); 959 } 960 } 961 }