./read "从 Python 到 Go(八):Web..."

从 Python 到 Go(八):Web 开发:从 Flask/FastAPI 到 Gin

从_python_到_go(八):web_开发:从_flas.md2025-10-15
./meta --show-details
Published
2025年10月15日
Reading
33 min
Words
32,115
Status
PUBLISHED

目录

概述

Python 的 Web 生态系统非常丰富,Flask 简洁灵活,FastAPI 现代高效。Go 的 Gin 框架结合了两者的优点:简洁的 API、高性能、类型安全。

核心差异

特性FlaskFastAPIGin
性能中等高(异步)极高
类型系统动态类型提示(Pydantic)静态类型
异步支持有限原生异步通过 Goroutine
API 文档手动(Flask-RESTX)自动生成(OpenAPI)需要集成(Swag)
生态系统极其丰富快速增长丰富
学习曲线平缓中等中等

框架对比

Flask - 简洁灵活

# Flask - 最小示例
from flask import Flask, jsonify, request

app = Flask(__name__)

@app.route('/')
def hello():
    return 'Hello World!'

@app.route('/api/users/<int:user_id>')
def get_user(user_id):
    return jsonify({
        'id': user_id,
        'name': 'John Doe'
    })

@app.route('/api/users', methods=['POST'])
def create_user():
    data = request.get_json()
    return jsonify({
        'id': 1,
        'name': data['name']
    }), 201

if __name__ == '__main__':
    app.run(debug=True, port=5000)

FastAPI - 现代异步

# FastAPI - 类型安全 + 自动文档
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel

app = FastAPI()

class User(BaseModel):
    name: str
    email: str
    age: int

@app.get('/')
async def hello():
    return {'message': 'Hello World!'}

@app.get('/api/users/{user_id}')
async def get_user(user_id: int):
    return {
        'id': user_id,
        'name': 'John Doe'
    }

@app.post('/api/users', status_code=201)
async def create_user(user: User):
    return {
        'id': 1,
        'name': user.name,
        'email': user.email
    }

# 自动生成文档:http://localhost:8000/docs
if __name__ == '__main__':
    import uvicorn
    uvicorn.run(app, host='0.0.0.0', port=8000)

Gin - 高性能

// Gin - 高性能 + 类型安全
package main

import (
    "net/http"
    "github.com/gin-gonic/gin"
)

type User struct {
    ID    int    `json:"id"`
    Name  string `json:"name" binding:"required"`
    Email string `json:"email" binding:"required,email"`
    Age   int    `json:"age" binding:"required,gte=0"`
}

func main() {
    r := gin.Default()

    r.GET("/", func(c *gin.Context) {
        c.JSON(http.StatusOK, gin.H{
            "message": "Hello World!",
        })
    })

    r.GET("/api/users/:user_id", func(c *gin.Context) {
        userID := c.Param("user_id")
        c.JSON(http.StatusOK, gin.H{
            "id":   userID,
            "name": "John Doe",
        })
    })

    r.POST("/api/users", func(c *gin.Context) {
        var user User
        if err := c.ShouldBindJSON(&user); err != nil {
            c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
            return
        }

        user.ID = 1
        c.JSON(http.StatusCreated, user)
    })

    r.Run(":8080")
}

路由与处理器

Python 路由

# Flask 路由
from flask import Flask, request

app = Flask(__name__)

# 基本路由
@app.route('/')
def index():
    return 'Index Page'

# 路径参数
@app.route('/users/<int:user_id>')
@app.route('/users/<string:username>')
def show_user(user_id):
    return f'User {user_id}'

# 多个 HTTP 方法
@app.route('/api/resource', methods=['GET', 'POST', 'PUT', 'DELETE'])
def resource():
    if request.method == 'GET':
        return 'Get Resource'
    elif request.method == 'POST':
        return 'Create Resource'
    # ...

# 路由组(使用 Blueprint)
from flask import Blueprint

api = Blueprint('api', __name__, url_prefix='/api/v1')

@api.route('/users')
def users():
    return 'API Users'

app.register_blueprint(api)

# FastAPI 路由
from fastapi import FastAPI, APIRouter

app = FastAPI()

