./read "从 Python 到 Go(六):并发编..."

从 Python 到 Go(六):并发编程:从线程到 Goroutine

从_python_到_go(六):并发编程:从线程到_gor.md2025-10-15
./meta --show-details
Published
2025年10月15日
Reading
27 min
Words
26,865
Status
PUBLISHED

目录

概述

Go 的并发模型是其最大的亮点之一。与 Python 的线程/进程模型不同,Go 使用轻量级的 Goroutine 和 Channel 实现并发,遵循 "不要通过共享内存来通信,而要通过通信来共享内存" 的哲学。

核心差异

特性PythonGo
并发单元Thread / ProcessGoroutine
创建成本高(MB 级内存)低(KB 级内存)
并发数量几百到几千数十万到百万
GIL 限制有(线程受限于 GIL)
通信方式Queue、Lock、ConditionChannel(首选)
调度OS 线程调度Go 运行时调度

线程 vs Goroutine

Python 的线程

# Python - 使用线程
import threading
import time

def worker(id: int, duration: int):
    """工作线程"""
    print(f"Worker {id} 开始工作")
    time.sleep(duration)
    print(f"Worker {id} 完成工作")

# 创建并启动线程
threads = []
for i in range(5):
    t = threading.Thread(target=worker, args=(i, 1))
    threads.append(t)
    t.start()

# 等待所有线程完成
for t in threads:
    t.join()

print("所有工作完成")

# GIL 的问题:CPU 密集型任务
import multiprocessing

def cpu_bound_task(n: int) -> int:
    """CPU 密集型任务"""
    result = 0
    for i in range(n):
        result += i * i
    return result

# 线程版本(受 GIL 限制,不能真正并行)
def with_threads():
    threads = []
    for _ in range(4):
        t = threading.Thread(target=cpu_bound_task, args=(10000000,))
        threads.append(t)
        t.start()

    for t in threads:
        t.join()

# 进程版本(真正并行,但开销大)
def with_processes():
    processes = []
    for _ in range(4):
        p = multiprocessing.Process(target=cpu_bound_task, args=(10000000,))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

Go 的 Goroutine

// Go - 使用 Goroutine
package main

import (
    "fmt"
    "sync"
    "time"
)

func worker(id int, duration time.Duration) {
    fmt.Printf("Worker %d 开始工作\n", id)
    time.Sleep(duration)
    fmt.Printf("Worker %d 完成工作\n", id)
}

func main() {
    // 使用 WaitGroup 等待所有 goroutine 完成
    var wg sync.WaitGroup

    for i := 0; i < 5; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            worker(id, time.Second)
        }(i)
    }

    wg.Wait()
    fmt.Println("所有工作完成")
}

// CPU 密集型任务(无 GIL,真正并行)
func cpuBoundTask(n int) int {
    result := 0
    for i := 0; i < n; i++ {
        result += i * i
    }
    return result
}

func withGoroutines() {
    var wg sync.WaitGroup

    // 创建成千上万个 goroutine 也没问题
    for i := 0; i < 4; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            cpuBoundTask(10000000)
        }()
    }

    wg.Wait()
}

Goroutine 的优势

方面Python ThreadGo Goroutine
初始栈大小~8MB (Linux)~2KB (动态增长)
创建时间~微秒级~纳秒级
上下文切换内核态切换(慢)用户态切换(快)
最大数量几千(受内存限制)数百万(受内存限制)
CPU 密集型GIL 限制,需要多进程无限制,真正并行

通信机制

Python 的 Queue

# Python - 使用 Queue 通信
from queue import Queue
from threading import Thread
import time

def producer(q: Queue, items: int):
    """生产者"""
    for i in range(items):
        print(f"生产: {i}")
        q.put(i)
        time.sleep(0.1)
    q.put(None)  # 发送结束信号

def consumer(q: Queue, id: int):
    """消费者"""
    while True:
        item = q.get()
        if item is None:
            q.put(None)  # 传递结束信号
            break
        print(f"消费者 {id} 处理: {item}")
        time.sleep(0.2)

# 使用
q = Queue()

prod_thread = Thread(target=producer, args=(q, 10))
cons_threads = [Thread(target=consumer, args=(q, i)) for i in range(3)]

