什么是Stream?
Java 8引入了全新的Stream API,我们看看java8 doc里面的定义:
A sequence of elements supporting sequential and parallel aggregate operations
简单翻译过来就是一个支持串行和并行聚合操作的元素序列。有点抽象,我们简单总结一下这里有两个点:
- Stream是一个元素序列,可以理解为是一个数组或列表。
- Stream支持一些列的操作,可以串行,可以并发。
Stream 不是集合,它不是用来保存数据的,应用是计算和算法,它更像一个功能强大的Iterator。
好,我们直接从代码层面上见识一下Stream。我们假设一个需求:我们有一个整数数组,将数组中大于10的数排重后做求和运算。比如:
- 数组里有5个数 1,12,19,12,20
- 选出大于10的数:12,19,12,20
- 排重:12,19,20
- 求和:51
写段代码实现一下:
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
public class StreamApiTest {
/**
* 通过循环的方式实现list列表的计算,实现循环有很多种 下标,迭代,foreach,本例子使用foreach
* @param list 传入的整数列表
* @return 返回大于10的数排重后的和
*/
public static int calcByForLoop(List<Integer> list){
Set<Integer> distinctResult = new HashSet<Integer>();
//过滤排重
for(Integer i : list){
if (i > 10 && !distinctResult.contains(i)){
distinctResult.add(i);
}
}
//求和
int sum = 0;
for (Integer i : distinctResult){
sum += i;
}
return sum;
}
//使用java8 stream api 结合lambda表达式实现
public static int calcByStream(List<Integer> list){
//产生Stream,然后经过变换,最后聚合结果
Optional<Integer> sum = list.stream().filter(i -> i > 10).distinct().reduce((x,y) -> x +y);
return sum.orElse(0);
}
public static void main(String[] args) {
List<Integer> list = Arrays.asList(1,12,19,12,20);
System.out.println(calcByForLoop(list));
System.out.println(calcByStream(list));
}
}
一般青年用一个循环10行代码实现上述需求,使用java8的文艺青年一句话搞定。java8这种写法我看不懂啊?
是不是在故弄玄虚?这有意义吗?实现功能不就行了?我觉得你掌握了它你会喜欢上它的。
Stream API是集合功能的增强扩展,结合使用Lambda 表达式(Lambda不懂的可以看我的另一篇文章《Java8新特性详解-Lambda表达式》),极大的提高编程效率和程序可读性。同时它提供串行和并行两种模式进行汇聚操作,并发模式能够充分利用多核处理器的优势。如果上面需求输入数据比较大,需要多线程实现。一般青年是不是得调试半天?使用 Stream API 无需编写一行多线程的代码,就可以很方便地写出高性能的并发程序。改成下面这样就行了:
//获取并行Stream
public static int calcByStream(List<Integer> list){
Optional<Integer> sum = list.parallelStream().filter(i -> i > 10).distinct().reduce((x,y) -> x +y);
return sum.orElse(0);
}
我觉得这有点像开车,有人说开手动档可以练技术,但大部分人开车还是代步,Stream就像是开自动档的车,可以让你更舒适,更高效,不容易出错。
有人说java8越来越不像java了,的确java8引入了一些新鲜和流行的特性,这样也使得java8更有活力。假想一下,以后别人写的java代码没用一行循环,你还能看懂吗?现在是不是觉得 Steam挺厉害,挺神奇的?
Stream API编程很简单,简单总结一些:首先你获取一个Stream对象,然后做各种变换,然后聚合,最后出结果,产生一种流水线的效果。让我们来深入了解一下。
获取Stream对象
获取Stream对象很简单也很方便,我们结合代码示例看一下:
//通过Collection接口获得,所有实现Collection接口对象都可以获得Stream
List<Integer> list = Arrays.asList(1,12,19,12,20);
Stream<Integer> stream1 = list.stream();
Stream<Integer> stream2 = list.parallelStream();
//将数组转换为Stream
Stream<String> stream3 = Stream.of("hello stream".split(" "));
Stream<Integer> stream4 = Stream.of(1,12,19,12,20);
//generate 方法传入Supplier接口可以创建Stream,limit可以指定元素最大个数
Stream<Double> randoms = Stream.generate(Math::random).limit(100);
Stream<String> stream5 = Stream.generate(() -> "test");
//创建empty Stream
Stream<String> stream6 = Stream.empty();
//通过正则表达式创建Stream
Stream<String> words = Pattern.compile("\\s").splitAsStream("hello stream test");
//静态文件转成 Stream
try (Stream<String> lines = Files.lines(path)) {
//Do something with lines
}
Stream 可以通过parallel和sequential方法实现串行和并行模式的变换,是不是特方便?
Stream 主要操作
Stream操作主要分为两类:中间状态和最终状态操作。中间状态操作就是Stream元素经过处理或过滤从一种集合状态到另一种集合状态,比如map,filter等等;最终状态操作是遍历所有元素产生一个结果或者其它操作结果,比如sum,max,min,count,forEach 等等。这些操作组合使用可以产生流水线的效果。Stream操作有一下几方面特点,使用时要注意:
- stream操作不会修改原有数据集合,中间状态变换会产生新的元素序列。
- Stream 不能重复使用,只要执行过最终状态操作就不能再执行其操作了,需要重新创建Stream,或者将结果转存到集合对象中,多次使用。
- 惰性计算,所有的中间状态操作不是立即执行的,而是执行最终状态操作的时候才出发执行的。这样对于流水线操作可以进行一些优化,比如减少循环,计算合并等等。
- 流水线方式执行,Stream执行的时候不是整个元素序列都变换完了再对整个元素序列做操作。Stream采用流水线的方式执行,一个元素走完所有操作,再处理下一个,可以简单理解为一次循环,每个元素逐个操作。这样设计对大量数据遍历有很好的效果。
我们先来分析一下文章开始举的那个例子,验证一下我们讲的特性。
List<Integer> list = Arrays.asList(1,12,19,12,20);
//我们分开写,加点日志输出
Stream<Integer> stream1 = list.stream();
//看看有没有立即执行
Stream<Integer> stream2 = stream1.filter(i -> {
System.out.println("filter:" + i);
if(i > 10) {
return true;
}else{
return false;
}
});
Stream<Integer> stream3 = stream2.distinct();
Optional<Integer> sum = stream3.reduce((x,y) -> {
System.out.println("reduce:x=" + x + ",y=" + y);
return x +y;
});
//再调用一次就会抛异常 java.lang.IllegalStateException: stream has already been operated upon or closed
//stream3.reuce
System.out.println(sum.get());
运行结果:
filter:1
filter:12
filter:19
reduce:x=12,y=19
filter:12
filter:20
reduce:x=31,y=20
51
看看是不是一个元素一次走完流程,理解一下上面例举的操作特点。
常用的变换操作:
- map
map 传入处理函数将Stream变换另一个状态,类似还有mapToInt,mapToDouble,mapToLong等方法
//
//将元素转成小写
Stream<String> stream = Stream.of("Hello","Stream","API","java8","map");
stream.map(String::toLowerCase).forEach(System.out::println);
- flatMap
如果是一对多的映射就用flatMap,将中间的层次结构扁平化
//将每个句子按空格分开拆成词
Stream<String> stream = Stream.of("hello java8","hello stream","hello word");
stream.flatMap(s -> Stream.of(s.split(" "))).forEach(System.out::println);
- filter
filter 传入一个谓词函数,对元素逐一测试,通过测试的元素(函数返回true的)产生新的Stream
//产生100个随机数,只保留大于等于0.5的数
Stream.generate(Math::random).limit(100).filter( d -> d >= 0.5).forEach(System.out::println);
- distinct
distinct,将元素序列排重,会调用equals方法
list.stream().distinct().forEach(System.out::println);
- limit/skip
limit 返回 Stream 的前面 n 个元素;skip 则是扔掉前 n 个元素
list.stream().skip(1).limit(2).forEach(System.out::println);
- sorted
sorted 元素序列进行排序,大量数据可以使用并行模式
list.stream().sorted().forEach(System.out::println);
常用的聚合/最终状态操作:
- reduce
给定一个种子元素(也可以不指定),将元素序列所为输入,应用传入accumulator函数进行结果聚合。
reduce 方法有三种定义:
//没有指定种子元素,则返回Optional 类型
Optional<T> reduce(BinaryOperator<T> accumulator);
//算法逻辑等价于:
/*
boolean foundAny = false;
T result = null;
for (T element : this stream) {
if (!foundAny) {
foundAny = true;
result = element;
}
else
result = accumulator.apply(result, element);
}
return foundAny ? Optional.of(result) : Optional.empty();
*/
//指定种子元素
T reduce(T identity, BinaryOperator<T> accumulator);
//算法逻辑等价于:
/*
U result = identity;
for (T element : this stream)
result = accumulator.apply(result, element)
return result;
*/
<U> U reduce(U identity, BiFunction<U,? super T,U> accumulator, BinaryOperator<U> combiner);
第三种定义的 combiner 是什么东东?我们看一下jdk源码定义你就知道了。
ReferencePipeline.java
@Override
public final P_OUT reduce(final P_OUT identity, final BinaryOperator<P_OUT> accumulator) {
return evaluate(ReduceOps.makeRef(identity, accumulator, accumulator));
}
@Override
public final Optional<P_OUT> reduce(BinaryOperator<P_OUT> accumulator) {
return evaluate(ReduceOps.makeRef(accumulator));
}
@Override
public final <R> R reduce(R identity, BiFunction<R, ? super P_OUT, R> accumulator, BinaryOperator<R> combiner) {
return evaluate(ReduceOps.makeRef(identity, accumulator, combiner));
}
可以看出第三种和第二种调用逻辑是一样的,只是第二种定义默认combiner设置成了accumulator。
第三种定义,java doc上的解释
U result = identity;
for (T element : this stream)
result = accumulator.apply(result, element)
return result;The identity value must be an identity for the combiner function. This means that for all u, combiner(identity, u) is equal to u. Additionally, the combiner function must be compatible with the accumulator function; for all u and t, the following must hold:
combiner.apply(u, accumulator.apply(identity, t)) == accumulator.apply(u, t)
看不懂,正常的,java doc里讲的不清楚,呵呵。其实细心的朋友会看到 java doc里反复提到一句话:
but is not constrained to execute sequentially.
就是说 Stream操作不限于顺序操作,也可以并发操作。这个combiner 在并行的时候才能看到起作用。parallelStream 采用Fork/Join并发框架实现,combiner 主要完成中间结果汇总,你想一下你自己实现多线程求和是不是需要汇总各个线程的结果才能得到最终结果。
我们举个例子: 用parallelStream.reduce 实现 1到9 求和。
将我们先用stream串行模式执行一下看看
List<Integer> list2 = Arrays.asList(1,2,3,4,5,6,7,8,9);
Integer r = list2.stream().reduce(0,(x,y) -> {
System.out.println("accumulator x:" + x + ",y:" + y);
return x + y;
},(x,y) -> {
System.out.println("combiner x:" + x + ",y:" + y);
return x + y;
});
System.out.println(r);
运行结果:
accumulator x:0,y:1
accumulator x:1,y:2
accumulator x:3,y:3
accumulator x:6,y:4
accumulator x:10,y:5
accumulator x:15,y:6
accumulator x:21,y:7
accumulator x:28,y:8
accumulator x:36,y:9
45
发现combiner 没有打印相关信息,表明combiner 没有用到,combiner只在并行模式下其作用。可以看看串行reduce执行过程
-------------------------
|1 + 2 3 4 5 6 7 8 9 |
-------------------------
||
-------------------------
| 3 + 3 4 5 6 7 8 9 |
-------------------------
||
-------------------------
| 6 + 4 5 6 7 8 9 |
-------------------------
||
-------------------------
| 10 + 5 6 7 8 9 |
-------------------------
.....
-------------------------
| 45 |
-------------------------
我们把stream 改成 parallelStream 看看运行结果
List<Integer> list2 = Arrays.asList(1,2,3,4,5,6,7,8,9);
Integer r = list2.parallelStream().reduce(0,(x,y) -> {
System.out.println("accumulator x:" + x + ",y:" + y);
return x + y;
},(x,y) -> {
System.out.println("combiner x:" + x + ",y:" + y);
return x + y;
});
System.out.println(r);
运行结果:
accumulator x:0,y:6
accumulator x:0,y:3
accumulator x:0,y:2
accumulator x:0,y:1
accumulator x:0,y:8
combiner x:1,y:2
accumulator x:0,y:4
accumulator x:0,y:7
accumulator x:0,y:5
combiner x:3,y:4
combiner x:3,y:7
accumulator x:0,y:9
combiner x:5,y:6
combiner x:8,y:9
combiner x:7,y:17
combiner x:11,y:24
combiner x:10,y:35
45
看结果不一样了,combiner起到作用了。初始元素是 t,中间汇总结果是 u,使用combiner进行最后汇总.你现在应该知道 java doc 里面让你的combiner 函数要满足
combiner.apply(u, accumulator.apply(identity, t)) == accumulator.apply(u, t)
这个条件的含义了吧。
细心的朋友可能发现 x =0 被加了好多次,是的 0 是我们设置的 identity,可以想一下如果是非零,那并行模式的结果和串行模式的结果就不一样了,这是个坑,其实 java doc里已经提示了:
The identity value must be an identity for the combiner function. This means that for all u, combiner(identity, u) is equal to u.
可以理解为identity 对combiner 是透明的,不能影响 u的结果,否则就和你想象的不一致了。同时还要保证 accumulator 是结合性运算。
结合性运算符满足:
(a op b) op c == a op (b op c)
a op b op c op d == (a op b) op (c op d) //并行模式可能有这种
像 sum,max,min,concat 都符合结合性运算。
- min/max
min/max 可以理解为一个特定的reduce操作。需要知道一个Comparator。
Optional<Integer> max = list.stream().max((x,y) -> x.compareTo(y));
- collect
collect操作就是将 Stream元素聚合到一个可变容器中,比如将Stream结果转换成 ArrayList,Map等等。其实也是一种特殊的reduce操作。
有两中方法定义:
<R,A> R collect(Collector<? super T,A,R> collector);
<R> R collect(Supplier<R> supplier, BiConsumer<R,? super T> accumulator, BiConsumer<R,R> combiner);
第二种定义是不是有点熟悉,和reduce是类似,collector,帮你封装了一些现成的 supplier,accumulator,combiner组合方便使用,
最后内部都调用reduce操作。
collect操作的算法逻辑就是这样的:
R result = supplier.get(); //构造一个可变集合
for (T element : this stream)
accumulator.accept(result, element); //变换操作,combiner 复杂汇总
return result;
举个例子看看:
List<Integer> list = Arrays.asList(1,2,3,4,5,6,7,8,9);
//将stream转换成List,这两个写法效果一样,Collector更方便,还要好多现成的Collector
List<Integer> l1 = list.stream().collect(Collectors.toList());
List<Integer> l2 = list.stream().collect(ArrayList::new,ArrayList::add,ArrayList::addAll);
//拼接成一个字符串
String concat = stringStream.collect(StringBuilder::new, StringBuilder::append, StringBuilder::append).toString();
//按城市分组,生产Map集合
Map<String, List<Person>> peopleByCity = personStream.collect(Collectors.groupingBy(Person::getCity));
- match/find
Stream支持一些匹配操作:anyMatch,allMatch,noneMatch,findFirst,findAny。这些也叫 short-circuiting terminal operation,就是从无限的流元素序列中,匹配有限几个元素,循环多长也要看数据的特点和测试函数的设计了。
总结
Stream 不是集合,它不是用来保存数据的,主要应用是计算和算法,它更像一个功能强大的Iterator,可以方便的实现操作流水线,结合lambda表达式你也可以体验到函数式编程的优雅;同时极大的提高编程效率和程序可读性;它提供串行和并行两种模式进行汇聚操作,并发模式能够充分利用多核处理器的优势。性能方面也不是问题,你不要去直接和for做性能对比,这么比没意义的。