Go 学习笔记(五):并发编程

写在前面

本文是 Go 学习笔记系列的第五篇,介绍 Go 的并发编程。并发是 Go 的核心优势,goroutine + channel 的模型简洁而强大。前置知识:接口与错误处理(第四篇)。


一、goroutine

1.1 启动 goroutine

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
// 普通函数调用(同步)
doSomething()

// 加 go 关键字启动 goroutine(异步)
go doSomething()

// 匿名函数启动
go func() {
    fmt.Println("goroutine 中执行")
}()

1.2 goroutine 基本示例

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
func main() {
    // 启动多个 goroutine
    go func() {
        for i := 0; i < 3; i++ {
            fmt.Println("goroutine A:", i)
        }
    }()

    go func() {
        for i := 0; i < 3; i++ {
            fmt.Println("goroutine B:", i)
        }
    }()

    // 问题:main 函数结束后所有 goroutine 都会被终止
    // 输出可能什么都看不到,因为 main 先退出了
}

1.3 等待 goroutine 结束

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
import "sync"

func main() {
    var wg sync.WaitGroup

    for i := 0; i < 5; i++ {
        wg.Add(1)           // 计数器 +1
        go func(id int) {
            defer wg.Done()  // 计数器 -1
            fmt.Printf("worker %d 完成\n", id)
        }(i)
    }

    wg.Wait()  // 阻塞直到计数器归零
    fmt.Println("所有 worker 完成")
}

注意:循环变量要作为参数传入 goroutine,否则闭包会捕获循环变量的引用(Go 1.22 之前会出问题)。

1.4 goroutine 特点

1
2
3
4
- 极轻量:初始栈只有 2KB(可动态增长),比线程轻得多
- 非阻塞:goroutine 的调度由 Go 运行时管理,不依赖 OS 线程
- 数量:轻松创建数万个 goroutine
- 调度:GMP 模型(Goroutine-Machine-Processor),由运行时自动调度

二、channel

channel 是 goroutine 之间通信的管道,Go 的并发哲学:不要通过共享内存来通信,而要通过通信来共享内存

2.1 创建和使用

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
// 创建 channel
ch := make(chan int)       // 无缓冲 channel
ch := make(chan int, 10)   // 带缓冲 channel,容量10

// 发送数据
ch <- 42

// 接收数据
value := <-ch

// 接收并判断是否关闭
value, ok := <-ch
if !ok {
    fmt.Println("channel 已关闭")
}

// 关闭 channel
close(ch)

2.2 无缓冲 channel

1
2
3
4
5
6
7
8
9
// 无缓冲 channel:发送和接收必须同时就绪(同步)
ch := make(chan string)

go func() {
    ch <- "hello"   // 发送后阻塞,直到有人接收
}()

msg := <-ch         // 阻塞,直到有数据
fmt.Println(msg)    // hello

无缓冲 channel 也叫同步 channel,发送和接收是握手操作。

2.3 带缓冲 channel

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
// 带缓冲 channel:缓冲区满之前发送不阻塞
ch := make(chan int, 3)   // 容量3

ch <- 1   // 不阻塞
ch <- 2   // 不阻塞
ch <- 3   // 不阻塞
// ch <- 4  // 阻塞,缓冲区满了

fmt.Println(len(ch))  // 3(当前缓冲区中的元素数)
fmt.Println(cap(ch))  // 3(容量)

v1 := <-ch   // 1
v2 := <-ch   // 2

2.4 关闭 channel

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
ch := make(chan int, 5)

// 生产者发送完数据后关闭
ch <- 1
ch <- 2
ch <- 3
close(ch)

// 消费者用 range 遍历(自动在 close 后退出)
for v := range ch {
    fmt.Println(v)   // 1 2 3
}

// 注意:
// - 只有发送方应该关闭 channel
// - 关闭后不能再发送(panic)
// - 关闭后仍可以接收(返回零值和 false)
// - 不需要关闭的 channel 就不要关(GC 会回收)

