快速了解 SkipList

快速的 SkipList 实现教程

在计算机科学领域,跳跃链表是一种数据结构,允许快速查询一个有序连续元素的数据链表。快速查询是通过维护一个多层次的链表,且每一层链表中的元素是前一层链表元素的子集。

基于并联的链表,其效率可比拟于二叉查找树(对于大多数操作需要O(log n)平均时间)。

基本上,跳跃列表是对有序的链表增加上附加的前进链接,增加是以随机化的方式进行的,所以在列表中的查找可以快速的跳过部分列表,因此得名。所有操作都以对数随机化的时间进行。
skip

要查找一个目标元素,起步于头元素和顶层列表,并沿着每个链表搜索,直到到达小于或着等于目标的最后一个元素。通过跟踪起自目标直到到达在更高列表中出现的元素的反向查找路径,在每个链表中预期的步数显而易见是 1/p。所以查找的总体代价是 O((log1/p n) / p),当p 是常数时是 O(log n)。通过选择不同 p 值,就可以在查找代价和存储代价之间作出权衡。

插入和删除的实现非常像相应的链表操作,除了”高层”元素必须在多个链表中插入或删除之外。

跳跃列表不像某些传统平衡树数据结构那样提供绝对的最坏情况性能保证,因为用来建造跳跃列表的扔硬币方法总有可能(尽管概率很小)生成一个糟糕的不平衡结构。但是在实际中它工作的很好,随机化平衡方案比在平衡二叉查找树中用的确定性平衡方案容易实现。跳跃列表在并行计算中也很有用,这里的插入可以在跳跃列表不同的部分并行的进行,而不用全局的数据结构重新平衡。

​ —— Wikipedia

以上就是 Wikipedia 中对 SkipList 的描述,从描述中和以往的了解我们可以得知,SkipList 是对 List 的一种加强,通过拔高某些 Node 的层次来达到快速搜索的目的,根据这个想法我们可以知道,这个有点类似于躺平的二叉搜索树,这套 快速实现教程 的目的,就是截取文章中讨论内容的重点部分,通过重点讨论其中的精要部分来达到快速实现的目的。

搜索方法

我们知道 SkipList 的结构知道我们就知道应该怎么对这个东西进行搜索,首先是从最上层的开始搜索,根据 Key 的比较进行判断向哪个方向进行搜索:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private SkipListNode<K, V> findNodeByKey(K key) {
SkipListNode<K, V> head = headNode;

while (true) {
// 首先右侧节点不为空 并且当前节点比右侧节点大 ===> 我们可以往右侧进行查找
while (head.right.key != null && key.compareTo(head.right.key) >= 0) {
head = head.right;
}
// 向下查找 ===> 直到最下一层
if (head.down != null) {
head = head.down;
} else {
break;
}

}

return head;
}

根据 从左到右,从上到下 的方法,我们就能查找到对应的节点,如果本身 List 中没有对应的节点,我们会获得 比所搜索的 Key 最小的一个节点 这样我们无论是存放还是搜索都很方便。

1
2
3
4
5
6
7
8
9
private SkipListNode<K, V> search(K key) {
SkipListNode<K, V> p = findNodeByKey(key);

if (key.equals(p.key)) {
return p;
} else {
return null;
}
}

暴露在外层的方法,可以根据拿到的 Node 是 所找的节点 还是 接近的节点 来判断返回。

存放方法

普通的存放方法其实是和普通的 LinkedList 是类似的,因为毕竟无论是几层的 Level 最终数据都是存储在 最下面一层的,所以我们存储的开始就和普通的链表没有区别,但是由于有 层次 的设定,所以说我们每个 Node 类都有 上下左右 四个方向的功能:

1
2
3
4
5
6
7
8
9
10
static class SkipListNode<K, V> implements Entry<K, V> {

/**
* point to =>
*/

SkipListNode<K, V> up, down, left, right;

K key;
V value;
}

我们可以先来简单的实现普通的存储方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
  @Override
public V put(K key, V value) {
if (key == null) {
throw new UnsupportedOperationException("key could not be null");
}
// 找到对应的 Key-Node 或者是最近的节点
SkipListNode<K, V> p = findNodeByKey(key);
// 如果存在这个节点只需要替换 Value 就可以了
if (key.equals(p.key)) {
p.value = value;
return value;
}
// 把新节点放在 p 节点后面
SkipListNode<K, V> q = new SkipListNode<>(key, value);
// 一些绑定而已
backLink(p, q);
nodeCounts++;
// ...
}

这部分只是普通的 LinkedList 的存取方法,然后更新一些数据而已,刚才我们从 SkipList 的介绍中可以得出,SkipList 是一个概率型的数据结构,每次存储的时候 随机进行把 level 把高的操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// current level
int currentLevel = 0;
while (random.nextDouble() < PROBABILITY) {
// 当 `当前拔高的层次` 超过已有的层次,新建层次
if (currentLevel >= listLevels) {
createNewLevel();
}

// 向左搜索 =====> 直到一个有上层的节点
while (p.up == null) {
// 这个 corner case 是为了解决此时还没有上面的层次
if (p.left == null) {
// p equal header node
createNewLevel();
break;
}
p = p.left;
}

// 向上拔高一层
p = p.up;

// 只保存 key
SkipListNode<K, V> e = new SkipListNode<>(key, null);

// 各方面链接
backLink(p, e);
verticalLink(e, q);
// 交换节点有助于多层链接
q = e;
currentLevel++;
}

新建层次:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private void createNewLevel() {
listLevels++;

SkipListNode<K, V> p1 = new SkipListNode<>(null, null);
SkipListNode<K, V> p2 = new SkipListNode<>(null, null);

horizontalLink(p1, p2);

verticalLink(p1, headNode);
verticalLink(p2, tailNode);

headNode = p1;
tailNode = p2;
}

删除方法

删除操作肯定也是和 Search 操作紧密衔接的,我们 先找到最底层的节点,然后从下到上逐层删除索引

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
 @Override
public V remove(Object key) {
// nest noe
SkipListNode<K, V> node = findNodeByKey((K) key);

if (node == null) {
return null;
// 找到的 节点的部分
} else if (node.key.equals(key)) {

// 从下到上不断读取节点合并两侧的节点
while (node != null) {
SkipListNode<K, V> left = node.left;
SkipListNode<K, V> right = node.right;

if (left != null && right != null) {
horizontalLink(left, right);
}

node = node.up;
}

node = headNode;
// 从上到下删除空的层次 —— 这个操作其实不是必要的,
// 很多的实现仅仅是删除最上层的空层
while (node != null && node.right.equals(tailNode)) {
SkipListNode<K, V> oldHeadNode = headNode;
SkipListNode<K, V> oldTailNode = tailNode;

this.headNode = oldHeadNode.down;
this.tailNode = oldTailNode.down;
this.listLevels--;

node = headNode;

oldHeadNode = null;
oldTailNode = null;
}
}

return null;
}

删除的内容就比较简单了:

  • 获取最下层节点
  • 从下到上不断读取节点合并两侧的节点
  • 删除空层

你学到了什么

  • 一种新的搜索和存储的数据结构
  • 快速构建 Skip-List 数据结构的思路和组成方法

全部代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
package set;

import com.sun.org.apache.bcel.internal.classfile.ExceptionTable;

import java.util.*;

/**
* Created by liufengkai on 2017/9/11.
*/

public class SkipList<K extends Comparable<K>, V> implements Map<K, V> {

/**
* Probability to flow one node up
*/

private static final double PROBABILITY = 0.5;

/**
* head / tail
*/

private SkipListNode<K, V> headNode, tailNode;

/**
* all node counts
*/

private int nodeCounts;

/**
* all list level
*/

private int listLevels;

/**
* random util
*/

private Random random;

/**
* Key Set
*/

private Set<K> keySet = new HashSet<>();

/**
* Value Set
*/

private Set<V> valueSet = new HashSet<>();

public SkipList() {
this.random = new Random();
this.clear();
}

@Override
public int size() {
return nodeCounts;
}

@Override
public boolean isEmpty() {
return nodeCounts == 0;
}

@Override
public boolean containsKey(Object key) {
return get(key) != null;
}

@Override
public boolean containsValue(Object value) {
for (SkipListNode node = headNode; node != null; node = node.right) {
if (node.value.equals(value)) {
return true;
}
}

return false;
}

@Override
public V get(Object key) {
if (key == null) {
throw new UnsupportedOperationException("key could not be null");
}

SkipListNode<K, V> node = search((K) key);

return node == null ? null : node.value;
}

@Override
public V put(K key, V value) {
if (key == null) {
throw new UnsupportedOperationException("key could not be null");
}

SkipListNode<K, V> p = findNodeByKey(key);
// change value | equal key
if (key.equals(p.key)) {
p.value = value;
return value;
}

SkipListNode<K, V> q = new SkipListNode<>(key, value);
backLink(p, q);
nodeCounts++;

// current level
int currentLevel = 0;
while (random.nextDouble() < PROBABILITY) {
if (currentLevel >= listLevels) {
createNewLevel();
}

// find up level node to bind it
while (p.up == null) {
if (p.left == null) {
// p equal header node
createNewLevel();
break;
}
p = p.left;
}

// upper level node
p = p.up;

// save key
SkipListNode<K, V> e = new SkipListNode<>(key, null);

backLink(p, e);
verticalLink(e, q);
q = e;
currentLevel++;
}

keySet.add(key);
valueSet.add(value);

return value;
}

private void createNewLevel() {
listLevels++;

SkipListNode<K, V> p1 = new SkipListNode<>(null, null);
SkipListNode<K, V> p2 = new SkipListNode<>(null, null);

horizontalLink(p1, p2);

verticalLink(p1, headNode);
verticalLink(p2, tailNode);

headNode = p1;
tailNode = p2;
}

@Override
public V remove(Object key) {
SkipListNode<K, V> node = findNodeByKey((K) key);

if (node == null) {
return null;
} else if (node.key.equals(key)) {

while (node != null) {
SkipListNode<K, V> left = node.left;
SkipListNode<K, V> right = node.right;

if (left != null && right != null) {
horizontalLink(left, right);
}

node = node.up;
}

node = headNode;

while (node != null && node.right.equals(tailNode)) {
SkipListNode<K, V> oldHeadNode = headNode;
SkipListNode<K, V> oldTailNode = tailNode;

this.headNode = oldHeadNode.down;
this.tailNode = oldTailNode.down;
this.listLevels--;

node = headNode;

oldHeadNode = null;
oldTailNode = null;
}
}

return null;
}


@Override
public boolean remove(Object key, Object value) {
return remove(key) != null;
}

@Override
public void putAll(Map<? extends K, ? extends V> m) {
for (Entry<? extends K, ? extends V> entry : m.entrySet()) {
put(entry.getKey(), entry.getValue());
}
}

@Override
public void clear() {
this.headNode = new SkipListNode<>(null, null);
this.tailNode = new SkipListNode<>(null, null);
this.nodeCounts = 0;
this.listLevels = 0;

// horizontal link head === tail nodes
this.horizontalLink(headNode, tailNode);
}

@Override
public Set<K> keySet() {
return keySet;
}

@Override
public Collection<V> values() {
return valueSet;
}

@Override
public Set<Entry<K, V>> entrySet() {
Set<Entry<K, V>> set = new HashSet<>();
for (SkipListNode<K, V> node = headNode; node != null; node = node.right) {
set.add(node);
}

return set;
}

/**
* add front link after node
*
* @param front front-node
* @param back back-node
*/

private void backLink(SkipListNode<K, V> front, SkipListNode<K, V> back) {
back.left = front;
back.right = front.right;
front.right.left = back;
front.right = back;
}


private SkipListNode<K, V> findNodeByKey(K key) {
// System.out.println("Start Search: ");
SkipListNode<K, V> head = headNode;

while (true) {
while (head.right.key != null && key.compareTo(head.right.key) >= 0) {
head = head.right;
// System.out.println(head);
}

if (head.down != null) {
head = head.down;
// System.out.println(head);
} else {
break;
}

}

return head;
}

private SkipListNode<K, V> search(K key) {
SkipListNode<K, V> p = findNodeByKey(key);

if (key.equals(p.key)) {
return p;
} else {
return null;
}
}

/**
* Horizontal Link
*
* @param left left node
* @param right right node
*/

private void horizontalLink(SkipListNode<K, V> left, SkipListNode<K, V> right) {
left.right = right;
right.left = left;
}

/**
* Vertical Link
*
* @param up up node
* @param down down node
*/

private void verticalLink(SkipListNode<K, V> up, SkipListNode<K, V> down) {
up.down = down;
down.up = up;
}

String debugStructure() {
List<List<SkipListNode<K, V>>> list = new LinkedList<>();
int currentLevel = listLevels;
SkipListNode<K, V> node = headNode;
while (true) {
if (currentLevel == -1) break;

List<SkipListNode<K, V>> levelList = new LinkedList<>();

while (node != null) {
levelList.add(node);
node = node.right;
}

currentLevel--;

int times = listLevels - currentLevel;
node = headNode;
while (times != 0) {
node = node.down;
times--;
}

list.add(levelList);
}

StringBuilder builder = new StringBuilder();
for (int i = 0; i < list.size(); i++) {
List<SkipListNode<K, V>> level = list.get(i);

builder.append("Level : ")
.append(String.valueOf(listLevels - i))
.append(" > \t");

for (SkipListNode<K, V> kvSkipListNode : level) {

builder.append(kvSkipListNode.key)
.append("\t");
}

builder.append("\n");

if (i == list.size() - 1) {
builder.append(" \t");
for (SkipListNode<K, V> kvSkipListNode : level) {

builder.append(kvSkipListNode.value)
.append("\t");
}
}
}

return builder.toString();
}

/**
* Node in Skip List
*
* @param <V> Type
*/

static class SkipListNode<K, V> implements Entry<K, V> {

/**
* point to =>
*/

SkipListNode<K, V> up, down, left, right;

K key;
V value;

SkipListNode(K key, V value) {
this.key = key;
this.value = value;
}

@Override
public K getKey() {
return key;
}

@Override
public V getValue() {
return value;
}

@Override
public V setValue(V value) {
return this.value = value;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}

if (o == null) {
return false;
}

if (!(o instanceof SkipListNode<?, ?>)) {
return false;
}

SkipListNode<K, V> ent;
try {
ent = (SkipListNode<K, V>) o;
} catch (ClassCastException ex) {
return false;
}

return (ent.key == key) && (ent.value == value);
}

@Override
public String toString() {
return "SkipListNode{" +
"key=" + key +
", value=" + value +
'}';
}
}
}

测试代码和结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class SkipListTest {
@Test
void testSkip() {
SkipList<Integer, String> list = new SkipList<>();
Random random = new Random();
for (int i = 0; i < 5; i++) {
list.put(Math.abs(random.nextInt() % 10), i + "lfk");
}
System.out.println(list.debugStructure());

for (Integer key : list.keySet()) {
System.out.println("========> remove key :" + key + " <========= ");
list.remove(key);
System.out.println(list.debugStructure());
}
}
}

