Java8 Stream

什么是Stream

Java8 API添加了一个新的抽象称为流Stream,可以以一种声明的方式处理数据,给我们操作集合(Collection)提供了极大的便利。
Stream将要处理的元素集合看作一种流,在流的过程中,借助Stream API对流中的元素进行操作,比如:筛选、排序、聚合等。

stream可以由数组或集合创建,对流的操作分为两种

  • 中间操作,每次返回一个新的流,可以有多个,类似MapReduce中的Map
  • 终端操作,每个流只能进行一次终端操作,终端操作结束后流无法再次使用。终端操作会产生一个新的集合或值。类似MapReduce中的Reduce

Stream API可以极大提高Java程序员的生产力,让程序员写出高效率、干净、简洁的代码。

Stream特性

  1. stream不存储数据,而是按照特定的规则对数据进行计算,一般会输出结果。
  2. stream不会改变数据源,通常情况下会产生一个新的集合或一个值。
  3. stream具有延迟执行特性,只有调用终端操作时,中间操作才会执行。

Stream的创建

  1. 通过java.utiil.Collection.stream()方法用集合创建流
List<string> list = Arrays.asList("a", "b", "c");
//&#x521B;&#x5EFA;&#x4E00;&#x4E2A;&#x987A;&#x5E8F;&#x6D41;
Stream<string> stream = list.stream();
//&#x521B;&#x5EFA;&#x4E00;&#x4E2A;&#x5E76;&#x884C;&#x6D41;
Stream<string> parallelStream = list.parallelStream();
</string></string></string>
  1. 使用java.util.Arrays.stream(T[] array)方法用数组创建流
String[] arr = {"a", "b", "c"};
Stream stream = Arrays.stream(arr);
  1. 使用Stream的静态方法: of()、iterate()、generate()
Stream<string> stream1 = Stream.of("a", "b", "c");

Stream<string> stream2 = Stream.iterate("a", (x) -> x + "a").limit(4);
//stream2.forEach(System.out::println);

Stream<double> stream3 = Stream.generate(Math::random).limit(3);
//stream3.forEach(System.out::println);
</double></string></string>

顺序流&并行流

Stream是顺序流,由主线程按顺序对流执行操作,而ParallelStream是并行流,内部以多线程并行执行的方式对流进行操作,但前提是流中的数据处理没有顺序要求。
如果流中的数据量足够大,并行流可以加快处理速度。
除了直接创建并行流,还可以通过parallel()把顺序流转换成并行流:

Optional<integer>&#xA0;findFirst&#xA0;=&#xA0;list.stream().parallel().filter(x->x>6).findFirst();
</integer>

Stream的使用

核心类Optional

Optional类是一个可以为null的容器对象,如果值存在则isPresent()方法会返回true,调用get()方法会返回该对象。

遍历/匹配(foreach/find/match)

List<integer> list = Arrays.asList(7,6,9,3,8,5,2,1,4);

//&#x904D;&#x5386;&#x8F93;&#x51FA;&#x7B26;&#x5408;&#x6761;&#x4EF6;&#x7684;&#x5143;&#x7D20;
list.stream().filter(x -> x > 6).forEach(System.out::println);

//&#x5339;&#x914D;&#x7B2C;&#x4E00;&#x4E2A;
Optional<integer> findFirst = list.stream().filter(x -> x > 6).findFirst();
System.out.println("&#x5339;&#x914D;&#x7B2C;&#x4E00;&#x4E2A;&#x503C;:" + findFirst.get());

//&#x5339;&#x914D;&#x4EFB;&#x610F;(&#x9002;&#x7528;&#x4E8E;&#x5E76;&#x884C;&#x6D41;)
Optional<integer> findAny = list.parallelStream().filter(x -> x > 6).findAny();
System.out.println("&#x5339;&#x914D;&#x4EFB;&#x610F;&#x4E00;&#x4E2A;&#x503C;:" + findAny.get());

//&#x662F;&#x5426;&#x5305;&#x542B;&#x7B26;&#x5408;&#x7279;&#x5B9A;&#x6761;&#x4EF6;&#x7684;&#x5143;&#x7D20;
boolean anyMatch = list.stream().anyMatch(x -> x > 6);
System.out.println("&#x662F;&#x5426;&#x5B58;&#x5728;&#x5927;&#x4E8E;6&#x7684;&#x503C;:" + anyMatch);

&#x7ED3;&#x679C;&#x8F93;&#x51FA;&#xFF1A;
7
9
8
&#x5339;&#x914D;&#x7B2C;&#x4E00;&#x4E2A;&#x503C;:7
&#x5339;&#x914D;&#x4EFB;&#x610F;&#x4E00;&#x4E2A;&#x503C;:8
&#x662F;&#x5426;&#x5B58;&#x5728;&#x5927;&#x4E8E;6&#x7684;&#x503C;:true
</integer></integer></integer>

筛选(filter)

List<integer> list = Arrays.asList(7,6,9,3,8,5,2,1,4);

//&#x904D;&#x5386;&#x8F93;&#x51FA;&#x7B26;&#x5408;&#x6761;&#x4EF6;&#x7684;&#x5143;&#x7D20;
list.stream().filter(x -> x > 6).forEach(System.out::println);

&#x7ED3;&#x679C;&#x8F93;&#x51FA;&#xFF1A;
7
9
8
</integer>

聚合(max/min/count)

List<string> list = Arrays.asList("admin", "winter", "test", "yfdyf", "supermarket");

Optional<string> max = list.stream().max(Comparator.comparing(String::length));
System.out.println("&#x6700;&#x957F;&#x7684;&#x5B57;&#x7B26;&#x4E32;: " + max.get());

Optional<string> min = list.stream().min(Comparator.comparing(String::length));
System.out.println("&#x6700;&#x77ED;&#x7684;&#x5B57;&#x7B26;&#x4E32;: " + min.get());

long count = list.stream().filter(x -> x.length() > 5).count();
System.out.println("&#x5B57;&#x7B26;&#x4E32;&#x957F;&#x5EA6;&#x5927;&#x4E8E;5&#x7684;&#x4E2A;&#x6570;: " + count);

&#x7ED3;&#x679C;&#x8F93;&#x51FA;&#xFF1A;
&#x6700;&#x957F;&#x7684;&#x5B57;&#x7B26;&#x4E32;: supermarket
&#x6700;&#x77ED;&#x7684;&#x5B57;&#x7B26;&#x4E32;: test
&#x5B57;&#x7B26;&#x4E32;&#x957F;&#x5EA6;&#x5927;&#x4E8E;5&#x7684;&#x4E2A;&#x6570;: 2
</string></string></string>

