写在前面
本文是 Go 学习笔记系列的第五篇,介绍 Go 的并发编程。并发是 Go 的核心优势,goroutine + channel 的模型简洁而强大。前置知识:接口与错误处理(第四篇)。
一、goroutine
1.1 启动 goroutine
1
2
3
4
5
6
7
8
9
10
|
// 普通函数调用(同步)
doSomething()
// 加 go 关键字启动 goroutine(异步)
go doSomething()
// 匿名函数启动
go func() {
fmt.Println("goroutine 中执行")
}()
|
1.2 goroutine 基本示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
func main() {
// 启动多个 goroutine
go func() {
for i := 0; i < 3; i++ {
fmt.Println("goroutine A:", i)
}
}()
go func() {
for i := 0; i < 3; i++ {
fmt.Println("goroutine B:", i)
}
}()
// 问题:main 函数结束后所有 goroutine 都会被终止
// 输出可能什么都看不到,因为 main 先退出了
}
|
1.3 等待 goroutine 结束
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
import "sync"
func main() {
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1) // 计数器 +1
go func(id int) {
defer wg.Done() // 计数器 -1
fmt.Printf("worker %d 完成\n", id)
}(i)
}
wg.Wait() // 阻塞直到计数器归零
fmt.Println("所有 worker 完成")
}
|
注意:循环变量要作为参数传入 goroutine,否则闭包会捕获循环变量的引用(Go 1.22 之前会出问题)。
1.4 goroutine 特点
1
2
3
4
|
- 极轻量:初始栈只有 2KB(可动态增长),比线程轻得多
- 非阻塞:goroutine 的调度由 Go 运行时管理,不依赖 OS 线程
- 数量:轻松创建数万个 goroutine
- 调度:GMP 模型(Goroutine-Machine-Processor),由运行时自动调度
|
二、channel
channel 是 goroutine 之间通信的管道,Go 的并发哲学:不要通过共享内存来通信,而要通过通信来共享内存。
2.1 创建和使用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
// 创建 channel
ch := make(chan int) // 无缓冲 channel
ch := make(chan int, 10) // 带缓冲 channel,容量10
// 发送数据
ch <- 42
// 接收数据
value := <-ch
// 接收并判断是否关闭
value, ok := <-ch
if !ok {
fmt.Println("channel 已关闭")
}
// 关闭 channel
close(ch)
|
2.2 无缓冲 channel
1
2
3
4
5
6
7
8
9
|
// 无缓冲 channel:发送和接收必须同时就绪(同步)
ch := make(chan string)
go func() {
ch <- "hello" // 发送后阻塞,直到有人接收
}()
msg := <-ch // 阻塞,直到有数据
fmt.Println(msg) // hello
|
无缓冲 channel 也叫同步 channel,发送和接收是握手操作。
2.3 带缓冲 channel
1
2
3
4
5
6
7
8
9
10
11
12
13
|
// 带缓冲 channel:缓冲区满之前发送不阻塞
ch := make(chan int, 3) // 容量3
ch <- 1 // 不阻塞
ch <- 2 // 不阻塞
ch <- 3 // 不阻塞
// ch <- 4 // 阻塞,缓冲区满了
fmt.Println(len(ch)) // 3(当前缓冲区中的元素数)
fmt.Println(cap(ch)) // 3(容量)
v1 := <-ch // 1
v2 := <-ch // 2
|
2.4 关闭 channel
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
ch := make(chan int, 5)
// 生产者发送完数据后关闭
ch <- 1
ch <- 2
ch <- 3
close(ch)
// 消费者用 range 遍历(自动在 close 后退出)
for v := range ch {
fmt.Println(v) // 1 2 3
}
// 注意:
// - 只有发送方应该关闭 channel
// - 关闭后不能再发送(panic)
// - 关闭后仍可以接收(返回零值和 false)
// - 不需要关闭的 channel 就不要关(GC 会回收)
|
2.5 单向 channel
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
// 只发送(生产者)
func producer(ch chan<- int) {
for i := 0; i < 5; i++ {
ch <- i
}
close(ch)
}
// 只接收(消费者)
func consumer(ch <-chan int) {
for v := range ch {
fmt.Println(v)
}
}
func main() {
ch := make(chan int, 5)
go producer(ch)
consumer(ch)
}
|
三、select
select 同时监听多个 channel,类似 switch 但专门用于 channel 操作。
3.1 基本用法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
ch1 := make(chan string)
ch2 := make(chan string)
go func() {
time.Sleep(1 * time.Second)
ch1 <- "来自 ch1"
}()
go func() {
time.Sleep(2 * time.Second)
ch2 <- "来自 ch2"
}()
// 哪个先就绪就执行哪个
for i := 0; i < 2; i++ {
select {
case msg1 := <-ch1:
fmt.Println(msg1)
case msg2 := <-ch2:
fmt.Println(msg2)
}
}
|
3.2 超时控制
1
2
3
4
5
6
|
select {
case result := <-ch:
fmt.Println("收到:", result)
case <-time.After(3 * time.Second):
fmt.Println("超时了")
}
|
3.3 非阻塞操作
1
2
3
4
5
6
|
select {
case msg := <-ch:
fmt.Println("收到:", msg)
default:
fmt.Println("没有数据,做其他事")
}
|
四、sync 包
4.1 Mutex — 互斥锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
import "sync"
type SafeCounter struct {
mu sync.Mutex
m map[string]int
}
func (c *SafeCounter) Inc(key string) {
c.mu.Lock()
defer c.mu.Unlock()
c.m[key]++
}
func (c *SafeCounter) Get(key string) int {
c.mu.Lock()
defer c.mu.Unlock()
return c.m[key]
}
// 使用
counter := SafeCounter{m: make(map[string]int)}
counter.Inc("hello")
fmt.Println(counter.Get("hello")) // 1
|
4.2 RWMutex — 读写锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
type Cache struct {
mu sync.RWMutex
data map[string]string
}
// 写操作用写锁(互斥)
func (c *Cache) Set(key, value string) {
c.mu.Lock()
defer c.mu.Unlock()
c.data[key] = value
}
// 读操作用读锁(共享,多个读者可以同时读)
func (c *Cache) Get(key string) string {
c.mu.RLock()
defer c.mu.RUnlock()
return c.data[key]
}
|
读多写少的场景用 RWMutex 比Mutex 性能更好。
4.3 sync.Once — 只执行一次
1
2
3
4
5
6
7
8
9
|
var once sync.Once
var instance *Config
func GetConfig() *Config {
once.Do(func() {
instance = loadConfig() // 无论多少 goroutine 调用,只执行一次
})
return instance
}
|
4.4 sync.Map — 并发安全的 Map
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
var m sync.Map
// 写入
m.Store("name", "张三")
m.Store("age", 25)
// 读取
val, ok := m.Load("name")
if ok {
fmt.Println(val) // 张三
}
// 遍历
m.Range(func(key, value any) bool {
fmt.Println(key, value)
return true // 返回 false 停止遍历
})
// 删除
m.Delete("age")
|
sync.Map 适合读多写少的场景。读写都频繁的场景用 map + Mutex 更好。
五、context
context 用于在 goroutine 之间传递取消信号、超时、截止时间和请求级别的值。
5.1 取消(WithCancel)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
import "context"
func worker(ctx context.Context, id int) {
for {
select {
case <-ctx.Done():
fmt.Printf("worker %d 收到取消信号,退出\n", id)
return
default:
fmt.Printf("worker %d 工作中...\n", id)
time.Sleep(500 * time.Millisecond)
}
}
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
go worker(ctx, 1)
go worker(ctx, 2)
time.Sleep(2 * time.Second)
cancel() // 通知所有 worker 退出
time.Sleep(1 * time.Second) // 等待 worker 退出
}
|
5.2 超时(WithTimeout)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
func doWork(ctx context.Context) error {
// 模拟耗时操作
select {
case <-time.After(5 * time.Second):
return nil
case <-ctx.Done():
return ctx.Err() // context deadline exceeded
}
}
func main() {
// 3秒超时
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
if err := doWork(ctx); err != nil {
fmt.Println("操作失败:", err) // context deadline exceeded
}
}
|
5.3 传值(WithValue)
1
2
3
4
5
6
7
8
9
10
11
|
func main() {
// 传请求级别的值(如 trace_id)
ctx := context.WithValue(context.Background(), "traceID", "abc-123")
handleRequest(ctx)
}
func handleRequest(ctx context.Context) {
traceID := ctx.Value("traceID")
fmt.Println("traceID:", traceID) // abc-123
}
|
WithValue 只用于请求级别的跨函数传值(trace_id、user_id 等),不要用来传可选参数。
5.4 context 使用规则
1
2
3
4
5
|
- 不要把 context 放在结构体里,作为函数第一个参数传递
- 传递 context 给子函数,子函数可以随时被取消
- context.WithValue 的 key 建议用自定义类型,避免冲突
- 函数接收 context 时,不要传 nil,用 context.TODO() 代替
- cancel() 要 defer 调用,避免资源泄漏
|
六、常见并发模式
6.1 Worker Pool(工作池)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
func workerPool(jobs <-chan int, results chan<- int, id int) {
for job := range jobs {
fmt.Printf("worker %d 处理任务 %d\n", id, job)
results <- job * 2
}
}
func main() {
jobs := make(chan int, 10)
results := make(chan int, 10)
// 启动3个 worker
for w := 1; w <= 3; w++ {
go workerPool(jobs, results, w)
}
// 发送5个任务
for j := 1; j <= 5; j++ {
jobs <- j
}
close(jobs)
// 收集结果
for r := 1; r <= 5; r++ {
fmt.Println("结果:", <-results)
}
}
|
6.2 Pipeline(管道模式)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
// 阶段1:生成数据
func generate(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
// 阶段2:平方
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
func main() {
// 组装管道:生成 → 平方 → 输出
ch := square(square(generate(2, 3, 4)))
for v := range ch {
fmt.Println(v) // 16, 81, 256
}
}
|
6.3 Fan-out / Fan-in(扇出/扇入)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
// Fan-out:多个 goroutine 从同一个 channel 读取(自动分发)
ch1 := square(generate(1, 2, 3, 4, 5))
ch2 := square(generate(1, 2, 3, 4, 5)) // 两个 worker 同时处理
// Fan-in:合并多个 channel 到一个
func merge(cs ...<-chan int) <-chan int {
out := make(chan int)
var wg sync.WaitGroup
for _, c := range cs {
wg.Add(1)
go func(ch <-chan int) {
defer wg.Done()
for v := range ch {
out <- v
}
}(c)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
|
七、小结
本文学习了 Go 的并发编程:
- goroutine(轻量级协程,go 关键字启动)
- channel(goroutine 间通信,无缓冲 vs 带缓冲)
- select(多路复用、超时控制)
- sync 包(Mutex、RWMutex、WaitGroup、Once、sync.Map)
- context(取消、超时、传值)
- 常见并发模式(Worker Pool、Pipeline、Fan-out/Fan-in)
下一篇将学习标准库与工程实践,包括 HTTP 服务、数据库操作和单元测试。