# 路径参数
@app.get('/users/{user_id}')
async def get_user(user_id: int):
    return {'user_id': user_id}

# 查询参数
@app.get('/items/')
async def read_items(skip: int = 0, limit: int = 10):
    return {'skip': skip, 'limit': limit}

# 路由组
router = APIRouter(prefix='/api/v1', tags=['users'])

@router.get('/users')
async def list_users():
    return []

app.include_router(router)

Go 路由(Gin)

package main

import (
    "github.com/gin-gonic/gin"
    "net/http"
)

func main() {
    r := gin.Default()

    // 基本路由
    r.GET("/", func(c *gin.Context) {
        c.String(http.StatusOK, "Index Page")
    })

    // 路径参数
    r.GET("/users/:id", func(c *gin.Context) {
        id := c.Param("id")
        c.String(http.StatusOK, "User %s", id)
    })

    // 通配符
    r.GET("/files/*filepath", func(c *gin.Context) {
        path := c.Param("filepath")
        c.String(http.StatusOK, "File: %s", path)
    })

    // 查询参数
    r.GET("/search", func(c *gin.Context) {
        query := c.DefaultQuery("q", "default")
        page := c.DefaultQuery("page", "1")
        c.JSON(http.StatusOK, gin.H{
            "query": query,
            "page":  page,
        })
    })

    // 路由组
    v1 := r.Group("/api/v1")
    {
        v1.GET("/users", listUsers)
        v1.GET("/users/:id", getUser)
        v1.POST("/users", createUser)
        v1.PUT("/users/:id", updateUser)
        v1.DELETE("/users/:id", deleteUser)
    }

    // 嵌套路由组
    v2 := r.Group("/api/v2")
    {
        users := v2.Group("/users")
        {
            users.GET("", listUsers)
            users.POST("", createUser)
        }

        posts := v2.Group("/posts")
        {
            posts.GET("", listPosts)
            posts.POST("", createPost)
        }
    }

    r.Run(":8080")
}

func listUsers(c *gin.Context) {
    c.JSON(http.StatusOK, []string{})
}

func getUser(c *gin.Context) {
    id := c.Param("id")
    c.JSON(http.StatusOK, gin.H{"id": id})
}

func createUser(c *gin.Context) {
    c.JSON(http.StatusCreated, gin.H{})
}

func updateUser(c *gin.Context) {
    c.JSON(http.StatusOK, gin.H{})
}

func deleteUser(c *gin.Context) {
    c.Status(http.StatusNoContent)
}

func listPosts(c *gin.Context) {
    c.JSON(http.StatusOK, []string{})
}

func createPost(c *gin.Context) {
    c.JSON(http.StatusCreated, gin.H{})
}

请求与响应

Python 请求处理

# Flask
from flask import Flask, request, jsonify

app = Flask(__name__)

@app.route('/api/data', methods=['POST'])
def handle_data():
    # JSON 数据
    data = request.get_json()

    # 表单数据
    name = request.form.get('name')

    # 查询参数
    page = request.args.get('page', 1, type=int)

    # 请求头
    auth = request.headers.get('Authorization')

    # 文件上传
    file = request.files['file']
    file.save(f'/uploads/{file.filename}')

    # Cookie
    session_id = request.cookies.get('session_id')

    return jsonify({
        'status': 'success',
        'data': data
    }), 200, {'X-Custom-Header': 'value'}

# FastAPI(更优雅)
from fastapi import FastAPI, File, UploadFile, Header, Cookie
from pydantic import BaseModel

app = FastAPI()

class DataModel(BaseModel):
    name: str
    age: int

@app.post('/api/data')
async def handle_data(
    data: DataModel,
    authorization: str = Header(None),
    session_id: str = Cookie(None),
    file: UploadFile = File(None)
):
    if file:
        contents = await file.read()
        # 处理文件

    return {
        'status': 'success',
        'data': data.dict()
    }

Go 请求处理(Gin)

package main

import (
    "github.com/gin-gonic/gin"
    "net/http"
)

type DataModel struct {
    Name string `json:"name" binding:"required"`
    Age  int    `json:"age" binding:"required,gte=0"`
}