映射(map/flatMap)

映射,可以将一个流的元素按照一定的映射规则映射到另一个流中。分为map和flatMap

  • map:接收一个函数作为参数,该函数会被应用到每个元素上,并将其映射成一个新的元素。
  • flatMap:接收一个函数作为参数,将流中的每个值都换成另一个流,然后把所有流连接成一个流。
List<string> list = Arrays.asList("admin", "winter", "test", "yfdyf", "supermarket");
List<string> strList = list.stream().map(String::toUpperCase).collect(Collectors.toList());
System.out.println("&#x6BCF;&#x4E2A;&#x5143;&#x7D20;&#x5927;&#x5199;: " + strList);

List<string> list2 = Arrays.asList("a-d-m-i-n", "w-i-n-t-e-r", "t-e-s-t");
List<string> strList2 = list2.stream().flatMap(s -> {
    //&#x5C06;&#x6BCF;&#x4E2A;&#x5143;&#x7D20;&#x8F6C;&#x6362;&#x6210;&#x4E00;&#x4E2A;stream
    String[] split = s.split("-");
    Stream<string> s2 = Arrays.stream(split);
    return s2;
}).collect(Collectors.toList());

System.out.println("&#x5904;&#x7406;&#x524D;&#x96C6;&#x5408;: " + list2);
System.out.println("&#x5904;&#x7406;&#x540E;&#x96C6;&#x5408;:" +strList2);

&#x7ED3;&#x679C;&#x8F93;&#x51FA;&#xFF1A;
&#x6BCF;&#x4E2A;&#x5143;&#x7D20;&#x5927;&#x5199;: [ADMIN, WINTER, TEST, YFDYF, SUPERMARKET]
&#x5904;&#x7406;&#x524D;&#x96C6;&#x5408;: [a-d-m-i-n, w-i-n-t-e-r, t-e-s-t]
&#x5904;&#x7406;&#x540E;&#x96C6;&#x5408;:[a, d, m, i, n, w, i, n, t, e, r, t, e, s, t]
</string></string></string></string></string>

归约(reduce)

归约,也称缩减,是把一个流缩减成一个值,能实现对集合求和,求乘积和求最值操作。

List<integer> list = Arrays.asList(1, 3, 2, 8, 11, 4);
//&#x6C42;&#x548C;&#x65B9;&#x5F0F;1
Optional<integer> sum = list.stream().reduce((x, y) -> x + y);
//&#x6C42;&#x548C;&#x65B9;&#x5F0F;2
Optional<integer> sum2 = list.stream().reduce(Integer::sum);
//&#x6C42;&#x548C;&#x65B9;&#x5F0F;3
Integer sum3 = list.stream().reduce(0, Integer::sum);
System.out.println("list&#x6C42;&#x548C;: " + sum.get() + "," + sum2.get() + "," + sum3);

//&#x6C42;&#x4E58;&#x79EF;
Optional<integer> product = list.stream().reduce((x, y) -> x * y);
System.out.println("list&#x6C42;&#x79EF;: " + product.get());

//&#x6C42;&#x6700;&#x5927;&#x503C;&#x65B9;&#x5F0F;1
Optional<integer> max = list.stream().reduce((x, y) -> x > y ? x : y);
//&#x6C42;&#x6700;&#x5927;&#x503C;&#x65B9;&#x5F0F;2
Integer max2 = list.stream().reduce(1, Integer::max);
System.out.println("list&#x6C42;&#x6700;&#x5927;&#x503C;: " + max.get() + "," + max2);

&#x7ED3;&#x679C;&#x8F93;&#x51FA;:
list&#x6C42;&#x548C;: 29,29,29
list&#x6C42;&#x79EF;: 2112
list&#x6C42;&#x6700;&#x5927;&#x503C;: 11,11
</integer></integer></integer></integer></integer>

收集(collect)

//demo &#x5458;&#x5DE5;&#x7C7B;
@Data
public class Person{
    private String name; //&#x59D3;&#x540D;
    private int salary; //&#x85AA;&#x8D44;
    private int age; //&#x5E74;&#x9F84;
    private String sex; //&#x6027;&#x522B;
    private String area; //&#x5730;&#x533A;

    public Person(String name, int salary, int age, String sex, String area) {
        this.name = name;
        this.salary = salary;
        this.age = age;
        this.sex = sex;
        this.area = area;
    }
}

归集(toList/toSet/toMap)

List<person> personList = new ArrayList<>();

personList.add(new Person("Spring", 9999, 28, "male", "Chang Sha"));
personList.add(new Person("Summer", 7777, 25, "female", "Hang Zhou"));
personList.add(new Person("Autumn", 6666, 23, "female", "Shang Hai"));
personList.add(new Person("Winter", 8888, 26, "male", "Chang Sha"));

List<string> list = personList.stream().filter(p -> p.getAge() >  24).map(Person::getArea).collect(Collectors.toList());
System.out.println("age > 24 return area toList: " + list);

Set<string> set = personList.stream().filter(p -> p.getAge() >  24).map(Person::getArea).collect(Collectors.toSet());
System.out.println("age > 24 return area toSet: " + set);

Map<?, Person> map = personList.stream().filter(p -> p.getSalary() > 8000).collect(Collectors.toMap(Person::getName, p -> p));
System.out.println("salary >8000 return personMap: " + map);

&#x7ED3;&#x679C;&#x8F93;&#x51FA;&#xFF1A;
age > 24 return area toList: [Chang Sha, Hang Zhou, Chang Sha]
age > 24 return area toSet: [Chang Sha, Hang Zhou]
salary >8000 return personMap: {Winter=Person(name=Winter, salary=8888, age=26, sex=male, area=Chang Sha), Spring=Person(name=Spring, salary=9999, age=28, sex=male, area=Chang Sha)}
</string></string></person>

统计(count/averaging)

Collectors提供了一系列用于数据统计的静态方法

  • 计数:count
  • 平均值:averagingInt、averagingLong、averagingDouble
  • 最值:maxBy、minBy
  • 求和:summingInt、summingLong、summingDouble
  • 统计以上所有:summarizingInt、summarizingLong、summarizingDouble
List<person> personList = new ArrayList<>();

personList.add(new Person("Spring", 9999, 28, "male", "Chang Sha"));
personList.add(new Person("Summer", 7777, 25, "female", "Hang Zhou"));
personList.add(new Person("Autumn", 6666, 23, "female", "Shang Hai"));
personList.add(new Person("Winter", 8888, 26, "male", "Chang Sha"));