prod_thread.start()
for t in cons_threads:
    t.start()

prod_thread.join()
for t in cons_threads:
    t.join()

Go 的 Channel

// Go - 使用 Channel 通信
package main

import (
    "fmt"
    "time"
)

func producer(ch chan<- int, items int) {
    for i := 0; i < items; i++ {
        fmt.Printf("生产: %d\n", i)
        ch <- i  // 发送到 channel
        time.Sleep(100 * time.Millisecond)
    }
    close(ch)  // 关闭 channel 表示结束
}

func consumer(ch <-chan int, id int) {
    for item := range ch {  // 从 channel 接收,关闭时退出
        fmt.Printf("消费者 %d 处理: %d\n", id, item)
        time.Sleep(200 * time.Millisecond)
    }
}

func main() {
    ch := make(chan int, 5)  // 带缓冲的 channel

    go producer(ch, 10)

    // 启动多个消费者
    done := make(chan bool)
    for i := 0; i < 3; i++ {
        go func(id int) {
            consumer(ch, id)
            done <- true
        }(i)
    }

    // 等待所有消费者完成
    for i := 0; i < 3; i++ {
        <-done
    }
}

Channel 类型

// Go - Channel 详解
package main

import "fmt"

func channelBasics() {
    // 1. 无缓冲 channel(同步)
    ch1 := make(chan int)
    go func() {
        ch1 <- 42  // 阻塞直到有接收者
    }()
    value := <-ch1  // 接收值
    fmt.Println(value)

    // 2. 有缓冲 channel(异步)
    ch2 := make(chan int, 3)
    ch2 <- 1
    ch2 <- 2
    ch2 <- 3
    // ch2 <- 4  // 会阻塞,缓冲已满

    fmt.Println(<-ch2)  // 1
    fmt.Println(<-ch2)  // 2

    // 3. 单向 channel
    // 只发送
    var sendOnly chan<- int = make(chan int)
    // 只接收
    var recvOnly <-chan int = make(chan int)

    // 4. 关闭 channel
    ch3 := make(chan int, 2)
    ch3 <- 1
    ch3 <- 2
    close(ch3)

    // 从已关闭的 channel 读取
    v1, ok1 := <-ch3  // 1, true
    v2, ok2 := <-ch3  // 2, true
    v3, ok3 := <-ch3  // 0, false (channel 已关闭)

    fmt.Println(v1, ok1)
    fmt.Println(v2, ok2)
    fmt.Println(v3, ok3)
}

同步原语

Python 的锁

# Python - 使用锁同步
import threading

class Counter:
    def __init__(self):
        self.value = 0
        self.lock = threading.Lock()

    def increment(self):
        with self.lock:
            self.value += 1

    def get_value(self):
        with self.lock:
            return self.value

# 使用
counter = Counter()
threads = []

def worker(counter: Counter, n: int):
    for _ in range(n):
        counter.increment()

for _ in range(10):
    t = threading.Thread(target=worker, args=(counter, 1000))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

print(f"最终值: {counter.get_value()}")  # 10000

Go 的同步原语

// Go - 使用 Mutex 同步
package main

import (
    "fmt"
    "sync"
)

// 方案 1:使用 Mutex
type Counter struct {
    value int
    mu    sync.Mutex
}

func (c *Counter) Increment() {
    c.mu.Lock()
    c.value++
    c.mu.Unlock()
}

func (c *Counter) GetValue() int {
    c.mu.Lock()
    defer c.mu.Unlock()
    return c.value
}

// 方案 2:使用 Channel(更符合 Go 风格)
type ChannelCounter struct {
    ops chan func(*int)
}

func NewChannelCounter() *ChannelCounter {
    c := &ChannelCounter{
        ops: make(chan func(*int)),
    }

    go func() {
        var value int
        for op := range c.ops {
            op(&value)
        }
    }()

    return c
}

func (c *ChannelCounter) Increment() {
    c.ops <- func(v *int) { *v++ }
}

func (c *ChannelCounter) GetValue() int {
    result := make(chan int)
    c.ops <- func(v *int) { result <- *v }
    return <-result
}

