假设有个字符串:

1
var str = "The MapReduce library in the user program first splits the input files into M pieces of typically 16 megabytes to 64 megabytes (MB) per piece (controllable by the user via an optional parameter). It then starts up many copies of the program on a cluster of machines."

需要统计里面每个字母出现的次数。最直观简单的做法就是利用一个 map,从开始到末尾读这个字符串,并把字母作为 key,出现的次数作为 value。Map 中包含 key 的时候,value + 1,Map 中没有 key 的时候默认 1。最后读完这个字符串就 OK。

1
2
3
4
5
6
7
8
9
10
11
12
13
var m = make(map[string]int)
temp := strings.Split(str, "")

for _, c := range temp {
if !unicode.IsLetter([]rune(c)[0]) {
continue
}
if count, ok := m[c]; ok {
m[c] = count + 1
} else {
m[c] = 1
}
}
1
[M:3 R:1 y:7 o:13 v:1 e:26 h:7 l:10 i:14 r:15 T:1 p:13 d:1 u:6 c:8 b:5 s:14 g:4 a:17 f:5 m:7 t:20 B:1 I:1 n:10]

在现实世界中,这个 str 可能非常巨大,所以有时候我们需要将源文本拆分成多个小的字符串,然后多个线程同时处理,每个线程计算得到当前的中间结果,最后合并到一起。

上述的过程在函数式编程中可以被抽象为 Map 和 Reduce 两个函数。其中 Map 函数是把一个数组的每个元素按照相同的逻辑处理之后返回的结果,Reduce 函数是把所有元素整合起来得到结果。通常这个两个函数的参数都是函数,Map 的返回值一般也是数组,Reduce 的返回值可能是各种类型。

为了在单机上实现出并发处理的效果,可以用 Go 自带的 goroutine 来实现。下面把拆分的工作省略,直接进入主题

接下来用 4 个 goroutine 同时处理这些 string,每个做 goroutine 利用 单机串行版 的逻辑,生产出一个小规模的中间内容。随后把每个中间内容都整合起来得到最终值。接下来需要考虑

  • Go 天生支持 CSP 编程模型,所以利用 channel 做通信没有问题
  • 是否有 data race
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package main

import (
"strings"
"sync"
"unicode"
)

type ResultMap struct {
sync.Mutex
result map[string]int
}

func main() {
str1 := "The MapReduce library in the user program first"
str2 := "splits the input files into M pieces of typically 16 megabytes to 64 megabytes (MB)"
str3 := "per piece (controllable by the user via an optional parameter)."
str4 := "It then starts up many copies of the program on a cluster of machines."

strs := []string {str1, str2, str3, str4}

// 主线程需要阻塞直到所有的 reduce 都结束
var waitGroup sync.WaitGroup
waitGroup.Add(len(strs))

c := make(chan map[string]int)

res := new(ResultMap)
res.result = make(map[string]int)

for _, str := range strs {
go doMap(str, c)
go doReduce(c, res, &waitGroup)
}

waitGroup.Wait()

sortPrintMap(res.result)

}

// 生产出对应的 kv 传递给 channel
func doMap(str string, c chan map[string]int) {
temp := strings.Split(str, "")
m := make(map[string]int)

for _, c := range temp {
if !unicode.IsLetter([]rune(c)[0]) {
continue
}
if count, ok := m[c]; ok {
m[c] = count + 1
} else {
m[c] = 1
}
}
c <- m
}

// 合并
func doReduce(c chan map[string]int, res *ResultMap, group *sync.WaitGroup) {
res.Lock()
defer res.Unlock()
for k, v := range <- c {
if count, ok := res.result[k]; ok {
res.result[k] = count + v
} else {
res.result[k] = v
}
}
group.Done()
}

检查一下结果 (Map 的 key 本身是无序的,这里是排好序之后的)

1
[M:3 R:1 y:7 o:13 v:1 e:26 h:7 l:10 i:14 r:15 T:1 p:13 d:1 u:6 c:8 b:5 s:14 g:4 a:17 f:5 m:7 t:20 B:1 I:1 n:10]

结果无误之后,这个问题可以再深入

  • 上述的 reduce 和 map 是单机上的,之间的数据共享用了 channel,如果是物理隔离的场景下,如何用别的东西做数据共享?
  • 任何一个子任务都有可能因为各种原因挂掉,如何在某个子任务挂掉的情况下,系统的准确性不受影响,甚至能自愈?
  • 上述的 goroutine 在执行结束之后就会被调度器回收,但实际上因为 map 总是会比 reduce 先结束,那么后期的过程实际上可以有更多的 goroutine 可以参与到 reduce 任务中 r 如何实现这种调度让资源可以被更加充分的利用?