137 }
138 return new ReduceOp<T, Optional<T>, ReducingSink>(StreamShape.REFERENCE) {
139 @Override
140 public ReducingSink makeSink() {
141 return new ReducingSink();
142 }
143 };
144 }
145
146 /**
147 * Constructs a {@code TerminalOp} that implements a mutable reduce on
148 * reference values.
149 *
150 * @param <T> the type of the input elements
151 * @param <I> the type of the intermediate reduction result
152 * @param collector a {@code Collector} defining the reduction
153 * @return a {@code ReduceOp} implementing the reduction
154 */
155 public static <T, I> TerminalOp<T, I>
156 makeRef(Collector<? super T, I, ?> collector) {
157 Supplier<I> supplier = Objects.requireNonNull(collector).supplier();
158 BiConsumer<I, ? super T> accumulator = collector.accumulator();
159 BinaryOperator<I> combiner = collector.combiner();
160 class ReducingSink extends Box<I>
161 implements AccumulatingSink<T, I, ReducingSink> {
162 @Override
163 public void begin(long size) {
164 state = supplier.get();
165 }
166
167 @Override
168 public void accept(T t) {
169 accumulator.accept(state, t);
170 }
171
172 @Override
173 public void combine(ReducingSink other) {
174 state = combiner.apply(state, other.state);
175 }
176 }
177 return new ReduceOp<T, I, ReducingSink>(StreamShape.REFERENCE) {
178 @Override
179 public ReducingSink makeSink() {
180 return new ReducingSink();
181 }
182
183 @Override
184 public int getOpFlags() {
|
137 }
138 return new ReduceOp<T, Optional<T>, ReducingSink>(StreamShape.REFERENCE) {
139 @Override
140 public ReducingSink makeSink() {
141 return new ReducingSink();
142 }
143 };
144 }
145
146 /**
147 * Constructs a {@code TerminalOp} that implements a mutable reduce on
148 * reference values.
149 *
150 * @param <T> the type of the input elements
151 * @param <I> the type of the intermediate reduction result
152 * @param collector a {@code Collector} defining the reduction
153 * @return a {@code ReduceOp} implementing the reduction
154 */
155 public static <T, I> TerminalOp<T, I>
156 makeRef(Collector<? super T, I, ?> collector) {
157 BiConsumer<I, ? super T> accumulator = Objects.requireNonNull(collector).accumulator();
158 BinaryOperator<I> combiner = collector.combiner();
159 class ReducingSink extends Box<I>
160 implements AccumulatingSink<T, I, ReducingSink> {
161 @Override
162 public void begin(long size) {
163 if (size == -1)
164 state = collector.supplier().get();
165 else
166 state = collector.sizedSupplier().apply((int) size);
167 }
168
169 @Override
170 public void accept(T t) {
171 accumulator.accept(state, t);
172 }
173
174 @Override
175 public void combine(ReducingSink other) {
176 state = combiner.apply(state, other.state);
177 }
178 }
179 return new ReduceOp<T, I, ReducingSink>(StreamShape.REFERENCE) {
180 @Override
181 public ReducingSink makeSink() {
182 return new ReducingSink();
183 }
184
185 @Override
186 public int getOpFlags() {
|