func main() {
    // 使用 Mutex
    counter1 := &Counter{}
    var wg sync.WaitGroup

    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for j := 0; j < 1000; j++ {
                counter1.Increment()
            }
        }()
    }

    wg.Wait()
    fmt.Printf("Mutex 计数器: %d\n", counter1.GetValue())

    // 使用 Channel
    counter2 := NewChannelCounter()
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for j := 0; j < 1000; j++ {
                counter2.Increment()
            }
        }()
    }

    wg.Wait()
    fmt.Printf("Channel 计数器: %d\n", counter2.GetValue())
}

其他同步原语

// Go - 其他同步工具
package main

import (
    "fmt"
    "sync"
    "time"
)

// 1. RWMutex - 读写锁
type Cache struct {
    data map[string]string
    mu   sync.RWMutex
}

func (c *Cache) Get(key string) (string, bool) {
    c.mu.RLock()  // 读锁,多个 goroutine 可以同时读
    defer c.mu.RUnlock()
    val, ok := c.data[key]
    return val, ok
}

func (c *Cache) Set(key, value string) {
    c.mu.Lock()  // 写锁,独占访问
    defer c.mu.Unlock()
    c.data[key] = value
}

// 2. Once - 确保只执行一次
var once sync.Once
var instance *Singleton

type Singleton struct {
    data string
}

func GetInstance() *Singleton {
    once.Do(func() {
        fmt.Println("创建单例")
        instance = &Singleton{data: "singleton"}
    })
    return instance
}

// 3. Cond - 条件变量
type Queue struct {
    items []int
    cond  *sync.Cond
}

func NewQueue() *Queue {
    return &Queue{
        items: make([]int, 0),
        cond:  sync.NewCond(&sync.Mutex{}),
    }
}

func (q *Queue) Enqueue(item int) {
    q.cond.L.Lock()
    q.items = append(q.items, item)
    q.cond.Signal()  // 唤醒一个等待的 goroutine
    q.cond.L.Unlock()
}

func (q *Queue) Dequeue() int {
    q.cond.L.Lock()
    defer q.cond.L.Unlock()

    for len(q.items) == 0 {
        q.cond.Wait()  // 等待直到有元素
    }

    item := q.items[0]
    q.items = q.items[1:]
    return item
}

func main() {
    // 测试 RWMutex
    cache := &Cache{data: make(map[string]string)}
    cache.Set("key", "value")
    val, _ := cache.Get("key")
    fmt.Println("Cache:", val)

    // 测试 Once
    for i := 0; i < 3; i++ {
        instance := GetInstance()
        fmt.Println(instance.data)
    }

    // 测试 Cond
    q := NewQueue()
    go func() {
        time.Sleep(time.Second)
        q.Enqueue(42)
    }()
    fmt.Println("Dequeued:", q.Dequeue())
}

并发模式

模式 1:Worker Pool(工作池)

# Python - Worker Pool
from queue import Queue
from threading import Thread
import time

def worker(id: int, tasks: Queue, results: Queue):
    """工作线程"""
    while True:
        task = tasks.get()
        if task is None:
            break
        print(f"Worker {id} 处理任务 {task}")
        time.sleep(0.1)
        results.put(task * 2)
        tasks.task_done()

# 创建队列和工作线程
tasks = Queue()
results = Queue()
num_workers = 3

workers = []
for i in range(num_workers):
    t = Thread(target=worker, args=(i, tasks, results))
    t.start()
    workers.append(t)

# 添加任务
for i in range(10):
    tasks.put(i)

# 等待所有任务完成
tasks.join()

# 停止工作线程
for _ in range(num_workers):
    tasks.put(None)
for w in workers:
    w.join()

# 收集结果
while not results.empty():
    print(results.get())
// Go - Worker Pool
package main

import (
    "fmt"
    "sync"
)

func worker(id int, jobs <-chan int, results chan<- int) {
    for job := range jobs {
        fmt.Printf("Worker %d 处理任务 %d\n", id, job)
        results <- job * 2
    }
}