测试结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Level : 2 > 	null	4	null	
Level : 1 > null 4 null
Level : 0 > null 2 3 4 9 null
null 3lfk 4lfk 2lfk 1lfk null
========> remove key :2 <=========
Level : 2 > null 4 null
Level : 1 > null 4 null
Level : 0 > null 3 4 9 null
null 4lfk 2lfk 1lfk null
========> remove key :3 <=========
Level : 2 > null 4 null
Level : 1 > null 4 null
Level : 0 > null 4 9 null
null 2lfk 1lfk null
========> remove key :4 <=========
Level : 0 > null 9 null
null 1lfk null
========> remove key :9 <=========

Rxjava 2 源码分析 (2)

线程切换的实现

我们这里再来看一个 Demo 里面为了看线程中的一些变化,我们使用了一些 Next 操作符,然后打印了很多的日志:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
Observable.just("lfkdsk", "ffff", "zzzz")
.subscribeOn(Schedulers.io())
.doOnNext(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
Logger.i("doOnNext !" + s);
Log.d("", Thread.currentThread().getName());
}
})
.observeOn(AndroidSchedulers.mainThread())
.doAfterNext(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
Logger.i("doAfterNext !" + s);
Log.d("", Thread.currentThread().getName());
}
})
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Logger.i("onSubscribe !");
Log.d("", Thread.currentThread().getName());
}

@Override
public void onNext(@NonNull String s) {
Logger.i("onNext !" + s);
Log.d("", Thread.currentThread().getName());
}

@Override
public void onError(@NonNull Throwable e) {

}

@Override
public void onComplete() {
Logger.i("onComplete !");
Log.d("", Thread.currentThread().getName());
}
});

这里面有很多的结果我们只能依赖日志来判断,我们现在可以给出这个结论:

  • 首先是 onSubscribe 被调用,这个好理解,我们的 subscribe 链的绑定是倒着来的,这里的线程就是主线程。
  • 之后是 doOnNext 被调用,也是因为我们是倒置调用的,因此在整个管道的构建链接中就被调用了,这里会针对每个数据进行一次调用,这里还是数据线程。
  • 之后就是 onNext 被逐个调用,然后就是 doAfterNext 被调用,这两个都在主线程中,再之后是 onComplete 被调用就还是主线程。

subscribeOn 原理分析

我们这里面有一个很重要的作用是进行线程的切换和调度,subscribeOnobserveOn 这两个 API 分别是调用线程和使用线程之间的关系:

1
2
3
4
5
6
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}

这里面就是 SubscribeOn 的包装类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;

public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}

@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

s.onSubscribe(parent);

parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
// ...
}

我们还是按照链路把这个包装成了一个相应的内部类(工作线程),之后呢就是我们的重点来了,我们把我们接到的 Scheduler.scheduleDirect 的返回的 Disposable 设置给了 SubscribeOnObserver 对象,这里面的操作还是做了很多为了保障原子性的东西,然后呢,我们为这个设置了一个 Runnable 然后在这个 run 里面调用了这个绑定的过程,这样这个绑定的过程就直接在对应的线程中被调用。

1
2
3
4
5
6
7
8
9
10
11
12
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;

SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}

@Override
public void run() {
source.subscribe(parent);
}
}

我们可以看出 sourceparent 相绑定的位置实际出现在这个对应的 Task 中。

subscribeOn 流程分析

我们可以简单分析一下在线程切换过程这个位置的这个过程都进行了哪些的流程:

  1. 首先对 Observer 进行 Wrapper 操作:
1
2
3
4
5
6
7
8
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

s.onSubscribe(parent);

parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}

我们先把 下游 的 Observer 被包装成了一个 SubscribeOnObserver ,这个东西保存了当前 下游的 Observer ,并且调用了这个部分的 onSubscribe,传进去了当前的这个 parent 保存了 下游的 Observer.Disposable ,通过:

1
2
3
4
5
6
7
8
9
10
@Override
public void onSubscribe(Disposable s) {
DisposableHelper.setOnce(this.s, s);
}

@Override
public void dispose() {
DisposableHelper.dispose(s);
DisposableHelper.dispose(this);
}

在取消事件的时候,要把两个 管子的部分 的数据流通都取消掉。

  1. 之后我们为这个 Observer Wrapper 设置了一个 Disposable :

这个 Disposable 的目的是用来干什么的呢?因为这个 Observer Wrapper 本身就是一个 AtomicReference 。

1
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));

这里我们在这里继续保存了当前的一个别的现成的 Disposable。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
// 调度工作类 Worker
final Worker w = createWorker();

final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
// 这个 Task 就是把这个 Runnable 跑完之后把这个 Worker 消耗掉
DisposeTask task = new DisposeTask(decoratedRun, w);

// 分配给线程池调度
w.schedule(task, delay, unit);

return task;
}

这个 schedule 我们随便举一个实现来看,在 NewThreadWorker 中我们的 Runnable 会被分给线程池调用来实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}

Future<?> f;
try {
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}

return sr;
}
  1. DisposeTask 被调用的地方:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
static final class DisposeTask implements Runnable, Disposable {
final Runnable decoratedRun;
final Worker w;

Thread runner;

DisposeTask(Runnable decoratedRun, Worker w) {
this.decoratedRun = decoratedRun;
this.w = w;
}

@Override
public void run() {
runner = Thread.currentThread();
try {
// 真正的切换线程的调用过程
decoratedRun.run();
} finally {
dispose();
runner = null;
}
}

@Override
public void dispose() {
if (runner == Thread.currentThread() && w instanceof NewThreadWorker) {
((NewThreadWorker)w).shutdown();
} else {
w.dispose();
}
}

@Override
public boolean isDisposed() {
return w.isDisposed();
}
}

observeOn 原理分析

observeOn 的调用从原理上想想就知道其实是差不多的,我们新生成一个 Observer Wrapper 然后保存之前的 Disposable 然后再切换到下一个线程,然后在这个线程之中进行了订阅操作,然后再有之后的流程,这里我们还是来看这个 API 的链路:

1
2
3
4
5
6
7
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}

之后使用了 ObservableObserveOn 这个包装类进行封装,我们可以再来看看这个部分的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
final boolean delayError;
final int bufferSize;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}

@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();

source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
// ...
}

一些和线程相关的东西都被封装在内部类 ObserveOnObserver 中,这里我们没有直接调用这个内部类的 onSubscribe 的方法去把上下游直接连接,而是在我们的连接(Subscribe)走到头的 (比如说遇到了数据的发射装置 Just 或者是 Create),被上游的 Observable 调用 onSubscribe 方法而用到:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
   @Override
public void onSubscribe(Disposable s) {
// 验证一下 Disposable
if (DisposableHelper.validate(this.s, s)) {
this.s = s;
// Queue 类型的 Disapoable
// 这里我们能看到都是用来合并数据流相关的东西
// 比如说 Concat 操作符之类的
if (s instanceof QueueDisposable) {
@SuppressWarnings("unchecked")
QueueDisposable<T> qd = (QueueDisposable<T>) s;

int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
// 判断类型 如果是同步的话
if (m == QueueDisposable.SYNC) {
sourceMode = m;
queue = qd;
done = true;
// 直接调用
actual.onSubscribe(this);
schedule();
return;
}
// 判断类型 如果是异步的话
if (m == QueueDisposable.ASYNC) {
sourceMode = m;
queue = qd;
actual.onSubscribe(this);
// schedule 放在后面了
return;
}
}

queue = new SpscLinkedArrayQueue<T>(bufferSize);

actual.onSubscribe(this);
}
}

这部分是怎么被调用的呢,比如我们这个里面是用了 Create 操作符,里面的 subscribeActual 方法是这样的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
// 这部分就是上个部分的调用
observer.onSubscribe(parent);

try {
// 然后在这个部分调用的 就是进入 Create 操作符的部分的回调
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}

Observable
.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
e.onNext("lfkdsk");
e.onNext("ffff");
e.onNext("zzzz");

if (e.isDisposed()) e.onComplete();
}
})

在这个部分我们可以在回调之中调用 onNext 的方法我们可以继续分析一下 这部分的 onNext 部分的调用链路:

1
2
3
4
5
6
7
8
9
10
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}

这里我们陷入了对 observeronNext 的掉用之中:

1
2
3
4
5
6
7
8
9
10
11
@Override
public void onNext(T t) {
if (done) {
return;
}

if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}

在异步的情况下,我们把要搞的东西加入队列之中,然后 schedule 的调用在 onNext 中使用了:

1
2
3
4
5
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}

这里我们所谓的 schedule 部分,我们把自己(一个 Runnable)使用对应的 Worker 进行调度,这里我们是在观察者线程的 Worker 去调度,这里我们看到了把这些东西都是通过 Android 的自带的 Handler 的把数据发回了 Android Main Thread 完成了数据的返回显示的问题:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");

if (disposed) {
return Disposables.disposed();
}

run = RxJavaPlugins.onSchedule(run);

ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);

Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.

handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));

// Re-check disposed state for removing in case we were racing a call to dispose().
if (disposed) {
handler.removeCallbacks(scheduled);
return Disposables.disposed();
}

return scheduled;
}

线程切换的顺序性

我们本身对线程切换的调用可能会出现在我们整个 Stream 流程的任意部分,到底哪个线程会被正确的应用呢?这个结论我们都知道:

subscribeOn 的被观察者是链路上最上面设置的线程会被作为数据的真正发射线程。

而 observeOn 的观察者线程的每一次的设置都会被确实的发送给链路中的下一个 Observer ,也就是说每次的 observeOn 的设置都是有效的。

我们来看一张图:

list

这张图描述了我们整条链路从下游到上游的绑定过程,和从上游到上游的发送过程,图中描述的程序可以被简单描述为这个样子的代码:

1
2
3
4
5
6
7
8
Observable
.create(...)
.observeOn(Schedulers.io())
.doOnNext(...)
.observerOn(AndroidSchedulers.main())
.subscribeOn(Schedulers.computation())
.subscribeOn(Schedulers.newThread())
.subscribe(... Observer)

从图中我们能看到我们之前从流程分析中分析到的每个步骤,我们进行 observerOn 还是 subscribeOn 都是真的运行了这样的一个对应的线程来进行,我们可以在这样分析一下:

  • subscribeOn 确实会开启新线程来继续进行下一个绑定的 onSubscribe 方法,但是我们的发送是靠最后一个线程切换(就是写在链路最上面的一个 subscribeOn ),因此数据的准备过程是没法改变的,所以说如非必要不要写很多的 subscribeOn
1
2
3
4
5
6
7
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
// ...
@Override
public void onNext(T t) {
actual.onNext(t);
}
}

能看出来,倒着 onNext 回来的时候 subscribe 没什么线程修改,是直接 onNext 回来的。

  • obscribeOn 的处理是不一样的:
1
2
3
4
5
6
7
8
9
10
11
@Override
public void onNext(T t) {
if (done) {
return;
}

if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}

这里会把数据如队列,然后通过 schedule 进行线程调度发送,因而每次的返回都会有一次线程修改,因此链路中的 Observer 也会在对应的线程被操作。

Rxjava 2 源码分析

从一个简单的例子开始

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
Observable
.just("lfkdsk", "just-we")
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Logger.d(testTAG(), " onSubscribe : " + d.isDisposed());
}

@Override
public void onNext(String value) {
mTextView.append(" onNext : value : " + value);
mTextView.append("\n");
Logger.d(testTAG(), " onNext : value : " + value);
}

@Override
public void onError(Throwable e) {
mTextView.append(" onError : " + e.getMessage());
mTextView.append("\n");
Logger.d(testTAG(), " onError : " + e.getMessage());
}

@Override
public void onComplete() {
mTextView.append(" onComplete");
mTextView.append("\n");
Logger.d(testTAG(), " onComplete");
}
});

这个东西和我们之前在 Rxjava1 中看到的最简单的行为是一样的,设定上下游,设定了上下游的线程调度器,这里我们可以根据这个来简单的来分析这个例子是如何进行实现的。

Just Operator

简单的来看这个 just 操作符,Just 操作符其实就是 From 系列操作符的一个封装,提供一个简单的 API 去使用,提供了 1 - 10 个数据的 API :

just

里面就是做了一个 空检查 然后包装进了 From 操作符:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> fromArray(T... items) {
ObjectHelper.requireNonNull(items, "items is null");
if (items.length == 0) {
return empty();
} else
if (items.length == 1) {
return just(items[0]);
}
return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items));
}

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> just(T item) {
ObjectHelper.requireNonNull(item, "The item is null");
return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
}

@SuppressWarnings("unchecked")
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> just(T item1, T item2) {
ObjectHelper.requireNonNull(item1, "The first item is null");
ObjectHelper.requireNonNull(item2, "The second item is null");

return fromArray(item1, item2);
}

这里面的 RxjavaPlugins 是用来集成之前的一些 Hooks 操作的,这里没有使用我们就不按下不表,这里面传进去了不少 Observable 的包装类,我们可以仔细的来分析一下。比如 ObservableJust :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* Represents a constant scalar value.
* @param <T> the value type
*/

public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> {

private final T value;
public ObservableJust(final T value) {
this.value = value;
}

@Override
protected void subscribeActual(Observer<? super T> s) {
ScalarDisposable<T> sd = new ScalarDisposable<T>(s, value);
s.onSubscribe(sd);
sd.run();
}

@Override
public T call() {
return value;
}
}

比如这个是 Just Observable 的一个包装类,这里面还提到了一些 Scalar 系列的操作我们可以先不分析,但是这里也能看出一些东西,比如说我们可以看出刚才我们的 Observer 这个对象增加了一个回调 onSubscribe 这个回调,首先我们这里会发现 Rxjava 2 增加的一系列对象—— Disposable 从名字上来看是一种一次性的对象:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* Represents a disposable resource.
*/

public interface Disposable {
/**
* Dispose the resource, the operation should be idempotent.
*/

void dispose();

/**
* Returns true if this resource has been disposed.
* @return true if this resource has been disposed
*/

boolean isDisposed();
}

从代码中我们也是能看出来的,只有判断是否使用和使用数据的 API,我们上面的数据也被包装成了一个这样的数据类型,并且还把包装的东西传进了 onSubscribe 这个回调中。

Subscribe

这个回调我们结合注释就很明白是干什么的了,是在整个流程中,上游和下游的 Observable 和 Observer 在结合的过程开始先被使用的:

1
2
3
4
5
6
7
8
/**
* Operator implementations (both source and intermediate) should implement this method that
* performs the necessary business logic.
* <p>There is no need to call any of the plugin hooks on the current Observable instance or
* the Subscriber.
* @param observer the incoming Observer, never null
*/

protected abstract void subscribeActual(Observer<? super T> observer);

注释里面告诉我们了,所有的 Operator 都要实现这个方法(数据源还是中间过程),这个函数处理了 Operator 的逻辑过程,注释也说了这里不应该使用 Plugin Hooks 的东西,我们在 subscribe 函数中能看到这个函数是怎么被使用的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);

ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);

NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}

