本文我们分析了 什么 是 “流”,对比了 Java 上几种常见的 “流”库,引入和详细介绍了 Java 22 中的 Stream Gather API 。同时也简单分享了利用 虚拟线程 如何简化 StreammapConcurrent操作符的实现。Java Stream 自从 Java 1.8 引入以来,迅速成为了各位开发者手中信手拈来的工具,大家日常在工作中谈论起来也是如数家珍。但由于 Java Stream 的操作符不足够的丰富,经常会遇到捉襟见肘的情况。对于此,大家可能会继而采用操作符更加丰富的库来作为替代。作为 Java 标准库的一部分,Oracle 的 Java 架构师们也一直以来都在探索如何更好的适应日益增长的用户需求和语言的可维护性,随着 Stream Gather API 的到来,这个情况得到了极大的改善,下面让我结合自身经验,和大家分享 Java Stream Gather API。在编程语言生态中,Stream 是一个抽象的概念,代表了一组连续地对数据的处理的操作及流经其的数据,类似汽车生产流水线一样,下面用 “流”来指代。通常来说,”流“可以分为“有限流”和“无限流”。如 长江(滚滚长江东逝水) 可以看做一个”无限流“,沿途的各种水电站可以看做 “流”中的中间处理节点。通常来说,拆分细致的库可能会将流拆分为 “Source(源)”、“Flow(流处理)”、”Sink(终点)“三个部分,不过在 API 层不一定体现。如下所示:这些 “Source”、“Flow”和 “Sink”的组合,最终形成了一个复杂的处理流图。也就是我们的代码业务逻辑。通常来说,在 Java 中,我们使用的标准库中的 Stream 则可以看做是一个最简单的单向流,如下图所示:现在我们应该对什么 是 “流”有了一个清晰的认识了。通常我们如果不需要自己开发 Java Stream 的操作符,因此几乎接触不到 AbstractPipeline和 Sink这两个类,当然截止 Java 22, Java Stream 也没有足够的扩展点。在目前日常工作中,作为 Java 开发者,我们可能会用到多种面向 “流”的工具包,简单对比如下:从上可以看到,虽然 Java 标准库的 Stream API 开箱即用,但是在很多特定场景下,我们也不得不选用其他的库;为了改善这一点,Viktor Klang 提出了 Stream Gather API,作为除 Stream Collector 之外的另一个扩展点,来弥补 Java Stream 的短板。题外话:Viktor Klang 之前是 Akka 的开发团队的一员。四、为什么需要 Stream Gather API?
Java Stream API 目前主要缺乏丰富的 操作符,而如果像其他的库一样添加很多的操作符到 Stream API,又会有较大的维护负担,如果希望只添加有限的操作符,却可以解决绝大部分的问题。则新的操作符需要支持下面的特性:
支持 “有限流” 和 “无限流”
支持 “有状态” 和 “无状态”操作
支持“提前终止”和 “全量消费”
支持 “单线程顺序处理”和 “多线程并行处理”
Stream Collector & Gather
Java Stream 目前仅提供的了一个扩展点:Collector <T, A, R>, 其包含下面几个部分:基于上面的说明,在处理过程中我们只会用到 accumulator 返回的 BiConsumer和 finisher返回的 Function<A, R>, 所以只能最终产生 1个值 (N :1)。比如我们的 Collectors.toList()只会在流结束的时候产生一个 List,所以 并不能满足我们实现 map、filter、buffer、scan 、zipWithIndex等操作符的要求。为此,在 JEP 461应运而生,API 和 Stream Collector 有点类似,具体的对比如下:上面最大的变化在于,integrator和 finisher都可以产生 1 : N 地产生元素 R。这个可选的 0 ... N个 R 通过DownStream<R>来传递给下游。如果直接在方法的返回值中返回,则仅可以表达 0 ... 1个 R 的情况。DownStream<R>的定义了两个核心方法,比较类似于 Reactor-core 的 Emitter。integrator 主要用于结合当前的状态 A 和 输入的元素 T,利用了 DownStream<R>的 能力来产生 可选的 N 个 R。核心的方法如下:结合目前的信息,我们可以发现整个 Gather API 的设计其实还挺复杂的,因为引入了太多的中间接口名称,同时支持了单线程和多线程等情况,并且有一些隐藏的约束。下面让我们庖丁解牛,细细道来。
结合上面的信息和其他类库中的经验,我们其实知道,“流”的任意中间处理过程都可以看做一个函数, 如下图所示:而上面的整个 GatherAPI 其实是在这个基础上做的变体。具体来说:supplier:产生最初的 State
- integrator + downstream:转换 和 传递值;
finisher主要用于终止信号处理, combiner主要用于并行流。这样一看整个API 就简化很多了。在实现特定的 操作符的时候,我们可以只关心特定部分。比如下面分别实现 map和 filter操作符(无状态)。map操作符无状态, 1 :1 地产生元素
filter 操作符无状态, 1 : 0 ... 1 地产生元素
public static <T, R> Gatherer<T, ?, R> map(final Function<T, R> mapper) {
return Gatherer.of((notused, element, downstream) -> downstream.push(mapper.apply(element)));
}
public static <T> Gatherer<T, ?, T> filter(final Predicate<T> predicate) {
return Gatherer.of((notused, element, downstream) -> {
if (predicate.test(element)) {
return downstream.push(element);
}
return true;
});
}
类似 gather 这样的操作符,也存在其他类库中,比如 reactor-core 、mutiny 和 pekko-stream, 比如以 pekko-stream 为例,和 gather最相近是 statefulMap操作符, 该操作符在别的 函数式流 库中一般叫做 mapAccumulate. 具体的定义如下:
def statefulMap[S, T](
create: function.Creator[S],
f: function.Function2[S, Out, Pair[S, T]],
onComplete: function.Function[S, Optional[T]]): javadsl.Flow[In, T, Mat]
“操作符”地址:
https://nightlies.apache.org/pekko/docs/pekko/1.0.2/docs/stream/operators/Source-or-Flow/statefulMap.html和 Stream Gather 做一个简单的对比:用 statefulMap和 gather分别实现 zipWithIndex如下: public static <T> Source<Pair<T, Integer>, NotUsed> zipWithIndex(final Source<T, NotUsed> source) {
return source.statefulMap(
() -> 0,
(state, element) -> Pair.create(state + 1, Pair.create(element, state)),
notused -> Optional.empty()
);
}
public static <T> Stream<Pair<T, Integer>> zipWithIndex(final Stream<T> stream) {
class State {
int index;
}
return stream.gather(Gatherer.ofSequential(
State::new,
(state, element, downstream) -> downstream.push(Pair.create(element, state.index++))
));
}
需要注意的是,在 GatherAPI 的设计中, 由 initializer返回的 状态 A在整个生命周期中使用,如果我们的操作符是带状态的,那么需要将状态放在一个 可变类 中,如上面实现中的 class State。这个和函数式的(返回新的不可变状态)的方式不太一样。这样做的一个优势就是避免生产小对象 Pair(State, Ouput). 缺点则是会有很多这样的局部类。其他的重要的约束还有:不要将State对象传递给多个线程并发修改、传递给 integrator 的 State 需要是 initializer返回的 State 等。至于并行化的支持这里不再展开,简单来说就是 Java Stream 是通过 combiner合并聚合状态来完成的。无独有偶,在有个 statefulMapAsync的PR 中也有类似的思考。
目前 Stream Gather 提供了几个默认的 Gather实现,比如 fold、scan、windowFixed和 mapConcurrent, 其中 fold比较简单,也有 Collectors.reduce作为对偶实现。最有有意思的是 mapConcurrent,其可以用于指定最大并行度地执行异步转换,类似于 Pekko-Stream 的 mapAsync。其中需要注意的是,在这个实现中,利用到了 虚拟线程、信号量,并且没有用到 CompletableFuture。更加简单的实现了相同的功能。
public static <T, R> Gatherer<T,?,R> mapConcurrent(
final int maxConcurrency,
final Function<? super T, ? extends R> mapper)
整个实现非常的简单:
public static <T, R> Gatherer<T,?,R> mapConcurrent(
final int maxConcurrency,
final Function<? super T, ? extends R> mapper) {
class State {
final ArrayDeque<Future<R>> window =
new ArrayDeque<>(Math.min(maxConcurrency, 16));
final Semaphore windowLock = new Semaphore(maxConcurrency);
final boolean integrate(T element,
Downstream<? super R> downstream) {
if (!downstream.isRejecting())
createTaskFor(element);
return flush(0, downstream);
}
final void createTaskFor(T element) {
windowLock.acquireUninterruptibly();
var task = new FutureTask<R>(() -> {
try {
return mapper.apply(element);
} finally {
windowLock.release();
}
});
var wasAddedToWindow = window.add(task);
Thread.startVirtualThread(task);
}
final boolean flush(long atLeastN,
Downstream<? super R> downstream) {
downstream.push(current.get());
}
}
return Gatherer.ofSequential(
State::new,
Integrator.<State, T, R>ofGreedy(State::integrate),
(state, downstream) -> state.flush(Long.MAX_VALUE, downstream)
);
}
我们可以看到:利用 虚拟线程 来并发的执行 mapper, 并结合 信号量来实现 maxConcurrency的限制。整个实现非常简单,感兴趣的同学可以对比下 Reactor-core 中 flatmap和 Pekko-Stream 中 mapAsync的实现。
相比之下,Gather 的 fold 实现就比较简单,如下所示:
public static <T, R> Gatherer<T, ?, R> fold(
Supplier<R> initial,
BiFunction<? super R, ? super T, ? extends R> folder) {
class State {
R value = initial.get();
State() {}
}
return Gatherer.ofSequential(
State::new,
Integrator.ofGreedy((state, element, downstream) -> {
state.value = folder.apply(state.value, element);
return true;
}),
(state, downstream) -> downstream.push(state.value)
);
}
同样将状态保持在了 局部的 State类中,并且在结束时,调用了 finisher返回的方法,将最终的值推送给了流的下一个处理节点。因为是 fold方法不一定满足 结合律,所以上面使用的是 Gatherer.ofSequential, 来保证串行执行。同时,Stream Gather API 也支持多个 gather 之间组合,相当于其他库中的 fuse ,继而提高性能。
随着 Stream Gather API 、虚拟线程的引入,补齐了 Stream API 在扩展性上的短板,同时降低了复杂度。未来一定会有一个开源的库来扩展各种各样的 Gather 操作符, 比如 zipWithIndex、zipWithNext、mapConcat、throttle等。稍有遗憾的是由于 Java 目前不支持类似 Kotlin 的扩展方法,所以在 Stream API DSL 的最终呈现上还不能是 Stream.zipWithIndex而是 Stream.gather(MyGathers.zipWithIndex)。希望在 下一个LTS来临之前 Java 能够补齐扩展方法这个短板,方便我们更优雅的设计 DSL 扩展。在本文中我们分析了 什么 是 “流”,对比了 Java 上几种常见的 “流”库,引入和详细介绍了 Java 22 中的 Stream Gather API 。同时也简单分享了利用虚拟线程 如何简化 Stream map Concurrent操作符的实现。希望抛砖引玉和大家分享新的特性,共同进步。同时也希望大家都可以升级到新版本的 JDK,更好的赋能业务。我们是淘天集团 - 终端平台团队,我们支撑淘宝、天猫核心电商以及闲鱼、点淘等创新业务,服务数亿消费者,并作为核心技术团队,为每一年的双十一购物狂欢节提供基础技术保障。在底层技术上,我们具备深厚的Android和iOS底层技术积累,拥有丰富的编译器、链接器、解释器技术应用实践。
在研发模式上,我们负责原生研发模式DX演进,服务数千开发者、承载数百亿日PV,深耕系统原生渲染技术,致力于建立下一代终端研发模式。
在网络技术上,我们在终端网络、传输和超大规模网关有深厚技术积累,负责开源方案XQUIC/Tengine,承载亿级长连和千万级QPS;在国际IETF标准、顶会SIGCOMM均有建树。
在终端技术上,我们打造领先行业的移动技术产品,涵盖多端架构、性能体验、组件框架、用户增长等关键领域,致力于移动端系统及厂商特性前沿探索。
在后端技术上,我们负责移动基础设施,有百万级QPS API网关、消息/推送、Serverless平台、自适应流控等柔性高可用解决方案。打造覆盖移动App全生命周期工程技术平台。
在跨端技术上,我们负责Weex2.0和核心Web容器,研究领域涉及W3C标准、WebKit内核、脚本引擎和自绘渲染引擎,面向Web标准提供一流跨端能力。通过卡片级小部件和小游戏技术,丰富创意供给,提供差异化的购物体验。
在前端技术上,我们在前端框架、工程、低代码领域长期深耕,支撑大促营销ProCode、LowCode、NoCode跨端页面研发;配套前沿的页面托管;负责ICE、微前端等开源方案,致力于提供简单友好的研发体系。
在这里,我们拥有最前沿的技术以及最优秀的人才;在这里,我们给你提供平台,师兄助你快速成长;在这里,校园化的办公环境,更有不定期团建high不停; 加入我们,让淘天成为你梦想起航的地方~