func main() {
    const numJobs = 10
    const numWorkers = 3

    jobs := make(chan int, numJobs)
    results := make(chan int, numJobs)

    // 启动工作池
    var wg sync.WaitGroup
    for i := 0; i < numWorkers; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            worker(id, jobs, results)
        }(i)
    }

    // 发送任务
    for i := 0; i < numJobs; i++ {
        jobs <- i
    }
    close(jobs)

    // 等待所有 worker 完成
    wg.Wait()
    close(results)

    // 收集结果
    for result := range results {
        fmt.Println("结果:", result)
    }
}

模式 2:Fan-out/Fan-in

// Go - Fan-out/Fan-in 模式
package main

import (
    "fmt"
    "sync"
)

// Fan-out: 将输入分发到多个 goroutine
func generate(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out)
    }()
    return out
}

// 处理函数
func square(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * n
        }
        close(out)
    }()
    return out
}

// Fan-in: 合并多个 channel
func merge(cs ...<-chan int) <-chan int {
    out := make(chan int)
    var wg sync.WaitGroup

    // 为每个输入 channel 启动一个 goroutine
    for _, c := range cs {
        wg.Add(1)
        go func(ch <-chan int) {
            defer wg.Done()
            for n := range ch {
                out <- n
            }
        }(c)
    }

    // 等待所有 goroutine 完成后关闭输出
    go func() {
        wg.Wait()
        close(out)
    }()

    return out
}

func main() {
    // 生成输入
    in := generate(1, 2, 3, 4, 5)

    // Fan-out: 启动多个 worker
    c1 := square(in)
    c2 := square(in)
    c3 := square(in)

    // Fan-in: 合并结果
    for n := range merge(c1, c2, c3) {
        fmt.Println(n)
    }
}

模式 3:Pipeline(管道)

// Go - Pipeline 模式
package main

import "fmt"

// 阶段 1:生成数字
func gen(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out)
    }()
    return out
}

// 阶段 2:平方
func sq(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * n
        }
        close(out)
    }()
    return out
}

// 阶段 3:加倍
func double(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * 2
        }
        close(out)
    }()
    return out
}

func main() {
    // 构建 pipeline
    c := gen(1, 2, 3, 4, 5)
    out := double(sq(c))

    // 消费输出
    for n := range out {
        fmt.Println(n)
    }
}

Select 语句

Go 的 select 语句是处理多个 channel 的强大工具,Python 没有直接对应的概念。

// Go - Select 语句
package main

import (
    "fmt"
    "time"
)

func main() {
    // 1. 基本 select
    c1 := make(chan string)
    c2 := make(chan string)

    go func() {
        time.Sleep(1 * time.Second)
        c1 <- "来自 c1"
    }()

    go func() {
        time.Sleep(2 * time.Second)
        c2 <- "来自 c2"
    }()

    for i := 0; i < 2; i++ {
        select {
        case msg1 := <-c1:
            fmt.Println(msg1)
        case msg2 := <-c2:
            fmt.Println(msg2)
        }
    }

    // 2. 带 default 的 select(非阻塞)
    messages := make(chan string)

    select {
    case msg := <-messages:
        fmt.Println("收到消息:", msg)
    default:
        fmt.Println("没有消息")
    }

    // 3. 超时处理
    c := make(chan string)

    go func() {
        time.Sleep(2 * time.Second)
        c <- "延迟的消息"
    }()

    select {
    case msg := <-c:
        fmt.Println(msg)
    case <-time.After(1 * time.Second):
        fmt.Println("超时")
    }

    // 4. 取消操作
    done := make(chan bool)

    go func() {
        time.Sleep(3 * time.Second)
        done <- true
    }()

    select {
    case <-done:
        fmt.Println("完成")
    case <-time.After(2 * time.Second):
        fmt.Println("任务超时,取消")
    }
}

Python 的近似实现(使用 queue.Queue):

# Python - 模拟 select(不完全等价)
import queue
import threading
import time

def select_like():
    q1 = queue.Queue()
    q2 = queue.Queue()

    def send_to_q1():
        time.sleep(1)
        q1.put("来自 q1")

    def send_to_q2():
        time.sleep(2)
        q2.put("来自 q2")

    threading.Thread(target=send_to_q1).start()
    threading.Thread(target=send_to_q2).start()

    # Python 没有真正的 select,需要轮询
    for _ in range(2):
        while True:
            try:
                msg = q1.get_nowait()
                print(msg)
                break
            except queue.Empty:
                pass

            try:
                msg = q2.get_nowait()
                print(msg)
                break
            except queue.Empty:
                pass

            time.sleep(0.1)  # 避免忙等