//&#x6C42;&#x603B;&#x6570;
Long count = personList.stream().collect(Collectors.counting());
//&#x6C42;&#x5E73;&#x5747;&#x5DE5;&#x8D44;
Double average = personList.stream().collect(Collectors.averagingDouble(Person::getSalary));
//&#x6C42;&#x6700;&#x9AD8;&#x5DE5;&#x8D44;
Optional<integer> max = personList.stream().map(Person::getSalary).collect(Collectors.maxBy(Integer::compare));
//&#x6C42;&#x5DE5;&#x8D44;&#x4E4B;&#x548C;
Integer sum = personList.stream().collect(Collectors.summingInt(Person::getSalary));
// &#x4E00;&#x6B21;&#x6027;&#x7EDF;&#x8BA1;&#x6240;&#x6709;&#x4FE1;&#x606F;
DoubleSummaryStatistics collect = personList.stream().collect(Collectors.summarizingDouble(Person::getSalary));

System.out.println("&#x5458;&#x5DE5;&#x603B;&#x6570;: " + count);
System.out.println("&#x5458;&#x5DE5;&#x5E73;&#x5747;&#x5DE5;&#x8D44;: " + average);
System.out.println("&#x5458;&#x5DE5;&#x6700;&#x9AD8;&#x5DE5;&#x8D44;: " + max.get());
System.out.println("&#x5458;&#x5DE5;&#x5DE5;&#x8D44;&#x603B;&#x548C;: " + sum);
System.out.println("&#x5458;&#x5DE5;&#x5DE5;&#x8D44;&#x6240;&#x6709;&#x7EDF;&#x8BA1;: " + collect);

&#x7ED3;&#x679C;&#x8F93;&#x51FA;:
&#x5458;&#x5DE5;&#x603B;&#x6570;: 4
&#x5458;&#x5DE5;&#x5E73;&#x5747;&#x5DE5;&#x8D44;: 8332.5
&#x5458;&#x5DE5;&#x6700;&#x9AD8;&#x5DE5;&#x8D44;: 9999
&#x5458;&#x5DE5;&#x5DE5;&#x8D44;&#x603B;&#x548C;: 33330
&#x5458;&#x5DE5;&#x5DE5;&#x8D44;&#x6240;&#x6709;&#x7EDF;&#x8BA1;: DoubleSummaryStatistics{count=4, sum=33330.000000, min=6666.000000, average=8332.500000, max=9999.000000}
</integer></person>

分组(partitioningBy/groupingBy)

List<person> personList = new ArrayList<>();

personList.add(new Person("Spring", 9999, 28, "male", "Chang Sha"));
personList.add(new Person("Summer", 7777, 25, "female", "Hang Zhou"));
personList.add(new Person("Autumn", 6666, 23, "female", "Shang Hai"));
personList.add(new Person("Winter", 8888, 26, "male", "Chang Sha"));

//&#x5C06;&#x5458;&#x5DE5;&#x6309;&#x85AA;&#x8D44;&#x662F;&#x5426;&#x9AD8;&#x4E8E;8000&#x5206;&#x7EC4;
Map<boolean, list<person>> part = personList.stream().collect(Collectors.partitioningBy(x -> x.getSalary() > 8000));
System.out.println("&#x5458;&#x5DE5;&#x6309;&#x85AA;&#x8D44;&#x662F;&#x5426;&#x5927;&#x4E8E;8000 &#x5206;&#x7EC4;&#x60C5;&#x51B5;:" + part);

//&#x5C06;&#x5458;&#x5DE5;&#x6309;&#x6027;&#x522B;&#x5206;&#x7EC4;
Map<string, list<person>> group = personList.stream().collect(Collectors.groupingBy(Person::getArea));
System.out.println("&#x5458;&#x5DE5;&#x6309;&#x6027;&#x522B; &#x5206;&#x7EC4;&#x60C5;&#x51B5;:" + group);

// &#x5C06;&#x5458;&#x5DE5;&#x5148;&#x6309;&#x6027;&#x522B;&#x5206;&#x7EC4;&#xFF0C;&#x518D;&#x6309;&#x5730;&#x533A;&#x5206;&#x7EC4;
Map<string, map<string, list<person>>> group2 = personList.stream().collect(Collectors.groupingBy(Person::getSex, Collectors.groupingBy(Person::getArea)));
System.out.println("&#x5458;&#x5DE5;&#x6309;&#x6027;&#x522B;&#x3001;&#x5730;&#x533A; &#x5206;&#x7EC4;&#x60C5;&#x51B5;: "+ group2);

&#x7ED3;&#x679C;&#x8F93;&#x51FA;:
&#x5458;&#x5DE5;&#x6309;&#x85AA;&#x8D44;&#x662F;&#x5426;&#x5927;&#x4E8E;8000 &#x5206;&#x7EC4;&#x60C5;&#x51B5;:{false=[Person(name=Summer, salary=7777, age=25, sex=female, area=Hang Zhou), Person(name=Autumn, salary=6666, age=23, sex=female, area=Shang Hai)], true=[Person(name=Spring, salary=9999, age=28, sex=male, area=Chang Sha), Person(name=Winter, salary=8888, age=26, sex=male, area=Chang Sha)]}
&#x5458;&#x5DE5;&#x6309;&#x6027;&#x522B; &#x5206;&#x7EC4;&#x60C5;&#x51B5;:{Chang Sha=[Person(name=Spring, salary=9999, age=28, sex=male, area=Chang Sha), Person(name=Winter, salary=8888, age=26, sex=male, area=Chang Sha)], Shang Hai=[Person(name=Autumn, salary=6666, age=23, sex=female, area=Shang Hai)], Hang Zhou=[Person(name=Summer, salary=7777, age=25, sex=female, area=Hang Zhou)]}
&#x5458;&#x5DE5;&#x6309;&#x6027;&#x522B;&#x3001;&#x5730;&#x533A; &#x5206;&#x7EC4;&#x60C5;&#x51B5;: {female={Shang Hai=[Person(name=Autumn, salary=6666, age=23, sex=female, area=Shang Hai)], Hang Zhou=[Person(name=Summer, salary=7777, age=25, sex=female, area=Hang Zhou)]}, male={Chang Sha=[Person(name=Spring, salary=9999, age=28, sex=male, area=Chang Sha), Person(name=Winter, salary=8888, age=26, sex=male, area=Chang Sha)]}}
</string,></string,></boolean,></person>

连接(joining)