func main() {
    r := gin.Default()

    r.POST("/api/data", func(c *gin.Context) {
        // JSON 数据(自动验证)
        var data DataModel
        if err := c.ShouldBindJSON(&data); err != nil {
            c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
            return
        }

        // 表单数据
        name := c.PostForm("name")

        // 查询参数
        page := c.DefaultQuery("page", "1")

        // 请求头
        auth := c.GetHeader("Authorization")

        // 文件上传
        file, err := c.FormFile("file")
        if err == nil {
            c.SaveUploadedFile(file, "/uploads/"+file.Filename)
        }

        // Cookie
        sessionID, _ := c.Cookie("session_id")

        // 设置响应
        c.Header("X-Custom-Header", "value")
        c.JSON(http.StatusOK, gin.H{
            "status": "success",
            "data":   data,
            "page":   page,
            "auth":   auth,
            "session": sessionID,
        })
    })

    r.Run(":8080")
}

响应类型

// Go - 各种响应类型
package main

import (
    "github.com/gin-gonic/gin"
    "net/http"
)

func main() {
    r := gin.Default()

    // JSON 响应
    r.GET("/json", func(c *gin.Context) {
        c.JSON(http.StatusOK, gin.H{
            "message": "Hello",
            "status":  200,
        })
    })

    // 结构体 JSON
    type User struct {
        Name  string `json:"name"`
        Email string `json:"email"`
    }

    r.GET("/user", func(c *gin.Context) {
        user := User{Name: "John", Email: "[email protected]"}
        c.JSON(http.StatusOK, user)
    })

    // XML 响应
    r.GET("/xml", func(c *gin.Context) {
        c.XML(http.StatusOK, gin.H{
            "message": "Hello",
        })
    })

    // HTML 响应
    r.GET("/html", func(c *gin.Context) {
        c.HTML(http.StatusOK, "index.html", gin.H{
            "title": "Home Page",
        })
    })

    // 文本响应
    r.GET("/text", func(c *gin.Context) {
        c.String(http.StatusOK, "Plain text response")
    })

    // 文件下载
    r.GET("/download", func(c *gin.Context) {
        c.File("/path/to/file.pdf")
    })

    // 重定向
    r.GET("/redirect", func(c *gin.Context) {
        c.Redirect(http.StatusMovedPermanently, "https://example.com")
    })

    // 流式响应
    r.GET("/stream", func(c *gin.Context) {
        c.Stream(func(w io.Writer) bool {
            w.Write([]byte("chunk\n"))
            time.Sleep(time.Second)
            return true  // 继续流式传输
        })
    })

    r.Run(":8080")
}

中间件

Python 中间件

# Flask 中间件
from flask import Flask, request, g
import time

app = Flask(__name__)

# 日志中间件
@app.before_request
def before_request():
    g.start_time = time.time()

@app.after_request
def after_request(response):
    if hasattr(g, 'start_time'):
        elapsed = time.time() - g.start_time
        response.headers['X-Response-Time'] = str(elapsed)
    return response

# 认证中间件
@app.before_request
def auth_middleware():
    if request.endpoint != 'login' and not check_auth():
        return 'Unauthorized', 401

# FastAPI 中间件
from fastapi import FastAPI, Request
from starlette.middleware.base import BaseHTTPMiddleware
import time

app = FastAPI()

class TimingMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        start_time = time.time()
        response = await call_next(request)
        process_time = time.time() - start_time
        response.headers['X-Process-Time'] = str(process_time)
        return response

app.add_middleware(TimingMiddleware)

Go 中间件(Gin)

// Gin 中间件
package main

import (
    "fmt"
    "github.com/gin-gonic/gin"
    "net/http"
    "time"
)

// 日志中间件
func LoggerMiddleware() gin.HandlerFunc {
    return func(c *gin.Context) {
        start := time.Now()

        // 处理请求
        c.Next()

        // 计算耗时
        latency := time.Since(start)
        status := c.Writer.Status()

        fmt.Printf("[%s] %s %s %d %v\n",
            c.Request.Method,
            c.Request.URL.Path,
            c.ClientIP(),
            status,
            latency,
        )
    }
}

