从_python_到_go(六):并发编程:从线程到_gor.md2025-10-15
./meta --show-details
Published
2025年10月15日
Reading
27 min
Words
26,865
Status
PUBLISHED
目录
概述
Go 的并发模型是其最大的亮点之一。与 Python 的线程/进程模型不同,Go 使用轻量级的 Goroutine 和 Channel 实现并发,遵循 "不要通过共享内存来通信,而要通过通信来共享内存" 的哲学。
核心差异
特性 | Python | Go |
---|---|---|
并发单元 | Thread / Process | Goroutine |
创建成本 | 高(MB 级内存) | 低(KB 级内存) |
并发数量 | 几百到几千 | 数十万到百万 |
GIL 限制 | 有(线程受限于 GIL) | 无 |
通信方式 | Queue、Lock、Condition | Channel(首选) |
调度 | OS 线程调度 | Go 运行时调度 |
线程 vs Goroutine
Python 的线程
# Python - 使用线程
import threading
import time
def worker(id: int, duration: int):
"""工作线程"""
print(f"Worker {id} 开始工作")
time.sleep(duration)
print(f"Worker {id} 完成工作")
# 创建并启动线程
threads = []
for i in range(5):
t = threading.Thread(target=worker, args=(i, 1))
threads.append(t)
t.start()
# 等待所有线程完成
for t in threads:
t.join()
print("所有工作完成")
# GIL 的问题:CPU 密集型任务
import multiprocessing
def cpu_bound_task(n: int) -> int:
"""CPU 密集型任务"""
result = 0
for i in range(n):
result += i * i
return result
# 线程版本(受 GIL 限制,不能真正并行)
def with_threads():
threads = []
for _ in range(4):
t = threading.Thread(target=cpu_bound_task, args=(10000000,))
threads.append(t)
t.start()
for t in threads:
t.join()
# 进程版本(真正并行,但开销大)
def with_processes():
processes = []
for _ in range(4):
p = multiprocessing.Process(target=cpu_bound_task, args=(10000000,))
processes.append(p)
p.start()
for p in processes:
p.join()
Go 的 Goroutine
// Go - 使用 Goroutine
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, duration time.Duration) {
fmt.Printf("Worker %d 开始工作\n", id)
time.Sleep(duration)
fmt.Printf("Worker %d 完成工作\n", id)
}
func main() {
// 使用 WaitGroup 等待所有 goroutine 完成
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
worker(id, time.Second)
}(i)
}
wg.Wait()
fmt.Println("所有工作完成")
}
// CPU 密集型任务(无 GIL,真正并行)
func cpuBoundTask(n int) int {
result := 0
for i := 0; i < n; i++ {
result += i * i
}
return result
}
func withGoroutines() {
var wg sync.WaitGroup
// 创建成千上万个 goroutine 也没问题
for i := 0; i < 4; i++ {
wg.Add(1)
go func() {
defer wg.Done()
cpuBoundTask(10000000)
}()
}
wg.Wait()
}
Goroutine 的优势
方面 | Python Thread | Go Goroutine |
---|---|---|
初始栈大小 | ~8MB (Linux) | ~2KB (动态增长) |
创建时间 | ~微秒级 | ~纳秒级 |
上下文切换 | 内核态切换(慢) | 用户态切换(快) |
最大数量 | 几千(受内存限制) | 数百万(受内存限制) |
CPU 密集型 | GIL 限制,需要多进程 | 无限制,真正并行 |
通信机制
Python 的 Queue
# Python - 使用 Queue 通信
from queue import Queue
from threading import Thread
import time
def producer(q: Queue, items: int):
"""生产者"""
for i in range(items):
print(f"生产: {i}")
q.put(i)
time.sleep(0.1)
q.put(None) # 发送结束信号
def consumer(q: Queue, id: int):
"""消费者"""
while True:
item = q.get()
if item is None:
q.put(None) # 传递结束信号
break
print(f"消费者 {id} 处理: {item}")
time.sleep(0.2)
# 使用
q = Queue()
prod_thread = Thread(target=producer, args=(q, 10))
cons_threads = [Thread(target=consumer, args=(q, i)) for i in range(3)]
prod_thread.start()
for t in cons_threads:
t.start()
prod_thread.join()
for t in cons_threads:
t.join()
Go 的 Channel
// Go - 使用 Channel 通信
package main
import (
"fmt"
"time"
)
func producer(ch chan<- int, items int) {
for i := 0; i < items; i++ {
fmt.Printf("生产: %d\n", i)
ch <- i // 发送到 channel
time.Sleep(100 * time.Millisecond)
}
close(ch) // 关闭 channel 表示结束
}
func consumer(ch <-chan int, id int) {
for item := range ch { // 从 channel 接收,关闭时退出
fmt.Printf("消费者 %d 处理: %d\n", id, item)
time.Sleep(200 * time.Millisecond)
}
}
func main() {
ch := make(chan int, 5) // 带缓冲的 channel
go producer(ch, 10)
// 启动多个消费者
done := make(chan bool)
for i := 0; i < 3; i++ {
go func(id int) {
consumer(ch, id)
done <- true
}(i)
}
// 等待所有消费者完成
for i := 0; i < 3; i++ {
<-done
}
}
Channel 类型
// Go - Channel 详解
package main
import "fmt"
func channelBasics() {
// 1. 无缓冲 channel(同步)
ch1 := make(chan int)
go func() {
ch1 <- 42 // 阻塞直到有接收者
}()
value := <-ch1 // 接收值
fmt.Println(value)
// 2. 有缓冲 channel(异步)
ch2 := make(chan int, 3)
ch2 <- 1
ch2 <- 2
ch2 <- 3
// ch2 <- 4 // 会阻塞,缓冲已满
fmt.Println(<-ch2) // 1
fmt.Println(<-ch2) // 2
// 3. 单向 channel
// 只发送
var sendOnly chan<- int = make(chan int)
// 只接收
var recvOnly <-chan int = make(chan int)
// 4. 关闭 channel
ch3 := make(chan int, 2)
ch3 <- 1
ch3 <- 2
close(ch3)
// 从已关闭的 channel 读取
v1, ok1 := <-ch3 // 1, true
v2, ok2 := <-ch3 // 2, true
v3, ok3 := <-ch3 // 0, false (channel 已关闭)
fmt.Println(v1, ok1)
fmt.Println(v2, ok2)
fmt.Println(v3, ok3)
}
同步原语
Python 的锁
# Python - 使用锁同步
import threading
class Counter:
def __init__(self):
self.value = 0
self.lock = threading.Lock()
def increment(self):
with self.lock:
self.value += 1
def get_value(self):
with self.lock:
return self.value
# 使用
counter = Counter()
threads = []
def worker(counter: Counter, n: int):
for _ in range(n):
counter.increment()
for _ in range(10):
t = threading.Thread(target=worker, args=(counter, 1000))
threads.append(t)
t.start()
for t in threads:
t.join()
print(f"最终值: {counter.get_value()}") # 10000
Go 的同步原语
// Go - 使用 Mutex 同步
package main
import (
"fmt"
"sync"
)
// 方案 1:使用 Mutex
type Counter struct {
value int
mu sync.Mutex
}
func (c *Counter) Increment() {
c.mu.Lock()
c.value++
c.mu.Unlock()
}
func (c *Counter) GetValue() int {
c.mu.Lock()
defer c.mu.Unlock()
return c.value
}
// 方案 2:使用 Channel(更符合 Go 风格)
type ChannelCounter struct {
ops chan func(*int)
}
func NewChannelCounter() *ChannelCounter {
c := &ChannelCounter{
ops: make(chan func(*int)),
}
go func() {
var value int
for op := range c.ops {
op(&value)
}
}()
return c
}
func (c *ChannelCounter) Increment() {
c.ops <- func(v *int) { *v++ }
}
func (c *ChannelCounter) GetValue() int {
result := make(chan int)
c.ops <- func(v *int) { result <- *v }
return <-result
}
func main() {
// 使用 Mutex
counter1 := &Counter{}
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < 1000; j++ {
counter1.Increment()
}
}()
}
wg.Wait()
fmt.Printf("Mutex 计数器: %d\n", counter1.GetValue())
// 使用 Channel
counter2 := NewChannelCounter()
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < 1000; j++ {
counter2.Increment()
}
}()
}
wg.Wait()
fmt.Printf("Channel 计数器: %d\n", counter2.GetValue())
}
其他同步原语
// Go - 其他同步工具
package main
import (
"fmt"
"sync"
"time"
)
// 1. RWMutex - 读写锁
type Cache struct {
data map[string]string
mu sync.RWMutex
}
func (c *Cache) Get(key string) (string, bool) {
c.mu.RLock() // 读锁,多个 goroutine 可以同时读
defer c.mu.RUnlock()
val, ok := c.data[key]
return val, ok
}
func (c *Cache) Set(key, value string) {
c.mu.Lock() // 写锁,独占访问
defer c.mu.Unlock()
c.data[key] = value
}
// 2. Once - 确保只执行一次
var once sync.Once
var instance *Singleton
type Singleton struct {
data string
}
func GetInstance() *Singleton {
once.Do(func() {
fmt.Println("创建单例")
instance = &Singleton{data: "singleton"}
})
return instance
}
// 3. Cond - 条件变量
type Queue struct {
items []int
cond *sync.Cond
}
func NewQueue() *Queue {
return &Queue{
items: make([]int, 0),
cond: sync.NewCond(&sync.Mutex{}),
}
}
func (q *Queue) Enqueue(item int) {
q.cond.L.Lock()
q.items = append(q.items, item)
q.cond.Signal() // 唤醒一个等待的 goroutine
q.cond.L.Unlock()
}
func (q *Queue) Dequeue() int {
q.cond.L.Lock()
defer q.cond.L.Unlock()
for len(q.items) == 0 {
q.cond.Wait() // 等待直到有元素
}
item := q.items[0]
q.items = q.items[1:]
return item
}
func main() {
// 测试 RWMutex
cache := &Cache{data: make(map[string]string)}
cache.Set("key", "value")
val, _ := cache.Get("key")
fmt.Println("Cache:", val)
// 测试 Once
for i := 0; i < 3; i++ {
instance := GetInstance()
fmt.Println(instance.data)
}
// 测试 Cond
q := NewQueue()
go func() {
time.Sleep(time.Second)
q.Enqueue(42)
}()
fmt.Println("Dequeued:", q.Dequeue())
}
并发模式
模式 1:Worker Pool(工作池)
# Python - Worker Pool
from queue import Queue
from threading import Thread
import time
def worker(id: int, tasks: Queue, results: Queue):
"""工作线程"""
while True:
task = tasks.get()
if task is None:
break
print(f"Worker {id} 处理任务 {task}")
time.sleep(0.1)
results.put(task * 2)
tasks.task_done()
# 创建队列和工作线程
tasks = Queue()
results = Queue()
num_workers = 3
workers = []
for i in range(num_workers):
t = Thread(target=worker, args=(i, tasks, results))
t.start()
workers.append(t)
# 添加任务
for i in range(10):
tasks.put(i)
# 等待所有任务完成
tasks.join()
# 停止工作线程
for _ in range(num_workers):
tasks.put(None)
for w in workers:
w.join()
# 收集结果
while not results.empty():
print(results.get())
// Go - Worker Pool
package main
import (
"fmt"
"sync"
)
func worker(id int, jobs <-chan int, results chan<- int) {
for job := range jobs {
fmt.Printf("Worker %d 处理任务 %d\n", id, job)
results <- job * 2
}
}
func main() {
const numJobs = 10
const numWorkers = 3
jobs := make(chan int, numJobs)
results := make(chan int, numJobs)
// 启动工作池
var wg sync.WaitGroup
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
worker(id, jobs, results)
}(i)
}
// 发送任务
for i := 0; i < numJobs; i++ {
jobs <- i
}
close(jobs)
// 等待所有 worker 完成
wg.Wait()
close(results)
// 收集结果
for result := range results {
fmt.Println("结果:", result)
}
}
模式 2:Fan-out/Fan-in
// Go - Fan-out/Fan-in 模式
package main
import (
"fmt"
"sync"
)
// Fan-out: 将输入分发到多个 goroutine
func generate(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
// 处理函数
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
// Fan-in: 合并多个 channel
func merge(cs ...<-chan int) <-chan int {
out := make(chan int)
var wg sync.WaitGroup
// 为每个输入 channel 启动一个 goroutine
for _, c := range cs {
wg.Add(1)
go func(ch <-chan int) {
defer wg.Done()
for n := range ch {
out <- n
}
}(c)
}
// 等待所有 goroutine 完成后关闭输出
go func() {
wg.Wait()
close(out)
}()
return out
}
func main() {
// 生成输入
in := generate(1, 2, 3, 4, 5)
// Fan-out: 启动多个 worker
c1 := square(in)
c2 := square(in)
c3 := square(in)
// Fan-in: 合并结果
for n := range merge(c1, c2, c3) {
fmt.Println(n)
}
}
模式 3:Pipeline(管道)
// Go - Pipeline 模式
package main
import "fmt"
// 阶段 1:生成数字
func gen(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
// 阶段 2:平方
func sq(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
// 阶段 3:加倍
func double(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * 2
}
close(out)
}()
return out
}
func main() {
// 构建 pipeline
c := gen(1, 2, 3, 4, 5)
out := double(sq(c))
// 消费输出
for n := range out {
fmt.Println(n)
}
}
Select 语句
Go 的 select
语句是处理多个 channel 的强大工具,Python 没有直接对应的概念。
// Go - Select 语句
package main
import (
"fmt"
"time"
)
func main() {
// 1. 基本 select
c1 := make(chan string)
c2 := make(chan string)
go func() {
time.Sleep(1 * time.Second)
c1 <- "来自 c1"
}()
go func() {
time.Sleep(2 * time.Second)
c2 <- "来自 c2"
}()
for i := 0; i < 2; i++ {
select {
case msg1 := <-c1:
fmt.Println(msg1)
case msg2 := <-c2:
fmt.Println(msg2)
}
}
// 2. 带 default 的 select(非阻塞)
messages := make(chan string)
select {
case msg := <-messages:
fmt.Println("收到消息:", msg)
default:
fmt.Println("没有消息")
}
// 3. 超时处理
c := make(chan string)
go func() {
time.Sleep(2 * time.Second)
c <- "延迟的消息"
}()
select {
case msg := <-c:
fmt.Println(msg)
case <-time.After(1 * time.Second):
fmt.Println("超时")
}
// 4. 取消操作
done := make(chan bool)
go func() {
time.Sleep(3 * time.Second)
done <- true
}()
select {
case <-done:
fmt.Println("完成")
case <-time.After(2 * time.Second):
fmt.Println("任务超时,取消")
}
}
Python 的近似实现(使用 queue.Queue):
# Python - 模拟 select(不完全等价)
import queue
import threading
import time
def select_like():
q1 = queue.Queue()
q2 = queue.Queue()
def send_to_q1():
time.sleep(1)
q1.put("来自 q1")
def send_to_q2():
time.sleep(2)
q2.put("来自 q2")
threading.Thread(target=send_to_q1).start()
threading.Thread(target=send_to_q2).start()
# Python 没有真正的 select,需要轮询
for _ in range(2):
while True:
try:
msg = q1.get_nowait()
print(msg)
break
except queue.Empty:
pass
try:
msg = q2.get_nowait()
print(msg)
break
except queue.Empty:
pass
time.sleep(0.1) # 避免忙等
select_like()
Context 上下文
Go 的 context 包用于跨 API 边界传递截止时间、取消信号和请求域的值。
// Go - Context 使用
package main
import (
"context"
"fmt"
"time"
)
// 1. WithCancel - 手动取消
func withCancel() {
ctx, cancel := context.WithCancel(context.Background())
go func() {
for {
select {
case <-ctx.Done():
fmt.Println("Goroutine 被取消")
return
default:
fmt.Println("工作中...")
time.Sleep(500 * time.Millisecond)
}
}
}()
time.Sleep(2 * time.Second)
cancel() // 取消操作
time.Sleep(1 * time.Second)
}
// 2. WithTimeout - 超时取消
func withTimeout() {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
select {
case <-time.After(3 * time.Second):
fmt.Println("任务完成")
case <-ctx.Done():
fmt.Println("超时:", ctx.Err())
}
}
// 3. WithDeadline - 截止时间
func withDeadline() {
deadline := time.Now().Add(2 * time.Second)
ctx, cancel := context.WithDeadline(context.Background(), deadline)
defer cancel()
select {
case <-time.After(3 * time.Second):
fmt.Println("任务完成")
case <-ctx.Done():
fmt.Println("截止时间到:", ctx.Err())
}
}
// 4. WithValue - 传递请求域的值
func withValue() {
ctx := context.WithValue(context.Background(), "user_id", 123)
processRequest(ctx)
}
func processRequest(ctx context.Context) {
userID := ctx.Value("user_id")
fmt.Printf("处理用户 %v 的请求\n", userID)
}
// 5. 实际应用:HTTP 请求取消
func makeRequest(ctx context.Context) error {
// 模拟 HTTP 请求
result := make(chan error)
go func() {
time.Sleep(3 * time.Second)
result <- nil
}()
select {
case <-ctx.Done():
return ctx.Err()
case err := <-result:
return err
}
}
func main() {
fmt.Println("=== WithCancel ===")
withCancel()
fmt.Println("\n=== WithTimeout ===")
withTimeout()
fmt.Println("\n=== WithDeadline ===")
withDeadline()
fmt.Println("\n=== WithValue ===")
withValue()
// 测试请求取消
fmt.Println("\n=== Request Cancellation ===")
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
if err := makeRequest(ctx); err != nil {
fmt.Println("请求错误:", err)
}
}
Python 没有内置的 context 包,但可以用其他方式实现类似功能:
# Python - 模拟 context
import threading
import time
class Context:
def __init__(self):
self.cancelled = threading.Event()
self.values = {}
def cancel(self):
self.cancelled.set()
def is_cancelled(self):
return self.cancelled.is_set()
def set_value(self, key, value):
self.values[key] = value
def get_value(self, key):
return self.values.get(key)
def worker(ctx: Context):
while not ctx.is_cancelled():
print("工作中...")
time.sleep(0.5)
print("Worker 被取消")
ctx = Context()
ctx.set_value("user_id", 123)
t = threading.Thread(target=worker, args=(ctx,))
t.start()
time.sleep(2)
ctx.cancel()
t.join()
实战案例
案例 1:并发下载器
// Go - 并发下载文件
package main
import (
"fmt"
"io"
"net/http"
"os"
"sync"
)
type DownloadResult struct {
URL string
Filename string
Size int64
Err error
}
func downloadFile(url, filename string) (int64, error) {
resp, err := http.Get(url)
if err != nil {
return 0, err
}
defer resp.Body.Close()
file, err := os.Create(filename)
if err != nil {
return 0, err
}
defer file.Close()
size, err := io.Copy(file, resp.Body)
return size, err
}
func downloadConcurrently(urls map[string]string) []DownloadResult {
results := make([]DownloadResult, 0, len(urls))
resultsChan := make(chan DownloadResult, len(urls))
var wg sync.WaitGroup
for url, filename := range urls {
wg.Add(1)
go func(u, f string) {
defer wg.Done()
size, err := downloadFile(u, f)
resultsChan <- DownloadResult{
URL: u,
Filename: f,
Size: size,
Err: err,
}
}(url, filename)
}
// 等待所有下载完成
go func() {
wg.Wait()
close(resultsChan)
}()
// 收集结果
for result := range resultsChan {
results = append(results, result)
if result.Err != nil {
fmt.Printf("下载 %s 失败: %v\n", result.URL, result.Err)
} else {
fmt.Printf("下载 %s 完成: %d 字节\n", result.URL, result.Size)
}
}
return results
}
func main() {
urls := map[string]string{
"https://example.com/file1.zip": "file1.zip",
"https://example.com/file2.pdf": "file2.pdf",
"https://example.com/file3.jpg": "file3.jpg",
}
downloadConcurrently(urls)
}
案例 2:并发爬虫
// Go - 并发网页爬虫
package main
import (
"fmt"
"sync"
)
type Fetcher interface {
Fetch(url string) (urls []string, err error)
}
type SafeMap struct {
mu sync.Mutex
cache map[string]bool
}
func (s *SafeMap) Visited(url string) bool {
s.mu.Lock()
defer s.mu.Unlock()
if s.cache[url] {
return true
}
s.cache[url] = true
return false
}
func Crawl(url string, depth int, fetcher Fetcher, visited *SafeMap, wg *sync.WaitGroup) {
defer wg.Done()
if depth <= 0 || visited.Visited(url) {
return
}
urls, err := fetcher.Fetch(url)
if err != nil {
fmt.Println("错误:", err)
return
}
fmt.Printf("抓取: %s\n", url)
for _, u := range urls {
wg.Add(1)
go Crawl(u, depth-1, fetcher, visited, wg)
}
}
func main() {
visited := &SafeMap{cache: make(map[string]bool)}
var wg sync.WaitGroup
wg.Add(1)
go Crawl("https://golang.org/", 4, fetcher, visited, &wg)
wg.Wait()
}
// 模拟 fetcher
type fakeFetcher map[string]*fakeResult
type fakeResult struct {
body string
urls []string
}
func (f fakeFetcher) Fetch(url string) ([]string, error) {
if res, ok := f[url]; ok {
return res.urls, nil
}
return nil, fmt.Errorf("未找到: %s", url)
}
var fetcher = fakeFetcher{
"https://golang.org/": &fakeResult{
"The Go Programming Language",
[]string{
"https://golang.org/pkg/",
"https://golang.org/cmd/",
},
},
"https://golang.org/pkg/": &fakeResult{
"Packages",
[]string{
"https://golang.org/",
"https://golang.org/cmd/",
"https://golang.org/pkg/fmt/",
},
},
}
案例 3:限流器(Rate Limiter)
// Go - Token Bucket 限流器
package main
import (
"fmt"
"sync"
"time"
)
type RateLimiter struct {
rate int // 每秒令牌数
capacity int // 桶容量
tokens int // 当前令牌数
lastUpdate time.Time // 上次更新时间
mu sync.Mutex
}
func NewRateLimiter(rate, capacity int) *RateLimiter {
return &RateLimiter{
rate: rate,
capacity: capacity,
tokens: capacity,
lastUpdate: time.Now(),
}
}
func (rl *RateLimiter) Allow() bool {
rl.mu.Lock()
defer rl.mu.Unlock()
now := time.Now()
elapsed := now.Sub(rl.lastUpdate)
// 补充令牌
tokensToAdd := int(elapsed.Seconds() * float64(rl.rate))
rl.tokens += tokensToAdd
if rl.tokens > rl.capacity {
rl.tokens = rl.capacity
}
rl.lastUpdate = now
// 尝试消费一个令牌
if rl.tokens > 0 {
rl.tokens--
return true
}
return false
}
func main() {
limiter := NewRateLimiter(5, 10) // 每秒 5 个请求,桶容量 10
// 模拟请求
for i := 0; i < 20; i++ {
if limiter.Allow() {
fmt.Printf("请求 %d: 允许\n", i)
} else {
fmt.Printf("请求 %d: 限流\n", i)
}
time.Sleep(100 * time.Millisecond)
}
}
最佳实践
1. 不要通过共享内存通信
// ❌ 不好:共享内存
var counter int
var mu sync.Mutex
func increment() {
mu.Lock()
counter++
mu.Unlock()
}
// ✅ 好:通过通信共享
func better() {
counter := make(chan int)
go func() {
var c int
for {
select {
case counter <- c:
case <-increment:
c++
}
}
}()
}
2. 及时关闭 Channel
// ✅ 生产者关闭 channel
func producer(ch chan<- int) {
defer close(ch) // 确保关闭
for i := 0; i < 10; i++ {
ch <- i
}
}
// 消费者使用 range
func consumer(ch <-chan int) {
for val := range ch { // channel 关闭时自动退出
fmt.Println(val)
}
}
3. 使用 Context 传递取消信号
// ✅ 好:使用 context
func doWork(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
// 执行工作
}
}
}
4. 避免 Goroutine 泄漏
// ❌ 不好:goroutine 泄漏
func leak() {
ch := make(chan int)
go func() {
val := <-ch // 如果没有发送者,永远阻塞
fmt.Println(val)
}()
} // goroutine 泄漏
// ✅ 好:使用 context 或超时
func noLeak(ctx context.Context) {
ch := make(chan int)
go func() {
select {
case val := <-ch:
fmt.Println(val)
case <-ctx.Done():
return
}
}()
}
5. 限制并发数量
// ✅ 使用带缓冲的 channel 限制并发
func limited() {
const maxConcurrent = 10
sem := make(chan struct{}, maxConcurrent)
for i := 0; i < 100; i++ {
sem <- struct{}{} // 获取令牌
go func(id int) {
defer func() { <-sem }() // 释放令牌
// 执行工作
fmt.Println("任务", id)
}(i)
}
}
总结
Python 到 Go 的思维转变
Python 思维 | Go 思维 |
---|---|
使用线程/进程 | 使用轻量级 Goroutine |
使用 Queue 通信 | 使用 Channel 通信(首选) |
使用锁保护共享状态 | 通过 Channel 通信共享状态 |
受 GIL 限制 | 真正的并行执行 |
手动管理线程池 | 按需创建 Goroutine |
关键要点
- Goroutine:轻量级,成本低,可以创建成千上万个
- Channel:首选通信方式,类型安全,避免竞态条件
- Select:多路复用 Channel,处理超时和取消
- Context:传递取消信号和截止时间
- 同步原语:Mutex、RWMutex、WaitGroup 等工具
- 并发模式:Worker Pool、Fan-out/Fan-in、Pipeline
实用建议
- 🔧 优先使用 Channel 而不是共享内存和锁
- 🔧 使用 Context 管理 Goroutine 生命周期
- 🔧 避免 Goroutine 泄漏,确保所有 Goroutine 能够退出
- 🔧 限制并发数量,避免资源耗尽
- 🔧 使用
go vet
和go race
检测竞态条件 - 🔧 关闭不再使用的 Channel,避免消费者阻塞
上一篇:错误处理:从异常到 error
系列目录:从 Python 到 Go 完全指南