List<string>&#xA0;list&#xA0;=&#xA0;Arrays.asList("A",&#xA0;"B",&#xA0;"C");
String&#xA0;string&#xA0;=&#xA0;list.stream().collect(Collectors.joining("-"));
System.out.println("&#x62FC;&#x63A5;&#x540E;&#x7684;&#x5B57;&#x7B26;&#x4E32;&#xFF1A;"&#xA0;+&#xA0;string);

&#x7ED3;&#x679C;&#x8F93;&#x51FA;&#xFF1A;
&#x62FC;&#x63A5;&#x540E;&#x7684;&#x5B57;&#x7B26;&#x4E32;&#xFF1A;A-B-C
</string>

排序(sorted)

  • sorted():自然排序,流中的元素需实现Comparable接口
  • sorted(Comparator com):Comparator排序器自定义排序
List<person> personList = new ArrayList<>();

personList.add(new Person("Spring", 9999, 28, "male", "Chang Sha"));
personList.add(new Person("Summer", 7777, 25, "female", "Hang Zhou"));
personList.add(new Person("Autumn", 6666, 23, "female", "Shang Hai"));
personList.add(new Person("Winter", 8888, 26, "male", "Chang Sha"));

//&#x6309;&#x5DE5;&#x8D44;&#x5347;&#x5E8F;(&#x81EA;&#x7136;&#x6392;&#x5E8F;)
List<string> newList = personList.stream().sorted(Comparator.comparing(Person::getSalary)).map(Person::getName).collect(Collectors.toList());
System.out.println("&#x6309;&#x5DE5;&#x8D44;&#x5347;&#x5E8F;&#x6392;&#x5E8F;: " + newList);

//&#x6309;&#x5DE5;&#x8D44;&#x964D;&#x5E8F;
List<string> newList2 = personList.stream().sorted(Comparator.comparing(Person::getSalary).reversed()).map(Person::getName).collect(Collectors.toList());
System.out.println("&#x6309;&#x5DE5;&#x8D44;&#x964D;&#x5E8F;&#x6392;&#x5E8F;: " + newList2);

//&#x5148;&#x6309;&#x5DE5;&#x8D44;&#x518D;&#x6309;&#x5E74;&#x9F84;&#x5347;&#x5E8F;&#x6392;&#x5E8F;
List<string> newList3 = personList.stream().sorted(Comparator.comparing(Person::getSalary).thenComparing(Person::getAge)).map(Person::getName).collect(Collectors.toList());
System.out.println("&#x5148;&#x6309;&#x5DE5;&#x8D44;&#x518D;&#x6309;&#x5E74;&#x9F84;&#x5347;&#x5E8F;&#x6392;&#x5E8F;: " + newList3);

//&#x5148;&#x6309;&#x5DE5;&#x8D44;&#x518D;&#x6309;&#x5E74;&#x9F84;&#x964D;&#x5E8F;&#x6392;&#x5E8F;(&#x81EA;&#x5B9A;&#x4E49;&#x6392;&#x5E8F;)
List<string> newList4 = personList.stream().sorted((p1, p2) -> {
    if(p1.getSalary() == p2.getSalary()) {
        return p2.getAge() - p1.getAge();
    } else {
        return p2.getSalary() - p1.getSalary();
    }
}).map(Person::getName).collect(Collectors.toList());
System.out.println("&#x5148;&#x6309;&#x5DE5;&#x8D44;&#x518D;&#x6309;&#x5E74;&#x9F84;&#x964D;&#x5E8F;&#x6392;&#x5E8F;: " + newList4);

&#x7ED3;&#x679C;&#x8F93;&#x51FA;&#xFF1A;
&#x6309;&#x5DE5;&#x8D44;&#x5347;&#x5E8F;&#x6392;&#x5E8F;: [Autumn, Summer, Winter, Spring]
&#x6309;&#x5DE5;&#x8D44;&#x964D;&#x5E8F;&#x6392;&#x5E8F;: [Spring, Winter, Summer, Autumn]
&#x5148;&#x6309;&#x5DE5;&#x8D44;&#x518D;&#x6309;&#x5E74;&#x9F84;&#x5347;&#x5E8F;&#x6392;&#x5E8F;: [Autumn, Summer, Winter, Spring]
&#x5148;&#x6309;&#x5DE5;&#x8D44;&#x518D;&#x6309;&#x5E74;&#x9F84;&#x964D;&#x5E8F;&#x6392;&#x5E8F;: [Spring, Winter, Summer, Autumn]
</string></string></string></string></person>

提取/组合(concat/distinct/limit/skip)

String[] arr1 = {"a", "b", "c", "d"};
String[] arr2 = {"d", "e", "f", "g"};

Stream<string> stream1 = Stream.of(arr1);
Stream<string> stream2 = Stream.of(arr2);

//&#x5408;&#x5E76;&#x4E24;&#x4E2A;&#x6D41;&#x5E76;&#x53BB;&#x91CD;
List<string> newList = Stream.concat(stream1, stream2).distinct().collect(Collectors.toList());
System.out.println("&#x6D41;&#x5408;&#x5E76;: " + newList);

//&#x9650;&#x5236;&#x4ECE;&#x6D41;&#x4E2D;&#x83B7;&#x53D6;&#x524D;5&#x4E2A;&#x6570;&#x636E;
List<string> collect = newList.stream().limit(5).collect(Collectors.toList());
System.out.println("&#x4ECE;&#x5408;&#x5E76;&#x7684;&#x6D41;&#x4E2D;&#x53D6;&#x51FA;&#x524D;5&#x4E2A;&#x6570;&#x636E;: " + collect);

//&#x8DF3;&#x8FC7;&#x524D;5&#x4E2A;&#x6570;&#x636E;
List<string> collect2 = newList.stream().skip(5).collect(Collectors.toList());
System.out.println("&#x4ECE;&#x5408;&#x5E76;&#x7684;&#x6D41;&#x4E2D;&#x8DF3;&#x8FC7;&#x524D;5&#x4E2A;&#x6570;&#x636E;:" + collect2);

&#x7ED3;&#x679C;&#x8F93;&#x51FA;:
&#x6D41;&#x5408;&#x5E76;: [a, b, c, d, e, f, g]
&#x4ECE;&#x5408;&#x5E76;&#x7684;&#x6D41;&#x4E2D;&#x53D6;&#x51FA;&#x524D;5&#x4E2A;&#x6570;&#x636E;: [a, b, c, d, e]
&#x4ECE;&#x5408;&#x5E76;&#x7684;&#x6D41;&#x4E2D;&#x8DF3;&#x8FC7;&#x524D;5&#x4E2A;&#x6570;&#x636E;:[f, g]
</string></string></string></string></string>

Stream源码解析

1. 基本介绍

Java8 Stream

Stream中的操作可以分为两大类: 中间操作(Intermediate operations)与 结束操作(Terminal operations),中间操作只是对操作进行了记录,只有结束操作才会触发实际的计算(即惰性求值),这也是Stream在迭代大集合时高效的原因之一。中间操作又可以分为 无状态(Stateless)操作与 有状态(Stateful)操作,前者是指元素的处理不受之前元素的影响;后者是指该操作只有拿到所有元素之后才能继续下去。结束操作又可以分为 短路(short-circuiting)与 非短路操作,前者是指遇到某些符合条件的元素就可以得到最终结果;而后者是指必须处理所有元素才能得到最终结果。

之所以要进行如此精细的划分,是因为底层对每一种情况的处理方式不同。

Java8 Stream

BaseStream:定义了流的迭代、并行、串行等基本特性

Stream:定义了map、filter、flatmap等用户关注的常用操作

PipelineHelper用于执行管道流中的操作以及捕获输出类型、并行度等信息

Head、StatelessOp、StatefulOp为ReferencePipeline中的内部子类,用于描述流的操作阶段

2. Stream()

public static <t> Stream<t> stream(Spliterator<t> spliterator, boolean parallel) {
    Objects.requireNonNull(spliterator);
    //&#x8FD4;&#x56DE;&#x4E86;&#x4E00;&#x4E2A;&#x7531;Head&#x5B9E;&#x73B0;&#x7684;Stream&#xFF0C;&#x4E09;&#x4E2A;&#x53C2;&#x6570;&#x5206;&#x522B;&#x4EE3;&#x8868;&#x6D41;&#x7684;&#x6570;&#x636E;&#x6E90;&#x3001;&#x7279;&#x6027;&#x7EC4;&#x5408;&#x3001;&#x662F;&#x5426;&#x5E76;&#x884C;
    return new ReferencePipeline.Head<>(spliterator,                     StreamOpFlag.fromCharacteristics(spliterator),parallel);
}

Head(Spliterator<?> source,
     int sourceFlags, boolean parallel) {
    super(source, sourceFlags, parallel);
}

ReferencePipeline(Spliterator<?> source,
                  int sourceFlags, boolean parallel) {
    super(source, sourceFlags, parallel);
}
</t></t></t>

ReferencePipeline.Head的构造方法为调用父类ReferencePipeline的构造方法,ReferencePipeline的构造方法又调用了父类AbstractPipeline的构造方法
AbstractPipeline

AbstractPipeline(Spliterator<?> source,
                 int sourceFlags, boolean parallel) {
    this.previousStage = null;//&#x4E0A;&#x4E00;&#x4E2A;stage&#x6307;&#x5411;null
    this.sourceSpliterator = source;
    this.sourceStage = this;//&#x6E90;&#x5934;stage&#x6307;&#x5411;&#x81EA;&#x5DF1;
    this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
    // The following is an optimization of:
    // StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE);
    this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;
    this.depth = 0;
    this.parallel = parallel;
}

此处构造函数,构造出了一个前一个节点为空,头节点指向自己,后一个节点暂未指定的双端链表。
即, stream函数返回了一个由类实现的管道流,且该管道流为一个双端链表,的头节点。

3. 无状态的中间操作(filter、map、flatmap等)

以filter为例

public final Stream<p_out> filter(Predicate<? super P_OUT> predicate) {
    //&#x5165;&#x53C2;&#x4E0D;&#x80FD;&#x4E3A;&#x7A7A;
    Objects.requireNonNull(predicate);
    //&#x6784;&#x5EFA;&#x4E86;&#x4E00;&#x4E2A;StatelessOp&#x5BF9;&#x8C61;,&#x5373;&#x65E0;&#x72B6;&#x6001;&#x7684;&#x4E2D;&#x95F4;&#x64CD;&#x4F5C;
    return new StatelessOp<p_out, p_out>(this, StreamShape.REFERENCE,
                                 StreamOpFlag.NOT_SIZED) {
        @Override
        //&#x8986;&#x5199;&#x4E86;&#x7236;&#x7C7B;&#x7684;opWrapSink&#x65B9;&#x6CD5;
        Sink<p_out> opWrapSink(int flags, Sink<p_out> sink) {
            return new Sink.ChainedReference<p_out, p_out>(sink) {
                @Override
                public void begin(long size) {
                    downstream.begin(-1);
                }

                @Override
                public void accept(P_OUT u) {
                    if (predicate.test(u))
                        downstream.accept(u);
                }
            };
        }
    };
}
</p_out,></p_out></p_out></p_out,></p_out>

StatelessOp最终调用的构造方法和ReferencePipeline.Head调用的构造方法一致,都是调用的AbstractPipeline的构造方法,不过第一个参数传入的是this,也就是将上一步创建的对象传入,作为该构造对象的previousStage。

AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
    if (previousStage.linkedOrConsumed)
        throw new IllegalStateException(MSG_STREAM_LINKED);
    previousStage.linkedOrConsumed = true;
    //previousStage&#x7684;&#x6307;&#x9488;&#x6307;&#x5411;&#x8BE5;&#x521B;&#x5EFA;&#x5BF9;&#x8C61;
    previousStage.nextStage = this;
    //&#x4E0A;&#x4E00;&#x4E2A;stage&#x6307;&#x5411;&#x4E0A;&#x4E00;&#x6B65;&#x521B;&#x5EFA;&#x7684;&#x5BF9;&#x8C61;
    this.previousStage = previousStage;
    this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
    this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
    this.sourceStage = previousStage.sourceStage;//&#x6E90;&#x5934;stage&#x4E0E;previousStage&#x4FDD;&#x6301;&#x4E00;&#x81F4;
    if (opIsStateful())
        sourceStage.sourceAnyStateful = true;
    this.depth = previousStage.depth + 1;
}

