1 /* 2 * Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.util.stream; 26 27 import java.util.Objects; 28 import java.util.Optional; 29 import java.util.OptionalDouble; 30 import java.util.OptionalInt; 31 import java.util.OptionalLong; 32 import java.util.Spliterator; 33 import java.util.concurrent.CountedCompleter; 34 import java.util.function.BiConsumer; 35 import java.util.function.BiFunction; 36 import java.util.function.BinaryOperator; 37 import java.util.function.DoubleBinaryOperator; 38 import java.util.function.IntBinaryOperator; 39 import java.util.function.LongBinaryOperator; 40 import java.util.function.ObjDoubleConsumer; 41 import java.util.function.ObjIntConsumer; 42 import java.util.function.ObjLongConsumer; 43 import java.util.function.Supplier; 44 45 /** 46 * Factory for creating instances of {@code TerminalOp} that implement 47 * reductions. 48 * 49 * @since 1.8 50 */ 51 final class ReduceOps { 52 53 private ReduceOps() { } 54 55 /** 56 * Constructs a {@code TerminalOp} that implements a functional reduce on 57 * reference values. 58 * 59 * @param <T> the type of the input elements 60 * @param <U> the type of the result 61 * @param seed the identity element for the reduction 62 * @param reducer the accumulating function that incorporates an additional 63 * input element into the result 64 * @param combiner the combining function that combines two intermediate 65 * results 66 * @return a {@code TerminalOp} implementing the reduction 67 */ 68 public static <T, U> TerminalOp<T, U> 69 makeRef(U seed, BiFunction<U, ? super T, U> reducer, BinaryOperator<U> combiner) { 70 Objects.requireNonNull(reducer); 71 Objects.requireNonNull(combiner); 72 class ReducingSink extends Box<U> implements AccumulatingSink<T, U, ReducingSink> { 73 @Override 74 public void begin(long size) { 75 state = seed; 76 } 77 78 @Override 79 public void accept(T t) { 80 state = reducer.apply(state, t); 81 } 82 83 @Override 84 public void combine(ReducingSink other) { 85 state = combiner.apply(state, other.state); 86 } 87 } 88 return new ReduceOp<T, U, ReducingSink>(StreamShape.REFERENCE) { 89 @Override 90 public ReducingSink makeSink() { 91 return new ReducingSink(); 92 } 93 }; 94 } 95 96 /** 97 * Constructs a {@code TerminalOp} that implements a functional reduce on 98 * reference values producing an optional reference result. 99 * 100 * @param <T> The type of the input elements, and the type of the result 101 * @param operator The reducing function 102 * @return A {@code TerminalOp} implementing the reduction 103 */ 104 public static <T> TerminalOp<T, Optional<T>> 105 makeRef(BinaryOperator<T> operator) { 106 Objects.requireNonNull(operator); 107 class ReducingSink 108 implements AccumulatingSink<T, Optional<T>, ReducingSink> { 109 private boolean empty; 110 private T state; 111 112 public void begin(long size) { 113 empty = true; 114 state = null; 115 } 116 117 @Override 118 public void accept(T t) { 119 if (empty) { 120 empty = false; 121 state = t; 122 } else { 123 state = operator.apply(state, t); 124 } 125 } 126 127 @Override 128 public Optional<T> get() { 129 return empty ? Optional.empty() : Optional.of(state); 130 } 131 132 @Override 133 public void combine(ReducingSink other) { 134 if (!other.empty) 135 accept(other.state); 136 } 137 } 138 return new ReduceOp<T, Optional<T>, ReducingSink>(StreamShape.REFERENCE) { 139 @Override 140 public ReducingSink makeSink() { 141 return new ReducingSink(); 142 } 143 }; 144 } 145 146 /** 147 * Constructs a {@code TerminalOp} that implements a mutable reduce on 148 * reference values. 149 * 150 * @param <T> the type of the input elements 151 * @param <I> the type of the intermediate reduction result 152 * @param collector a {@code Collector} defining the reduction 153 * @return a {@code ReduceOp} implementing the reduction 154 */ 155 public static <T, I> TerminalOp<T, I> 156 makeRef(Collector<? super T, I, ?> collector) { 157 BiConsumer<I, ? super T> accumulator = Objects.requireNonNull(collector).accumulator(); 158 BinaryOperator<I> combiner = collector.combiner(); 159 class ReducingSink extends Box<I> 160 implements AccumulatingSink<T, I, ReducingSink> { 161 @Override 162 public void begin(long size) { 163 if (size == -1) 164 state = collector.supplier().get(); 165 else 166 state = collector.sizedSupplier().apply((int) size); 167 } 168 169 @Override 170 public void accept(T t) { 171 accumulator.accept(state, t); 172 } 173 174 @Override 175 public void combine(ReducingSink other) { 176 state = combiner.apply(state, other.state); 177 } 178 } 179 return new ReduceOp<T, I, ReducingSink>(StreamShape.REFERENCE) { 180 @Override 181 public ReducingSink makeSink() { 182 return new ReducingSink(); 183 } 184 185 @Override 186 public int getOpFlags() { 187 return collector.characteristics().contains(Collector.Characteristics.UNORDERED) 188 ? StreamOpFlag.NOT_ORDERED 189 : 0; 190 } 191 }; 192 } 193 194 /** 195 * Constructs a {@code TerminalOp} that implements a mutable reduce on 196 * reference values. 197 * 198 * @param <T> the type of the input elements 199 * @param <R> the type of the result 200 * @param seedFactory a factory to produce a new base accumulator 201 * @param accumulator a function to incorporate an element into an 202 * accumulator 203 * @param reducer a function to combine an accumulator into another 204 * @return a {@code TerminalOp} implementing the reduction 205 */ 206 public static <T, R> TerminalOp<T, R> 207 makeRef(Supplier<R> seedFactory, 208 BiConsumer<R, ? super T> accumulator, 209 BiConsumer<R,R> reducer) { 210 Objects.requireNonNull(seedFactory); 211 Objects.requireNonNull(accumulator); 212 Objects.requireNonNull(reducer); 213 class ReducingSink extends Box<R> 214 implements AccumulatingSink<T, R, ReducingSink> { 215 @Override 216 public void begin(long size) { 217 state = seedFactory.get(); 218 } 219 220 @Override 221 public void accept(T t) { 222 accumulator.accept(state, t); 223 } 224 225 @Override 226 public void combine(ReducingSink other) { 227 reducer.accept(state, other.state); 228 } 229 } 230 return new ReduceOp<T, R, ReducingSink>(StreamShape.REFERENCE) { 231 @Override 232 public ReducingSink makeSink() { 233 return new ReducingSink(); 234 } 235 }; 236 } 237 238 /** 239 * Constructs a {@code TerminalOp} that counts the number of stream 240 * elements. If the size of the pipeline is known then count is the size 241 * and there is no need to evaluate the pipeline. If the size of the 242 * pipeline is non known then count is produced, via reduction, using a 243 * {@link CountingSink}. 244 * 245 * @param <T> the type of the input elements 246 * @return a {@code TerminalOp} implementing the counting 247 */ 248 public static <T> TerminalOp<T, Long> 249 makeRefCounting() { 250 return new ReduceOp<T, Long, CountingSink<T>>(StreamShape.REFERENCE) { 251 @Override 252 public CountingSink<T> makeSink() { return new CountingSink.OfRef<>(); } 253 254 @Override 255 public <P_IN> Long evaluateSequential(PipelineHelper<T> helper, 256 Spliterator<P_IN> spliterator) { 257 if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags())) 258 return spliterator.getExactSizeIfKnown(); 259 return super.evaluateSequential(helper, spliterator); 260 } 261 262 @Override 263 public <P_IN> Long evaluateParallel(PipelineHelper<T> helper, 264 Spliterator<P_IN> spliterator) { 265 if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags())) 266 return spliterator.getExactSizeIfKnown(); 267 return super.evaluateParallel(helper, spliterator); 268 } 269 270 @Override 271 public int getOpFlags() { 272 return StreamOpFlag.NOT_ORDERED; 273 } 274 }; 275 } 276 277 /** 278 * Constructs a {@code TerminalOp} that implements a functional reduce on 279 * {@code int} values. 280 * 281 * @param identity the identity for the combining function 282 * @param operator the combining function 283 * @return a {@code TerminalOp} implementing the reduction 284 */ 285 public static TerminalOp<Integer, Integer> 286 makeInt(int identity, IntBinaryOperator operator) { 287 Objects.requireNonNull(operator); 288 class ReducingSink 289 implements AccumulatingSink<Integer, Integer, ReducingSink>, Sink.OfInt { 290 private int state; 291 292 @Override 293 public void begin(long size) { 294 state = identity; 295 } 296 297 @Override 298 public void accept(int t) { 299 state = operator.applyAsInt(state, t); 300 } 301 302 @Override 303 public Integer get() { 304 return state; 305 } 306 307 @Override 308 public void combine(ReducingSink other) { 309 accept(other.state); 310 } 311 } 312 return new ReduceOp<Integer, Integer, ReducingSink>(StreamShape.INT_VALUE) { 313 @Override 314 public ReducingSink makeSink() { 315 return new ReducingSink(); 316 } 317 }; 318 } 319 320 /** 321 * Constructs a {@code TerminalOp} that implements a functional reduce on 322 * {@code int} values, producing an optional integer result. 323 * 324 * @param operator the combining function 325 * @return a {@code TerminalOp} implementing the reduction 326 */ 327 public static TerminalOp<Integer, OptionalInt> 328 makeInt(IntBinaryOperator operator) { 329 Objects.requireNonNull(operator); 330 class ReducingSink 331 implements AccumulatingSink<Integer, OptionalInt, ReducingSink>, Sink.OfInt { 332 private boolean empty; 333 private int state; 334 335 public void begin(long size) { 336 empty = true; 337 state = 0; 338 } 339 340 @Override 341 public void accept(int t) { 342 if (empty) { 343 empty = false; 344 state = t; 345 } 346 else { 347 state = operator.applyAsInt(state, t); 348 } 349 } 350 351 @Override 352 public OptionalInt get() { 353 return empty ? OptionalInt.empty() : OptionalInt.of(state); 354 } 355 356 @Override 357 public void combine(ReducingSink other) { 358 if (!other.empty) 359 accept(other.state); 360 } 361 } 362 return new ReduceOp<Integer, OptionalInt, ReducingSink>(StreamShape.INT_VALUE) { 363 @Override 364 public ReducingSink makeSink() { 365 return new ReducingSink(); 366 } 367 }; 368 } 369 370 /** 371 * Constructs a {@code TerminalOp} that implements a mutable reduce on 372 * {@code int} values. 373 * 374 * @param <R> The type of the result 375 * @param supplier a factory to produce a new accumulator of the result type 376 * @param accumulator a function to incorporate an int into an 377 * accumulator 378 * @param combiner a function to combine an accumulator into another 379 * @return A {@code ReduceOp} implementing the reduction 380 */ 381 public static <R> TerminalOp<Integer, R> 382 makeInt(Supplier<R> supplier, 383 ObjIntConsumer<R> accumulator, 384 BinaryOperator<R> combiner) { 385 Objects.requireNonNull(supplier); 386 Objects.requireNonNull(accumulator); 387 Objects.requireNonNull(combiner); 388 class ReducingSink extends Box<R> 389 implements AccumulatingSink<Integer, R, ReducingSink>, Sink.OfInt { 390 @Override 391 public void begin(long size) { 392 state = supplier.get(); 393 } 394 395 @Override 396 public void accept(int t) { 397 accumulator.accept(state, t); 398 } 399 400 @Override 401 public void combine(ReducingSink other) { 402 state = combiner.apply(state, other.state); 403 } 404 } 405 return new ReduceOp<Integer, R, ReducingSink>(StreamShape.INT_VALUE) { 406 @Override 407 public ReducingSink makeSink() { 408 return new ReducingSink(); 409 } 410 }; 411 } 412 413 /** 414 * Constructs a {@code TerminalOp} that counts the number of stream 415 * elements. If the size of the pipeline is known then count is the size 416 * and there is no need to evaluate the pipeline. If the size of the 417 * pipeline is non known then count is produced, via reduction, using a 418 * {@link CountingSink}. 419 * 420 * @return a {@code TerminalOp} implementing the counting 421 */ 422 public static TerminalOp<Integer, Long> 423 makeIntCounting() { 424 return new ReduceOp<Integer, Long, CountingSink<Integer>>(StreamShape.INT_VALUE) { 425 @Override 426 public CountingSink<Integer> makeSink() { return new CountingSink.OfInt(); } 427 428 @Override 429 public <P_IN> Long evaluateSequential(PipelineHelper<Integer> helper, 430 Spliterator<P_IN> spliterator) { 431 if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags())) 432 return spliterator.getExactSizeIfKnown(); 433 return super.evaluateSequential(helper, spliterator); 434 } 435 436 @Override 437 public <P_IN> Long evaluateParallel(PipelineHelper<Integer> helper, 438 Spliterator<P_IN> spliterator) { 439 if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags())) 440 return spliterator.getExactSizeIfKnown(); 441 return super.evaluateParallel(helper, spliterator); 442 } 443 444 @Override 445 public int getOpFlags() { 446 return StreamOpFlag.NOT_ORDERED; 447 } 448 }; 449 } 450 451 /** 452 * Constructs a {@code TerminalOp} that implements a functional reduce on 453 * {@code long} values. 454 * 455 * @param identity the identity for the combining function 456 * @param operator the combining function 457 * @return a {@code TerminalOp} implementing the reduction 458 */ 459 public static TerminalOp<Long, Long> 460 makeLong(long identity, LongBinaryOperator operator) { 461 Objects.requireNonNull(operator); 462 class ReducingSink 463 implements AccumulatingSink<Long, Long, ReducingSink>, Sink.OfLong { 464 private long state; 465 466 @Override 467 public void begin(long size) { 468 state = identity; 469 } 470 471 @Override 472 public void accept(long t) { 473 state = operator.applyAsLong(state, t); 474 } 475 476 @Override 477 public Long get() { 478 return state; 479 } 480 481 @Override 482 public void combine(ReducingSink other) { 483 accept(other.state); 484 } 485 } 486 return new ReduceOp<Long, Long, ReducingSink>(StreamShape.LONG_VALUE) { 487 @Override 488 public ReducingSink makeSink() { 489 return new ReducingSink(); 490 } 491 }; 492 } 493 494 /** 495 * Constructs a {@code TerminalOp} that implements a functional reduce on 496 * {@code long} values, producing an optional long result. 497 * 498 * @param operator the combining function 499 * @return a {@code TerminalOp} implementing the reduction 500 */ 501 public static TerminalOp<Long, OptionalLong> 502 makeLong(LongBinaryOperator operator) { 503 Objects.requireNonNull(operator); 504 class ReducingSink 505 implements AccumulatingSink<Long, OptionalLong, ReducingSink>, Sink.OfLong { 506 private boolean empty; 507 private long state; 508 509 public void begin(long size) { 510 empty = true; 511 state = 0; 512 } 513 514 @Override 515 public void accept(long t) { 516 if (empty) { 517 empty = false; 518 state = t; 519 } 520 else { 521 state = operator.applyAsLong(state, t); 522 } 523 } 524 525 @Override 526 public OptionalLong get() { 527 return empty ? OptionalLong.empty() : OptionalLong.of(state); 528 } 529 530 @Override 531 public void combine(ReducingSink other) { 532 if (!other.empty) 533 accept(other.state); 534 } 535 } 536 return new ReduceOp<Long, OptionalLong, ReducingSink>(StreamShape.LONG_VALUE) { 537 @Override 538 public ReducingSink makeSink() { 539 return new ReducingSink(); 540 } 541 }; 542 } 543 544 /** 545 * Constructs a {@code TerminalOp} that implements a mutable reduce on 546 * {@code long} values. 547 * 548 * @param <R> the type of the result 549 * @param supplier a factory to produce a new accumulator of the result type 550 * @param accumulator a function to incorporate an int into an 551 * accumulator 552 * @param combiner a function to combine an accumulator into another 553 * @return a {@code TerminalOp} implementing the reduction 554 */ 555 public static <R> TerminalOp<Long, R> 556 makeLong(Supplier<R> supplier, 557 ObjLongConsumer<R> accumulator, 558 BinaryOperator<R> combiner) { 559 Objects.requireNonNull(supplier); 560 Objects.requireNonNull(accumulator); 561 Objects.requireNonNull(combiner); 562 class ReducingSink extends Box<R> 563 implements AccumulatingSink<Long, R, ReducingSink>, Sink.OfLong { 564 @Override 565 public void begin(long size) { 566 state = supplier.get(); 567 } 568 569 @Override 570 public void accept(long t) { 571 accumulator.accept(state, t); 572 } 573 574 @Override 575 public void combine(ReducingSink other) { 576 state = combiner.apply(state, other.state); 577 } 578 } 579 return new ReduceOp<Long, R, ReducingSink>(StreamShape.LONG_VALUE) { 580 @Override 581 public ReducingSink makeSink() { 582 return new ReducingSink(); 583 } 584 }; 585 } 586 587 /** 588 * Constructs a {@code TerminalOp} that counts the number of stream 589 * elements. If the size of the pipeline is known then count is the size 590 * and there is no need to evaluate the pipeline. If the size of the 591 * pipeline is non known then count is produced, via reduction, using a 592 * {@link CountingSink}. 593 * 594 * @return a {@code TerminalOp} implementing the counting 595 */ 596 public static TerminalOp<Long, Long> 597 makeLongCounting() { 598 return new ReduceOp<Long, Long, CountingSink<Long>>(StreamShape.LONG_VALUE) { 599 @Override 600 public CountingSink<Long> makeSink() { return new CountingSink.OfLong(); } 601 602 @Override 603 public <P_IN> Long evaluateSequential(PipelineHelper<Long> helper, 604 Spliterator<P_IN> spliterator) { 605 if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags())) 606 return spliterator.getExactSizeIfKnown(); 607 return super.evaluateSequential(helper, spliterator); 608 } 609 610 @Override 611 public <P_IN> Long evaluateParallel(PipelineHelper<Long> helper, 612 Spliterator<P_IN> spliterator) { 613 if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags())) 614 return spliterator.getExactSizeIfKnown(); 615 return super.evaluateParallel(helper, spliterator); 616 } 617 618 @Override 619 public int getOpFlags() { 620 return StreamOpFlag.NOT_ORDERED; 621 } 622 }; 623 } 624 625 /** 626 * Constructs a {@code TerminalOp} that implements a functional reduce on 627 * {@code double} values. 628 * 629 * @param identity the identity for the combining function 630 * @param operator the combining function 631 * @return a {@code TerminalOp} implementing the reduction 632 */ 633 public static TerminalOp<Double, Double> 634 makeDouble(double identity, DoubleBinaryOperator operator) { 635 Objects.requireNonNull(operator); 636 class ReducingSink 637 implements AccumulatingSink<Double, Double, ReducingSink>, Sink.OfDouble { 638 private double state; 639 640 @Override 641 public void begin(long size) { 642 state = identity; 643 } 644 645 @Override 646 public void accept(double t) { 647 state = operator.applyAsDouble(state, t); 648 } 649 650 @Override 651 public Double get() { 652 return state; 653 } 654 655 @Override 656 public void combine(ReducingSink other) { 657 accept(other.state); 658 } 659 } 660 return new ReduceOp<Double, Double, ReducingSink>(StreamShape.DOUBLE_VALUE) { 661 @Override 662 public ReducingSink makeSink() { 663 return new ReducingSink(); 664 } 665 }; 666 } 667 668 /** 669 * Constructs a {@code TerminalOp} that implements a functional reduce on 670 * {@code double} values, producing an optional double result. 671 * 672 * @param operator the combining function 673 * @return a {@code TerminalOp} implementing the reduction 674 */ 675 public static TerminalOp<Double, OptionalDouble> 676 makeDouble(DoubleBinaryOperator operator) { 677 Objects.requireNonNull(operator); 678 class ReducingSink 679 implements AccumulatingSink<Double, OptionalDouble, ReducingSink>, Sink.OfDouble { 680 private boolean empty; 681 private double state; 682 683 public void begin(long size) { 684 empty = true; 685 state = 0; 686 } 687 688 @Override 689 public void accept(double t) { 690 if (empty) { 691 empty = false; 692 state = t; 693 } 694 else { 695 state = operator.applyAsDouble(state, t); 696 } 697 } 698 699 @Override 700 public OptionalDouble get() { 701 return empty ? OptionalDouble.empty() : OptionalDouble.of(state); 702 } 703 704 @Override 705 public void combine(ReducingSink other) { 706 if (!other.empty) 707 accept(other.state); 708 } 709 } 710 return new ReduceOp<Double, OptionalDouble, ReducingSink>(StreamShape.DOUBLE_VALUE) { 711 @Override 712 public ReducingSink makeSink() { 713 return new ReducingSink(); 714 } 715 }; 716 } 717 718 /** 719 * Constructs a {@code TerminalOp} that implements a mutable reduce on 720 * {@code double} values. 721 * 722 * @param <R> the type of the result 723 * @param supplier a factory to produce a new accumulator of the result type 724 * @param accumulator a function to incorporate an int into an 725 * accumulator 726 * @param combiner a function to combine an accumulator into another 727 * @return a {@code TerminalOp} implementing the reduction 728 */ 729 public static <R> TerminalOp<Double, R> 730 makeDouble(Supplier<R> supplier, 731 ObjDoubleConsumer<R> accumulator, 732 BinaryOperator<R> combiner) { 733 Objects.requireNonNull(supplier); 734 Objects.requireNonNull(accumulator); 735 Objects.requireNonNull(combiner); 736 class ReducingSink extends Box<R> 737 implements AccumulatingSink<Double, R, ReducingSink>, Sink.OfDouble { 738 @Override 739 public void begin(long size) { 740 state = supplier.get(); 741 } 742 743 @Override 744 public void accept(double t) { 745 accumulator.accept(state, t); 746 } 747 748 @Override 749 public void combine(ReducingSink other) { 750 state = combiner.apply(state, other.state); 751 } 752 } 753 return new ReduceOp<Double, R, ReducingSink>(StreamShape.DOUBLE_VALUE) { 754 @Override 755 public ReducingSink makeSink() { 756 return new ReducingSink(); 757 } 758 }; 759 } 760 761 /** 762 * Constructs a {@code TerminalOp} that counts the number of stream 763 * elements. If the size of the pipeline is known then count is the size 764 * and there is no need to evaluate the pipeline. If the size of the 765 * pipeline is non known then count is produced, via reduction, using a 766 * {@link CountingSink}. 767 * 768 * @return a {@code TerminalOp} implementing the counting 769 */ 770 public static TerminalOp<Double, Long> 771 makeDoubleCounting() { 772 return new ReduceOp<Double, Long, CountingSink<Double>>(StreamShape.DOUBLE_VALUE) { 773 @Override 774 public CountingSink<Double> makeSink() { return new CountingSink.OfDouble(); } 775 776 @Override 777 public <P_IN> Long evaluateSequential(PipelineHelper<Double> helper, 778 Spliterator<P_IN> spliterator) { 779 if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags())) 780 return spliterator.getExactSizeIfKnown(); 781 return super.evaluateSequential(helper, spliterator); 782 } 783 784 @Override 785 public <P_IN> Long evaluateParallel(PipelineHelper<Double> helper, 786 Spliterator<P_IN> spliterator) { 787 if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags())) 788 return spliterator.getExactSizeIfKnown(); 789 return super.evaluateParallel(helper, spliterator); 790 } 791 792 @Override 793 public int getOpFlags() { 794 return StreamOpFlag.NOT_ORDERED; 795 } 796 }; 797 } 798 799 /** 800 * A sink that counts elements 801 */ 802 abstract static class CountingSink<T> 803 extends Box<Long> 804 implements AccumulatingSink<T, Long, CountingSink<T>> { 805 long count; 806 807 @Override 808 public void begin(long size) { 809 count = 0L; 810 } 811 812 @Override 813 public Long get() { 814 return count; 815 } 816 817 @Override 818 public void combine(CountingSink<T> other) { 819 count += other.count; 820 } 821 822 static final class OfRef<T> extends CountingSink<T> { 823 @Override 824 public void accept(T t) { 825 count++; 826 } 827 } 828 829 static final class OfInt extends CountingSink<Integer> implements Sink.OfInt { 830 @Override 831 public void accept(int t) { 832 count++; 833 } 834 } 835 836 static final class OfLong extends CountingSink<Long> implements Sink.OfLong { 837 @Override 838 public void accept(long t) { 839 count++; 840 } 841 } 842 843 static final class OfDouble extends CountingSink<Double> implements Sink.OfDouble { 844 @Override 845 public void accept(double t) { 846 count++; 847 } 848 } 849 } 850 851 /** 852 * A type of {@code TerminalSink} that implements an associative reducing 853 * operation on elements of type {@code T} and producing a result of type 854 * {@code R}. 855 * 856 * @param <T> the type of input element to the combining operation 857 * @param <R> the result type 858 * @param <K> the type of the {@code AccumulatingSink}. 859 */ 860 private interface AccumulatingSink<T, R, K extends AccumulatingSink<T, R, K>> 861 extends TerminalSink<T, R> { 862 void combine(K other); 863 } 864 865 /** 866 * State box for a single state element, used as a base class for 867 * {@code AccumulatingSink} instances 868 * 869 * @param <U> The type of the state element 870 */ 871 private abstract static class Box<U> { 872 U state; 873 874 Box() {} // Avoid creation of special accessor 875 876 public U get() { 877 return state; 878 } 879 } 880 881 /** 882 * A {@code TerminalOp} that evaluates a stream pipeline and sends the 883 * output into an {@code AccumulatingSink}, which performs a reduce 884 * operation. The {@code AccumulatingSink} must represent an associative 885 * reducing operation. 886 * 887 * @param <T> the output type of the stream pipeline 888 * @param <R> the result type of the reducing operation 889 * @param <S> the type of the {@code AccumulatingSink} 890 */ 891 private abstract static class ReduceOp<T, R, S extends AccumulatingSink<T, R, S>> 892 implements TerminalOp<T, R> { 893 private final StreamShape inputShape; 894 895 /** 896 * Create a {@code ReduceOp} of the specified stream shape which uses 897 * the specified {@code Supplier} to create accumulating sinks. 898 * 899 * @param shape The shape of the stream pipeline 900 */ 901 ReduceOp(StreamShape shape) { 902 inputShape = shape; 903 } 904 905 public abstract S makeSink(); 906 907 @Override 908 public StreamShape inputShape() { 909 return inputShape; 910 } 911 912 @Override 913 public <P_IN> R evaluateSequential(PipelineHelper<T> helper, 914 Spliterator<P_IN> spliterator) { 915 return helper.wrapAndCopyInto(makeSink(), spliterator).get(); 916 } 917 918 @Override 919 public <P_IN> R evaluateParallel(PipelineHelper<T> helper, 920 Spliterator<P_IN> spliterator) { 921 return new ReduceTask<>(this, helper, spliterator).invoke().get(); 922 } 923 } 924 925 /** 926 * A {@code ForkJoinTask} for performing a parallel reduce operation. 927 */ 928 @SuppressWarnings("serial") 929 private static final class ReduceTask<P_IN, P_OUT, R, 930 S extends AccumulatingSink<P_OUT, R, S>> 931 extends AbstractTask<P_IN, P_OUT, S, ReduceTask<P_IN, P_OUT, R, S>> { 932 private final ReduceOp<P_OUT, R, S> op; 933 934 ReduceTask(ReduceOp<P_OUT, R, S> op, 935 PipelineHelper<P_OUT> helper, 936 Spliterator<P_IN> spliterator) { 937 super(helper, spliterator); 938 this.op = op; 939 } 940 941 ReduceTask(ReduceTask<P_IN, P_OUT, R, S> parent, 942 Spliterator<P_IN> spliterator) { 943 super(parent, spliterator); 944 this.op = parent.op; 945 } 946 947 @Override 948 protected ReduceTask<P_IN, P_OUT, R, S> makeChild(Spliterator<P_IN> spliterator) { 949 return new ReduceTask<>(this, spliterator); 950 } 951 952 @Override 953 protected S doLeaf() { 954 return helper.wrapAndCopyInto(op.makeSink(), spliterator); 955 } 956 957 @Override 958 public void onCompletion(CountedCompleter<?> caller) { 959 if (!isLeaf()) { 960 S leftResult = leftChild.getLocalResult(); 961 leftResult.combine(rightChild.getLocalResult()); 962 setLocalResult(leftResult); 963 } 964 // GC spliterator, left and right child 965 super.onCompletion(caller); 966 } 967 } 968 }