select_like()

Context 上下文

Go 的 context 包用于跨 API 边界传递截止时间、取消信号和请求域的值。

// Go - Context 使用
package main

import (
    "context"
    "fmt"
    "time"
)

// 1. WithCancel - 手动取消
func withCancel() {
    ctx, cancel := context.WithCancel(context.Background())

    go func() {
        for {
            select {
            case <-ctx.Done():
                fmt.Println("Goroutine 被取消")
                return
            default:
                fmt.Println("工作中...")
                time.Sleep(500 * time.Millisecond)
            }
        }
    }()

    time.Sleep(2 * time.Second)
    cancel()  // 取消操作
    time.Sleep(1 * time.Second)
}

// 2. WithTimeout - 超时取消
func withTimeout() {
    ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
    defer cancel()

    select {
    case <-time.After(3 * time.Second):
        fmt.Println("任务完成")
    case <-ctx.Done():
        fmt.Println("超时:", ctx.Err())
    }
}

// 3. WithDeadline - 截止时间
func withDeadline() {
    deadline := time.Now().Add(2 * time.Second)
    ctx, cancel := context.WithDeadline(context.Background(), deadline)
    defer cancel()

    select {
    case <-time.After(3 * time.Second):
        fmt.Println("任务完成")
    case <-ctx.Done():
        fmt.Println("截止时间到:", ctx.Err())
    }
}

// 4. WithValue - 传递请求域的值
func withValue() {
    ctx := context.WithValue(context.Background(), "user_id", 123)

    processRequest(ctx)
}

func processRequest(ctx context.Context) {
    userID := ctx.Value("user_id")
    fmt.Printf("处理用户 %v 的请求\n", userID)
}

// 5. 实际应用:HTTP 请求取消
func makeRequest(ctx context.Context) error {
    // 模拟 HTTP 请求
    result := make(chan error)

    go func() {
        time.Sleep(3 * time.Second)
        result <- nil
    }()

    select {
    case <-ctx.Done():
        return ctx.Err()
    case err := <-result:
        return err
    }
}

func main() {
    fmt.Println("=== WithCancel ===")
    withCancel()

    fmt.Println("\n=== WithTimeout ===")
    withTimeout()

    fmt.Println("\n=== WithDeadline ===")
    withDeadline()

    fmt.Println("\n=== WithValue ===")
    withValue()

    // 测试请求取消
    fmt.Println("\n=== Request Cancellation ===")
    ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
    defer cancel()

    if err := makeRequest(ctx); err != nil {
        fmt.Println("请求错误:", err)
    }
}

Python 没有内置的 context 包,但可以用其他方式实现类似功能:

# Python - 模拟 context
import threading
import time

class Context:
    def __init__(self):
        self.cancelled = threading.Event()
        self.values = {}

    def cancel(self):
        self.cancelled.set()

    def is_cancelled(self):
        return self.cancelled.is_set()

    def set_value(self, key, value):
        self.values[key] = value

    def get_value(self, key):
        return self.values.get(key)

def worker(ctx: Context):
    while not ctx.is_cancelled():
        print("工作中...")
        time.sleep(0.5)
    print("Worker 被取消")

ctx = Context()
ctx.set_value("user_id", 123)

t = threading.Thread(target=worker, args=(ctx,))
t.start()

time.sleep(2)
ctx.cancel()
t.join()

实战案例

案例 1:并发下载器

// Go - 并发下载文件
package main

import (
    "fmt"
    "io"
    "net/http"
    "os"
    "sync"
)

type DownloadResult struct {
    URL      string
    Filename string
    Size     int64
    Err      error
}

func downloadFile(url, filename string) (int64, error) {
    resp, err := http.Get(url)
    if err != nil {
        return 0, err
    }
    defer resp.Body.Close()

    file, err := os.Create(filename)
    if err != nil {
        return 0, err
    }
    defer file.Close()

    size, err := io.Copy(file, resp.Body)
    return size, err
}