再来看看map操作

public final <r> Stream<r> map(Function<? super P_OUT, ? extends R> mapper) {
    Objects.requireNonNull(mapper);
    return new StatelessOp<p_out, r>(this, StreamShape.REFERENCE,
                                 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
        @Override
        Sink<p_out> opWrapSink(int flags, Sink<r> sink) {
            return new Sink.ChainedReference<p_out, r>(sink) {
                @Override
                public void accept(P_OUT u) {
                    downstream.accept(mapper.apply(u));
                }
            };
        }
    };
}
</p_out,></r></p_out></p_out,></r></r>

可以看到与filter方法一样,都是创建了一个StagellessOp对象, 重写了opWrapSink方法
调用一系列中间操作后会形成如下所示的双链表结构:

Java8 Stream

4. 终结操作(collect等)

以collect为例

public final <r, a> R collect(Collector<? super P_OUT, A, R> collector) {
    A container;
    //&#x5E76;&#x884C;&#x6A21;&#x5F0F;
    if (isParallel()
            && (collector.characteristics().contains(Collector.Characteristics.CONCURRENT))
            && (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) {
        container = collector.supplier().get();
        BiConsumer<a, ? super p_out> accumulator = collector.accumulator();
        forEach(u -> accumulator.accept(container, u));
    }
    //&#x4E32;&#x884C;&#x6A21;&#x5F0F;
    else {
        container = evaluate(ReduceOps.makeRef(collector));
    }
    return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)
           ? (R) container
           : collector.finisher().apply(container);
}