2.5 单向 channel

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
// 只发送(生产者)
func producer(ch chan<- int) {
    for i := 0; i < 5; i++ {
        ch <- i
    }
    close(ch)
}

// 只接收(消费者)
func consumer(ch <-chan int) {
    for v := range ch {
        fmt.Println(v)
    }
}

func main() {
    ch := make(chan int, 5)
    go producer(ch)
    consumer(ch)
}

三、select

select 同时监听多个 channel,类似 switch 但专门用于 channel 操作。

3.1 基本用法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
ch1 := make(chan string)
ch2 := make(chan string)

go func() {
    time.Sleep(1 * time.Second)
    ch1 <- "来自 ch1"
}()

go func() {
    time.Sleep(2 * time.Second)
    ch2 <- "来自 ch2"
}()

// 哪个先就绪就执行哪个
for i := 0; i < 2; i++ {
    select {
    case msg1 := <-ch1:
        fmt.Println(msg1)
    case msg2 := <-ch2:
        fmt.Println(msg2)
    }
}

3.2 超时控制

1
2
3
4
5
6
select {
case result := <-ch:
    fmt.Println("收到:", result)
case <-time.After(3 * time.Second):
    fmt.Println("超时了")
}

3.3 非阻塞操作

1
2
3
4
5
6
select {
case msg := <-ch:
    fmt.Println("收到:", msg)
default:
    fmt.Println("没有数据,做其他事")
}

四、sync 包

4.1 Mutex — 互斥锁

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import "sync"

type SafeCounter struct {
    mu sync.Mutex
    m  map[string]int
}

func (c *SafeCounter) Inc(key string) {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.m[key]++
}

func (c *SafeCounter) Get(key string) int {
    c.mu.Lock()
    defer c.mu.Unlock()
    return c.m[key]
}

// 使用
counter := SafeCounter{m: make(map[string]int)}
counter.Inc("hello")
fmt.Println(counter.Get("hello"))   // 1

4.2 RWMutex — 读写锁

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
type Cache struct {
    mu   sync.RWMutex
    data map[string]string
}

// 写操作用写锁(互斥)
func (c *Cache) Set(key, value string) {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.data[key] = value
}

// 读操作用读锁(共享,多个读者可以同时读)
func (c *Cache) Get(key string) string {
    c.mu.RLock()
    defer c.mu.RUnlock()
    return c.data[key]
}

读多写少的场景用 RWMutex 比Mutex 性能更好。

4.3 sync.Once — 只执行一次

1
2
3
4
5
6
7
8
9
var once sync.Once
var instance *Config

func GetConfig() *Config {
    once.Do(func() {
        instance = loadConfig()  // 无论多少 goroutine 调用,只执行一次
    })
    return instance
}

4.4 sync.Map — 并发安全的 Map

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
var m sync.Map

// 写入
m.Store("name", "张三")
m.Store("age", 25)

// 读取
val, ok := m.Load("name")
if ok {
    fmt.Println(val)   // 张三
}

// 遍历
m.Range(func(key, value any) bool {
    fmt.Println(key, value)
    return true   // 返回 false 停止遍历
})

// 删除
m.Delete("age")

sync.Map 适合读多写少的场景。读写都频繁的场景用 map + Mutex 更好。


五、context

context 用于在 goroutine 之间传递取消信号、超时、截止时间和请求级别的值。

5.1 取消(WithCancel)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import "context"

func worker(ctx context.Context, id int) {
    for {
        select {
        case <-ctx.Done():
            fmt.Printf("worker %d 收到取消信号,退出\n", id)
            return
        default:
            fmt.Printf("worker %d 工作中...\n", id)
            time.Sleep(500 * time.Millisecond)
        }
    }
}