// 认证中间件
func AuthMiddleware() gin.HandlerFunc {
    return func(c *gin.Context) {
        token := c.GetHeader("Authorization")

        if token == "" {
            c.JSON(http.StatusUnauthorized, gin.H{"error": "未授权"})
            c.Abort()  // 停止后续处理
            return
        }

        // 验证 token
        if !validateToken(token) {
            c.JSON(http.StatusUnauthorized, gin.H{"error": "Token 无效"})
            c.Abort()
            return
        }

        // 设置用户信息到上下文
        c.Set("user_id", 123)

        c.Next()  // 继续处理
    }
}

// CORS 中间件
func CORSMiddleware() gin.HandlerFunc {
    return func(c *gin.Context) {
        c.Writer.Header().Set("Access-Control-Allow-Origin", "*")
        c.Writer.Header().Set("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS")
        c.Writer.Header().Set("Access-Control-Allow-Headers", "Content-Type, Authorization")

        if c.Request.Method == "OPTIONS" {
            c.AbortWithStatus(http.StatusNoContent)
            return
        }

        c.Next()
    }
}

// 限流中间件
func RateLimitMiddleware(maxRequests int) gin.HandlerFunc {
    // 简单的内存限流器
    requestCounts := make(map[string]int)

    return func(c *gin.Context) {
        ip := c.ClientIP()
        requestCounts[ip]++

        if requestCounts[ip] > maxRequests {
            c.JSON(http.StatusTooManyRequests, gin.H{
                "error": "请求过于频繁",
            })
            c.Abort()
            return
        }

        c.Next()

        // 定期清理(实际应用中应使用更好的方案)
        go func() {
            time.Sleep(time.Minute)
            delete(requestCounts, ip)
        }()
    }
}

func main() {
    r := gin.New()  // 不使用默认中间件

    // 全局中间件
    r.Use(LoggerMiddleware())
    r.Use(gin.Recovery())  // panic 恢复
    r.Use(CORSMiddleware())

    // 公开路由
    r.POST("/login", login)

    // 需要认证的路由
    authorized := r.Group("/api")
    authorized.Use(AuthMiddleware())
    {
        authorized.GET("/users", getUsers)
        authorized.POST("/posts", createPost)
    }

    // 特定路由中间件
    r.GET("/limited", RateLimitMiddleware(10), handleLimited)

    r.Run(":8080")
}

func validateToken(token string) bool {
    return token == "valid-token"
}

func login(c *gin.Context) {
    c.JSON(http.StatusOK, gin.H{"token": "valid-token"})
}

func getUsers(c *gin.Context) {
    c.JSON(http.StatusOK, []string{"user1", "user2"})
}

func createPost(c *gin.Context) {
    c.JSON(http.StatusCreated, gin.H{})
}

func handleLimited(c *gin.Context) {
    c.String(http.StatusOK, "Limited endpoint")
}

模板渲染

Python 模板

# Flask + Jinja2
from flask import Flask, render_template

app = Flask(__name__)

@app.route('/profile/<username>')
def profile(username):
    user = {
        'name': username,
        'email': f'{username}@example.com',
        'posts': ['Post 1', 'Post 2']
    }
    return render_template('profile.html', user=user)

# templates/profile.html
"""
<!DOCTYPE html>
<html>
<head>
    <title>{{ user.name }}'s Profile</title>
</head>
<body>
    <h1>{{ user.name }}</h1>
    <p>Email: {{ user.email }}</p>

    <h2>Posts</h2>
    <ul>
    {% for post in user.posts %}
        <li>{{ post }}</li>
    {% endfor %}
    </ul>
</body>
</html>
"""

Go 模板

// Gin + html/template
package main

import (
    "github.com/gin-gonic/gin"
    "net/http"
)

type User struct {
    Name  string
    Email string
    Posts []string
}

func main() {
    r := gin.Default()

    // 加载模板
    r.LoadHTMLGlob("templates/*")

    r.GET("/profile/:username", func(c *gin.Context) {
        username := c.Param("username")

        user := User{
            Name:  username,
            Email: username + "@example.com",
            Posts: []string{"Post 1", "Post 2"},
        }

        c.HTML(http.StatusOK, "profile.html", gin.H{
            "user": user,
        })
    })

    r.Run(":8080")
}