&#x4EE5;&#x4E32;&#x884C;&#x6A21;&#x5F0F;&#x4E3A;&#x4F8B;&#xFF0C;&#x5F80;&#x4E0B;&#x8D70;

public static <t, i> TerminalOp<t, i>
makeRef(Collector<? super T, I, ?> collector) {
    Supplier<i> supplier = Objects.requireNonNull(collector).supplier();
    BiConsumer<i, ? super t> accumulator = collector.accumulator();
    BinaryOperator<i> combiner = collector.combiner();
    class ReducingSink extends Box<i>
            implements AccumulatingSink<t, i, reducingsink> {
        @Override
        public void begin(long size) {
            state = supplier.get();
        }

        @Override
        public void accept(T t) {
            accumulator.accept(state, t);
        }

        @Override
        public void combine(ReducingSink other) {
            state = combiner.apply(state, other.state);
        }
    }
    return new ReduceOp<t, i, reducingsink>(StreamShape.REFERENCE) {
        @Override
        public ReducingSink makeSink() {
            return new ReducingSink();
        }

        @Override
        public int getOpFlags() {
            return collector.characteristics().contains(Collector.Characteristics.UNORDERED)
                   ? StreamOpFlag.NOT_ORDERED
                   : 0;
        }
    };
}
</t,></t,></i></i></i,></i></t,></t,></a,></r,>

ReduceOps.makeRef(collector)会构造一个TerminalOp对象,传入evaluate方法。
以串行模式为例,evaluate方法会调用TerminalOp.evaluateSequential方法,再调用
PipelineHelper.wrapAndCopyInto方法,最终调用AbstarctPipeline中的copyInto方法,最终实现流水线的启动。

final <p_in, s extends sink<e_out>> S wrapAndCopyInto(S sink, Spliterator<p_in> spliterator) {
    copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
    return sink;
}

final <p_in> void copyInto(Sink<p_in> wrappedSink, Spliterator<p_in> spliterator) {
    Objects.requireNonNull(wrappedSink);
    //&#x65E0;&#x77ED;&#x8DEF;&#x64CD;&#x4F5C;
    if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
        wrappedSink.begin(spliterator.getExactSizeIfKnown());//&#x901A;&#x77E5;&#x5F00;&#x59CB;&#x904D;&#x5386;
        spliterator.forEachRemaining(wrappedSink);//&#x4F9D;&#x6B21;&#x5904;&#x7406;&#x6BCF;&#x4E2A;&#x5143;&#x7D20;
        wrappedSink.end();//&#x901A;&#x77E5;&#x7ED3;&#x675F;&#x904D;&#x5386;
    }
    //&#x6709;&#x77ED;&#x8DEF;&#x64CD;&#x4F5C;
    else {
        copyIntoWithCancel(wrappedSink, spliterator);
    }
}
</p_in></p_in></p_in></p_in></p_in,>

该方法从数据源Spliterator中获取元素,推入Sink进行处理,如果有短路操作,在每个元素处理后会通过Sink.cancellationRequested()判断是否立即返回。

总结:
前面的中间操作只是做了一系列的准备工作,并没有真正执行,真正的迭代是由结束操作来触发的。

5. Sink

Stream中使用Stage的概念来描述一个完整的操作,将具有先后顺序的各个Stage连到一起,就构成了整个流水线。

很多Stream操作会需要一个回调函数(Lambda表达式),因此一个完整的操作是

stage只是解决了操作记录的问题,要想让流水线起到应有的作用我们需要一种将所有操作叠加到一起的方案。你可能会觉得这很简单,只需要从流水线的head开始依次执行每一步的操作(包括回调函数)就行了。这听起来似乎是可行的,但是你忽略了前面的Stage并不知道后面Stage到底执行了哪种操作,以及回调函数是哪种形式。换句话说,只有当前Stage本身才知道该如何执行自己包含的动作。这就需要有某种协议来协调相邻Stage之间的调用关系。
而通过上文的collect源码,可以推测,Sink将在Stream中扮演该角色。

interface&#xA0;Sink<t>&#xA0;extends&#xA0;Consumer<t>&#xA0;{

//&#x5F00;&#x59CB;&#x904D;&#x5386;&#x5143;&#x7D20;&#x4E4B;&#x524D;&#x8C03;&#x7528;&#x8BE5;&#x65B9;&#x6CD5;&#xFF0C;&#x901A;&#x77E5;Sink&#x505A;&#x597D;&#x51C6;&#x5907;&#xFF0C;size&#x4EE3;&#x8868;&#x8981;&#x5904;&#x7406;&#x7684;&#x5143;&#x7D20;&#x603B;&#x6570;&#xFF0C;&#x5982;&#x679C;&#x4F20;&#x5165;-1&#x4EE3;&#x8868;&#x603B;&#x6570;&#x672A;&#x77E5;&#x6216;&#x8005;&#x65E0;&#x9650;
default&#xA0;void&#xA0;begin(long&#xA0;size) {}

//&#x6240;&#x6709;&#x5143;&#x7D20;&#x904D;&#x5386;&#x5B8C;&#x6210;&#x4E4B;&#x540E;&#x8C03;&#x7528;&#xFF0C;&#x901A;&#x77E5;Sink&#x6CA1;&#x6709;&#x66F4;&#x591A;&#x7684;&#x5143;&#x7D20;&#x4E86;&#x3002;
default&#xA0;void&#xA0;end() {}

//&#x5982;&#x679C;&#x8FD4;&#x56DE;true&#xFF0C;&#x4EE3;&#x8868;&#x8FD9;&#x4E2A;Sink&#x4E0D;&#x518D;&#x63A5;&#x6536;&#x4EFB;&#x4F55;&#x6570;&#x636E;
default&#xA0;boolean&#xA0;cancellationRequested() {
return&#xA0;false;
}