func downloadConcurrently(urls map[string]string) []DownloadResult {
    results := make([]DownloadResult, 0, len(urls))
    resultsChan := make(chan DownloadResult, len(urls))

    var wg sync.WaitGroup

    for url, filename := range urls {
        wg.Add(1)
        go func(u, f string) {
            defer wg.Done()

            size, err := downloadFile(u, f)
            resultsChan <- DownloadResult{
                URL:      u,
                Filename: f,
                Size:     size,
                Err:      err,
            }
        }(url, filename)
    }

    // 等待所有下载完成
    go func() {
        wg.Wait()
        close(resultsChan)
    }()

    // 收集结果
    for result := range resultsChan {
        results = append(results, result)
        if result.Err != nil {
            fmt.Printf("下载 %s 失败: %v\n", result.URL, result.Err)
        } else {
            fmt.Printf("下载 %s 完成: %d 字节\n", result.URL, result.Size)
        }
    }

    return results
}

func main() {
    urls := map[string]string{
        "https://example.com/file1.zip": "file1.zip",
        "https://example.com/file2.pdf": "file2.pdf",
        "https://example.com/file3.jpg": "file3.jpg",
    }

    downloadConcurrently(urls)
}

案例 2:并发爬虫

// Go - 并发网页爬虫
package main

import (
    "fmt"
    "sync"
)

type Fetcher interface {
    Fetch(url string) (urls []string, err error)
}

type SafeMap struct {
    mu    sync.Mutex
    cache map[string]bool
}

func (s *SafeMap) Visited(url string) bool {
    s.mu.Lock()
    defer s.mu.Unlock()

    if s.cache[url] {
        return true
    }
    s.cache[url] = true
    return false
}

func Crawl(url string, depth int, fetcher Fetcher, visited *SafeMap, wg *sync.WaitGroup) {
    defer wg.Done()

    if depth <= 0 || visited.Visited(url) {
        return
    }

    urls, err := fetcher.Fetch(url)
    if err != nil {
        fmt.Println("错误:", err)
        return
    }

    fmt.Printf("抓取: %s\n", url)

    for _, u := range urls {
        wg.Add(1)
        go Crawl(u, depth-1, fetcher, visited, wg)
    }
}

func main() {
    visited := &SafeMap{cache: make(map[string]bool)}
    var wg sync.WaitGroup

    wg.Add(1)
    go Crawl("https://golang.org/", 4, fetcher, visited, &wg)
    wg.Wait()
}

// 模拟 fetcher
type fakeFetcher map[string]*fakeResult

type fakeResult struct {
    body string
    urls []string
}

func (f fakeFetcher) Fetch(url string) ([]string, error) {
    if res, ok := f[url]; ok {
        return res.urls, nil
    }
    return nil, fmt.Errorf("未找到: %s", url)
}

var fetcher = fakeFetcher{
    "https://golang.org/": &fakeResult{
        "The Go Programming Language",
        []string{
            "https://golang.org/pkg/",
            "https://golang.org/cmd/",
        },
    },
    "https://golang.org/pkg/": &fakeResult{
        "Packages",
        []string{
            "https://golang.org/",
            "https://golang.org/cmd/",
            "https://golang.org/pkg/fmt/",
        },
    },
}

案例 3:限流器(Rate Limiter)

// Go - Token Bucket 限流器
package main

import (
    "fmt"
    "sync"
    "time"
)

type RateLimiter struct {
    rate       int           // 每秒令牌数
    capacity   int           // 桶容量
    tokens     int           // 当前令牌数
    lastUpdate time.Time     // 上次更新时间
    mu         sync.Mutex
}

func NewRateLimiter(rate, capacity int) *RateLimiter {
    return &RateLimiter{
        rate:       rate,
        capacity:   capacity,
        tokens:     capacity,
        lastUpdate: time.Now(),
    }
}