我们会发现 subscribeActual 的回调是在我们上下游管道连接的过程中进行的。然后我们在整个流程的过程之中我们会发现各个 Operator 像是各部分的组件一样,分门别类的对 subscribe 的调用进行管控的(这里我对这个东西的是实现有些怀疑,是不是有什么更好的方式去解决这个问题)。

初探链路

我们这个 Disposable 代替了之前的 Subscription 的作用能对我们的请求进行一些取消操作之类的,这里我们也是用一个类似的 Demo 来进行学习:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
static Disposable disposable;

@Override
protected void testWork() {

Observable
.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
e.onNext("lfkdsk");

e.onNext("Hello");

SystemClock.sleep(100);

disposable.dispose();

e.onNext("World");

e.onComplete();
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String>() {

@Override
public void onSubscribe(@NonNull Disposable d) {
disposable = d;
mTextView.append("bind on subscribe \n");
}

@Override
public void onNext(@NonNull String s) {
mTextView.append(s);
mTextView.append("\n");
}

@Override
public void onError(@NonNull Throwable e) {

}

@Override
public void onComplete() {
mTextView.append(" onComplete ");
}
});
}

这里我们又搞了一个简单的 Demo,紧接着来介绍我们刚才使用过的 Disposable 到底是有什么用的,最后显示的效果如下:

disposable

Create Operator

我们在这里就是把 Disposable 缓存起来了,然后设定了一个 sleep 让数据先从管道流过去,然后让整个 Disposable 取消掉,然后我们就可以看到这样的效果,终止了调用之后的信息流,这里我们没有使用 just 这个 API,just 没办法手动的控制我们的 onNext 的流程,所以这里使用 create API,这里的 create API 和之前的 Rxjava1 中的还有一些区别:

1
2
3
4
5
6
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

接受的内容换成了一个新的 Observable 的包装类,其余的部分也是通过另一个 Create 的包装类来实现的,我们可以继续分析一下这个过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* A functional interface that has a {@code subscribe()} method that receives
* an instance of an {@link ObservableEmitter} instance that allows pushing
* events in a cancellation-safe manner.
*
* @param <T> the value type pushed
*/

public interface ObservableOnSubscribe<T> {

/**
* Called for each Observer that subscribes.
* @param e the safe emitter instance, never null
* @throws Exception on error
*/

void subscribe(ObservableEmitter<T> e) throws Exception;
}

Create API 传进去了一个 ObservableOnSubscribe 的这个对象,接受一个 ObservableEmitter 对象,注释中还提到时对每一个订阅的 Observer 调用。

Emitter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* Base interface for emitting signals in a push-fashion in various generator-like source
* operators (create, generate).
*
* @param <T> the value type emitted
*/

public interface Emitter<T> {

/**
* Signal a normal value.
* @param value the value to signal, not null
*/

void onNext(T value);

/**
* Signal a Throwable exception.
* @param error the Throwable to signal, not null
*/

void onError(Throwable error);

/**
* Signal a completion.
*/

void onComplete();
}

接触的发射器的接口提供了我们非常熟悉的 API 调用,这都是我们非常熟悉的对象,ObservableEmitter 类对它进行了一些扩充:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/**
* Abstraction over an RxJava {@link Observer} that allows associating
* a resource with it.
* <p>
* The onNext, onError and onComplete methods should be called
* in a sequential manner, just like the Observer's methods.
* Use {@link #serialize()} if you want to ensure this.
* The other methods are thread-safe.
*
* @param <T> the value type to emit
*/

public interface ObservableEmitter<T> extends Emitter<T> {

/**
* Sets a Disposable on this emitter; any previous Disposable
* or Cancellation will be unsubscribed/cancelled.
* @param d the disposable, null is allowed
*/

void setDisposable(Disposable d);

/**
* Sets a Cancellable on this emitter; any previous Disposable
* or Cancellation will be unsubscribed/cancelled.
* @param c the cancellable resource, null is allowed
*/

void setCancellable(Cancellable c);

/**
* Returns true if the downstream disposed the sequence.
* @return true if the downstream disposed the sequence
*/

boolean isDisposed();

/**
* Ensures that calls to onNext, onError and onComplete are properly serialized.
* @return the serialized ObservableEmitter
*/

ObservableEmitter<T> serialize();
}

剩下的 API 实现都是关于 Disposable 和 Cancellable 还提供了序列化的 serialize 的方法,这里为了学习这玩意的实现我们可以去看一下,这个接口的一个在 ObservableCreate 中的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {

private static final long serialVersionUID = -3434801548987643227L;
final Observer<? super T> observer;

CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}

@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}

@Override
public void onError(Throwable t) {
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
dispose();
}
} else {
RxJavaPlugins.onError(t);
}
}

@Override
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}

@Override
public void setDisposable(Disposable d) {
DisposableHelper.set(this, d);
}

@Override
public void setCancellable(Cancellable c) {
setDisposable(new CancellableDisposable(c));
}

@Override
public ObservableEmitter<T> serialize() {
return new SerializedEmitter<T>(this);
}

@Override
public void dispose() {
DisposableHelper.dispose(this);
}

@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
}

本身整个类是 AtomicReference 的子类,本身支持原子性的对象操作,几个接口实现的函数的实现可说的也不多,我们可以看看 dispose 相关的操作,DisposableHelper 里面的包装操作,都是用来控制原子性的一层包装。

通过CreateEmitter 中的部分 API,我们能看出来有一些 onNext 和 onComplete 的操作中都会判断 Disposable.isDisposed 的过程,这也就是知道我们为什么能阻断我们的管道流通,然后我们也知道了我们为啥 Error 和 Complete 只会有一条链路被调用。

Create 操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;

public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}

@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);

try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}

// ...
}

这里面就是 ObservableCreate 的实现中的 subscribeActual 方法,如果能以 递归 或者说是 包装 的角度来思考这个问题,看这些代码也是会容易很多:

subscribe

我们去描述我们上面的 Demo 程序是怎么由 Observable 的包装类 来实现的:

1
2
3
4
5
Observable.create(...)
.subscribeOn(...)
.observeOn(...)
.subscribe(...)
// ...

我们的整个流程的包装就是类似这样的,再被创建的过程中就是一层层的包装和递归的调用,最先是 Create 然后是 SubscribeOn 然后再是最后的 ObserveOn 包装类,然后在 subscribe 的时候我们的 API subscribeActual 就会以这样的形式 倒叙的方式 去调用的。

这里面也是这样的我们把之前的(从调用角度上是后面的)包装好的 Observer 继续包装成 CreateEmitter 数据源发射器,这里面的 parent 保存好,然后因为 Observable.create 本身是数据源的一个开头,但是再倒调用着回来的过程其实是调用的终点,然后这个终点里我们调用了 Observable.subscribe ,启动了整个过程。

Rxjava 2 源码分析

从一个简单的例子开始

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
Observable
.just("lfkdsk", "just-we")
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Logger.d(testTAG(), " onSubscribe : " + d.isDisposed());
}

@Override
public void onNext(String value) {
mTextView.append(" onNext : value : " + value);
mTextView.append("\n");
Logger.d(testTAG(), " onNext : value : " + value);
}

@Override
public void onError(Throwable e) {
mTextView.append(" onError : " + e.getMessage());
mTextView.append("\n");
Logger.d(testTAG(), " onError : " + e.getMessage());
}

@Override
public void onComplete() {
mTextView.append(" onComplete");
mTextView.append("\n");
Logger.d(testTAG(), " onComplete");
}
});

这个东西和我们之前在 Rxjava1 中看到的最简单的行为是一样的,设定上下游,设定了上下游的线程调度器,这里我们可以根据这个来简单的来分析这个例子是如何进行实现的。

Just Operator

简单的来看这个 just 操作符

Retrofit 实现分析

Retrofit 为我们提供了一种非常优雅的方式去书写 Restful 的请求的接口代码,和 OkHttp 、Rxjava 都能方便的无缝搭配,为我们在 Java 和 Android 提供了非常便捷的网络请求的编写方式。这篇文章中我们会从 Usage 出发,逐个步骤的分析 Retrofit 的实现方式。

实现分析

我们可以定义这样的一个接口,代表一种 restful 请求:

1
2
3
4
public interface GitHubService {
@GET("users/{user}/repos")
Call<List<Repo>> listRepos(@Path("user") String user);
}

在使用 Retrofit 的时候:

1
2
3
4
5
Retrofit retrofit = new Retrofit.Builder()
.baseUrl("https://api.github.com/")
.build();

GitHubService service = retrofit.create(GitHubService.class);

我们通过 Builder 模式拼好 baseUrl 等字串,通过 retrofit 对象可以创建我们的接口对应的实体类,我们通过对这个实体类的操作,就能对我们定义好的接口去请求对应的数据:

1
Call<List<Repo>> repos = service.listRepos("octocat");

创建请求类

1
2
3
4
5
6
7
8
9
10
11
12
@SuppressWarnings("unchecked") // Single-interface proxy creation guarded by parameter safety.
public <T> T create(final Class<T> service) {
Utils.validateServiceInterface(service);
if (validateEagerly) {
eagerlyValidateMethods(service);
}
return (T) Proxy.newProxyInstance(service.getClassLoader(), new Class<?>[]{service}, new InvocationHandler() {
// ... 省略
}

});
}

这里我们先是对 class 对象的类型做了检测保证了是 Interface 而且没有多继承,并且在开了 validateEagerly 的情况下会对 Service 里面的请求接口进行 Cache:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private void eagerlyValidateMethods(Class<?> service) {
Platform platform = Platform.get();
for (Method method : service.getDeclaredMethods()) {
if (!platform.isDefaultMethod(method)) {
loadServiceMethod(method);
}
}
}

ServiceMethod<?, ?> loadServiceMethod(Method method) {
ServiceMethod<?, ?> result = serviceMethodCache.get(method);
if (result != null) return result;

synchronized (serviceMethodCache) {
result = serviceMethodCache.get(method);
if (result == null) {
result = new ServiceMethod.Builder<>(this, method).build();
serviceMethodCache.put(method, result);
}
}
return result;
}

动态代理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
return (T) Proxy.newProxyInstance(service.getClassLoader(), new Class<?>[]{service},
new InvocationHandler() {
private final Platform platform = Platform.get();

@Override
public Object invoke(Object proxy, Method method, @Nullable Object[] args)
throws Throwable {

// If the method is a method from Object then defer to normal invocation.
if (method.getDeclaringClass() == Object.class) {
return method.invoke(this, args);
}
if (platform.isDefaultMethod(method)) {
return platform.invokeDefaultMethod(method, service, proxy, args);
}
ServiceMethod<Object, Object> serviceMethod =
(ServiceMethod<Object, Object>) loadServiceMethod(method);
OkHttpCall<Object> okHttpCall = new OkHttpCall<>(serviceMethod, args);
return serviceMethod.callAdapter.adapt(okHttpCall);
}
});

这就是刚才在 Create 函数中的省略的部分,从这个写法看了明显是用了动态代理的方式。使用代理的方式当然是为了能统一的对我们 Service 进去的方法做一些操作,比如判断是否走 method 的正常的 invoke 方法,判断了是否是 default 方法,当然这个根本就不支持,然后就是正常的绑定成一个 ServiceMethod 里面存储和请求的各种元信息,最后把他们封装成一个 OkHttp 的请求交给 Adapt 托管,这样我们就完成了整个请求服务类的创建。

ServiceMethod

我们可以来看下这个包含元信息的 ServiceMethod 是怎么构建的:

1
2
3
4
5
6
7
8
9
10
11
12
13
ServiceMethod<?, ?> loadServiceMethod(Method method) {
ServiceMethod<?, ?> result = serviceMethodCache.get(method);
if (result != null) return result;

synchronized (serviceMethodCache) {
result = serviceMethodCache.get(method);
if (result == null) {
result = new ServiceMethod.Builder<>(this, method).build();
serviceMethodCache.put(method, result);
}
}
return result;
}

使用了 ServiceMethod 的默认 build 方法:

1
2
3
4
5
6
7
Builder(Retrofit retrofit, Method method) {
this.retrofit = retrofit;
this.method = method;
this.methodAnnotations = method.getAnnotations();
this.parameterTypes = method.getGenericParameterTypes();
this.parameterAnnotationsArray = method.getParameterAnnotations();
}

在这之中我们绑定了很多的东西,绑定了我们对这个方法加的注解,参数类型,还有参数的注解,之后在 build 方法之中,主要的事情就是生成 CallAdapter , 还有就是为我们的 Response 提供类型转化的 Converter (比如和 Rxjava 一同使用的就会需要这个)。

CallAdapter

在生成的 CallAdapter 的探索路径中我们会发现是在 Retrofit 的 build 创建的:

1
adapterFactories.add(platform.defaultCallAdapterFactory(callbackExecutor));

我们的默认的 CallAdapter.Factory 是从 Platform 中提供的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
    CallAdapter.Factory defaultCallAdapterFactory(@Nullable Executor callbackExecutor) {
if (callbackExecutor != null) {
return new ExecutorCallAdapterFactory(callbackExecutor);
}
return DefaultCallAdapterFactory.INSTANCE;
}

/**
* Creates call adapters for that uses the same thread for both I/O and application-level
* callbacks. For synchronous calls this is the application thread making the request; for
* asynchronous calls this is a thread provided by OkHttp's dispatcher.
*/

final class DefaultCallAdapterFactory extends CallAdapter.Factory {
static final CallAdapter.Factory INSTANCE = new DefaultCallAdapterFactory();

@Override
public CallAdapter<?, ?> get(Type returnType, Annotation[] annotations, Retrofit retrofit) {
if (getRawType(returnType) != Call.class) {
return null;
}

final Type responseType = Utils.getCallResponseType(returnType);
return new CallAdapter<Object, Call<?>>() {
@Override public Type responseType() {
return responseType;
}

@Override public Call<Object> adapt(Call<Object> call) {
return call;
}
};
}
}

从注释里面我们也能看出来这个是使用相同的线程去创建 CallBack 和 开 IO ,如果是异步的 OKHTTP 就是从它的默认的分发的线程,如果是同步调用就和应用用同一个线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public interface CallAdapter<R, T> {
/**
* Returns the value type that this adapter uses when converting the HTTP response body to a Java
* object. For example, the response type for {@code Call<Repo>} is {@code Repo}. This type
* is used to prepare the {@code call} passed to {@code #adapt}.
* <p>
* Note: This is typically not the same type as the {@code returnType} provided to this call
* adapter's factory.
*/

Type responseType();

/**
* Returns an instance of {@code T} which delegates to {@code call}.
* <p>
* For example, given an instance for a hypothetical utility, {@code Async}, this instance would
* return a new {@code Async<R>} which invoked {@code call} when run.
* <pre><code>
* &#64;Override
* public &lt;R&gt; Async&lt;R&gt; adapt(final Call&lt;R&gt; call) {
* return Async.create(new Callable&lt;Response&lt;R&gt;&gt;() {
* &#64;Override
* public Response&lt;R&gt; call() throws Exception {
* return call.execute();
* }
* });
* }
* </code></pre>
*/

T adapt(Call<R> call);
}