//&#x8FD8;&#x6709;&#x4E00;&#x4E2A;&#x7EE7;&#x627F;&#x81EA;Consumer&#x7684;&#x65B9;&#x6CD5;&#xFF0C;&#x7528;&#x4E8E;&#x63A5;&#x6536;&#x7BA1;&#x9053;&#x6D41;&#x4E2D;&#x7684;&#x6570;&#x636E;
//void accept(T t);

...

}
</t></t>

注意上文collect源码中,collect操作在调用copyInto方法时,传入了一个名为wrappedSink的参数,就是一个Sink对象,由AbstractPipeline.wrapSink方法构造而来。

@Override
@SuppressWarnings("unchecked")
final&#xA0;<p_in>&#xA0;Sink<p_in>&#xA0;wrapSink(Sink<e_out>&#xA0;sink) {
Objects.requireNonNull(sink);

for&#xA0;(@SuppressWarnings("rawtypes")
AbstractPipeline&#xA0;p&#xA0;=&#xA0;AbstractPipeline.this;&#xA0;p.depth&#xA0;>&#xA0;0;&#xA0;p&#xA0;=&#xA0;p.previousStage) {
// &#x81EA;&#x672C;&#x8EAB;stage&#x5F00;&#x59CB;&#xFF0C;&#x4E0D;&#x65AD;&#x8C03;&#x7528;&#x524D;&#x4E00;&#x4E2A;stage&#x7684;opWrapSink&#xFF0C;&#x76F4;&#x5230;&#x5934;&#x8282;&#x70B9;
sink&#xA0;=&#xA0;p.opWrapSink(p.previousStage.combinedFlags,&#xA0;sink);
}
return&#xA0;(Sink<p_in>)&#xA0;sink;
}
</p_in></e_out></p_in></p_in>

onWrapSink()方法的作用是将当前操作与下游Sink结合成新的Sink,只要从流水线的最后一个Stage开始,不断调用上一个Stage的onWrapSink()方法直到头节点,就可以得到一个代表了流水线上所有操作的Sink。
而onWrapSink()方法,正是在上文中间操作中,重写的方法。

每个Stage都会将自己的操作封装到一个Sink里,前一个Stage只需调用后一个Stage的accept()方法即可,并不需要知道其内部是如何处理的。当然对于有状态的操作,Sink的begin()和end()方法也是必须实现的。比如Stream.sorted()
是一个有状态的中间操作,其对应的Sink.begin()方法可能会创建一个盛放结果的容器,而accept()方法负责将元素添加到该容器,最后end()负责对容器进行排序。对于短路操作,Sink.cancellationRequested()也是必须实现的,比如Stream.findFirst()是短路操作,只要找到一个元素,cancellationRequested()就应该返回true,以便调用者尽快结束查找。Sink的四个接口方法常常相互协作,共同完成计算任务。 实际上Stream API内部实现的的本质,就是如何重载Sink的这四个接口方法。

Java8 Stream

有了Sink对操作的包装,Stage之间的调用问题就解决了,执行时只需要从流水线的head开始对数据源依次调用每个Stage对应的Sink.{begin(), accept(), cancellationRequested(), end()}方法就可以了。

以sorted方法为例,sorted一种可能封装的Sink代码如下:

// Stream.sort()&#x65B9;&#x6CD5;&#x7528;&#x5230;&#x7684;Sink&#x5B9E;&#x73B0;
class&#xA0;RefSortingSink<t>&#xA0;extends&#xA0;AbstractRefSortingSink<t>&#xA0;{
private&#xA0;ArrayList<t>&#xA0;list;// &#x5B58;&#x653E;&#x7528;&#x4E8E;&#x6392;&#x5E8F;&#x7684;&#x5143;&#x7D20;

RefSortingSink(Sink<? super T>&#xA0;downstream,&#xA0;Comparator<? super T>&#xA0;comparator) {
super(downstream,&#xA0;comparator);
}

@Override
public&#xA0;void&#xA0;begin(long&#xA0;size) {
...

// &#x521B;&#x5EFA;&#x4E00;&#x4E2A;&#x5B58;&#x653E;&#x6392;&#x5E8F;&#x5143;&#x7D20;&#x7684;&#x5217;&#x8868;
list&#xA0;=&#xA0;(size&#xA0;>=&#xA0;0)&#xA0;?&#xA0;new&#xA0;ArrayList<t>((int)&#xA0;size) :&#xA0;new&#xA0;ArrayList<t>();
}

@Override
public&#xA0;void&#xA0;end() {
list.sort(comparator);// &#x53EA;&#x6709;&#x5143;&#x7D20;&#x5168;&#x90E8;&#x63A5;&#x6536;&#x4E4B;&#x540E;&#x624D;&#x80FD;&#x5F00;&#x59CB;&#x6392;&#x5E8F;
downstream.begin(list.size());
if&#xA0;(!cancellationWasRequested) {// &#x4E0B;&#x6E38;Sink&#x4E0D;&#x5305;&#x542B;&#x77ED;&#x8DEF;&#x64CD;&#x4F5C;
list.forEach(downstream::accept);// 2. &#x5C06;&#x5904;&#x7406;&#x7ED3;&#x679C;&#x4F20;&#x9012;&#x7ED9;&#x6D41;&#x6C34;&#x7EBF;&#x4E0B;&#x6E38;&#x7684;Sink
}&#xA0;else&#xA0;{// &#x4E0B;&#x6E38;Sink&#x5305;&#x542B;&#x77ED;&#x8DEF;&#x64CD;&#x4F5C;
for&#xA0;(T&#xA0;t&#xA0;:&#xA0;list) {// &#x6BCF;&#x6B21;&#x90FD;&#x8C03;&#x7528;cancellationRequested()&#x8BE2;&#x95EE;&#x662F;&#x5426;&#x53EF;&#x4EE5;&#x7ED3;&#x675F;&#x5904;&#x7406;&#x3002;
if&#xA0;(downstream.cancellationRequested())
break;
downstream.accept(t);// 2. &#x5C06;&#x5904;&#x7406;&#x7ED3;&#x679C;&#x4F20;&#x9012;&#x7ED9;&#x6D41;&#x6C34;&#x7EBF;&#x4E0B;&#x6E38;&#x7684;Sink
}
}
downstream.end();
list&#xA0;=&#xA0;null;
}