func (rl *RateLimiter) Allow() bool {
    rl.mu.Lock()
    defer rl.mu.Unlock()

    now := time.Now()
    elapsed := now.Sub(rl.lastUpdate)

    // 补充令牌
    tokensToAdd := int(elapsed.Seconds() * float64(rl.rate))
    rl.tokens += tokensToAdd
    if rl.tokens > rl.capacity {
        rl.tokens = rl.capacity
    }

    rl.lastUpdate = now

    // 尝试消费一个令牌
    if rl.tokens > 0 {
        rl.tokens--
        return true
    }

    return false
}

func main() {
    limiter := NewRateLimiter(5, 10)  // 每秒 5 个请求,桶容量 10

    // 模拟请求
    for i := 0; i < 20; i++ {
        if limiter.Allow() {
            fmt.Printf("请求 %d: 允许\n", i)
        } else {
            fmt.Printf("请求 %d: 限流\n", i)
        }
        time.Sleep(100 * time.Millisecond)
    }
}

最佳实践

1. 不要通过共享内存通信

// ❌ 不好:共享内存
var counter int
var mu sync.Mutex

func increment() {
    mu.Lock()
    counter++
    mu.Unlock()
}

// ✅ 好:通过通信共享
func better() {
    counter := make(chan int)

    go func() {
        var c int
        for {
            select {
            case counter <- c:
            case <-increment:
                c++
            }
        }
    }()
}

2. 及时关闭 Channel

// ✅ 生产者关闭 channel
func producer(ch chan<- int) {
    defer close(ch)  // 确保关闭
    for i := 0; i < 10; i++ {
        ch <- i
    }
}

// 消费者使用 range
func consumer(ch <-chan int) {
    for val := range ch {  // channel 关闭时自动退出
        fmt.Println(val)
    }
}

3. 使用 Context 传递取消信号

// ✅ 好:使用 context
func doWork(ctx context.Context) error {
    for {
        select {
        case <-ctx.Done():
            return ctx.Err()
        default:
            // 执行工作
        }
    }
}

4. 避免 Goroutine 泄漏

// ❌ 不好:goroutine 泄漏
func leak() {
    ch := make(chan int)
    go func() {
        val := <-ch  // 如果没有发送者,永远阻塞
        fmt.Println(val)
    }()
}  // goroutine 泄漏

// ✅ 好:使用 context 或超时
func noLeak(ctx context.Context) {
    ch := make(chan int)
    go func() {
        select {
        case val := <-ch:
            fmt.Println(val)
        case <-ctx.Done():
            return
        }
    }()
}

5. 限制并发数量

// ✅ 使用带缓冲的 channel 限制并发
func limited() {
    const maxConcurrent = 10
    sem := make(chan struct{}, maxConcurrent)

    for i := 0; i < 100; i++ {
        sem <- struct{}{}  // 获取令牌
        go func(id int) {
            defer func() { <-sem }()  // 释放令牌
            // 执行工作
            fmt.Println("任务", id)
        }(i)
    }
}

总结

Python 到 Go 的思维转变

Python 思维Go 思维
使用线程/进程使用轻量级 Goroutine
使用 Queue 通信使用 Channel 通信(首选)
使用锁保护共享状态通过 Channel 通信共享状态
受 GIL 限制真正的并行执行
手动管理线程池按需创建 Goroutine

关键要点

  1. Goroutine:轻量级,成本低,可以创建成千上万个
  2. Channel:首选通信方式,类型安全,避免竞态条件
  3. Select:多路复用 Channel,处理超时和取消
  4. Context:传递取消信号和截止时间
  5. 同步原语:Mutex、RWMutex、WaitGroup 等工具
  6. 并发模式:Worker Pool、Fan-out/Fan-in、Pipeline

实用建议

  • 🔧 优先使用 Channel 而不是共享内存和锁
  • 🔧 使用 Context 管理 Goroutine 生命周期
  • 🔧 避免 Goroutine 泄漏,确保所有 Goroutine 能够退出
  • 🔧 限制并发数量,避免资源耗尽
  • 🔧 使用 go vetgo race 检测竞态条件
  • 🔧 关闭不再使用的 Channel,避免消费者阻塞

下一篇包管理:从 pip 到 go mod

上一篇错误处理:从异常到 error

系列目录从 Python 到 Go 完全指南

comments.logDiscussion Thread
./comments --show-all

讨论区

./loading comments...