// templates/profile.html
/*
<!DOCTYPE html>
<html>
<head>
    <title>{{ .user.Name }}'s Profile</title>
</head>
<body>
    <h1>{{ .user.Name }}</h1>
    <p>Email: {{ .user.Email }}</p>

    <h2>Posts</h2>
    <ul>
    {{ range .user.Posts }}
        <li>{{ . }}</li>
    {{ end }}
    </ul>
</body>
</html>
*/

数据库集成

Python + SQLAlchemy

# Flask + SQLAlchemy
from flask import Flask, jsonify, request
from flask_sqlalchemy import SQLAlchemy

app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///app.db'
db = SQLAlchemy(app)

# 模型定义
class User(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(80), nullable=False)
    email = db.Column(db.String(120), unique=True, nullable=False)

    def to_dict(self):
        return {
            'id': self.id,
            'name': self.name,
            'email': self.email
        }

# 创建表
with app.app_context():
    db.create_all()

# CRUD 操作
@app.route('/users', methods=['GET'])
def get_users():
    users = User.query.all()
    return jsonify([user.to_dict() for user in users])

@app.route('/users/<int:user_id>', methods=['GET'])
def get_user(user_id):
    user = User.query.get_or_404(user_id)
    return jsonify(user.to_dict())

@app.route('/users', methods=['POST'])
def create_user():
    data = request.get_json()
    user = User(name=data['name'], email=data['email'])
    db.session.add(user)
    db.session.commit()
    return jsonify(user.to_dict()), 201

@app.route('/users/<int:user_id>', methods=['PUT'])
def update_user(user_id):
    user = User.query.get_or_404(user_id)
    data = request.get_json()
    user.name = data.get('name', user.name)
    user.email = data.get('email', user.email)
    db.session.commit()
    return jsonify(user.to_dict())

@app.route('/users/<int:user_id>', methods=['DELETE'])
def delete_user(user_id):
    user = User.query.get_or_404(user_id)
    db.session.delete(user)
    db.session.commit()
    return '', 204

Go + GORM

// Gin + GORM
package main

import (
    "github.com/gin-gonic/gin"
    "gorm.io/driver/sqlite"
    "gorm.io/gorm"
    "net/http"
)

// 模型定义
type User struct {
    ID    uint   `gorm:"primaryKey" json:"id"`
    Name  string `json:"name" binding:"required"`
    Email string `gorm:"unique" json:"email" binding:"required,email"`
}

var db *gorm.DB

func main() {
    // 连接数据库
    var err error
    db, err = gorm.Open(sqlite.Open("app.db"), &gorm.Config{})
    if err != nil {
        panic("failed to connect database")
    }

    // 自动迁移
    db.AutoMigrate(&User{})

    r := gin.Default()

    // CRUD 路由
    r.GET("/users", getUsers)
    r.GET("/users/:id", getUser)
    r.POST("/users", createUser)
    r.PUT("/users/:id", updateUser)
    r.DELETE("/users/:id", deleteUser)

    r.Run(":8080")
}

func getUsers(c *gin.Context) {
    var users []User
    db.Find(&users)
    c.JSON(http.StatusOK, users)
}

func getUser(c *gin.Context) {
    var user User
    if err := db.First(&user, c.Param("id")).Error; err != nil {
        c.JSON(http.StatusNotFound, gin.H{"error": "User not found"})
        return
    }
    c.JSON(http.StatusOK, user)
}

func createUser(c *gin.Context) {
    var user User
    if err := c.ShouldBindJSON(&user); err != nil {
        c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
        return
    }

    if err := db.Create(&user).Error; err != nil {
        c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
        return
    }

    c.JSON(http.StatusCreated, user)
}

func updateUser(c *gin.Context) {
    var user User
    if err := db.First(&user, c.Param("id")).Error; err != nil {
        c.JSON(http.StatusNotFound, gin.H{"error": "User not found"})
        return
    }

    var updates User
    if err := c.ShouldBindJSON(&updates); err != nil {
        c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
        return
    }

    db.Model(&user).Updates(updates)
    c.JSON(http.StatusOK, user)
}

func deleteUser(c *gin.Context) {
    if err := db.Delete(&User{}, c.Param("id")).Error; err != nil {
        c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
        return
    }
    c.Status(http.StatusNoContent)
}

