1 Function interface
A functional interface is any interface that contains only one abstract method. A functional interface may contain one or more default methods or static methods. Because a functional interface contains only one abstract method, you can omit the name of that method when you implement it.
2 Lambda Expressions
// 完整的写法
(int a, int b) -> { int c = a + b;
System.out.prinlnt(c);
}
// 如果可以推断出a,b的类型,可以省略类型声明int。
// 并且,可以将大括号内的语句,改写成表达式,表达式可以省略大括号。
(a, b) -> System.out.prinlnt(a + b);
// 如果一行表达式,java还能自动返回表达式的值,而不需要return
(a, b) -> a + b;
2-1 Accessing Local Variables of the Enclosing Scope
Lambda expressions can capture variables; they have the same access to local variables of the enclosing scope. Lambda expressions are lexically scoped. This means that they do not inherit any names from a supertype or introduce a new level of scoping. Declarations in a lambda expression are interpreted just as they are in the enclosing environment.
3 Standard Functional Interface
Four basic functional interface:
// 接收一个参数,返回boolean值
interface Predicate<T> {
boolean test(T t);
}
// 接收一个参数,而无返回值
interface Consumer<T> {
void accept(T t);
}
// 接收一个参数,返回计算结果
interface Function<T, R> {
R apply(T t);
}
// 不接收参数,返回值
interface Supplier<T> {
T get();
}
4 Stream
使用流操作,可以代替使用循环重复操作。而且,使用流操作,可以利用多核架构,不需要编写任何特定代码。
stream 并不是某种数据结构,它只是数据源的一种视图。这里的数据源可以是一个数组,Java 容器或 I/O channel 等。因此要得到一个 stream 通常不会手动创建,而是调用对应的工具方法,比如:
- 调用
Collection.stream()
或者Collection.parallelStream()
方法 - 调用
Arrays.stream(T[] array)
方法
虽然大部分情况下 stream 是容器调用 Collection.stream()
方法得到的,但 stream 和 collections 有以下不同:
- 无存储。stream 不是一种数据结构,它只是某种数据源的一个视图,数据源可以是一个数组,Java 容器或 I/O channel 等。
- 为函数式编程而生。对 stream 的任何修改都不会修改背后的数据源,比如对 stream 执行过滤操作并不会删除被过滤的元素,而是会产生一个不包含被过滤元素的新 stream。(重新生成+副本)
- 惰式执行。stream 上的操作并不会立即执行,只有等到用户真正需要结果的时候才会执行。(中间操作和结束操作)
- 可消费性。stream 只能被“消费”一次,一旦遍历过就会失效,就像容器的迭代器那样,想要再次遍历必须重新生成。
对 stream 的操作分为为两类,中间操作(intermediate operations)和结束操作(terminal operations),二者特点是:
- 中间操作总是会惰式执行,调用中间操作只会生成一个标记了该操作的新 stream,仅此而已。
- 结束操作会触发实际计算,计算发生时会把所有中间操作积攒的操作以 pipeline 的方式执行,这样可以减少迭代次数。计算完成之后 stream 就会失效。下表汇总了
Stream
接口的部分常见方法:
操作类型 | 接口方法 |
---|---|
中间操作 | concat() distinct() filter() flatMap() limit() map() peek() skip() sorted() parallel() sequential() unordered() |
结束操作 | allMatch() anyMatch() collect() count() findAny() findFirst() forEach() forEachOrdered() max() min() noneMatch() reduce() toArray() |
不要在结束操作后面再调用中间操作。
5 Reactor
5-1 一个重要概念
负压(backpressure):在基本的消息推送模式中,当消息发布者产生数据的速度过快时,会使得消息订阅者的处理速度无法跟上产生的速度,从而给订阅者造成很大的压力。当压力过大时,有可能造成订阅者本身的奔溃,所产生的级联效应甚至可能造成整个系统的瘫痪。负压的作用在于提供一种从订阅者到生产者的反馈渠道。订阅者可以通过 request()方法来声明其一次所能处理的消息数量,而生产者就只会产生相应数量的消息,直到下一次 request()方法调用。这实际上变成了推拉结合的模式。
5-2 两个基本概念
- Flux:0 — N 个元素的异步序列
- Mono:0 — 1个元素的异步序列
5-3 Flux/Mono 中都包含三种不同类型的消息通知
- 正常的包含元素的消息
- 序列结束的消息
- 序列出错的消息
5-4 Flux 生成复杂序列
- generate() -> SynchronousSink 逐一生成消息,即 next()方法只能最多调用一次。
- create() -> FluxSink 同步/异步的消息产生,且可以在一次调用中产生多个元素,即 next()方法可以多次调用。
5-5 一些重要的操作符
5-5-1 buffer / bufferTimeout / bufferUnit / bufferWhile
以 buffer 开头的函数,作用是把当前流中的元素收集到集合中,并把集合对象作为流中的新元素。
5-5-2 filter
对流中包含的元素进行过滤,只留下满足 Predicate 指定条件的元素。
5-5-3 window
类似于 buffer,所不同的是 window 操作符是把当前流中的元素收集到另外的 Flux 序列中,因此返回值类型是 Flux<Flux<T>>。
因为 window 操作符所产生的流中包含的是 UnicastProcessor 类的对象,所以直接输出的话,打印出来的就是 UnicastProcessor 字符。
5-5-4 zipWith
zipWith 操作符把当前流中的元素与另外一个流中的元素按照一对一的方式进行合并。在合并时可以不做任何处理,由此得到的是一个元素类型为 Tuple2 的流;也可以通过一个 BiFunction 函数对合并的元素进行处理,所得到的流的元素类型为该函数的返回值。注意,合并时以元素少的流为长度。
5-5-5 take / takeUntil / takeWhile / tileUntilOther
take 系列操作符用来从当前流中提取元素。
5-5-6 reduce / reduceWith
reduce 和 reduceWith 操作符对流中包含的所有元素进行累积操作,得到一个包含计算结果的 Mono 序列。累积操作是通过一个 BiFunction 来表示的。在操作时可以指定一个初始值。如果没有初始值,则序列的第一个元素作为初始值。
5-5-7 merge/ mergeSequential
merge 和 mergeSequential 操作符用来把多个流合并成一个 Flux 序列。不同之处在于 merge 按照所有流中元素的实际产生顺序来合并,而 mergeSequential 则按照所有流被订阅的顺序,以流为单位进行合并。
5-5-8 flatMap / flatMapSequential
flatMap 和 flatMapSequential 操作符把流中的每个元素转换成一个流,再把所有流中的元素进行合并。flatMapSequential 和 flatMap 之间的区别与 mergeSequential 和 merge 之间的区别是一样的。
5-5-9 concatMap
concatMap 操作符的作用也是把流中的每个元素转换成一个流,再把所有流进行合并。与 flatMap 不同的是,concatMap 会根据原始流中的元素顺序依次把转换之后的流进行合并;与 flatMapSequential 不同的是,concatMap 对转换之后的流的订阅是动态进行的,而 flatMapSequential 在合并之前就已经订阅了所有的流。
5-5-10 combineLatest
combineLatest 操作符把所有流中的最新产生的元素合并成一个新的元素,作为返回结果流中的元素。只要其中任何一个流中产生了新的元素,合并操作就会被执行一次,结果流中就会产生新的元素。
5-6 消息处理
In a new Reactor version, which means above version 3.0, there are some APIs changed in Flux and Mono error handling.
In that light, all Mono#otherwiseXxx
methods are being deprecated and aligned with their Flux counterparts, and in both classes the switchOnError
flavors are being removed (deprecated in Flux
, removed in Mono
since it was just part of last snapshots).
Mono API deprecations:
otherwise
->onErrorResume
otherwiseReturn
->onErrorReturn
otherwiseIfEmpty
is still being renamed toswitchIfEmpty
mapError
->onErrorMap
- (
switchOnError
removed)
Flux API deprecations:
onErrorResumeWith
->onErrorResume
mapError
->onErrorMap
switchOnError(x)
->onErrorResume(ignore -> x)
(we don’t keep a direct equivalent of the methods but they can be easily replaced by a lambda that ignores the left hand side)
5-7 调度器
通过调度器(Scheduler)可以指定 Reactor 的各种操作执行的方式和所在的线程。有下面几种不同的调度器实现。
- 当前线程,通过 Schedulers.immediate()方法来创建。
- 单一的可复用的线程,通过 Schedulers.single()方法来创建。
- 使用弹性的线程池,通过 Schedulers.elastic()方法来创建。线程池中的线程是可以复用的。当所需要时,新的线程会被创建。如果一个线程闲置太长时间,则会被销毁。该调度器适用于 I/O 操作相关的流的处理。
- 使用对并行操作优化的线程池,通过 Schedulers.parallel()方法来创建。其中的线程数量取决于 CPU 的核的数量。该调度器适用于计算密集型的流的处理。
- 使用支持任务调度的调度器,通过 Schedulers.timer()方法来创建。
- 从已有的 ExecutorService 对象中创建调度器,通过 Schedulers.fromExecutorService()方法来创建。
某些操作符默认就已经使用了特定类型的调度器。比如 interval()方法创建的流就使用了由 Schedulers.timer()创建的调度器。通过 publishOn()和 subscribeOn()方法可以切换执行操作的调度器。其中 publishOn()方法切换的是操作符的执行方式,而 subscribeOn()方法切换的是产生流中元素时的执行方式。