streams

集合的数据处理方式比较

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
/**
* prints all the reading task titles, sorted by their title length
*/
public class Example {
public static void main(String[] args) {
java7Processing();
java8Processing();
}

/**
* Data processing in Java 8
*/
private static void java8Processing() {
List<Task> tasks = getTasks();

List<String> readingTasks = tasks.stream()
.filter(task -> task.getType() == TaskType.READING)
.sorted((t1, t2) -> t1.getTitle().length() - t2.getTitle().length())
.map(Task::getTitle)
.collect(Collectors.toList());

readingTasks.forEach(System.out::println);
}

/**
* Data processing before Java 8
*/
private static void java7Processing() {
List<Task> tasks = getTasks();

List<Task> readingTasks = new ArrayList<>();
for (Task task : tasks) {
if (task.getType() == TaskType.READING) {
readingTasks.add(task);
}
}
Collections.sort(readingTasks, new Comparator<Task>() {
@Override
public int compare(Task t1, Task t2) {
return t1.getTitle().length() - t2.getTitle().length();
}
});
for (Task readingTask : readingTasks) {
System.out.println(readingTask.getTitle());
}
}

private static List<Task> getTasks() {
return Arrays.asList(new Task(TaskType.READING, "a reading book"),
new Task(TaskType.WRITING, "write for her"),
new Task(TaskType.LISTENING, "listen to me"),
new Task(TaskType.READING, "read haha"),
new Task(TaskType.READING, "read for u"));
}

}

class Task {
private String type;
private String title;

public Task(String type, String title) {
this.type = type;
this.title = title;
}

public String getType() {
return type;
}

public void setType(String type) {
this.type = type;
}

public String getTitle() {
return title;
}

public void setTitle(String title) {
this.title = title;
}

}

class TaskType {
public static final String READING = "READING";
public static final String WRITING = "WRITING";
public static final String LISTENING = "LISTENING";
}

流操作说明

  1. stream() 通过straem()方法创建一个stream管道
  2. filter(Predicate) 从stream中提取符合条件的元素
  3. sorted(Comparator) 对提取后的元素进行排序
  4. map(Function<T,R>) 对排序后的每一个元素调用apply方法并将结果放到新的stream中返回
  5. collect(toList) 将新的stream中的所有元素转为List

为什么Java8代码更好?

  1. Java8代码清晰地表达了开发者的意图:条件过滤、排序等
  2. 开发者使用Stream API的更高抽象形式时是为了表达他们要做什么而不是如何去做
  3. Stream API提供了统一的数据处理语言【语法,保证大家都懂且理解一致】
  4. 不需要再显示的编写for循环或者创建临时集合来存储中间或结果数据
  5. Streams不会更新原有的和新产生的数据集合,这些集合都是不可变的

Stream是什么?

Stream是一些数据的抽象试图。这些数据可以是一个List,也可以是一个文件中的每一行数据,或者任何元素的序列。
Stream API提供了一些可以顺序执行或者并行执行的操作。
最重要的一点是:Stream是一个较高层级的抽象【一连串的行为调用】,而不是一个数据结构,Stream不存储数据。
Stream操作是延迟计算的,只有被访问的时候才会执行计算。

Collection vs Stream

image

External iteration vs internal iteration

  1. External iteration: iterator的客户端(使用者)来控制遍历 – Collection API
  2. Internal iteration: iterator自身控制遍历 – Stream API

延迟计算Lazy evaluation

尝试运行下面代码,查看会否抛出异常 => 不会抛出异常

1
2
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
Stream<Integer> stream = numbers.stream().map(n -> n / 0).filter(n -> n % 2 == 0);

将stream转为List => 抛出异常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
Stream<Integer> stream = numbers.stream().map(n -> n / 0).filter(n -> n % 2 == 0);
stream.collect(Collectors.toList());

Exception in thread "main" java.lang.ArithmeticException: / by zero
at a_03_streams.Example.lambda$main$0(Example.java:14)
at a_03_streams.Example$$Lambda$1/1480010240.apply(Unknown Source)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:512)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:502)
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
at a_03_streams.Example.main(Example.java:15)

使用Stream API,两类操作

  1. Intermediate operations(中间操作):从已经存在的stream生成新的stream如 filter、map、sorted等
  2. Terminal operations(最后终止操作):从stream生成一个非Stream结果如 collect(Collectors.toList())、forEach、count等
  3. Terminal operations被执行时会创建Stream管道去执行Intermediate operations
  4. Stream functions
    image

实例

Task和getTasks()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
class Task {
private final String id;
private final String title;
private final TaskType type;
private final LocalDate createdOn;
private boolean done = false;
private Set<String> tags = new HashSet<>();
private LocalDate dueOn;

public Task(String title, TaskType type, LocalDate createdOn) {
this.id = UUID.randomUUID().toString();
this.title = title;
this.type = type;
this.createdOn = createdOn;
}

public Task addTag(String tag) {
this.tags.add(tag);
return this;
}

public String getId() {
return id;
}

public String getTitle() {
return title;
}

public TaskType getType() {
return type;
}

public LocalDate getCreatedOn() {
return createdOn;
}

public boolean isDone() {
return done;
}

public Set<String> getTags() {
return tags;
}

public LocalDate getDueOn() {
return dueOn;
}
}