认证与授权

Python JWT 认证

# Flask + JWT
from flask import Flask, request, jsonify
from flask_jwt_extended import (
    JWTManager, create_access_token,
    jwt_required, get_jwt_identity
)

app = Flask(__name__)
app.config['JWT_SECRET_KEY'] = 'super-secret-key'
jwt = JWTManager(app)

@app.route('/login', methods=['POST'])
def login():
    data = request.get_json()
    username = data.get('username')
    password = data.get('password')

    # 验证用户(实际应查询数据库)
    if username == 'admin' and password == 'password':
        access_token = create_access_token(identity=username)
        return jsonify(access_token=access_token)

    return jsonify({'error': 'Invalid credentials'}), 401

@app.route('/protected', methods=['GET'])
@jwt_required()
def protected():
    current_user = get_jwt_identity()
    return jsonify(logged_in_as=current_user)

Go JWT 认证

// Gin + JWT
package main

import (
    "github.com/gin-gonic/gin"
    "github.com/golang-jwt/jwt/v5"
    "net/http"
    "time"
)

var jwtSecret = []byte("super-secret-key")

type Claims struct {
    Username string `json:"username"`
    jwt.RegisteredClaims
}

type LoginRequest struct {
    Username string `json:"username" binding:"required"`
    Password string `json:"password" binding:"required"`
}

func main() {
    r := gin.Default()

    r.POST("/login", login)
    r.GET("/protected", authMiddleware(), protected)

    r.Run(":8080")
}

func login(c *gin.Context) {
    var req LoginRequest
    if err := c.ShouldBindJSON(&req); err != nil {
        c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
        return
    }

    // 验证用户
    if req.Username != "admin" || req.Password != "password" {
        c.JSON(http.StatusUnauthorized, gin.H{"error": "Invalid credentials"})
        return
    }

    // 生成 JWT
    claims := Claims{
        Username: req.Username,
        RegisteredClaims: jwt.RegisteredClaims{
            ExpiresAt: jwt.NewNumericDate(time.Now().Add(24 * time.Hour)),
            IssuedAt:  jwt.NewNumericDate(time.Now()),
        },
    }

    token := jwt.NewWithClaims(jwt.SigningMethodHS256, claims)
    tokenString, err := token.SignedString(jwtSecret)
    if err != nil {
        c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
        return
    }

    c.JSON(http.StatusOK, gin.H{"access_token": tokenString})
}

func authMiddleware() gin.HandlerFunc {
    return func(c *gin.Context) {
        tokenString := c.GetHeader("Authorization")
        if tokenString == "" {
            c.JSON(http.StatusUnauthorized, gin.H{"error": "Missing token"})
            c.Abort()
            return
        }

        // 移除 "Bearer " 前缀
        if len(tokenString) > 7 && tokenString[:7] == "Bearer " {
            tokenString = tokenString[7:]
        }

        // 验证 token
        token, err := jwt.ParseWithClaims(tokenString, &Claims{}, func(token *jwt.Token) (interface{}, error) {
            return jwtSecret, nil
        })

        if err != nil || !token.Valid {
            c.JSON(http.StatusUnauthorized, gin.H{"error": "Invalid token"})
            c.Abort()
            return
        }

        claims, ok := token.Claims.(*Claims)
        if !ok {
            c.JSON(http.StatusUnauthorized, gin.H{"error": "Invalid token claims"})
            c.Abort()
            return
        }

        c.Set("username", claims.Username)
        c.Next()
    }
}

func protected(c *gin.Context) {
    username := c.GetString("username")
    c.JSON(http.StatusOK, gin.H{"logged_in_as": username})
}

完整 REST API 示例

Python完整示例(FastAPI)

# 完整的 FastAPI REST API
from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel, EmailStr
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session
from typing import List

# 数据库设置
SQLALCHEMY_DATABASE_URL = "sqlite:///./api.db"
engine = create_engine(SQLALCHEMY_DATABASE_URL)
SessionLocal = sessionmaker(bind=engine)
Base = declarative_base()

# 模型
class UserDB(Base):
    __tablename__ = "users"
    id = Column(Integer, primary_key=True, index=True)
    name = Column(String, index=True)
    email = Column(String, unique=True, index=True)