另外我们看这个生成的 CallAdapter 本身做了什么事,它判断了返回的 Response 的类型,针对类型去泛化了一个对应的 CallAdapter , Adapter 做得事情就是把提供接口在 adapter 的过程中给 Call 增加代理(默认的工厂方法没有提供),另外就是提供类型信息当 Response 回来的时候能进行正确的类型转化。

Converter

Response Converter 的设计思路和 Adapter 的思路其实是差不多的提供默认的接口,并且辅助包中提供了大量的额外实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
/**
* Convert objects to and from their representation in HTTP. Instances are created by {@linkplain
* Factory a factory} which is {@linkplain Retrofit.Builder#addConverterFactory(Factory) installed}
* into the {@link Retrofit} instance.
*/

public interface Converter<F, T> {
T convert(F value) throws IOException;

/**
* Creates {@link Converter} instances based on a type and target usage.
*/

abstract class Factory {
/**
* Returns a {@link Converter} for converting an HTTP response body to {@code type}, or null if
* {@code type} cannot be handled by this factory. This is used to create converters for
* response types such as {@code SimpleResponse} from a {@code Call<SimpleResponse>}
* declaration.
*/

public @Nullable
Converter<ResponseBody, ?> responseBodyConverter(Type type,
Annotation[] annotations, Retrofit retrofit) {
return null;
}

/**
* Returns a {@link Converter} for converting {@code type} to an HTTP request body, or null if
* {@code type} cannot be handled by this factory. This is used to create converters for types
* specified by {@link Body @Body}, {@link Part @Part}, and {@link PartMap @PartMap}
* values.
*/

public @Nullable
Converter<?, RequestBody> requestBodyConverter(Type type,
Annotation[] parameterAnnotations, Annotation[] methodAnnotations, Retrofit retrofit) {
return null;
}

/**
* Returns a {@link Converter} for converting {@code type} to a {@link String}, or null if
* {@code type} cannot be handled by this factory. This is used to create converters for types
* specified by {@link Field @Field}, {@link FieldMap @FieldMap} values,
* {@link Header @Header}, {@link HeaderMap @HeaderMap}, {@link Path @Path},
* {@link Query @Query}, and {@link QueryMap @QueryMap} values.
*/

public @Nullable
Converter<?, String> stringConverter(Type type, Annotation[] annotations,
Retrofit retrofit) {
return null;
}

/**
* Extract the upper bound of the generic parameter at {@code index} from {@code type}. For
* example, index 1 of {@code Map<String, ? extends Runnable>} returns {@code Runnable}.
*/

protected static Type getParameterUpperBound(int index, ParameterizedType type) {
return Utils.getParameterUpperBound(index, type);
}

/**
* Extract the raw class type from {@code type}. For example, the type representing
* {@code List<? extends Runnable>} returns {@code List.class}.
*/

protected static Class<?> getRawType(Type type) {
return Utils.getRawType(type);
}
}
}

真对不同需求的实现可以去实现不同的方法,比如 rxjava2 包里的 StringConverterFactory 就默认只实现了和 String 相关的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
final class StringConverterFactory extends Converter.Factory {
@Override
public Converter<ResponseBody, ?> responseBodyConverter(Type type, Annotation[] annotations,
Retrofit retrofit) {
return new Converter<ResponseBody, String>() {
@Override public String convert(ResponseBody value) throws IOException {
return value.string();
}
};
}

@Override public Converter<?, RequestBody> requestBodyConverter(Type type,
Annotation[] parameterAnnotations, Annotation[] methodAnnotations, Retrofit retrofit) {
return new Converter<String, RequestBody>() {
@Override public RequestBody convert(String value) throws IOException {
return RequestBody.create(MediaType.parse("text/plain"), value);
}
};
}
}

这里面提供了吧 ResponseBody 转为 String 的 Converter 和把 String 构建成 ResponseBody 的方法,比如我们经常用的 Gson 的 Converter:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
/**
* A {@linkplain Converter.Factory converter} which uses Gson for JSON.
* <p>
* Because Gson is so flexible in the types it supports, this converter assumes that it can handle
* all types. If you are mixing JSON serialization with something else (such as protocol buffers),
* you must {@linkplain Retrofit.Builder#addConverterFactory(Converter.Factory) add this instance}
* last to allow the other converters a chance to see their types.
*/

public final class GsonConverterFactory extends Converter.Factory {
/**
* Create an instance using a default {@link Gson} instance for conversion. Encoding to JSON and
* decoding from JSON (when no charset is specified by a header) will use UTF-8.
*/

public static GsonConverterFactory create() {
return create(new Gson());
}

/**
* Create an instance using {@code gson} for conversion. Encoding to JSON and
* decoding from JSON (when no charset is specified by a header) will use UTF-8.
*/

@SuppressWarnings("ConstantConditions") // Guarding public API nullability.
public static GsonConverterFactory create(Gson gson) {
if (gson == null) throw new NullPointerException("gson == null");
return new GsonConverterFactory(gson);
}

private final Gson gson;

private GsonConverterFactory(Gson gson) {
this.gson = gson;
}

@Override
public Converter<ResponseBody, ?> responseBodyConverter(Type type, Annotation[] annotations,
Retrofit retrofit) {
TypeAdapter<?> adapter = gson.getAdapter(TypeToken.get(type));
return new GsonResponseBodyConverter<>(gson, adapter);
}

@Override
public Converter<?, RequestBody> requestBodyConverter(Type type,
Annotation[] parameterAnnotations, Annotation[] methodAnnotations, Retrofit retrofit) {
TypeAdapter<?> adapter = gson.getAdapter(TypeToken.get(type));
return new GsonRequestBodyConverter<>(gson, adapter);
}
}

里面则提供了对 Gson 的转化,因为 Type 信息在之前已经拿到了,所以 Gson 的转换也非常简单。

ParameterHandlers

parseParameterAnnotation 根据注解和类型分门别类的对每个参数对应了一个 ParameterHandler ,这段过程没有太多可说的地方,基本上就是对不同类型的一个大的判断和 Parser ,ParameterHandler 中有不同的 apply 方法对我们不同的类型进行处理:

1
abstract void apply(RequestBuilder builder, @Nullable T value) throws IOException;

最终的目的都是为了为 RequestBuilder 这个对象拼接需要的元信息。

发送请求

1
Call<List<Repo>> repos = service.listRepos("octocat");

刚才我们提到了发送请求就是直接这样使用的就可以把我们的请求发送出来了,我们刚才提到了很多请求用到的 元数据 的绑定,那我们现在应该已经有了一个充满了请求所需数据的 ResponseBuilder 我们需要的就是调用刚才绑定的那个 OkHttpCall 了:

execute()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
@Override
public Response<T> execute() throws IOException {
okhttp3.Call call;

synchronized (this) {
if (executed) throw new IllegalStateException("Already executed.");
executed = true;

if (creationFailure != null) {
if (creationFailure instanceof IOException) {
throw (IOException) creationFailure;
} else {
throw (RuntimeException) creationFailure;
}
}

call = rawCall;
if (call == null) {
try {
call = rawCall = createRawCall();
} catch (IOException | RuntimeException e) {
creationFailure = e;
throw e;
}
}
}

if (canceled) {
call.cancel();
}

return parseResponse(call.execute());
}

private okhttp3.Call createRawCall() throws IOException {
Request request = serviceMethod.toRequest(args);
okhttp3.Call call = serviceMethod.callFactory.newCall(request);
if (call == null) {
throw new NullPointerException("Call.Factory returned null.");
}
return call;
}

Response<T> parseResponse(okhttp3.Response rawResponse) throws IOException {
ResponseBody rawBody = rawResponse.body();

// Remove the body's source (the only stateful object) so we can pass the response along.
rawResponse = rawResponse.newBuilder()
.body(new NoContentResponseBody(rawBody.contentType(), rawBody.contentLength()))
.build();

int code = rawResponse.code();
if (code < 200 || code >= 300) {
try {
// Buffer the entire body to avoid future I/O.
ResponseBody bufferedBody = Utils.buffer(rawBody);
return Response.error(bufferedBody, rawResponse);
} finally {
rawBody.close();
}
}

if (code == 204 || code == 205) {
rawBody.close();
return Response.success(null, rawResponse);
}

ExceptionCatchingRequestBody catchingBody = new ExceptionCatchingRequestBody(rawBody);
try {
T body = serviceMethod.toResponse(catchingBody);
return Response.success(body, rawResponse);
} catch (RuntimeException e) {
// If the underlying source threw an exception, propagate that rather than indicating it was
// a runtime exception.
catchingBody.throwIfCaught();
throw e;
}
}

execute() 方法实现了同步请求的方法,这其中的网络请求和调用自然是全靠 OkHttp 来实现的,另外本身使用 OkHttp 的 execute 方法就是一个阻塞的方法。其中值得注意的是我们对 Request 和 Response 的处理,分别是用了 ServiceMethod 中的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/** Builds an HTTP request from method arguments. */
Request toRequest(@Nullable Object... args) throws IOException {
RequestBuilder requestBuilder = new RequestBuilder(httpMethod, baseUrl, relativeUrl, headers,
contentType, hasBody, isFormEncoded, isMultipart);

@SuppressWarnings("unchecked") // It is an error to invoke a method with the wrong arg types.
ParameterHandler<Object>[] handlers = (ParameterHandler<Object>[]) parameterHandlers;

int argumentCount = args != null ? args.length : 0;
if (argumentCount != handlers.length) {
throw new IllegalArgumentException("Argument count (" + argumentCount
+ ") doesn't match expected count (" + handlers.length + ")");
}

for (int p = 0; p < argumentCount; p++) {
handlers[p].apply(requestBuilder, args[p]);
}

return requestBuilder.build();
}

/** Builds a method return value from an HTTP response body. */
R toResponse(ResponseBody body) throws IOException {
return responseConverter.convert(body);
}

一个是用了刚才 ParameterHandler 抓到的元信息去构造 Request,另外一个就是把 Response 用对应的 Converter 去解析成想要的类型。

enqueue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
@Override
public void enqueue(final Callback<T> callback) {
checkNotNull(callback, "callback == null");

okhttp3.Call call;
Throwable failure;

synchronized (this) {
if (executed) throw new IllegalStateException("Already executed.");
executed = true;

call = rawCall;
failure = creationFailure;
if (call == null && failure == null) {
try {
call = rawCall = createRawCall();
} catch (Throwable t) {
failure = creationFailure = t;
}
}
}

if (failure != null) {
callback.onFailure(this, failure);
return;
}

if (canceled) {
call.cancel();
}

call.enqueue(new okhttp3.Callback() {
@Override
public void onResponse(okhttp3.Call call, okhttp3.Response rawResponse)
throws IOException {

Response<T> response;
try {
response = parseResponse(rawResponse);
} catch (Throwable e) {
callFailure(e);
return;
}
callSuccess(response);
}

@Override
public void onFailure(okhttp3.Call call, IOException e) {
try {
callback.onFailure(OkHttpCall.this, e);
} catch (Throwable t) {
t.printStackTrace();
}
}

private void callFailure(Throwable e) {
try {
callback.onFailure(OkHttpCall.this, e);
} catch (Throwable t) {
t.printStackTrace();
}
}

private void callSuccess(Response<T> response) {
try {
callback.onResponse(OkHttpCall.this, response);
} catch (Throwable t) {
t.printStackTrace();
}
}
});
}

相应的 enqueue 方法也是在一堆的 check 之后,直接使用了 OkHttp 的异步 enqueue 去请求。

再看 CallAdapter

CallAdapter 刚才我们已经看了它的默认的实现,将 Call<R> 类型,转换为 T ,当然这里默认的实现里面 这个 T 还是 Call<R> ,但是这是默认实现,retrofit 中提供了为数众多的 adapters ,比如针对 Rxjava 的 Adapter:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Override
public Object adapt(Call<R> call) {
Observable<Response<R>> responseObservable = isAsync
? new CallEnqueueObservable<>(call)
: new CallExecuteObservable<>(call);

Observable<?> observable;
if (isResult) {
observable = new ResultObservable<>(responseObservable);
} else if (isBody) {
observable = new BodyObservable<>(responseObservable);
} else {
observable = responseObservable;
}

if (scheduler != null) {
observable = observable.subscribeOn(scheduler);
}

if (isFlowable) {
return observable.toFlowable(BackpressureStrategy.LATEST);
}
if (isSingle) {
return observable.singleOrError();
}
if (isMaybe) {
return observable.singleElement();
}
if (isCompletable) {
return observable.ignoreElements();
}
return observable;
}

就为我们提供了返回 Observable 对象的返回 Adapter。

总结

至此我们就完成了对 Retrofit 的本体的分析,当然 Retrofit 还提供了非常多的 Adapter 和 Converter ,不过都是针对不同的请求类型进行的特化,这里就不一并分析了。Retrofit 本身的本体代码非常的少,但是能为我们提供这么 Restful 的 API 支持,不得不说确实有其过人之处。

阅读本文你学到了什么:

  • Retrofit 源码的分析
  • 通过动态代理为 API 提供统一操作的思路
  • 使用注解提供元信息的请求框架的设计思路
  • 使用 Factory 对整个架构留出扩充空间和解耦的思路

0x05:SICP 的魔法 - 实例:数字电路模拟

在第二章我们学到的和 数据抽象 相关的知识指出,如果想构造数据抽象,我们需要两个部分:

  1. 创建构造函数包含数据
  2. 创建选择函数 分派 数据

但是在经过了解了第三章相关的 模块化、状态、环境 的知识之后,我们认识到了新的问题,在实际编程之中,需要依赖程序的状态进行编程,那么程序中就要根据我们的环境求值的方式进行计算,那我们在重新设计和模拟系统大的时候就要多考虑几点了:

  1. 在系统中我们带状态的数据抽象
  2. 创建 改变函数(mutator) 去对数据进行重新修改

基于变动的模拟

在构建复杂的系统之中,我们最先面对的部分就是关于 同一性 的知识,这部分知识我们已经在上一章的 同一性发生了变化 的那个小节中简单的讨论过一次,我们可以在这个再重新讨论一下 共享和相等 的知识。

共享和相等

我们通过引入赋值的方式为系统引入了状态,但是造成了引用透明的危机,我们没办法再通过相同的结构来判断对象西相同,两个相同结构的对象并不能确定两个对象是否相同:

1
2
(define x (list 'a 'b))
(define z1 (cons x x))

我们定义了这样的一个结构,x'a'b 组成的序对,然后 z1 是由两个 x 的组成的序对,我们还要在另外定义一个 z2

1
(define z2 (cons (list 'a 'b) (list 'a 'b)))

这两个结构的定义起来,看起来的结构是一样的:

struct

这里我们能看到放置 'a'b 两个序对中的节点都指向了同一个节点,这是因为在 Scheme 中符号引用是共享的,因而他们都指向了同一个节点。但是很明显虽然 符号引用 都指向了同一个节点,但是整体的结构是两个结构指向了两个结构,直接使用:

1
(eq? z1 z2)

的结果肯定是不相等的,这意味着我们每次使用 cons 方法结成一个新的序对都是生成了一个新的对象并返回的是可操作该对象的 指针/引用 (虽然 Scheme 中没有显示的这种概念)。

这时候我们需要考虑另一个问题,就是在整个 数据抽象 都是不可变的情况下,我们对于数据结构是否使用了同样的引用是不可知的,但是当我们引入 赋值 我们共享的数据因为使用了同一个贡献的结构而变得复杂起来,修改其中的一个会导致另一个收到影响。

比如我们上文中提到的 z1z2 就是这样结构,z1 的两个子结构全都是 x ,如果我们修改一个另一个肯定会受到影响,但是 z2 序对中的两个元素是两个不同的序对,并不会有这个的问题。

引入共享结构会带来一些好处:

  • 扩充数据抽象能够定义的数据范围
  • 作为内部状态引入的结构,能够在程序的运行过程中不断地变化,这就可以承担我们需要让他们模拟更为复杂可变的需求

但是上面的这个例子(关于 z1 和 z2 的)也表明了,我们在构建复杂结构的时候需要自己清楚那些结构是共享的,在对这些共享的进行修改的时候我们也要谨慎。

以赋值改变结构

Tips 基于改变函数的新 API,我们的数据依赖 序对 进行构造,那我们的改造函数需要和构造函数类似的结构

  • set-car! 修改序对中的首项
  • set-cdr! 修改序对中的第二项

在之前我们使用该通过过程来存储数据和模拟序对的表示,当时我们通过 $0,1$ 作为 Tag 模拟了序对的分派,这里我们可以对这个程序略作修改:

1
2
3
4
5
6
7
8
9
(define (cons x y)
(define (dispatch m)
(cond ((eq? m 'car) x)
((eq? m 'cdr) y)
(else (error "Undefined operation -- cons" m))))
dispatch)

(define (car z) (z 'car))
(define (cdr z) (z 'cdr))

这个程序的变化其实不大,我们这是利用了 Scheme 中的符号引用的唯一性的这点,本质上和用 0 1 没有什么区别。我们再跟着这个改造的理念可以为这个程序进行扩充,添加我们之前提到的 set-car!set-cdr! 函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
(define (cons x y)
(define (set-x! v) (set! x v))
(define (set-y! v) (set! y v))
(define (dispatch m)
(cond ((eq? m 'car) x)
((eq? m 'cdr) y)
((eq? m 'set-car!) set-x!)
((eq? m 'set-cdr!) set-y!)
(else (error "Undefined operation -- cons" m))))
dispatch)

(define (car z) (z 'car))
(define (cdr z) (z 'cdr))
(define (set-car! z new-value) ((z 'set-car!) new-value) z)
(define (set-cdr! z new-value) ((z 'set-cdr!) new-value) z)

这里面我们通过增加了程序的内部状态,并且增加了两个内部方法来搞定了这个问题。

接下来部分是几个非常有趣的实例学习,包括对表格,队列和数字电路的模拟。这里面前面的表格、队列的知识都比较的简单,这里我们主要关注这个 数字电路模拟 的学习。

数字电路模拟

本节中的代码可以到 Github 仓库 SICP-Magical-Book 中获取

plus

这是一个比较有趣的实例,一个数字电路的模拟器本身是完成一种基于状态的系统模拟,而且本身从设计构建系统的角度中来看数字电路的模拟也是非常好玩,因为本身数字电路是由更多的细小的部件来构成的,本身带有一种自底向上进行程序设计的思路。另外,数字电路模拟还包含了另一种思想,一种基于事件驱动的模拟系统,我们用 事件信号 模拟实际的 数字信号 流过整个模拟的电路程序。

事件驱动设计

事件驱动程序设计(英语:Event-driven programming)是一种电脑程序设计模型。这种模型的程序运行流程是由用户的动作(如鼠标的按键,键盘的按键动作)或者是由其他程序的消息来决定的。相对于批处理程序设计(batch programming)而言,程序运行的流程是由程序员来决定。批量的程序设计在初级程序设计教学课程上是一种方式。然而,事件驱动程序设计这种设计模型是在交互程序(Interactive program)的情况下孕育而生的。

—— Wikipedia

从这段维基百科中摘取出来的关于 事件驱动程序 的介绍,我们对事件驱动可能已经有了一定的理解。其实简单的去理解事件驱动设计,其实就是和传统的基于批处理的程序设计不同,因为那么去写过程式的程序,只能根据程序开发者的编写顺序对程序进行运行,但是事件驱动的程序设计,就像是设定了某些条件,当某些事件触发了这些条件的时候,程序会自动对事件进行处理。

这里我们的数字电路的模拟也是类似,一个数字电路由更细粒度的一系列的组件构成,在整个数字电路的操作中,各个部件的活动构成了这个整体的活动。

  • 电路系统的构建的过程中会遇到一些事件,这些事件是由某些部件遇到一些情况而引发的,并且引发可能是有时间顺序的(很正常,数字信号的传播肯定有顺序)。
  • 还有一个是事件会引起状态的继续改变,状态的改变又会继续带来事件的产生和传播。

基础部件

实际的数字电路线路由一些电子元件和它们之间的连线组成,一个电子元件可能有几个输入端口和一个输出端口,功能就是从多个接入信号经过一系列处理然后输出一个信号:

component

就向我们上图的这些部件,可以提供一些 与、或、非门 这些基础部件,另外我们还需要各种部件之间的连线,这些连线会需要传递 0、1 这样的信号,除此之外各种功能块都会有不同的输出并且产生删除出信号都会有一些信号延迟。

在不断地通过基本构造块和连线进行数字链路的搭建的过程中,实际的数字电路程序已经实际成了一种结构设计语言,并且通过模块的不断构造我们语言的中使用的基本模块也在不断的在扩充,我们用这种语言能够构造结构任意复杂的数字电路:

  • 模块化 —— 基本构造块是基本元素
  • 组合机制 —— 通过基本元素和连线进行组合
  • 抽象机制 —— 我们可以进一步将复杂的组合过程抽象成过程

连线

Tips 半加器:

半加器的功能是将两个一位二进制数相加。它具有两个输入和两个输出(分别是和、进位)。输出的进位信号代表了输入两个数相加溢出的高一位数值。因此,这两2个一位二级制数的和等于2C + S。根据两个一位二进制数相加的结果,可以通过真值表、卡诺图得到右图所描绘的简易半加器设计。它使用了一个异或门来产生和S,并使用了一个与门来产生进位信号C。如果再添加一个或门来接收低位的进位输出信号,则两个半加器就构成了一个全加器。

电路需要由连线来连接我们的基础功能块,由基础的功能块组成更大的功能块,我们这里先使用一个定义生成基础连线的方法 (make-wire) 生成一条连线,我们可以通过这个方法生成连线,然后把各个部件绑定在一起:

1
2
3
4
5
6
7
8
9
10
11
12
13
; 生成六根连线
(define a (make-wire))
(define b (make-wire))
(define c (make-wire))
(define d (make-wire))
(define e (make-wire))
(define s (make-wire))

; 使用与或非门构建半加器 参考之前的半加器图片
(or-gate a b d)
(and-gate a b c)
(inverter c e)
(and-gate d e s)

这里面的 A 和 B 是数据输入,S 和 C 是数据输出,其中S 是和,C 是进位,我们能看到抽象出来连接线、和与、或、非门之后我们对电路的的描述能被抽象到非常简洁形式,我们还可以把这几个过程结合成一个流程生成出一个 半加器的过程:

1
2
3
4
5
6
7
8
(define (half-adder a b s c)
(let ((d (make-wire))
(e (make-wire)))
(or-gate a b d)
(and-gate a b c)
(inverter c e)
(and-gate d e s)
'ok))

两个半加器和一个或门能够成一个全加器:

full-adder

1
2
3
4
5
6
7
8
(define (full-adder a b c-in sum c-out)
(let ((s (make-wire))
(c1 (make-wire))
(c2 (make-wire)))
(half-adder b c-in s c1)
(half-adder a s sum c2)
(or-gate c1 c2 c-out)
'ok))

这就是像是我们之前提到的,使用抽象机制将全加器、半加器这些复杂组件封装成构建过程。

我们在这里还可以给出连线上的发送信号的基本操作:

1
2
3
4
(get-signal <wire>) ; 返回当前线上的信号值
(set-signal! <wire> <new-value>) ; 重设线上的信号值
(add-action! <wire> <procedure of no arguments>) ; 在线上的信号改变的时候运行过程
(after-delay <time> <procedure> ; 设定一个时间延迟和对应的过程,时间过后触发过程

这就能试着写出 make-wire 的定义,肯定是包含上面三个函数的 dispatch 的 分发器,还要有针对多个属性的 修改器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
; 创建一条连线
(define (make-wire)
(let ((signal-value 0)
(action-procedures '()))
; set new value and call procedures
(define (set-my-signal! new-value)
(if (not (= signal-value new-value))
(begin (set! signal-value new-value)
(call-each action-procedures))
'done))

(define (accept-action-procedure! proc)
(set! action-procedures
(cons proc action-procedures))
(proc))

(define (dispatch m)
(cond ((eq? m 'get-signal) signal-value)
((eq? m 'set-signal!) set-my-signal!)
((eq? m 'add-action!) accept-action-procedure!)
(else (error "Unknown operation -- WIRE" m))))

dispatch))

我们注意到刚才提到的东西我们几乎都有,并且通过过程 accept-action-proceduer 能知道每条线包含的过程不只有一个,是一个列表的形式,而且 set-my-signal! 中我们看到,每当有一个线上的数据修改,我们会重新调用所有监听这条线上的过程。

其中还有一些其他的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
; 遍历调用
(define (call-each procedures)
(if (null? procedures)
'done
(begin
((car procedures))
(call-each (cdr procedures)))))

(define (get-signal wire)
(wire 'get-signal))

(define (set-signal! wire new-value)
((wire 'set-signal!) new-value))

(define (add-action! wire action-procedure)
((wire 'add-action!) action-procedure))

执行一些 dispatch 方法而已。

Tips 书中的一到课后题也提到了,例如:

1
2
3
4
5
>     (define (accept-action-proceduer! proc)
> (set! action-proceduers
> (cons proc action-proceduers))
> (proc))
>

>

这个方法,为什么要把 proc 在最后返回,其实这里就是为了让 proc 在那个返回的位置展开,在后面会调用 after-delayafter-delay 会调用 add-to-agenda! ,将指定的动作添加到模拟器的待处理列表中,当调用 (propagate) 时,这个指定的动作会被执行。

逻辑电路门

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53

; 算数逻辑非
(define (logical-not s)
(cond ((= s 0) 1)
((= s 1) 0)
(else (error " Invalid signal " s))))

; 算数逻辑与
(define (logical-and a b)
(if (and (= a 1) (b = 1))
1
0))


; 逻辑或
(define (logical-or a b)
(if (or (= a 1) (= b 1))
1
0))

; 与门 给两个线路都绑上一个监控器
; 当某个值变化的时候 会重新计算 new-value 设置到输出端口
(define (and-gate a1 a2 output)
(define (and-action-procedure)
(let ((new-value
(logical-and (get-signal a1) (get-signal a2))))
(after-delay and-gate-delay
(lambda ()
(set-signal! output new-value)))))
(add-action! a1 and-action-procedure)
(add-action! a2 and-action-procedure)
'ok)

; 或门
(define (or-gate input-1 input-2 output)
(define (or-action-procedure)
(let ((new-value
(logical-or (get-signal input-1) (get-signal input-2))))
(after-delay or-gate-delay
(lambda ()
(set-sign! output new-new-value)))))
(add-action! input-1 or-action-procedure)
(add-action! input-2 or-action-procedure)
'ok)

; 反门
(define (inverter-gate input output)
(define (invert-input)
(let ((new-value (logical-not (get-signal input))))
(after-delay inverter-delay
(lambda () (set-signal! output new-value)))))
(add-action! input invert-input)
'ok)

如果我们理清了前面的给连线添加信号的内容,那逻辑电路门部分的代码就很简单了,甚至有点冗余。与或非门的唯一区别在于如何生成 new-value ,每个过程都包含一个内部过程,负责生成新的值,把这个过程绑定到输入线上,这里面我们的输入值一旦改变,就会重新进行计算,然后重设当前线路中的数值。

待处理表

待处理表的问题主要是我们之前在程序中的使用 after-delay 操作控制程序在某个时间段之后执行一个过程,能控制一定的操作的时序性,这个功能本质上就是在维护一张 待处理表 ,使用队列的结构控制我们执行过程,先给出各个 API 的内容:

1
2
3
4
5
6
7
8
9
10
11
12
; 返回新建的空待处理表
(make-agenda)
; 判断待处理表是否为空
(empty-agenda? <agenda>)
; 返回待处理表中第一个项
(first-agenda-item <agenda>)
; 删除待处理表里的第一项
(remove-first-agenda-item! <agenda>)
; 向待处理表中加入一 项,其意义是要求在给定时间运行的过程
(add-to-agenda! <time> <action> <agenda>)
; 返回当前时间
(current-time <agenda>)

这些 API 的实现都很容易,我们可以逐个的来讲解一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
; segment

(define (make-time-segment time queue)
(cons time queue))

(define (segment-time s) (car s))

(define (segment-queue s) (cdr s))

; agenda

(define (make-agenda) (list 0))

; 当前时间
(define (current-time agenda) (car agenda))

(define (set-current-time! agenda time)
(set-car! agenda time))

(define (segments agenda) (cdr agenda))

(define (set-segments! agenda segments)
(set-cdr! agenda segments))

(define (first-segment agenda) (car (segments agenda)))

(define (rest-segments agenda) (cdr (segments agenda)))

(define (empty-agenda? agenda)
(null? (segments agenda)))

关于segment 这个几个过程式构建 时间队列 的绑定关系,待处理表中的每一项是由时间和时间队列的一个绑定,agenda 本身是一个关于时间的一维表格,在表头存储当前的时间,表头之后存储的是 segments 的段落,还提供了几个 API 做存取。

还有就是如何插入处理表,通过时间片加队列的形式进行查找管理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
; 向待处理表中添加一项
(define (add-to-agenda! time action agenda)
; 检查是否为空 或者当前时间小于 segment 的时间
(define (belongs-before? segments)
(or (null? segments)
(< time (segment-time (car segments)))))
; 创建一个新的时间片 生成新的队列 插入过程
(define (make-new-time-segment time action)
(let ((q (make-queue)))
(insert-queue! q action)
(make-time-segment time q)))
; 添加时间片
(define (add-to-segments! segments)
; 找到合适的时间插进去
(if (= (segment-time (car segments)) time)
(insert-queue! (segment-queue (car segments))
action)
(let ((rest (cdr segments)))
(if (belongs-before? rest)
(set-cdr!
segments
(cons (make-new-time-segment time action)
(cdr segments)))
(add-to-segments! rest)))))
; 遍历时间片 找到合适的时间插进去
(let ((segments (segments agenda)))
(if (belongs-before? segments)
(set-segments!
agenda
(cons (make-new-time-segment time action)
segments))
(add-to-segments! segments))))

上面的注释应该是介绍的已经很详细的了,还提供了一些方法去对处理表进行一些管理,无非就是如何消耗处理事件之类的:

1
2
3
4
5
6
7
8
9
10
11
12
(define (remove-first-agenda-item! agenda)
(let ((q (segment-queue (first-segment agenda))))
(delete-queue! q)
(if (empty-queue? q)
(set-segments! agenda (rest-segments agenda)))))

(define (first-agenda-item agenda)
(if (empty-agenda? agenda)
(error "Agenda is empty -- FIRST-AGENDA-ITEM")
(let ((first-seg (first-segment agenda)))
(set-current-time! agenda (segment-time first-seg))
(front-queue (segment-queue first-seg)))))

最红我们终于要说到 after-delay 的实现定义了:

1
2
3
4
(define (after-delay delay action)
(add-to-agenda! (+ delay (current-time the-agenda))
action
the-agenda))

可见我们为让某个事件延迟执行的本质就是给某个处理表添加一条处理事件,通过获取当前时间增加一定的延迟的把事件放进对应的序列。

我们还需要一个过程去模拟驱动整个表的执行过程,propagate 对整个待处理表进行操作,不断地处理其中的项目进行处理:

1
2
3
4
5
6
7
(define (propagate)
(if (empty-agenda? the-agenda)
'done
(let ((first-item (first-agenda-item the-agenda)))
(first-item)
(remove-first-agenda-item! the-agenda)
(propagate))))

简单的模拟实例

Github 仓库中提供了相应的代码,只需要 load 其中的 simular.rkt 就可以进行编写模拟实例的操作。这里面我们添加一个监控器过程放到线路上面,用来监听数字电路模拟中的数字电路模拟的运行情况:

1
2
3
4
5
6
7
8
9
(define (probe name wire)
(add-action! wire
(lambda ()
(newline)
(display name)
(display " ")
(display (current-time the-agenda))
(display " New-value = ")
(display (get-signal wire)))))

这段代码很简单,就是给某根线注册名字和一个相应的回调,当数据发生变化的时候打印出目前的消息。

接下来的模拟是为了模拟一个半加器的运行情况:

  1. 初始化待处理表,定义延迟相关的常量:
1
2
3
(define the-agenda (make-agenda)) (define inverter-delay 2)
(define and-gate-delay 3)
(define or-gate-delay 5)
  1. 定义四条线路,安装监控器:
1
2
3
4
5
6
(define input-1 (make-wire)) 
(define input-2 (make-wire))
(define sum (make-wire))
(define carry (make-wire))
(probe 'sum sum)
(probe 'carry carry)

运行一次的效果是:

s-1

  1. 接着我们把线路连接到半加器上,测试半加器的输入:
1
(half-adder input-1 input-2 sum carry)

s-2

测试输入,即将 input-1 设置为 1 ,然后将 input-2 设置为 1 :

1
2
(set-signal! input-1 1)
(propagate)

s-3

这时候时间片为 8 ,设定值为 1。

1
2
(set-signal! input-2 2)
(propagate)

s-4

时间片为 11 时,carry 被设置为 1,之后 sum 被设置为 0,这表明我们在这的模拟是有效的。

小结

这篇文章的内容并没有特别多的复杂的理论知识内容没有特别多,主要是进行介绍了书中出现的第一个比较复杂的程序实例,但是这个稍有一些复杂的程序实例中包含了从本书开始就介绍的很多和抽象、模拟相关的知识,也算是用某种实例程序去又温习了之前的知识。

除了对之前的知识的温习之外,这个关于 数字电路模拟程序 的程序设计更多的是关注变动程序的程序设计,程序中和第二章不同的是整个程序都是基于状态变动建立起来的。基础模块和连线相互绑定,在数据出现变动的时候作出响应,使用待处理表对过程调用进行处理,使用时间排序建立时序性的程序设计,整个程序无处不体现着基于变动的程序模拟的思想。

上一篇专栏文章中说文章坑了很久,没想到这篇文章坑了更长时间,看了一下居然四个月过去了,这四个月基本上都是在杭州某厂实习,写文章的时间不是很多,而且这篇文章其实其中的很多文字都是很早就写好的,但是迟迟没有写完也是因为在思考怎么介绍这个实例的程序设计比较好,因为很久以前再读书的时候,感觉书里的步骤虽然由浅入深但是失于冗长,到读者能整的跑整个程序之前,对程序的理解仅限于能读懂书上的代码,所以这部最后还是打算从各个部分的代码分析入手,事先提供好能够跑通运行的程序,能让读者一步步的通过运行代码和阅读文章去理解整个程序。

CountDownLatch 源码分析

CountDownLatch 使用

本身 CountDownLatch 本身的使用其实非常的简单,我们只需要读一读这段注释就可以了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
/**
* A synchronization aid that allows one or more threads to wait until
* a set of operations being performed in other threads completes.
*
* <p>A {@code CountDownLatch} is initialized with a given <em>count</em>.
* The {@link #await await} methods block until the current count reaches
* zero due to invocations of the {@link #countDown} method, after which
* all waiting threads are released and any subsequent invocations of
* {@link #await await} return immediately. This is a one-shot phenomenon
* -- the count cannot be reset. If you need a version that resets the
* count, consider using a {@link CyclicBarrier}.
*
* <p>A {@code CountDownLatch} is a versatile synchronization tool
* and can be used for a number of purposes. A
* {@code CountDownLatch} initialized with a count of one serves as a
* simple on/off latch, or gate: all threads invoking {@link #await await}
* wait at the gate until it is opened by a thread invoking {@link
* #countDown}. A {@code CountDownLatch} initialized to <em>N</em>
* can be used to make one thread wait until <em>N</em> threads have
* completed some action, or some action has been completed N times.
*
* <p>A useful property of a {@code CountDownLatch} is that it
* doesn't require that threads calling {@code countDown} wait for
* the count to reach zero before proceeding, it simply prevents any
* thread from proceeding past an {@link #await await} until all
* threads could pass.
*
* <p><b>Sample usage:</b> Here is a pair of classes in which a group
* of worker threads use two countdown latches:
* <ul>
* <li>The first is a start signal that prevents any worker from proceeding
* until the driver is ready for them to proceed;
* <li>The second is a completion signal that allows the driver to wait
* until all workers have completed.
* </ul>
*
* <pre> {@code
* class Driver { // ...
* void main() throws InterruptedException {
* CountDownLatch startSignal = new CountDownLatch(1);
* CountDownLatch doneSignal = new CountDownLatch(N);
*
* for (int i = 0; i < N; ++i) // create and start threads
* new Thread(new Worker(startSignal, doneSignal)).start();
*
* doSomethingElse(); // don't let run yet
* startSignal.countDown(); // let all threads proceed
* doSomethingElse();
* doneSignal.await(); // wait for all to finish
* }
* }
*
* class Worker implements Runnable {
* private final CountDownLatch startSignal;
* private final CountDownLatch doneSignal;
* Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
* this.startSignal = startSignal;
* this.doneSignal = doneSignal;
* }
* public void run() {
* try {
* startSignal.await();
* doWork();
* doneSignal.countDown();
* } catch (InterruptedException ex) {} // return;
* }
*
* void doWork() { ... }
* }}</pre>
*
* <p>Another typical usage would be to divide a problem into N parts,
* describe each part with a Runnable that executes that portion and
* counts down on the latch, and queue all the Runnables to an
* Executor. When all sub-parts are complete, the coordinating thread
* will be able to pass through await. (When threads must repeatedly
* count down in this way, instead use a {@link CyclicBarrier}.)
*
* <pre> {@code
* class Driver2 { // ...
* void main() throws InterruptedException {
* CountDownLatch doneSignal = new CountDownLatch(N);
* Executor e = ...
*
* for (int i = 0; i < N; ++i) // create and start threads
* e.execute(new WorkerRunnable(doneSignal, i));
*
* doneSignal.await(); // wait for all to finish
* }
* }
*
* class WorkerRunnable implements Runnable {
* private final CountDownLatch doneSignal;
* private final int i;
* WorkerRunnable(CountDownLatch doneSignal, int i) {
* this.doneSignal = doneSignal;
* this.i = i;
* }
* public void run() {
* try {
* doWork(i);
* doneSignal.countDown();
* } catch (InterruptedException ex) {} // return;
* }
*
* void doWork() { ... }
* }}</pre>
*
* <p>Memory consistency effects: Until the count reaches
* zero, actions in a thread prior to calling
* {@code countDown()}
* <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
* actions following a successful return from a corresponding
* {@code await()} in another thread.
*
* @since 1.5
* @author Doug Lea
*/

这里面提到了两种主要的用法:

  • 控制多线程的调用顺序
  • 还有就是把大的工作分成多个子线程部分去完成,然后再等待多线程完成

源码分析

CountDownLatch 本身的代码比较简单,全部的代码几乎都是依赖于 AbstractQueuedSynchronizer 这个实现类,其实我们应该主要去分析 AQS 的实现原理:

1
2
3
4
5
6
7
8
9
10
11
12
13
/* <p>This class supports either or both a default <em>exclusive</em>
* mode and a <em>shared</em> mode. When acquired in exclusive mode,
* attempted acquires by other threads cannot succeed. Shared mode
* acquires by multiple threads may (but need not) succeed. This class
* does not &quot;understand&quot; these differences except in the
* mechanical sense that when a shared mode acquire succeeds, the next
* waiting thread (if one exists) must also determine whether it can
* acquire as well. Threads waiting in the different modes share the
* same FIFO queue. Usually, implementation subclasses support only
* one of these modes, but both can come into play for example in a
* {@link ReadWriteLock}. Subclasses that support only exclusive or
* only shared modes need not define the methods supporting the unused mode.
/

这段文档告诉我们我们的 AQS 包含了两种模式 共享模式独占模式 ,子类只需要实现其中的一部分功能就好了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* Provides a framework for implementing blocking locks and related
* synchronizers (semaphores, events, etc) that rely on
* first-in-first-out (FIFO) wait queues. This class is designed to
* be a useful basis for most kinds of synchronizers that rely on a
* single atomic {@code int} value to represent state. Subclasses
* must define the protected methods that change this state, and which
* define what that state means in terms of this object being acquired
* or released. Given these, the other methods in this class carry
* out all queuing and blocking mechanics. Subclasses can maintain
* other state fields, but only the atomically updated {@code int}
* value manipulated using methods {@link #getState}, {@link
* #setState} and {@link #compareAndSetState} is tracked with respect
* to synchronization.
*
* <p>Subclasses should be defined as non-public internal helper
* classes that are used to implement the synchronization properties
* of their enclosing class. Class
* {@code AbstractQueuedSynchronizer} does not implement any
* synchronization interface. Instead it defines methods such as
* {@link #acquireInterruptibly} that can be invoked as
* appropriate by concrete locks and related synchronizers to
* implement their public methods.
/

我们的 AQS 本身是依赖于先进先出的等待队列,并且是原子计数去实现的实现的状态,这段注释里面还指出了我们的原子计数是 protect 的不能让外部方法操纵它,并且推荐我们在子类中使用内部类作为 helper 的方式去实现我们功能。父类和子类的工作分别是:

  1. 子类负责修改共享变量(a single atomic value to represent state),其操作必须是原子的(通过getState()、setState()和compareAndSetState()方法),根据返回值影响父类的行为(是否挂起当前线程,是否恢复被阻塞线程)。
  2. AbstractQueuedSynchronizer负责线程阻塞队列(FIFO)的维护,根据预留给子类的方法的返回值判断线程阻塞和唤醒(queuing and blocking mechanics)

子类方法实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/* <h3>Usage</h3>
*
* <p>To use this class as the basis of a synchronizer, redefine the
* following methods, as applicable, by inspecting and/or modifying
* the synchronization state using {@link #getState}, {@link
* #setState} and/or {@link #compareAndSetState}:
*
* <ul>
* <li> {@link #tryAcquire}
* <li> {@link #tryRelease}
* <li> {@link #tryAcquireShared}
* <li> {@link #tryReleaseShared}
* <li> {@link #isHeldExclusively}
* </ul>
*
* Each of these methods by default throws {@link
* UnsupportedOperationException}. Implementations of these methods
* must be internally thread-safe, and should in general be short and
* not block. Defining these methods is the <em>only</em> supported
* means of using this class. All other methods are declared
* {@code final} because they cannot be independently varied.
*
* <p>You may also find the inherited methods from {@link
* AbstractOwnableSynchronizer} useful to keep track of the thread
* owning an exclusive synchronizer. You are encouraged to use them
* -- this enables monitoring and diagnostic tools to assist users in
* determining which threads hold locks.
*
* <p>Even though this class is based on an internal FIFO queue, it
* does not automatically enforce FIFO acquisition policies. The core
* of exclusive synchronization takes the form:
*
* <pre>
* Acquire:
* while (!tryAcquire(arg)) {
* <em>enqueue thread if it is not already queued</em>;
* <em>possibly block current thread</em>;
* }
*
* Release:
* if (tryRelease(arg))
* <em>unblock the first queued thread</em>;
* </pre>
*
* (Shared mode is similar but may involve cascading signals.)
/

AQS 作为一个队列的同步器的,子类需要实现的都是 try 字头开头的方法,其中共享锁和独占锁覆盖的方法都带有各自的标识。

参考注释中的一个具体的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
/* <h3>Usage Examples</h3>
*
* <p>Here is a non-reentrant mutual exclusion lock class that uses
* the value zero to represent the unlocked state, and one to
* represent the locked state. While a non-reentrant lock
* does not strictly require recording of the current owner
* thread, this class does so anyway to make usage easier to monitor.
* It also supports conditions and exposes
* one of the instrumentation methods:
*
* <pre> {@code
* class Mutex implements Lock, java.io.Serializable {
*
* // Our internal helper class
* private static class Sync extends AbstractQueuedSynchronizer {
* // Reports whether in locked state
* protected boolean isHeldExclusively() {
* return getState() == 1;
* }
*
* // Acquires the lock if state is zero
* public boolean tryAcquire(int acquires) {
* assert acquires == 1; // Otherwise unused
* if (compareAndSetState(0, 1)) {
* setExclusiveOwnerThread(Thread.currentThread());
* return true;
* }
* return false;
* }
*
* // Releases the lock by setting state to zero
* protected boolean tryRelease(int releases) {
* assert releases == 1; // Otherwise unused
* if (getState() == 0) throw new IllegalMonitorStateException();
* setExclusiveOwnerThread(null);
* setState(0);
* return true;
* }
*
* // Provides a Condition
* Condition newCondition() { return new ConditionObject(); }
*
* // Deserializes properly
* private void readObject(ObjectInputStream s)
* throws IOException, ClassNotFoundException {
* s.defaultReadObject();
* setState(0); // reset to unlocked state
* }
* }
*
* // The sync object does all the hard work. We just forward to it.
* private final Sync sync = new Sync();
*
* public void lock() { sync.acquire(1); }
* public boolean tryLock() { return sync.tryAcquire(1); }
* public void unlock() { sync.release(1); }
* public Condition newCondition() { return sync.newCondition(); }
* public boolean isLocked() { return sync.isHeldExclusively(); }
* public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
* public void lockInterruptibly() throws InterruptedException {
* sync.acquireInterruptibly(1);
* }
* public boolean tryLock(long timeout, TimeUnit unit)
* throws InterruptedException {
* return sync.tryAcquireNanos(1, unit.toNanos(timeout));
* }
* }}</pre>
*/

这个实现的挺简单的,我们只需要考一个内部类包装一下,控制一下我们 AQS 中实现原子状态,在设置下我们拿到独占锁的当前线程就能包装出来一个 互斥类 ,本身实现也很简单的。

另外我们还有一个模仿 CountDownLatch 的 Demo 在注释之中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/* <p>Here is a latch class that is like a
* {@link java.util.concurrent.CountDownLatch CountDownLatch}
* except that it only requires a single {@code signal} to
* fire. Because a latch is non-exclusive, it uses the {@code shared}
* acquire and release methods.
*
* <pre> {@code
* class BooleanLatch {
*
* private static class Sync extends AbstractQueuedSynchronizer {
* boolean isSignalled() { return getState() != 0; }
*
* protected int tryAcquireShared(int ignore) {
* return isSignalled() ? 1 : -1;
* }
*
* protected boolean tryReleaseShared(int ignore) {
* setState(1);
* return true;
* }
* }
*
* private final Sync sync = new Sync();
* public boolean isSignalled() { return sync.isSignalled(); }
* public void signal() { sync.releaseShared(1); }
* public void await() throws InterruptedException {
* sync.acquireSharedInterruptibly(1);
* }
* }}</pre>
*
* @since 1.5
* @author Doug Lea
*/

其中的一些常数我们可以参照 AQS 子类中的部分:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;

/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/

static final int PROPAGATE = -3;

内部队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
/**
* Wait queue node class.
*
* <p>The wait queue is a variant of a "CLH" (Craig, Landin, and
* Hagersten) lock queue. CLH locks are normally used for
* spinlocks. We instead use them for blocking synchronizers, but
* use the same basic tactic of holding some of the control
* information about a thread in the predecessor of its node. A
* "status" field in each node keeps track of whether a thread
* should block. A node is signalled when its predecessor
* releases. Each node of the queue otherwise serves as a
* specific-notification-style monitor holding a single waiting
* thread. The status field does NOT control whether threads are
* granted locks etc though. A thread may try to acquire if it is
* first in the queue. But being first does not guarantee success;
* it only gives the right to contend. So the currently released
* contender thread may need to rewait.
*
* <p>To enqueue into a CLH lock, you atomically splice it in as new
* tail. To dequeue, you just set the head field.
* <pre>
* +------+ prev +-----+ +-----+
* head | | <---- | | <---- | | tail
* +------+ +-----+ +-----+
* </pre>
*
* <p>Insertion into a CLH queue requires only a single atomic
* operation on "tail", so there is a simple atomic point of
* demarcation from unqueued to queued. Similarly, dequeuing
* involves only updating the "head". However, it takes a bit
* more work for nodes to determine who their successors are,
* in part to deal with possible cancellation due to timeouts
* and interrupts.
*
* <p>The "prev" links (not used in original CLH locks), are mainly
* needed to handle cancellation. If a node is cancelled, its
* successor is (normally) relinked to a non-cancelled
* predecessor. For explanation of similar mechanics in the case
* of spin locks, see the papers by Scott and Scherer at
* http://www.cs.rochester.edu/u/scott/synchronization/
*
* <p>We also use "next" links to implement blocking mechanics.
* The thread id for each node is kept in its own node, so a
* predecessor signals the next node to wake up by traversing
* next link to determine which thread it is. Determination of
* successor must avoid races with newly queued nodes to set
* the "next" fields of their predecessors. This is solved
* when necessary by checking backwards from the atomically
* updated "tail" when a node's successor appears to be null.
* (Or, said differently, the next-links are an optimization
* so that we don't usually need a backward scan.)
*
* <p>Cancellation introduces some conservatism to the basic
* algorithms. Since we must poll for cancellation of other
* nodes, we can miss noticing whether a cancelled node is
* ahead or behind us. This is dealt with by always unparking
* successors upon cancellation, allowing them to stabilize on
* a new predecessor, unless we can identify an uncancelled
* predecessor who will carry this responsibility.
*
* <p>CLH queues need a dummy header node to get started. But
* we don't create them on construction, because it would be wasted
* effort if there is never contention. Instead, the node
* is constructed and head and tail pointers are set upon first
* contention.
*
* <p>Threads waiting on Conditions use the same nodes, but
* use an additional link. Conditions only need to link nodes
* in simple (non-concurrent) linked queues because they are
* only accessed when exclusively held. Upon await, a node is
* inserted into a condition queue. Upon signal, the node is
* transferred to the main queue. A special value of status
* field is used to mark which queue a node is on.
*
* <p>Thanks go to Dave Dice, Mark Moir, Victor Luchangco, Bill
* Scherer and Michael Scott, along with members of JSR-166
* expert group, for helpful ideas, discussions, and critiques
* on the design of this class.
*/

AbstractQueuedSynchronizer 的实现的概要基本上是这个样子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public abstract class AbstractQueuedSynchronizer{
private transient volatile Node head;
private transient volatile Node tail;

Node {
int waitStatus;
Node prev;
Node next;
Node nextWaiter;
Thread thread;
}
// 在队列尾部插入节点,中间一些操作用到CAS以保证原子性
private Node addWaiter(Node mode){}
// 将一个Node的相关指向置为空,并不再让其它节点指向它,即可(GC)释放该节点
}

模式支持

AbstractQueuedSynchronizer支持多种工作模式及其组合,包括共享模式、排他模式、是否支持中断、是否超时等。各个模式对应的方法如下(部分)

  • 排他模式

    1
    2
    3
    4
    public final void acquire(int arg)
    final boolean acquireQueued(final Node node, int arg)
    // 恢复锁的状态(“为0”),唤醒后继节点
    public final boolean release(int arg)

  • 支持中断

    1
    2
    // 每次“干活”前,先检查下当前线程的中断状态,如果当前线程被中断了,就放弃当前操作
    public final void acquireInterruptibly(int arg)
  • 支持超时

    1
    2
    // 每次“干活(循环)”前,先检查下剩余时间,在循环的最后更新下时间
    public final boolean tryAcquireNanos(int arg, long nanosTimeout)
  • 共享模式

    1
    2
    3
    // 如果共享状态获取成功之后会判断后继节点是否是共享模式,如是就直接对其进行唤醒操作,也就是同时激发多个线程并发的运行。
    public final void acquireShared(int arg)
    public final boolean releaseShared(int arg)

线程的阻塞和唤醒,使用LockSupport的park和unpark方法。

acquireQueued 实现

HashMap 源码解析

我们刚才看过了 Hashtable 的具体实现,其实我们 HashMap 的很多行为等同于 Hashtable ,主要的区别的就是 HashMap 无法保证对系统同步的支持,还有就是 HashMap 支持 null 值,我们发现 HashMap 继承于 AbstractMap 类,在 AbstractMap 中提供了骨干部分功能的支持,但是具体实现的 put 和 get 的方法没有实现, Abstract 系列的实现类,基本上都是不能保证线程安全的。

Implementation Note

实现细节很有用,可以读一下,主要是在将我们将链表在超过一定的范畴之后会转化成一个红黑树,然后当 size 下去之后我们树又会化成数组的形式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
/*
* Implementation notes.
*
* This map usually acts as a binned (bucketed) hash table, but
* when bins get too large, they are transformed into bins of
* TreeNodes, each structured similarly to those in
* java.util.TreeMap. Most methods try to use normal bins, but
* relay to TreeNode methods when applicable (simply by checking
* instanceof a node). Bins of TreeNodes may be traversed and
* used like any others, but additionally support faster lookup
* when overpopulated. However, since the vast majority of bins in
* normal use are not overpopulated, checking for existence of
* tree bins may be delayed in the course of table methods.
*
* Tree bins (i.e., bins whose elements are all TreeNodes) are
* ordered primarily by hashCode, but in the case of ties, if two
* elements are of the same "class C implements Comparable<C>",
* type then their compareTo method is used for ordering. (We
* conservatively check generic types via reflection to validate
* this -- see method comparableClassFor). The added complexity
* of tree bins is worthwhile in providing worst-case O(log n)
* operations when keys either have distinct hashes or are
* orderable, Thus, performance degrades gracefully under
* accidental or malicious usages in which hashCode() methods
* return values that are poorly distributed, as well as those in
* which many keys share a hashCode, so long as they are also
* Comparable. (If neither of these apply, we may waste about a
* factor of two in time and space compared to taking no
* precautions. But the only known cases stem from poor user
* programming practices that are already so slow that this makes
* little difference.)
*
* Because TreeNodes are about twice the size of regular nodes, we
* use them only when bins contain enough nodes to warrant use
* (see TREEIFY_THRESHOLD). And when they become too small (due to
* removal or resizing) they are converted back to plain bins. In
* usages with well-distributed user hashCodes, tree bins are
* rarely used. Ideally, under random hashCodes, the frequency of
* nodes in bins follows a Poisson distribution
* (http://en.wikipedia.org/wiki/Poisson_distribution) with a
* parameter of about 0.5 on average for the default resizing
* threshold of 0.75, although with a large variance because of
* resizing granularity. Ignoring variance, the expected
* occurrences of list size k are (exp(-0.5) * pow(0.5, k) /
* factorial(k)). The first values are:
*
* 0: 0.60653066
* 1: 0.30326533
* 2: 0.07581633
* 3: 0.01263606
* 4: 0.00157952
* 5: 0.00015795
* 6: 0.00001316
* 7: 0.00000094
* 8: 0.00000006
* more: less than 1 in ten million
*
* The root of a tree bin is normally its first node. However,
* sometimes (currently only upon Iterator.remove), the root might
* be elsewhere, but can be recovered following parent links
* (method TreeNode.root()).
*
* All applicable internal methods accept a hash code as an
* argument (as normally supplied from a public method), allowing
* them to call each other without recomputing user hashCodes.
* Most internal methods also accept a "tab" argument, that is
* normally the current table, but may be a new or old one when
* resizing or converting.
*
* When bin lists are treeified, split, or untreeified, we keep
* them in the same relative access/traversal order (i.e., field
* Node.next) to better preserve locality, and to slightly
* simplify handling of splits and traversals that invoke
* iterator.remove. When using comparators on insertion, to keep a
* total ordering (or as close as is required here) across
* rebalancings, we compare classes and identityHashCodes as
* tie-breakers.
*
* The use and transitions among plain vs tree modes is
* complicated by the existence of subclass LinkedHashMap. See
* below for hook methods defined to be invoked upon insertion,
* removal and access that allow LinkedHashMap internals to
* otherwise remain independent of these mechanics. (This also
* requires that a map instance be passed to some utility methods
* that may create new nodes.)
*
* The concurrent-programming-like SSA-based coding style helps
* avoid aliasing errors amid all of the twisty pointer operations.
*/

容量和扩容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* The default initial capacity - MUST be a power of two.
*/

static final int DEFAULT_INITIAL_CAPACITY = 1 << 4; // aka 16

/**
* The maximum capacity, used if a higher value is implicitly specified
* by either of the constructors with arguments.
* MUST be a power of two <= 1<<30.
*/

static final int MAXIMUM_CAPACITY = 1 << 30;

/**
* The load factor used when none specified in constructor.
*/

static final float DEFAULT_LOAD_FACTOR = 0.75f;

我们这里能看到,和 HashTable 一样有 Capacity 和 Factor 的设定,这里面我们的默认值也是 16 和 0.75 的值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* The bin count threshold for using a tree rather than list for a
* bin. Bins are converted to trees when adding an element to a
* bin with at least this many nodes. The value must be greater
* than 2 and should be at least 8 to mesh with assumptions in
* tree removal about conversion back to plain bins upon
* shrinkage.
*/

static final int TREEIFY_THRESHOLD = 8;
/**
* The bin count threshold for untreeifying a (split) bin during a
* resize operation. Should be less than TREEIFY_THRESHOLD, and at
* most 6 to mesh with shrinkage detection under removal.
*/

static final int UNTREEIFY_THRESHOLD = 6;
/**
* The smallest table capacity for which bins may be treeified.
* (Otherwise the table is resized if too many nodes in a bin.)
* Should be at least 4 * TREEIFY_THRESHOLD to avoid conflicts
* between resizing and treeification thresholds.
*/

static final int MIN_TREEIFY_CAPACITY = 64;

这个就是我们说的链表成树的门限,我们在大小超过 THRESHOLD(8) 的时候会把列表变成一棵树,还有 UNTREEIFY_THRESHOLD 就是树返回链表的门限。MIN_TREEIFY_CAPACITY 这个是树化的最小的 HashMap 容量。

数据存储

1
2
3
4
5
6
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
V value;
Node<K,V> next;
}

我们每一个 Node 都是这样的一个数据节点,包含 K-V 和子节点,这个和 Hashtable 没啥区别。

hash 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

/**
* Computes key.hashCode() and spreads (XORs) higher bits of hash
* to lower. Because the table uses power-of-two masking, sets of
* hashes that vary only in bits above the current mask will
* always collide. (Among known examples are sets of Float keys
* holding consecutive whole numbers in small tables.) So we
* apply a transform that spreads the impact of higher bits
* downward. There is a tradeoff between speed, utility, and
* quality of bit-spreading. Because many common sets of hashes
* are already reasonably distributed (so don't benefit from
* spreading), and because we use trees to handle large sets of
* collisions in bins, we just XOR some shifted bits in the
* cheapest possible way to reduce systematic lossage, as well as
* to incorporate impact of the highest bits that would otherwise
* never be used in index calculations because of table bounds.
*/

static final int hash(Object key) {
int h;
return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16);
}

JDK 7、JDK 8的 HashMap 的 hash 方法是不同的,但是实现的原理其实是一样的东西,我们在程序中实现了一个扰动函数,在这里面我们解释一下这个 扰动函数 的具体实现细节:

hash-map

我们先使用默认的 HashCode 方法求出了返回的 Int 类型的 hash,但是我们的 hash 本身是一个 32bit 的数据,二次幂的话,会有 40亿的 hash 空间,但是 40亿 的 hash 空间是没办法存储的开的,所以我们本身就用通过掩模的方式去的降低我们的数位。

我们先右移 16 位和自己的 hash 本身异或一下,能让我们的数据更为分散一些。这也揭示了我们的 HashMap 的大小为什么都是取 2 的次幂进行处理,那样我们的 table.length - 1 能成为我们的 低位掩模 。我们最后在进行一次 (table.length - 1) & hash 之后我们就能求出位数较小的 hash 值。我们通过 >>> 的方式,让我们的低位值也能保留出高位数据的值,能让我们的 hash 更加平均。

Field 数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
/**
* The table, initialized on first use, and resized as
* necessary. When allocated, length is always a power of two.
* (We also tolerate length zero in some operations to allow
* bootstrapping mechanics that are currently not needed.)
*/

transient Node<K,V>[] table;

/**
* Holds cached entrySet(). Note that AbstractMap fields are used
* for keySet() and values().
*/

transient Set<Map.Entry<K,V>> entrySet;

/**
* The number of key-value mappings contained in this map.
*/

transient int size;

/**
* The number of times this HashMap has been structurally modified
* Structural modifications are those that change the number of mappings in
* the HashMap or otherwise modify its internal structure (e.g.,
* rehash). This field is used to make iterators on Collection-views of
* the HashMap fail-fast. (See ConcurrentModificationException).
*/

transient int modCount;

/**
* The next size value at which to resize (capacity * load factor).
*
* @serial
*/

// (The javadoc description is true upon serialization.
// Additionally, if the table array has not been allocated, this
// field holds the initial array capacity, or zero signifying
// DEFAULT_INITIAL_CAPACITY.)
int threshold;

/**
* The load factor for the hash table.
*
* @serial
*/

final float loadFactor;

这些数据我们都很熟悉,各种的存储的 table ,用作缓存的 entrySet,还有用作判断同步的 modCount 参数,还有门限和加载因子。

1
2
3
4
5
6
7
8
9
10
11
12
/**
* Returns a power of two size for the given target capacity.
*/

static final int tableSizeFor(int cap) {
int n = cap - 1;
n |= n >>> 1;
n |= n >>> 2;
n |= n >>> 4;
n |= n >>> 8;
n |= n >>> 16;
return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}

我们在构造函数中通过这个算法找到了大于等于我们给定参数的最小的二次幂方的参数,这个算法的分析很多人节写的比较麻烦,我们可以简单地理解为我们收到的结果参数都是将数位的所有的位数都变成 1,这样子二进制再加以就会进位,那就肯定是一个 1 全 0 的情况,然后我们拿到值就全是 2 的幂了,这样和 之前介绍的 hash 方法的结合,我们的 HashMap 的容量就能被控制在 2 的幂次了。

Put & Set

initial put

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* Implements Map.put and related methods
*
* @param hash hash for key
* @param key the key
* @param value the value to put
* @param onlyIfAbsent if true, don't change existing value
* @param evict if false, the table is in creation mode.
* @return previous value, or null if none
*/

final V putVal(int hash, K key, V value, boolean onlyIfAbsent,
boolean evict)
{

Node<K,V>[] tab; Node<K,V> p; int n, i;
if ((tab = table) == null || (n = tab.length) == 0)
n = (tab = resize()).length;
// ... 后续
}

我们发现了 table 的初始化程序都是在 put 的时候才开始初始化的,我们在这里当 table == null 的时候使用 resize() 方法去实现我们的初始化和之后的扩容算法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/**
* Initializes or doubles table size. If null, allocates in
* accord with initial capacity target held in field threshold.
* Otherwise, because we are using power-of-two expansion, the
* elements from each bin must either stay at same index, or move
* with a power of two offset in the new table.
*
* @return the table
*/

final Node<K,V>[] resize() {
Node<K,V>[] oldTab = table;
int oldCap = (oldTab == null) ? 0 : oldTab.length;
int oldThr = threshold;
int newCap, newThr = 0;
if (oldCap > 0) {
if (oldCap >= MAXIMUM_CAPACITY) {
threshold = Integer.MAX_VALUE;
return oldTab;
}
else if ((newCap = oldCap << 1) < MAXIMUM_CAPACITY &&
oldCap >= DEFAULT_INITIAL_CAPACITY)
newThr = oldThr << 1; // double threshold
}
else if (oldThr > 0) // initial capacity was placed in threshold
newCap = oldThr;
else { // zero initial threshold signifies using defaults
newCap = DEFAULT_INITIAL_CAPACITY;
newThr = (int)(DEFAULT_LOAD_FACTOR * DEFAULT_INITIAL_CAPACITY);
}
if (newThr == 0) {
float ft = (float)newCap * loadFactor;
newThr = (newCap < MAXIMUM_CAPACITY && ft < (float)MAXIMUM_CAPACITY ?
(int)ft : Integer.MAX_VALUE);
}
threshold = newThr;
@SuppressWarnings({"rawtypes","unchecked"})
Node<K,V>[] newTab = (Node<K,V>[])new Node[newCap];
// 初始化或者是扩容后的 table 返回
}

有位置的直接作为链表首节点插入

1
2
3
// put function
if ((p = tab[i = (n - 1) & hash]) == null)
tab[i] = newNode(hash, key, value, null);

链表插入 <=> 树化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// put function   
else {
Node<K,V> e; K k;
if (p.hash == hash &&
((k = p.key) == key || (key != null && key.equals(k))))
e = p;
else if (p instanceof TreeNode)
e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value);
else {
for (int binCount = 0; ; ++binCount) {
if ((e = p.next) == null) {
p.next = newNode(hash, key, value, null);
if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st
treeifyBin(tab, hash);
break;
}
if (e.hash == hash &&
((k = e.key) == key || (key != null && key.equals(k))))
break;
p = e;
}
}
if (e != null) { // existing mapping for key
V oldValue = e.value;
if (!onlyIfAbsent || oldValue == null)
e.value = value;
afterNodeAccess(e);
return oldValue;
}
}

中的这个部分就是在把撞 key 的数据结成链表:

1
2
3
4
5
6
7
8
9
10
11
12
for (int binCount = 0; ; ++binCount) {
if ((e = p.next) == null) {
p.next = newNode(hash, key, value, null);
if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st
treeifyBin(tab, hash);
break;
}
if (e.hash == hash &&
((k = e.key) == key || (key != null && key.equals(k))))
break;
p = e;
}

当然这里面我们还出现了对树进行树化的函数 treeifyBin ,

HashTable 源码阅读

HashTable 是我们经常会使用的一种数据结构,是一种哈希表的具体实现,提供键值对的存储,对于哈希表我们了解的是比较多的。其中的比较重要的问题是如何处理其中的哈希冲突的问题,在系统中的具体实现是使用了 链地址法re-hash 的结合方式处理冲突。另外还有就是 HashTable 和 我们更为常用的 HashMap 的一个主要的区别就是 HashTable 是支持多线程操作的,在操作 HashTable 的很多的方法里都是同步的代码块,另外就是 HashTable 不允许 null 值的设置。

加载因子和扩容

我们首先来看看我们是如何进行数据的具体存储的,和很久之前写的那篇文章中分析 ArrayList 和 LinkedList 的一样,HashTable 本身的存储也依赖于泛型数组:

1
2
3
4
5
6
7
8
9
/**
* The hash table data.
*/

private transient Entry<?,?>[] table;

/**
* The total number of entries in the hash table.
*/

private transient int count;

这里我们使用了 Entry<?,?> 里面填充的双通配符的模式来存储我们的 Key-Value 的键值对,当然这个坑主要还是因为 Java 本身没办法实现泛型数组的声明,只能靠泛型转换来搞定,另外 count 属性本身是用来存储 table 的本身的长度。

另外我们还有一些的数据对 HashTable 也同样重要的数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* The table is rehashed when its size exceeds this threshold. (The
* value of this field is (int)(capacity * loadFactor).)
*
* @serial
*/

private int threshold;

/**
* The load factor for the hashtable.
*
* @serial
*/

private float loadFactor;

/**
* The number of times this Hashtable has been structurally modified
* Structural modifications are those that change the number of entries in
* the Hashtable or otherwise modify its internal structure (e.g.,
* rehash). This field is used to make iterators on Collection-views of
* the Hashtable fail-fast. (See ConcurrentModificationException).
*/

private transient int modCount = 0;

threshold 是我们 HashTable 扩容的一个阈值和门限,只有超过了这个门限的 table 才会对数据进行扩容,另外还有 loadFactor 是加载因子,加载因子本身应该是一个小于 1 的数字:

1
2
table = new Entry<?,?>[initialCapacity];
threshold = (int)Math.min(initialCapacity * loadFactor, MAX_ARRAY_SIZE + 1);

我们可以看到 threshold 这个门限参数本身来自于 capacity (扩容的大小),和 loadFactor 加载因子的乘积共同决定的,所以说乘出来的门限,就是我们的扩容的阈值。因此在源码的注释之中,也提到了优化的方式,就是不要有过高的初始化的容量和过小的加载因子,因为这两者都会造成 table 的过早扩容,造成浪费的情况出现。

数据存储

我们在该类的实际的使用之中采用了一个内部类 Entry 存储数据:

1
2
3
4
5
6
private static class Entry<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
V value;
Entry<K,V> next;
}

每个 Entry 对象本身是一个单向列表的节点,也就是说我们在用 链地址法 使用的时候结成的那个单向链表的节点,我们可以用如下的一幅图来表示:

single-line-graph

处理数据冲突

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* Returns the value to which the specified key is mapped,
* or {@code null} if this map contains no mapping for the key.
*
* <p>More formally, if this map contains a mapping from a key
* {@code k} to a value {@code v} such that {@code (key.equals(k))},
* then this method returns {@code v}; otherwise it returns
* {@code null}. (There can be at most one such mapping.)
*
* @param key the key whose associated value is to be returned
* @return the value to which the specified key is mapped, or
* {@code null} if this map contains no mapping for the key
* @throws NullPointerException if the specified key is null
* @see #put(Object, Object)
*/

@SuppressWarnings("unchecked")
public synchronized V get(Object key) {
Entry<?,?> tab[] = table;
int hash = key.hashCode();
int index = (hash & 0x7FFFFFFF) % tab.length;
for (Entry<?,?> e = tab[index] ; e != null ; e = e.next) {
if ((e.hash == hash) && e.key.equals(key)) {
return (V)e.value;
}
}
return null;
}

我们首先来看 get 方法的处理是如何进行的,首先获得 key 的 hashCode 的值,通过和 0x7FFFFFFF 相与这个步骤,我们抹去了所有 hash 位的 符号位 的数值,这样我们能正常的对一个负数去模。之后我们的这个 for 循环就是在遍历我们的 子链表 来查找我们要找的那个元素和节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/**
* Maps the specified <code>key</code> to the specified
* <code>value</code> in this hashtable. Neither the key nor the
* value can be <code>null</code>. <p>
*
* The value can be retrieved by calling the <code>get</code> method
* with a key that is equal to the original key.
*
* @param key the hashtable key
* @param value the value
* @return the previous value of the specified key in this hashtable,
* or <code>null</code> if it did not have one
* @exception NullPointerException if the key or value is
* <code>null</code>
* @see Object#equals(Object)
* @see #get(Object)
*/

public synchronized V put(K key, V value) {
// Make sure the value is not null
if (value == null) {
throw new NullPointerException();
}

// Makes sure the key is not already in the hashtable.
Entry<?,?> tab[] = table;
int hash = key.hashCode();
int index = (hash & 0x7FFFFFFF) % tab.length;
@SuppressWarnings("unchecked")
Entry<K,V> entry = (Entry<K,V>)tab[index];
for(; entry != null ; entry = entry.next) {
if ((entry.hash == hash) && entry.key.equals(key)) {
V old = entry.value;
entry.value = value;
return old;
}
}

addEntry(hash, key, value, index);
return null;
}

put 中的 hash 部分都是相似的,通过相与然后取模获取 hash 位,然后我们拿到对应的链表的首节点(数组中的一个位) 然后遍历直到找到撞 Key 的 Value 节点,然后在返回的时候返回我们的 Privious Value 然后做替换,如果没有就插入新的节点:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private void addEntry(int hash, K key, V value, int index) {
modCount++;

Entry<?,?> tab[] = table;
if (count >= threshold) {
// Rehash the table if the threshold is exceeded
rehash();

tab = table;
hash = key.hashCode();
index = (hash & 0x7FFFFFFF) % tab.length;
}

// Creates the new entry.
@SuppressWarnings("unchecked")
Entry<K,V> e = (Entry<K,V>) tab[index];
tab[index] = new Entry<>(hash, key, value, e);
count++;
}

我们在 addEntry 方法添加一个节点进入我们的数组,我们既然没找到我们所需要的节点,那么我们就可以直接把 hash 的 Entry 塞进去。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* Removes the key (and its corresponding value) from this
* hashtable. This method does nothing if the key is not in the hashtable.
*
* @param key the key that needs to be removed
* @return the value to which the key had been mapped in this hashtable,
* or <code>null</code> if the key did not have a mapping
* @throws NullPointerException if the key is <code>null</code>
*/

public synchronized V remove(Object key) {
Entry<?,?> tab[] = table;
int hash = key.hashCode();
int index = (hash & 0x7FFFFFFF) % tab.length;
@SuppressWarnings("unchecked")
Entry<K,V> e = (Entry<K,V>)tab[index];
for(Entry<K,V> prev = null ; e != null ; prev = e, e = e.next) {
if ((e.hash == hash) && e.key.equals(key)) {
modCount++;
if (prev != null) {
prev.next = e.next;
} else {
tab[index] = e.next;
}
count--;
V oldValue = e.value;
e.value = null;
return oldValue;
}
}
return null;
}

remove 看起来也不是特别的麻烦,也就是在我们的链表上面撞 key 和 hash 来做对应的链表的节点删除。

rehash

我们在这里面发现了超过 threshold 的大小,就应该重新 rehash 数据,让我们的 hash 值能够重新平衡:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
* Increases the capacity of and internally reorganizes this
* hashtable, in order to accommodate and access its entries more
* efficiently. This method is called automatically when the
* number of keys in the hashtable exceeds this hashtable's capacity
* and load factor.
*/

@SuppressWarnings("unchecked")
protected void rehash() {
int oldCapacity = table.length;
Entry<?,?>[] oldMap = table;

// overflow-conscious code
int newCapacity = (oldCapacity << 1) + 1;
if (newCapacity - MAX_ARRAY_SIZE > 0) {
if (oldCapacity == MAX_ARRAY_SIZE)
// Keep running with MAX_ARRAY_SIZE buckets
return;
newCapacity = MAX_ARRAY_SIZE;
}
Entry<?,?>[] newMap = new Entry<?,?>[newCapacity];

modCount++;
threshold = (int)Math.min(newCapacity * loadFactor, MAX_ARRAY_SIZE + 1);
table = newMap;

for (int i = oldCapacity ; i-- > 0 ;) {
for (Entry<K,V> old = (Entry<K,V>)oldMap[i] ; old != null ; ) {
Entry<K,V> e = old;
old = old.next;

int index = (e.hash & 0x7FFFFFFF) % newCapacity;
e.next = (Entry<K,V>)newMap[index];
newMap[index] = e;
}
}
}

rehash 里面的操作也比较简单,我们就是先把数组的大小放大 (* 2 +1 ),然后我们把旧的每个数组和每个链表中能索引到的数值都重新的平均 hash 到每个数组的位置之中,这时候我们的每个 hash 就又重新的平衡了。