func main() {
    ctx, cancel := context.WithCancel(context.Background())

    go worker(ctx, 1)
    go worker(ctx, 2)

    time.Sleep(2 * time.Second)
    cancel()   // 通知所有 worker 退出

    time.Sleep(1 * time.Second)  // 等待 worker 退出
}

5.2 超时(WithTimeout)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
func doWork(ctx context.Context) error {
    // 模拟耗时操作
    select {
    case <-time.After(5 * time.Second):
        return nil
    case <-ctx.Done():
        return ctx.Err()   // context deadline exceeded
    }
}

func main() {
    // 3秒超时
    ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
    defer cancel()

    if err := doWork(ctx); err != nil {
        fmt.Println("操作失败:", err)   // context deadline exceeded
    }
}

5.3 传值(WithValue)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
func main() {
    // 传请求级别的值(如 trace_id)
    ctx := context.WithValue(context.Background(), "traceID", "abc-123")

    handleRequest(ctx)
}

func handleRequest(ctx context.Context) {
    traceID := ctx.Value("traceID")
    fmt.Println("traceID:", traceID)   // abc-123
}

WithValue 只用于请求级别的跨函数传值(trace_id、user_id 等),不要用来传可选参数。

5.4 context 使用规则

1
2
3
4
5
- 不要把 context 放在结构体里,作为函数第一个参数传递
- 传递 context 给子函数,子函数可以随时被取消
- context.WithValue 的 key 建议用自定义类型,避免冲突
- 函数接收 context 时,不要传 nil,用 context.TODO() 代替
- cancel() 要 defer 调用,避免资源泄漏

六、常见并发模式

6.1 Worker Pool(工作池)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func workerPool(jobs <-chan int, results chan<- int, id int) {
    for job := range jobs {
        fmt.Printf("worker %d 处理任务 %d\n", id, job)
        results <- job * 2
    }
}

func main() {
    jobs := make(chan int, 10)
    results := make(chan int, 10)

    // 启动3个 worker
    for w := 1; w <= 3; w++ {
        go workerPool(jobs, results, w)
    }

    // 发送5个任务
    for j := 1; j <= 5; j++ {
        jobs <- j
    }
    close(jobs)

    // 收集结果
    for r := 1; r <= 5; r++ {
        fmt.Println("结果:", <-results)
    }
}

6.2 Pipeline(管道模式)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// 阶段1:生成数据
func generate(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out)
    }()
    return out
}

// 阶段2:平方
func square(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * n
        }
        close(out)
    }()
    return out
}

func main() {
    // 组装管道:生成 → 平方 → 输出
    ch := square(square(generate(2, 3, 4)))
    for v := range ch {
        fmt.Println(v)   // 16, 81, 256
    }
}

6.3 Fan-out / Fan-in(扇出/扇入)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// Fan-out:多个 goroutine 从同一个 channel 读取(自动分发)
ch1 := square(generate(1, 2, 3, 4, 5))
ch2 := square(generate(1, 2, 3, 4, 5))   // 两个 worker 同时处理

// Fan-in:合并多个 channel 到一个
func merge(cs ...<-chan int) <-chan int {
    out := make(chan int)
    var wg sync.WaitGroup

    for _, c := range cs {
        wg.Add(1)
        go func(ch <-chan int) {
            defer wg.Done()
            for v := range ch {
                out <- v
            }
        }(c)
    }

    go func() {
        wg.Wait()
        close(out)
    }()
    return out
}

七、小结

本文学习了 Go 的并发编程:

  • goroutine(轻量级协程,go 关键字启动)
  • channel(goroutine 间通信,无缓冲 vs 带缓冲)
  • select(多路复用、超时控制)
  • sync 包(Mutex、RWMutex、WaitGroup、Once、sync.Map)
  • context(取消、超时、传值)
  • 常见并发模式(Worker Pool、Pipeline、Fan-out/Fan-in)

下一篇将学习标准库与工程实践,包括 HTTP 服务、数据库操作和单元测试。