Base.metadata.create_all(bind=engine)

# Pydantic 模型
class UserBase(BaseModel):
    name: str
    email: EmailStr

class UserCreate(UserBase):
    pass

class User(UserBase):
    id: int

    class Config:
        from_attributes = True

# 依赖注入
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

# API
app = FastAPI(title="User API", version="1.0.0")

@app.get("/users", response_model=List[User])
def list_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    users = db.query(UserDB).offset(skip).limit(limit).all()
    return users

@app.get("/users/{user_id}", response_model=User)
def get_user(user_id: int, db: Session = Depends(get_db)):
    user = db.query(UserDB).filter(UserDB.id == user_id).first()
    if not user:
        raise HTTPException(status_code=404, detail="User not found")
    return user

@app.post("/users", response_model=User, status_code=201)
def create_user(user: UserCreate, db: Session = Depends(get_db)):
    db_user = UserDB(**user.dict())
    db.add(db_user)
    db.commit()
    db.refresh(db_user)
    return db_user

@app.put("/users/{user_id}", response_model=User)
def update_user(user_id: int, user: UserCreate, db: Session = Depends(get_db)):
    db_user = db.query(UserDB).filter(UserDB.id == user_id).first()
    if not db_user:
        raise HTTPException(status_code=404, detail="User not found")

    for key, value in user.dict().items():
        setattr(db_user, key, value)

    db.commit()
    db.refresh(db_user)
    return db_user

@app.delete("/users/{user_id}", status_code=204)
def delete_user(user_id: int, db: Session = Depends(get_db)):
    db_user = db.query(UserDB).filter(UserDB.id == user_id).first()
    if not db_user:
        raise HTTPException(status_code=404, detail="User not found")

    db.delete(db_user)
    db.commit()
    return None

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

Go 完整示例(Gin)

// 完整的 Gin REST API
package main

import (
    "github.com/gin-gonic/gin"
    "gorm.io/driver/sqlite"
    "gorm.io/gorm"
    "net/http"
)

// 模型
type User struct {
    ID    uint   `gorm:"primaryKey" json:"id"`
    Name  string `json:"name" binding:"required"`
    Email string `gorm:"unique" json:"email" binding:"required,email"`
}

// 数据库连接
var db *gorm.DB

func initDB() {
    var err error
    db, err = gorm.Open(sqlite.Open("api.db"), &gorm.Config{})
    if err != nil {
        panic("failed to connect database")
    }
    db.AutoMigrate(&User{})
}

// 处理器
func listUsers(c *gin.Context) {
    var users []User

    // 分页
    page := c.DefaultQuery("skip", "0")
    limit := c.DefaultQuery("limit", "100")

    db.Offset(page).Limit(limit).Find(&users)
    c.JSON(http.StatusOK, users)
}

func getUser(c *gin.Context) {
    var user User
    if err := db.First(&user, c.Param("id")).Error; err != nil {
        c.JSON(http.StatusNotFound, gin.H{"error": "User not found"})
        return
    }
    c.JSON(http.StatusOK, user)
}

func createUser(c *gin.Context) {
    var user User
    if err := c.ShouldBindJSON(&user); err != nil {
        c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
        return
    }

    if err := db.Create(&user).Error; err != nil {
        c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
        return
    }

    c.JSON(http.StatusCreated, user)
}

func updateUser(c *gin.Context) {
    var user User
    if err := db.First(&user, c.Param("id")).Error; err != nil {
        c.JSON(http.StatusNotFound, gin.H{"error": "User not found"})
        return
    }

    var updates User
    if err := c.ShouldBindJSON(&updates); err != nil {
        c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
        return
    }

    db.Model(&user).Updates(updates)
    c.JSON(http.StatusOK, user)
}

func deleteUser(c *gin.Context) {
    if err := db.Delete(&User{}, c.Param("id")).Error; err != nil {
        c.JSON(http.StatusNotFound, gin.H{"error": "User not found"})
        return
    }
    c.Status(http.StatusNoContent)
}