@Override
public&#xA0;void&#xA0;accept(T&#xA0;t) {
list.add(t);// 1. &#x4F7F;&#x7528;&#x5F53;&#x524D;Sink&#x5305;&#x88C5;&#x52A8;&#x4F5C;&#x5904;&#x7406;t&#xFF0C;&#x53EA;&#x662F;&#x7B80;&#x5355;&#x7684;&#x5C06;&#x5143;&#x7D20;&#x6DFB;&#x52A0;&#x5230;&#x4E2D;&#x95F4;&#x5217;&#x8868;&#x5F53;&#x4E2D;
}
}
</t></t></t></t></t>

上述代码完美的展现了Sink的四个接口方法是如何协同工作的:
begin():告诉Sink参与排序的元素个数,方便确定中间结果容器的大小

accept():将元素添加到中间结果当中,最终执行时调用者会不断调用该方法,直到遍历所有元素。

end():告诉Sink所有元素遍历完毕,启动排序步骤,排序完成后将结果传递给下游的Sink

如果下游Sink是短路操作,将结果传递给下游时不断询问下游cancellationRequested()是否可以结束处理。

6. 结果收集

流水线上所有操作都执行后,用户所需要的结果(如果有)在哪里?
首先要说明的是不是所有的Stream结束操作都需要返回结果,有些操作只是为了使用其副作用(Side-effects),比如使用Stream.forEach()方法将结果打印出来就是常见的使用副作用的场景(事实上,除了打印之外其他场景都应避免使用副作用),对于真正需要返回结果的结束操作结果存在哪里呢?这种需要分情况讨论:
对于返回boolean或者Optional的操作的操作,由于值返回一个值,只需要在对应的Sink中记录这个值,等到执行结束时返回就可以了。
对于归约操作,最终结果放在用户调用时指定的容器中(容器类型通过收集器指定)。collect(),reduce(),max(),min()都是归约操作,虽然max()和min()也是返回一个Optional,但事实上底层是通过调用reduce()方法实现的。
对于返回是数组的情况,在最终返回数组之前,结果其实是存储在一种叫做Node的数据结构中的。Node是一种多叉树结构,元素存储在树的叶子当中,并且一个叶子节点可以存放多个元素。这样做是为了并行执行方便。

7. 并行流

由上文可知,可通过parallel()方法,将顺序流转换成并行流。parallel()方法的实现很简单,只是将源stage的并行标记值设为true。在结束操作通过evaluate方法启动管道流时,会根据并行标记来判断。如果并行标记为true则会通过ReduceTask来执行并发任务。

public&#xA0;<p_in>&#xA0;R&#xA0;evaluateParallel(PipelineHelper<t>&#xA0;helper,&#xA0;Spliterator<p_in>&#xA0;spliterator) {
return&#xA0;new&#xA0;ReduceTask<>(this,&#xA0;helper,&#xA0;spliterator).invoke().get();
}
</p_in></t></p_in>

ReduceTask是ForkJoinTask的子类,其实Stream的并行处理都是基于Fork/Join框架的,相关类与接口的结构如下图所示:

Java8 Stream

fork/join框架是jdk1.7引入的,可以以递归方式将并行的任务拆分成更小的任务,然后将每个子任务的结果合并起来生成整体结果。它是ExecutorService接口的一个实现,它把子任务分配线程池(ForkJoinPool)中的工作线程。要把任务提交到这个线程池,必须创建RecursiveTask的一个子类,如果任务不返回结果则是RecursiveAction的子类。(本文不过多赘述fork/join框架)
对于ReduceTask来说,任务分解的实现定义在其父类AbstractTask的compute()方法当中:

public void compute() {
    Spliterator<p_in> rs = spliterator, ls; // right, left spliterators
    long sizeEstimate = rs.estimateSize();
    long sizeThreshold = getTargetSize(sizeEstimate);
    boolean forkRight = false;
    @SuppressWarnings("unchecked") K task = (K) this;
    while (sizeEstimate > sizeThreshold && (ls = rs.trySplit()) != null) {
        K leftChild, rightChild, taskToFork;
        task.leftChild  = leftChild = task.makeChild(ls);
        task.rightChild = rightChild = task.makeChild(rs);
        task.setPendingCount(1);
        if (forkRight) {
            forkRight = false;
            rs = ls;
            task = leftChild;
            taskToFork = rightChild;
        }
        else {
            forkRight = true;
            task = rightChild;
            taskToFork = leftChild;
        }
        taskToFork.fork();
        sizeEstimate = rs.estimateSize();
    }
    task.setLocalResult(task.doLeaf());
    task.tryComplete();
}
</p_in>

该方法先调用当前splititerator 方法的estimateSize 方法,预估这个分片中的数据量,根据预估的数据量获取最小处理单元的阈值,即当数据量已经小于这个阈值的时候进行计算,否则进行fork 将任务划分成更小的数据块,进行求解。

这里面有个很重要的参数LEAF_TARGET,用来判断是否需要继续分割成更小的子任务,默认为parallelism*4(ForkJoinPool.getCommonPoolParallelism() << 2),parallelism是并发度的意思,默认值为cpu 数 – 1,可以通过java.util.concurrent.ForkJoinPool.common.parallelism设置, 如果当前分片大小仍然大于处理数据单元的阈值,且分片继续尝试切分成功,那么就继续切分,分别将左右分片的任务创建为新的Task,并且将当前的任务关联为两个新任务的父级任务(逻辑在makeChild 里面)。

先后对左右子节点的任务进行fork,对另外的分区进行分解。同时设定pending 为1,这代表一个task 实际上只会有一个等待的子节点(被fork)。当任务已经分解到足够小的时候退出循环,尝试进行结束。调用子类实现的doLeaf方法,完成最小计算单元的计算任务,并设置到当前任务的localResult中。

然后调用tryComplete方法进行最终任务的扫尾工作,如果该任务pending值不等于0,则原子的减1,如果已经等于0,说明任务都已经完成,则调用onCompletion回调,如果该任务是叶子任务,则直接销毁中间数据结束;如果是中间节点会将左右子节点的结果进行合并。

最后检查这个任务是否还有父级任务了,如果没有则将该任务置为正常结束,如果还有则尝试递归的去调用父级节点的onCompletion回调,逐级进行任务的合并。

并行流的实现本质上就是在ForkJoin上进行了一层封装,将Stream 不断尝试分解成更小的split,然后使用fork/join 框架分而治之。

参考资料

Java8 Stream:2万字20个实例,玩转集合的筛选、归约、分组、聚合
好文推荐:JAVA进阶之Stream实现原理

Original: https://www.cnblogs.com/winter0730/p/16143943.html
Author: cos晓风残月
Title: Java8 Stream

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/576240/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球