enum TaskType {
READING, WRITING, LISTENING, CODING;
}

private static List<Task> getTasks() {
Task task1 = new Task("Read Version Control with Git book", TaskType.READING, LocalDate.of(2015, Month.JULY, 1)).addTag("git").addTag("reading").addTag("books");

Task task2 = new Task("Read Java 8 Lambdas book", TaskType.READING, LocalDate.of(2015, Month.JULY, 2)).addTag("java8").addTag("reading").addTag("books");

Task task3 = new Task("Write a mobile application to store my tasks", TaskType.CODING, LocalDate.of(2015, Month.JULY, 3)).addTag("coding").addTag("mobile");

Task task4 = new Task("Write a blog on Java 8 Streams", TaskType.WRITING, LocalDate.of(2015, Month.JULY, 4)).addTag("blogging").addTag("writing").addTag("streams");

Task task5 = new Task("Read Domain Driven Design book", TaskType.READING, LocalDate.of(2015, Month.JULY, 5)).addTag("ddd").addTag("books").addTag("reading");

List<Task> tasks = Arrays.asList(task1, task2, task3, task4, task5);
return tasks;
}
根据创建日期排序,查找所有类型为READING的Task标题
  1. 过滤所有类型为READING的task
  2. 根据createdOn排序
  3. 获取每个task的标题
  4. 收集task标题并放入List中
1
2
3
4
5
6
7
8
private static List<String> allReadingTasks(List<Task> tasks) {
List<String> readingTaskTitles = tasks.stream().
filter(task -> task.getType() == TaskType.READING).
sorted((t1, t2) -> t1.getCreatedOn().compareTo(t2.getCreatedOn())).
map(task -> task.getTitle()).
collect(Collectors.toList());
return readingTaskTitles;
}

使用method references

1
2
3
4
5
6
7
8
private static List<String> allReadingTasks(List<Task> tasks) {
return tasks.stream().
filter(task -> task.getType() == TaskType.READING).
sorted(Comparator.comparing(Task::getCreatedOn).reversed()).
map(Task::getTitle).
collect(Collectors.toList());

}

调用

1
allReadingTasks(getTasks()).forEach(System.out::println);
distinct-结果集去重
1
2
3
private static List<Task> allDistinctTasks(List<Task> tasks) {
return tasks.stream().distinct().collect(Collectors.toList());
}
topN-limit和skip分页 page从0开始
1
2
3
4
5
6
7
8
9
private static List<String> topN(List<Task> tasks, int n, int page) {
return tasks.stream().
filter(task -> task.getType() == TaskType.READING).
sorted(comparing(Task::getCreatedOn)).
map(Task::getTitle).
skip(page * n).
limit(n).
collect(Collectors.toList());
}
count计数
1
2
3
4
5
private static long countAllReadingTasks(List<Task> tasks) {
return tasks.stream().
filter(task -> task.getType() == TaskType.READING).
count();
}
flatMap过滤所有子集合中的元素
1
2
3
private static List<String> allDistinctTags(List<Task> tasks) {
return tasks.stream().flatMap(task -> task.getTags().stream()).distinct().collect(Collectors.toList());
}
allMatch用所有子集合匹配-anyMatch其中任何一个子集合匹配
1
2
3
4
5
private static boolean isAllReadingTasksWithTagBooks(List<Task> tasks) {
return tasks.stream().
filter(task -> task.getType() == TaskType.READING).
allMatch(task -> task.getTags().contains("books"));
}
reduce聚合汇总JOIN
1
2
3
4
5
6
private static String joinAllTaskTitles(List<Task> tasks) {
return tasks.stream().
map(Task::getTitle).
reduce((first, second) -> first + " *** " + second).
get();
}
支持原生数据类型
  1. [0,10)

    1
    IntStream.range(0, 10).forEach(System.out::println);
  2. [1,10]

    1
    IntStream.rangeClosed(1, 10).forEach(System.out::println);
  3. 无限数据流

    1
    LongStream infiniteStream = LongStream.iterate(1, el -> el + 1);
  4. 无线数据流100个偶数

    1
    infiniteStream.filter(el -> el % 2 == 0).limit(100).forEach(System.out::println);
Arrays创建Streams-startInclusive endExclusive
1
2
3
4
String[] tags = {"java", "git", "lambdas", "machine-learning"};
Arrays.stream(tags).map(String::toUpperCase).forEach(System.out::println);

Arrays.stream(tags, 1, 2).map(String::toUpperCase).forEach(System.out::println);

Parallel Streams

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class ParallelStreamExample {
public static void main(String[] args) {
Map<String, List<Integer>> numbersPerThread = IntStream.rangeClosed(1, 160)
.parallel()
.boxed()
.collect(groupingBy(i -> Thread.currentThread().getName()));

numbersPerThread.forEach((k, v) -> System.out.println(String.format("%s >> %s", k, v)));
}
}

ForkJoinPool.commonPool-worker-1 >> [41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120]
ForkJoinPool.commonPool-worker-2 >> [81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, 147, 148, 149, 150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160]
main >> [101, 102, 103, 104, 105, 106, 107, 108, 109, 110]
ForkJoinPool.commonPool-worker-3 >> [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70]
邵志鹏 wechat
扫一扫上面的二维码关注我的公众号
0%