func main() {
    initDB()

    r := gin.Default()

    // 路由
    api := r.Group("/users")
    {
        api.GET("", listUsers)
        api.GET("/:id", getUser)
        api.POST("", createUser)
        api.PUT("/:id", updateUser)
        api.DELETE("/:id", deleteUser)
    }

    r.Run(":8080")
}

最佳实践

1. 项目结构

// ✅ 好的 Go 项目结构
myapi/
├── cmd/
│   └── api/
│       └── main.go           // 程序入口
├── internal/
│   ├── handler/              // HTTP 处理器
│   │   ├── user.go
│   │   └── post.go
│   ├── service/              // 业务逻辑
│   │   ├── user.go
│   │   └── post.go
│   ├── repository/           // 数据访问
│   │   ├── user.go
│   │   └── post.go
│   └── model/                // 数据模型
│       ├── user.go
│       └── post.go
├── pkg/                      // 可重用的包
│   ├── auth/
│   └── middleware/
├── go.mod
└── go.sum

2. 错误处理

// ✅ 统一的错误处理
type APIError struct {
    Code    int    `json:"code"`
    Message string `json:"message"`
}

func handleError(c *gin.Context, err error, status int) {
    c.JSON(status, APIError{
        Code:    status,
        Message: err.Error(),
    })
}

func getUser(c *gin.Context) {
    var user User
    if err := db.First(&user, c.Param("id")).Error; err != nil {
        if err == gorm.ErrRecordNotFound {
            handleError(c, err, http.StatusNotFound)
        } else {
            handleError(c, err, http.StatusInternalServerError)
        }
        return
    }
    c.JSON(http.StatusOK, user)
}

3. 配置管理

// ✅ 使用环境变量和配置文件
type Config struct {
    Port       string
    DBHost     string
    DBPort     string
    DBUser     string
    DBPassword string
    JWTSecret  string
}

func LoadConfig() *Config {
    return &Config{
        Port:       getEnv("PORT", "8080"),
        DBHost:     getEnv("DB_HOST", "localhost"),
        DBPort:     getEnv("DB_PORT", "5432"),
        DBUser:     getEnv("DB_USER", "postgres"),
        DBPassword: getEnv("DB_PASSWORD", ""),
        JWTSecret:  getEnv("JWT_SECRET", ""),
    }
}

func getEnv(key, fallback string) string {
    if value := os.Getenv(key); value != "" {
        return value
    }
    return fallback
}

4. 测试

// ✅ 编写测试
func TestGetUser(t *testing.T) {
    // 设置测试数据库
    db := setupTestDB()
    defer db.Close()

    // 创建测试用户
    user := User{Name: "Test", Email: "[email protected]"}
    db.Create(&user)

    // 创建测试路由
    r := setupRouter(db)
    w := httptest.NewRecorder()
    req, _ := http.NewRequest("GET", "/users/1", nil)
    r.ServeHTTP(w, req)

    // 断言
    assert.Equal(t, http.StatusOK, w.Code)

    var response User
    json.Unmarshal(w.Body.Bytes(), &response)
    assert.Equal(t, "Test", response.Name)
}

总结

Python 到 Go 的思维转变

Python 思维Go 思维
动态类型,运行时错误静态类型,编译时检查
丰富的装饰器中间件函数
ORM 自动查询显式查询(GORM)
异步 async/awaitGoroutine + Channel
请求对象全局可用Context 显式传递

关键要点

  1. Gin 性能卓越:比 Flask 快数倍,接近 C 语言性能
  2. 类型安全:编译时捕获错误,减少运行时问题
  3. 并发模型:利用 Goroutine 轻松实现高并发
  4. 标准库强大:内置 HTTP 服务器,无需 WSGI/ASGI
  5. 部署简单:单个二进制文件,无需 Python 环境

实用建议

  • 🔧 使用结构体绑定和验证请求数据
  • 🔧 实现统一的错误处理机制
  • 🔧 使用中间件处理横切关注点
  • 🔧 分层架构:handler -> service -> repository
  • 🔧 编写单元测试和集成测试
  • 🔧 使用配置文件和环境变量管理配置

上一篇包管理:从 pip 到 go mod

系列目录从 Python 到 Go 完全指南

comments.logDiscussion Thread
./comments --show-all

讨论区

./loading comments...