1 /* 2 * Copyright (c) 2012, 2017, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.util.stream; 26 27 import java.util.Comparator; 28 import java.util.Iterator; 29 import java.util.Objects; 30 import java.util.Optional; 31 import java.util.Spliterator; 32 import java.util.Spliterators; 33 import java.util.function.BiConsumer; 34 import java.util.function.BiFunction; 35 import java.util.function.BinaryOperator; 36 import java.util.function.Consumer; 37 import java.util.function.DoubleConsumer; 38 import java.util.function.Function; 39 import java.util.function.IntConsumer; 40 import java.util.function.IntFunction; 41 import java.util.function.LongConsumer; 42 import java.util.function.Predicate; 43 import java.util.function.Supplier; 44 import java.util.function.ToDoubleFunction; 45 import java.util.function.ToIntFunction; 46 import java.util.function.ToLongFunction; 47 48 /** 49 * Abstract base class for an intermediate pipeline stage or pipeline source 50 * stage implementing whose elements are of type {@code U}. 51 * 52 * @param <P_IN> type of elements in the upstream source 53 * @param <P_OUT> type of elements in produced by this stage 54 * 55 * @since 1.8 56 */ 57 abstract class ReferencePipeline<P_IN, P_OUT> 58 extends AbstractPipeline<P_IN, P_OUT, Stream<P_OUT>> 59 implements Stream<P_OUT> { 60 61 /** 62 * Constructor for the head of a stream pipeline. 63 * 64 * @param source {@code Supplier<Spliterator>} describing the stream source 65 * @param sourceFlags the source flags for the stream source, described in 66 * {@link StreamOpFlag} 67 * @param parallel {@code true} if the pipeline is parallel 68 */ 69 ReferencePipeline(Supplier<? extends Spliterator<?>> source, 70 int sourceFlags, boolean parallel) { 71 super(source, sourceFlags, parallel); 72 } 73 74 /** 75 * Constructor for the head of a stream pipeline. 76 * 77 * @param source {@code Spliterator} describing the stream source 78 * @param sourceFlags The source flags for the stream source, described in 79 * {@link StreamOpFlag} 80 * @param parallel {@code true} if the pipeline is parallel 81 */ 82 ReferencePipeline(Spliterator<?> source, 83 int sourceFlags, boolean parallel) { 84 super(source, sourceFlags, parallel); 85 } 86 87 /** 88 * Constructor for appending an intermediate operation onto an existing 89 * pipeline. 90 * 91 * @param upstream the upstream element source. 92 */ 93 ReferencePipeline(AbstractPipeline<?, P_IN, ?> upstream, int opFlags) { 94 super(upstream, opFlags); 95 } 96 97 // Shape-specific methods 98 99 @Override 100 final StreamShape getOutputShape() { 101 return StreamShape.REFERENCE; 102 } 103 104 @Override 105 final <P_IN> Node<P_OUT> evaluateToNode(PipelineHelper<P_OUT> helper, 106 Spliterator<P_IN> spliterator, 107 boolean flattenTree, 108 IntFunction<P_OUT[]> generator) { 109 return Nodes.collect(helper, spliterator, flattenTree, generator); 110 } 111 112 @Override 113 final <P_IN> Spliterator<P_OUT> wrap(PipelineHelper<P_OUT> ph, 114 Supplier<Spliterator<P_IN>> supplier, 115 boolean isParallel) { 116 return new StreamSpliterators.WrappingSpliterator<>(ph, supplier, isParallel); 117 } 118 119 @Override 120 final Spliterator<P_OUT> lazySpliterator(Supplier<? extends Spliterator<P_OUT>> supplier) { 121 return new StreamSpliterators.DelegatingSpliterator<>(supplier); 122 } 123 124 @Override 125 final boolean forEachWithCancel(Spliterator<P_OUT> spliterator, Sink<P_OUT> sink) { 126 boolean cancelled; 127 do { } while (!(cancelled = sink.cancellationRequested()) && spliterator.tryAdvance(sink)); 128 return cancelled; 129 } 130 131 @Override 132 final Node.Builder<P_OUT> makeNodeBuilder(long exactSizeIfKnown, IntFunction<P_OUT[]> generator) { 133 return Nodes.builder(exactSizeIfKnown, generator); 134 } 135 136 137 // BaseStream 138 139 @Override 140 public final Iterator<P_OUT> iterator() { 141 return Spliterators.iterator(spliterator()); 142 } 143 144 145 // Stream 146 147 // Stateless intermediate operations from Stream 148 149 @Override 150 public Stream<P_OUT> unordered() { 151 if (!isOrdered()) 152 return this; 153 return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE, StreamOpFlag.NOT_ORDERED) { 154 @Override 155 Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) { 156 return sink; 157 } 158 }; 159 } 160 161 @Override 162 public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) { 163 Objects.requireNonNull(predicate); 164 return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE, 165 StreamOpFlag.NOT_SIZED) { 166 @Override 167 Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) { 168 return new Sink.ChainedReference<P_OUT, P_OUT>(sink) { 169 @Override 170 public void begin(long size) { 171 downstream.begin(-1); 172 } 173 174 @Override 175 public void accept(P_OUT u) { 176 if (predicate.test(u)) 177 downstream.accept(u); 178 } 179 }; 180 } 181 }; 182 } 183 184 @Override 185 @SuppressWarnings("unchecked") 186 public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) { 187 Objects.requireNonNull(mapper); 188 return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE, 189 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 190 @Override 191 Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) { 192 return new Sink.ChainedReference<P_OUT, R>(sink) { 193 @Override 194 public void accept(P_OUT u) { 195 downstream.accept(mapper.apply(u)); 196 } 197 }; 198 } 199 }; 200 } 201 202 @Override 203 public final IntStream mapToInt(ToIntFunction<? super P_OUT> mapper) { 204 Objects.requireNonNull(mapper); 205 return new IntPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE, 206 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 207 @Override 208 Sink<P_OUT> opWrapSink(int flags, Sink<Integer> sink) { 209 return new Sink.ChainedReference<P_OUT, Integer>(sink) { 210 @Override 211 public void accept(P_OUT u) { 212 downstream.accept(mapper.applyAsInt(u)); 213 } 214 }; 215 } 216 }; 217 } 218 219 @Override 220 public final LongStream mapToLong(ToLongFunction<? super P_OUT> mapper) { 221 Objects.requireNonNull(mapper); 222 return new LongPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE, 223 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 224 @Override 225 Sink<P_OUT> opWrapSink(int flags, Sink<Long> sink) { 226 return new Sink.ChainedReference<P_OUT, Long>(sink) { 227 @Override 228 public void accept(P_OUT u) { 229 downstream.accept(mapper.applyAsLong(u)); 230 } 231 }; 232 } 233 }; 234 } 235 236 @Override 237 public final DoubleStream mapToDouble(ToDoubleFunction<? super P_OUT> mapper) { 238 Objects.requireNonNull(mapper); 239 return new DoublePipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE, 240 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 241 @Override 242 Sink<P_OUT> opWrapSink(int flags, Sink<Double> sink) { 243 return new Sink.ChainedReference<P_OUT, Double>(sink) { 244 @Override 245 public void accept(P_OUT u) { 246 downstream.accept(mapper.applyAsDouble(u)); 247 } 248 }; 249 } 250 }; 251 } 252 253 @Override 254 public final <R> Stream<R> flatMap(Function<? super P_OUT, ? extends Stream<? extends R>> mapper) { 255 Objects.requireNonNull(mapper); 256 return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE, 257 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) { 258 @Override 259 Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) { 260 return new Sink.ChainedReference<P_OUT, R>(sink) { 261 // true if cancellationRequested() has been called 262 boolean cancellationRequestedCalled; 263 264 @Override 265 public void begin(long size) { 266 downstream.begin(-1); 267 } 268 269 @Override 270 public void accept(P_OUT u) { 271 try (Stream<? extends R> result = mapper.apply(u)) { 272 if (result != null) { 273 if (!cancellationRequestedCalled) { 274 result.sequential().forEach(downstream); 275 } 276 else { 277 var s = result.sequential().spliterator(); 278 do { } while (!downstream.cancellationRequested() && s.tryAdvance(downstream)); 279 } 280 } 281 } 282 } 283 284 @Override 285 public boolean cancellationRequested() { 286 // If this method is called then an operation within the stream 287 // pipeline is short-circuiting (see AbstractPipeline.copyInto). 288 // Note that we cannot differentiate between an upstream or 289 // downstream operation 290 cancellationRequestedCalled = true; 291 return downstream.cancellationRequested(); 292 } 293 }; 294 } 295 }; 296 } 297 298 @Override 299 public final IntStream flatMapToInt(Function<? super P_OUT, ? extends IntStream> mapper) { 300 Objects.requireNonNull(mapper); 301 return new IntPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE, 302 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) { 303 @Override 304 Sink<P_OUT> opWrapSink(int flags, Sink<Integer> sink) { 305 return new Sink.ChainedReference<P_OUT, Integer>(sink) { 306 // true if cancellationRequested() has been called 307 boolean cancellationRequestedCalled; 308 309 // cache the consumer to avoid creation on every accepted element 310 IntConsumer downstreamAsInt = downstream::accept; 311 312 @Override 313 public void begin(long size) { 314 downstream.begin(-1); 315 } 316 317 @Override 318 public void accept(P_OUT u) { 319 try (IntStream result = mapper.apply(u)) { 320 if (result != null) { 321 if (!cancellationRequestedCalled) { 322 result.sequential().forEach(downstreamAsInt); 323 } 324 else { 325 var s = result.sequential().spliterator(); 326 do { } while (!downstream.cancellationRequested() && s.tryAdvance(downstreamAsInt)); 327 } 328 } 329 } 330 } 331 332 @Override 333 public boolean cancellationRequested() { 334 cancellationRequestedCalled = true; 335 return downstream.cancellationRequested(); 336 } 337 }; 338 } 339 }; 340 } 341 342 @Override 343 public final DoubleStream flatMapToDouble(Function<? super P_OUT, ? extends DoubleStream> mapper) { 344 Objects.requireNonNull(mapper); 345 return new DoublePipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE, 346 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) { 347 @Override 348 Sink<P_OUT> opWrapSink(int flags, Sink<Double> sink) { 349 return new Sink.ChainedReference<P_OUT, Double>(sink) { 350 // true if cancellationRequested() has been called 351 boolean cancellationRequestedCalled; 352 353 // cache the consumer to avoid creation on every accepted element 354 DoubleConsumer downstreamAsDouble = downstream::accept; 355 356 @Override 357 public void begin(long size) { 358 downstream.begin(-1); 359 } 360 361 @Override 362 public void accept(P_OUT u) { 363 try (DoubleStream result = mapper.apply(u)) { 364 if (result != null) { 365 if (!cancellationRequestedCalled) { 366 result.sequential().forEach(downstreamAsDouble); 367 } 368 else { 369 var s = result.sequential().spliterator(); 370 do { } while (!downstream.cancellationRequested() && s.tryAdvance(downstreamAsDouble)); 371 } 372 } 373 } 374 } 375 376 @Override 377 public boolean cancellationRequested() { 378 cancellationRequestedCalled = true; 379 return downstream.cancellationRequested(); 380 } 381 }; 382 } 383 }; 384 } 385 386 @Override 387 public final LongStream flatMapToLong(Function<? super P_OUT, ? extends LongStream> mapper) { 388 Objects.requireNonNull(mapper); 389 // We can do better than this, by polling cancellationRequested when stream is infinite 390 return new LongPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE, 391 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) { 392 @Override 393 Sink<P_OUT> opWrapSink(int flags, Sink<Long> sink) { 394 return new Sink.ChainedReference<P_OUT, Long>(sink) { 395 // true if cancellationRequested() has been called 396 boolean cancellationRequestedCalled; 397 398 // cache the consumer to avoid creation on every accepted element 399 LongConsumer downstreamAsLong = downstream::accept; 400 401 @Override 402 public void begin(long size) { 403 downstream.begin(-1); 404 } 405 406 @Override 407 public void accept(P_OUT u) { 408 try (LongStream result = mapper.apply(u)) { 409 if (result != null) { 410 if (!cancellationRequestedCalled) { 411 result.sequential().forEach(downstreamAsLong); 412 } 413 else { 414 var s = result.sequential().spliterator(); 415 do { } while (!downstream.cancellationRequested() && s.tryAdvance(downstreamAsLong)); 416 } 417 } 418 } 419 } 420 421 @Override 422 public boolean cancellationRequested() { 423 cancellationRequestedCalled = true; 424 return downstream.cancellationRequested(); 425 } 426 }; 427 } 428 }; 429 } 430 431 @Override 432 public final Stream<P_OUT> peek(Consumer<? super P_OUT> action) { 433 Objects.requireNonNull(action); 434 return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE, 435 0) { 436 @Override 437 Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) { 438 return new Sink.ChainedReference<P_OUT, P_OUT>(sink) { 439 @Override 440 public void accept(P_OUT u) { 441 action.accept(u); 442 downstream.accept(u); 443 } 444 }; 445 } 446 }; 447 } 448 449 // Stateful intermediate operations from Stream 450 451 @Override 452 public final Stream<P_OUT> distinct() { 453 return DistinctOps.makeRef(this); 454 } 455 456 @Override 457 public final Stream<P_OUT> sorted() { 458 return SortedOps.makeRef(this); 459 } 460 461 @Override 462 public final Stream<P_OUT> sorted(Comparator<? super P_OUT> comparator) { 463 return SortedOps.makeRef(this, comparator); 464 } 465 466 @Override 467 public final Stream<P_OUT> limit(long maxSize) { 468 if (maxSize < 0) 469 throw new IllegalArgumentException(Long.toString(maxSize)); 470 return SliceOps.makeRef(this, 0, maxSize); 471 } 472 473 @Override 474 public final Stream<P_OUT> skip(long n) { 475 if (n < 0) 476 throw new IllegalArgumentException(Long.toString(n)); 477 if (n == 0) 478 return this; 479 else 480 return SliceOps.makeRef(this, n, -1); 481 } 482 483 @Override 484 public final Stream<P_OUT> takeWhile(Predicate<? super P_OUT> predicate) { 485 return WhileOps.makeTakeWhileRef(this, predicate); 486 } 487 488 @Override 489 public final Stream<P_OUT> dropWhile(Predicate<? super P_OUT> predicate) { 490 return WhileOps.makeDropWhileRef(this, predicate); 491 } 492 493 // Terminal operations from Stream 494 495 @Override 496 public void forEach(Consumer<? super P_OUT> action) { 497 evaluate(ForEachOps.makeRef(action, false)); 498 } 499 500 @Override 501 public void forEachOrdered(Consumer<? super P_OUT> action) { 502 evaluate(ForEachOps.makeRef(action, true)); 503 } 504 505 @Override 506 @SuppressWarnings("unchecked") 507 public final <A> A[] toArray(IntFunction<A[]> generator) { 508 // Since A has no relation to U (not possible to declare that A is an upper bound of U) 509 // there will be no static type checking. 510 // Therefore use a raw type and assume A == U rather than propagating the separation of A and U 511 // throughout the code-base. 512 // The runtime type of U is never checked for equality with the component type of the runtime type of A[]. 513 // Runtime checking will be performed when an element is stored in A[], thus if A is not a 514 // super type of U an ArrayStoreException will be thrown. 515 @SuppressWarnings("rawtypes") 516 IntFunction rawGenerator = (IntFunction) generator; 517 return (A[]) Nodes.flatten(evaluateToArrayNode(rawGenerator), rawGenerator) 518 .asArray(rawGenerator); 519 } 520 521 @Override 522 public final Object[] toArray() { 523 return toArray(Object[]::new); 524 } 525 526 @Override 527 public final boolean anyMatch(Predicate<? super P_OUT> predicate) { 528 return evaluate(MatchOps.makeRef(predicate, MatchOps.MatchKind.ANY)); 529 } 530 531 @Override 532 public final boolean allMatch(Predicate<? super P_OUT> predicate) { 533 return evaluate(MatchOps.makeRef(predicate, MatchOps.MatchKind.ALL)); 534 } 535 536 @Override 537 public final boolean noneMatch(Predicate<? super P_OUT> predicate) { 538 return evaluate(MatchOps.makeRef(predicate, MatchOps.MatchKind.NONE)); 539 } 540 541 @Override 542 public final Optional<P_OUT> findFirst() { 543 return evaluate(FindOps.makeRef(true)); 544 } 545 546 @Override 547 public final Optional<P_OUT> findAny() { 548 return evaluate(FindOps.makeRef(false)); 549 } 550 551 @Override 552 public final P_OUT reduce(final P_OUT identity, final BinaryOperator<P_OUT> accumulator) { 553 return evaluate(ReduceOps.makeRef(identity, accumulator, accumulator)); 554 } 555 556 @Override 557 public final Optional<P_OUT> reduce(BinaryOperator<P_OUT> accumulator) { 558 return evaluate(ReduceOps.makeRef(accumulator)); 559 } 560 561 @Override 562 public final <R> R reduce(R identity, BiFunction<R, ? super P_OUT, R> accumulator, BinaryOperator<R> combiner) { 563 return evaluate(ReduceOps.makeRef(identity, accumulator, combiner)); 564 } 565 566 @Override 567 @SuppressWarnings("unchecked") 568 public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) { 569 A container; 570 if (isParallel() 571 && (collector.characteristics().contains(Collector.Characteristics.CONCURRENT)) 572 && (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) { 573 container = evaluate(new UnorderedConcurrentCollectorOp<>(collector)); 574 } 575 else { 576 container = evaluate(ReduceOps.makeRef(collector)); 577 } 578 return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH) 579 ? (R) container 580 : collector.finisher().apply(container); 581 } 582 583 /** 584 * A TerminalOp for unordered, concurrent collectors. The semantics are 585 * very similar to ForEachOp, except presizing is supported and the 586 * intermediary Collector accumulation type is made accessible. 587 * 588 * @param <T> The Collector's input element type 589 * @param <A> The Collector's mutable accumulation type 590 */ 591 static final class UnorderedConcurrentCollectorOp<T, A> implements TerminalOp<T, A>, TerminalSink<T, A> { 592 final IntFunction<A> sizedSupplier; 593 final Supplier<A> supplier; 594 final BiConsumer<A, ? super T> accumulator; 595 596 A res; 597 598 UnorderedConcurrentCollectorOp(Collector<? super T, A, ?> collector) { 599 this.sizedSupplier = Objects.requireNonNull(collector).sizedSupplier(); 600 this.supplier = collector.supplier(); 601 this.accumulator = collector.accumulator(); 602 } 603 604 @Override 605 public A get() { 606 return res; 607 } 608 609 @Override 610 public void accept(T t) { 611 accumulator.accept(res, t); 612 } 613 614 @Override 615 public <P_IN> A evaluateSequential(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) { 616 throw new IllegalStateException("Cannot evaluate UnorderedConcurrentCollectorOp sequentially"); 617 } 618 619 @Override 620 public <P_IN> A evaluateParallel(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) { 621 long size = helper.exactOutputSizeIfKnown(spliterator); 622 if (size < 0 || size > Integer.MAX_VALUE) 623 res = supplier.get(); 624 else 625 res = sizedSupplier.apply((int) size); 626 new ForEachOps.ForEachTask<>(helper, spliterator, helper.wrapSink(this)).invoke(); 627 return res; 628 } 629 } 630 631 @Override 632 public final <R> R collect(Supplier<R> supplier, 633 BiConsumer<R, ? super P_OUT> accumulator, 634 BiConsumer<R, R> combiner) { 635 return evaluate(ReduceOps.makeRef(supplier, accumulator, combiner)); 636 } 637 638 @Override 639 public final Optional<P_OUT> max(Comparator<? super P_OUT> comparator) { 640 return reduce(BinaryOperator.maxBy(comparator)); 641 } 642 643 @Override 644 public final Optional<P_OUT> min(Comparator<? super P_OUT> comparator) { 645 return reduce(BinaryOperator.minBy(comparator)); 646 647 } 648 649 @Override 650 public final long count() { 651 return evaluate(ReduceOps.makeRefCounting()); 652 } 653 654 // 655 656 /** 657 * Source stage of a ReferencePipeline. 658 * 659 * @param <E_IN> type of elements in the upstream source 660 * @param <E_OUT> type of elements in produced by this stage 661 * @since 1.8 662 */ 663 static class Head<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT> { 664 /** 665 * Constructor for the source stage of a Stream. 666 * 667 * @param source {@code Supplier<Spliterator>} describing the stream 668 * source 669 * @param sourceFlags the source flags for the stream source, described 670 * in {@link StreamOpFlag} 671 */ 672 Head(Supplier<? extends Spliterator<?>> source, 673 int sourceFlags, boolean parallel) { 674 super(source, sourceFlags, parallel); 675 } 676 677 /** 678 * Constructor for the source stage of a Stream. 679 * 680 * @param source {@code Spliterator} describing the stream source 681 * @param sourceFlags the source flags for the stream source, described 682 * in {@link StreamOpFlag} 683 */ 684 Head(Spliterator<?> source, 685 int sourceFlags, boolean parallel) { 686 super(source, sourceFlags, parallel); 687 } 688 689 @Override 690 final boolean opIsStateful() { 691 throw new UnsupportedOperationException(); 692 } 693 694 @Override 695 final Sink<E_IN> opWrapSink(int flags, Sink<E_OUT> sink) { 696 throw new UnsupportedOperationException(); 697 } 698 699 // Optimized sequential terminal operations for the head of the pipeline 700 701 @Override 702 public void forEach(Consumer<? super E_OUT> action) { 703 if (!isParallel()) { 704 sourceStageSpliterator().forEachRemaining(action); 705 } 706 else { 707 super.forEach(action); 708 } 709 } 710 711 @Override 712 public void forEachOrdered(Consumer<? super E_OUT> action) { 713 if (!isParallel()) { 714 sourceStageSpliterator().forEachRemaining(action); 715 } 716 else { 717 super.forEachOrdered(action); 718 } 719 } 720 } 721 722 /** 723 * Base class for a stateless intermediate stage of a Stream. 724 * 725 * @param <E_IN> type of elements in the upstream source 726 * @param <E_OUT> type of elements in produced by this stage 727 * @since 1.8 728 */ 729 abstract static class StatelessOp<E_IN, E_OUT> 730 extends ReferencePipeline<E_IN, E_OUT> { 731 /** 732 * Construct a new Stream by appending a stateless intermediate 733 * operation to an existing stream. 734 * 735 * @param upstream The upstream pipeline stage 736 * @param inputShape The stream shape for the upstream pipeline stage 737 * @param opFlags Operation flags for the new stage 738 */ 739 StatelessOp(AbstractPipeline<?, E_IN, ?> upstream, 740 StreamShape inputShape, 741 int opFlags) { 742 super(upstream, opFlags); 743 assert upstream.getOutputShape() == inputShape; 744 } 745 746 @Override 747 final boolean opIsStateful() { 748 return false; 749 } 750 } 751 752 /** 753 * Base class for a stateful intermediate stage of a Stream. 754 * 755 * @param <E_IN> type of elements in the upstream source 756 * @param <E_OUT> type of elements in produced by this stage 757 * @since 1.8 758 */ 759 abstract static class StatefulOp<E_IN, E_OUT> 760 extends ReferencePipeline<E_IN, E_OUT> { 761 /** 762 * Construct a new Stream by appending a stateful intermediate operation 763 * to an existing stream. 764 * @param upstream The upstream pipeline stage 765 * @param inputShape The stream shape for the upstream pipeline stage 766 * @param opFlags Operation flags for the new stage 767 */ 768 StatefulOp(AbstractPipeline<?, E_IN, ?> upstream, 769 StreamShape inputShape, 770 int opFlags) { 771 super(upstream, opFlags); 772 assert upstream.getOutputShape() == inputShape; 773 } 774 775 @Override 776 final boolean opIsStateful() { 777 return true; 778 } 779 780 @Override 781 abstract <P_IN> Node<E_OUT> opEvaluateParallel(PipelineHelper<E_OUT> helper, 782 Spliterator<P_IN> spliterator, 783 IntFunction<E_OUT[]> generator); 784 } 785 }