从_python_到_go(八):web_开发:从_flas.md2025-10-15
./meta --show-details
Published
2025年10月15日
Reading
33 min
Words
32,115
Status
PUBLISHED
目录
概述
Python 的 Web 生态系统非常丰富,Flask 简洁灵活,FastAPI 现代高效。Go 的 Gin 框架结合了两者的优点:简洁的 API、高性能、类型安全。
核心差异
特性 | Flask | FastAPI | Gin |
---|---|---|---|
性能 | 中等 | 高(异步) | 极高 |
类型系统 | 动态 | 类型提示(Pydantic) | 静态类型 |
异步支持 | 有限 | 原生异步 | 通过 Goroutine |
API 文档 | 手动(Flask-RESTX) | 自动生成(OpenAPI) | 需要集成(Swag) |
生态系统 | 极其丰富 | 快速增长 | 丰富 |
学习曲线 | 平缓 | 中等 | 中等 |
框架对比
Flask - 简洁灵活
# Flask - 最小示例
from flask import Flask, jsonify, request
app = Flask(__name__)
@app.route('/')
def hello():
return 'Hello World!'
@app.route('/api/users/<int:user_id>')
def get_user(user_id):
return jsonify({
'id': user_id,
'name': 'John Doe'
})
@app.route('/api/users', methods=['POST'])
def create_user():
data = request.get_json()
return jsonify({
'id': 1,
'name': data['name']
}), 201
if __name__ == '__main__':
app.run(debug=True, port=5000)
FastAPI - 现代异步
# FastAPI - 类型安全 + 自动文档
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
app = FastAPI()
class User(BaseModel):
name: str
email: str
age: int
@app.get('/')
async def hello():
return {'message': 'Hello World!'}
@app.get('/api/users/{user_id}')
async def get_user(user_id: int):
return {
'id': user_id,
'name': 'John Doe'
}
@app.post('/api/users', status_code=201)
async def create_user(user: User):
return {
'id': 1,
'name': user.name,
'email': user.email
}
# 自动生成文档:http://localhost:8000/docs
if __name__ == '__main__':
import uvicorn
uvicorn.run(app, host='0.0.0.0', port=8000)
Gin - 高性能
// Gin - 高性能 + 类型安全
package main
import (
"net/http"
"github.com/gin-gonic/gin"
)
type User struct {
ID int `json:"id"`
Name string `json:"name" binding:"required"`
Email string `json:"email" binding:"required,email"`
Age int `json:"age" binding:"required,gte=0"`
}
func main() {
r := gin.Default()
r.GET("/", func(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{
"message": "Hello World!",
})
})
r.GET("/api/users/:user_id", func(c *gin.Context) {
userID := c.Param("user_id")
c.JSON(http.StatusOK, gin.H{
"id": userID,
"name": "John Doe",
})
})
r.POST("/api/users", func(c *gin.Context) {
var user User
if err := c.ShouldBindJSON(&user); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
user.ID = 1
c.JSON(http.StatusCreated, user)
})
r.Run(":8080")
}
路由与处理器
Python 路由
# Flask 路由
from flask import Flask, request
app = Flask(__name__)
# 基本路由
@app.route('/')
def index():
return 'Index Page'
# 路径参数
@app.route('/users/<int:user_id>')
@app.route('/users/<string:username>')
def show_user(user_id):
return f'User {user_id}'
# 多个 HTTP 方法
@app.route('/api/resource', methods=['GET', 'POST', 'PUT', 'DELETE'])
def resource():
if request.method == 'GET':
return 'Get Resource'
elif request.method == 'POST':
return 'Create Resource'
# ...
# 路由组(使用 Blueprint)
from flask import Blueprint
api = Blueprint('api', __name__, url_prefix='/api/v1')
@api.route('/users')
def users():
return 'API Users'
app.register_blueprint(api)
# FastAPI 路由
from fastapi import FastAPI, APIRouter
app = FastAPI()
# 路径参数
@app.get('/users/{user_id}')
async def get_user(user_id: int):
return {'user_id': user_id}
# 查询参数
@app.get('/items/')
async def read_items(skip: int = 0, limit: int = 10):
return {'skip': skip, 'limit': limit}
# 路由组
router = APIRouter(prefix='/api/v1', tags=['users'])
@router.get('/users')
async def list_users():
return []
app.include_router(router)
Go 路由(Gin)
package main
import (
"github.com/gin-gonic/gin"
"net/http"
)
func main() {
r := gin.Default()
// 基本路由
r.GET("/", func(c *gin.Context) {
c.String(http.StatusOK, "Index Page")
})
// 路径参数
r.GET("/users/:id", func(c *gin.Context) {
id := c.Param("id")
c.String(http.StatusOK, "User %s", id)
})
// 通配符
r.GET("/files/*filepath", func(c *gin.Context) {
path := c.Param("filepath")
c.String(http.StatusOK, "File: %s", path)
})
// 查询参数
r.GET("/search", func(c *gin.Context) {
query := c.DefaultQuery("q", "default")
page := c.DefaultQuery("page", "1")
c.JSON(http.StatusOK, gin.H{
"query": query,
"page": page,
})
})
// 路由组
v1 := r.Group("/api/v1")
{
v1.GET("/users", listUsers)
v1.GET("/users/:id", getUser)
v1.POST("/users", createUser)
v1.PUT("/users/:id", updateUser)
v1.DELETE("/users/:id", deleteUser)
}
// 嵌套路由组
v2 := r.Group("/api/v2")
{
users := v2.Group("/users")
{
users.GET("", listUsers)
users.POST("", createUser)
}
posts := v2.Group("/posts")
{
posts.GET("", listPosts)
posts.POST("", createPost)
}
}
r.Run(":8080")
}
func listUsers(c *gin.Context) {
c.JSON(http.StatusOK, []string{})
}
func getUser(c *gin.Context) {
id := c.Param("id")
c.JSON(http.StatusOK, gin.H{"id": id})
}
func createUser(c *gin.Context) {
c.JSON(http.StatusCreated, gin.H{})
}
func updateUser(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{})
}
func deleteUser(c *gin.Context) {
c.Status(http.StatusNoContent)
}
func listPosts(c *gin.Context) {
c.JSON(http.StatusOK, []string{})
}
func createPost(c *gin.Context) {
c.JSON(http.StatusCreated, gin.H{})
}
请求与响应
Python 请求处理
# Flask
from flask import Flask, request, jsonify
app = Flask(__name__)
@app.route('/api/data', methods=['POST'])
def handle_data():
# JSON 数据
data = request.get_json()
# 表单数据
name = request.form.get('name')
# 查询参数
page = request.args.get('page', 1, type=int)
# 请求头
auth = request.headers.get('Authorization')
# 文件上传
file = request.files['file']
file.save(f'/uploads/{file.filename}')
# Cookie
session_id = request.cookies.get('session_id')
return jsonify({
'status': 'success',
'data': data
}), 200, {'X-Custom-Header': 'value'}
# FastAPI(更优雅)
from fastapi import FastAPI, File, UploadFile, Header, Cookie
from pydantic import BaseModel
app = FastAPI()
class DataModel(BaseModel):
name: str
age: int
@app.post('/api/data')
async def handle_data(
data: DataModel,
authorization: str = Header(None),
session_id: str = Cookie(None),
file: UploadFile = File(None)
):
if file:
contents = await file.read()
# 处理文件
return {
'status': 'success',
'data': data.dict()
}
Go 请求处理(Gin)
package main
import (
"github.com/gin-gonic/gin"
"net/http"
)
type DataModel struct {
Name string `json:"name" binding:"required"`
Age int `json:"age" binding:"required,gte=0"`
}
func main() {
r := gin.Default()
r.POST("/api/data", func(c *gin.Context) {
// JSON 数据(自动验证)
var data DataModel
if err := c.ShouldBindJSON(&data); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
// 表单数据
name := c.PostForm("name")
// 查询参数
page := c.DefaultQuery("page", "1")
// 请求头
auth := c.GetHeader("Authorization")
// 文件上传
file, err := c.FormFile("file")
if err == nil {
c.SaveUploadedFile(file, "/uploads/"+file.Filename)
}
// Cookie
sessionID, _ := c.Cookie("session_id")
// 设置响应
c.Header("X-Custom-Header", "value")
c.JSON(http.StatusOK, gin.H{
"status": "success",
"data": data,
"page": page,
"auth": auth,
"session": sessionID,
})
})
r.Run(":8080")
}
响应类型
// Go - 各种响应类型
package main
import (
"github.com/gin-gonic/gin"
"net/http"
)
func main() {
r := gin.Default()
// JSON 响应
r.GET("/json", func(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{
"message": "Hello",
"status": 200,
})
})
// 结构体 JSON
type User struct {
Name string `json:"name"`
Email string `json:"email"`
}
r.GET("/user", func(c *gin.Context) {
user := User{Name: "John", Email: "[email protected]"}
c.JSON(http.StatusOK, user)
})
// XML 响应
r.GET("/xml", func(c *gin.Context) {
c.XML(http.StatusOK, gin.H{
"message": "Hello",
})
})
// HTML 响应
r.GET("/html", func(c *gin.Context) {
c.HTML(http.StatusOK, "index.html", gin.H{
"title": "Home Page",
})
})
// 文本响应
r.GET("/text", func(c *gin.Context) {
c.String(http.StatusOK, "Plain text response")
})
// 文件下载
r.GET("/download", func(c *gin.Context) {
c.File("/path/to/file.pdf")
})
// 重定向
r.GET("/redirect", func(c *gin.Context) {
c.Redirect(http.StatusMovedPermanently, "https://example.com")
})
// 流式响应
r.GET("/stream", func(c *gin.Context) {
c.Stream(func(w io.Writer) bool {
w.Write([]byte("chunk\n"))
time.Sleep(time.Second)
return true // 继续流式传输
})
})
r.Run(":8080")
}
中间件
Python 中间件
# Flask 中间件
from flask import Flask, request, g
import time
app = Flask(__name__)
# 日志中间件
@app.before_request
def before_request():
g.start_time = time.time()
@app.after_request
def after_request(response):
if hasattr(g, 'start_time'):
elapsed = time.time() - g.start_time
response.headers['X-Response-Time'] = str(elapsed)
return response
# 认证中间件
@app.before_request
def auth_middleware():
if request.endpoint != 'login' and not check_auth():
return 'Unauthorized', 401
# FastAPI 中间件
from fastapi import FastAPI, Request
from starlette.middleware.base import BaseHTTPMiddleware
import time
app = FastAPI()
class TimingMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
start_time = time.time()
response = await call_next(request)
process_time = time.time() - start_time
response.headers['X-Process-Time'] = str(process_time)
return response
app.add_middleware(TimingMiddleware)
Go 中间件(Gin)
// Gin 中间件
package main
import (
"fmt"
"github.com/gin-gonic/gin"
"net/http"
"time"
)
// 日志中间件
func LoggerMiddleware() gin.HandlerFunc {
return func(c *gin.Context) {
start := time.Now()
// 处理请求
c.Next()
// 计算耗时
latency := time.Since(start)
status := c.Writer.Status()
fmt.Printf("[%s] %s %s %d %v\n",
c.Request.Method,
c.Request.URL.Path,
c.ClientIP(),
status,
latency,
)
}
}
// 认证中间件
func AuthMiddleware() gin.HandlerFunc {
return func(c *gin.Context) {
token := c.GetHeader("Authorization")
if token == "" {
c.JSON(http.StatusUnauthorized, gin.H{"error": "未授权"})
c.Abort() // 停止后续处理
return
}
// 验证 token
if !validateToken(token) {
c.JSON(http.StatusUnauthorized, gin.H{"error": "Token 无效"})
c.Abort()
return
}
// 设置用户信息到上下文
c.Set("user_id", 123)
c.Next() // 继续处理
}
}
// CORS 中间件
func CORSMiddleware() gin.HandlerFunc {
return func(c *gin.Context) {
c.Writer.Header().Set("Access-Control-Allow-Origin", "*")
c.Writer.Header().Set("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS")
c.Writer.Header().Set("Access-Control-Allow-Headers", "Content-Type, Authorization")
if c.Request.Method == "OPTIONS" {
c.AbortWithStatus(http.StatusNoContent)
return
}
c.Next()
}
}
// 限流中间件
func RateLimitMiddleware(maxRequests int) gin.HandlerFunc {
// 简单的内存限流器
requestCounts := make(map[string]int)
return func(c *gin.Context) {
ip := c.ClientIP()
requestCounts[ip]++
if requestCounts[ip] > maxRequests {
c.JSON(http.StatusTooManyRequests, gin.H{
"error": "请求过于频繁",
})
c.Abort()
return
}
c.Next()
// 定期清理(实际应用中应使用更好的方案)
go func() {
time.Sleep(time.Minute)
delete(requestCounts, ip)
}()
}
}
func main() {
r := gin.New() // 不使用默认中间件
// 全局中间件
r.Use(LoggerMiddleware())
r.Use(gin.Recovery()) // panic 恢复
r.Use(CORSMiddleware())
// 公开路由
r.POST("/login", login)
// 需要认证的路由
authorized := r.Group("/api")
authorized.Use(AuthMiddleware())
{
authorized.GET("/users", getUsers)
authorized.POST("/posts", createPost)
}
// 特定路由中间件
r.GET("/limited", RateLimitMiddleware(10), handleLimited)
r.Run(":8080")
}
func validateToken(token string) bool {
return token == "valid-token"
}
func login(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{"token": "valid-token"})
}
func getUsers(c *gin.Context) {
c.JSON(http.StatusOK, []string{"user1", "user2"})
}
func createPost(c *gin.Context) {
c.JSON(http.StatusCreated, gin.H{})
}
func handleLimited(c *gin.Context) {
c.String(http.StatusOK, "Limited endpoint")
}
模板渲染
Python 模板
# Flask + Jinja2
from flask import Flask, render_template
app = Flask(__name__)
@app.route('/profile/<username>')
def profile(username):
user = {
'name': username,
'email': f'{username}@example.com',
'posts': ['Post 1', 'Post 2']
}
return render_template('profile.html', user=user)
# templates/profile.html
"""
<!DOCTYPE html>
<html>
<head>
<title>{{ user.name }}'s Profile</title>
</head>
<body>
<h1>{{ user.name }}</h1>
<p>Email: {{ user.email }}</p>
<h2>Posts</h2>
<ul>
{% for post in user.posts %}
<li>{{ post }}</li>
{% endfor %}
</ul>
</body>
</html>
"""
Go 模板
// Gin + html/template
package main
import (
"github.com/gin-gonic/gin"
"net/http"
)
type User struct {
Name string
Email string
Posts []string
}
func main() {
r := gin.Default()
// 加载模板
r.LoadHTMLGlob("templates/*")
r.GET("/profile/:username", func(c *gin.Context) {
username := c.Param("username")
user := User{
Name: username,
Email: username + "@example.com",
Posts: []string{"Post 1", "Post 2"},
}
c.HTML(http.StatusOK, "profile.html", gin.H{
"user": user,
})
})
r.Run(":8080")
}
// templates/profile.html
/*
<!DOCTYPE html>
<html>
<head>
<title>{{ .user.Name }}'s Profile</title>
</head>
<body>
<h1>{{ .user.Name }}</h1>
<p>Email: {{ .user.Email }}</p>
<h2>Posts</h2>
<ul>
{{ range .user.Posts }}
<li>{{ . }}</li>
{{ end }}
</ul>
</body>
</html>
*/
数据库集成
Python + SQLAlchemy
# Flask + SQLAlchemy
from flask import Flask, jsonify, request
from flask_sqlalchemy import SQLAlchemy
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///app.db'
db = SQLAlchemy(app)
# 模型定义
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(80), nullable=False)
email = db.Column(db.String(120), unique=True, nullable=False)
def to_dict(self):
return {
'id': self.id,
'name': self.name,
'email': self.email
}
# 创建表
with app.app_context():
db.create_all()
# CRUD 操作
@app.route('/users', methods=['GET'])
def get_users():
users = User.query.all()
return jsonify([user.to_dict() for user in users])
@app.route('/users/<int:user_id>', methods=['GET'])
def get_user(user_id):
user = User.query.get_or_404(user_id)
return jsonify(user.to_dict())
@app.route('/users', methods=['POST'])
def create_user():
data = request.get_json()
user = User(name=data['name'], email=data['email'])
db.session.add(user)
db.session.commit()
return jsonify(user.to_dict()), 201
@app.route('/users/<int:user_id>', methods=['PUT'])
def update_user(user_id):
user = User.query.get_or_404(user_id)
data = request.get_json()
user.name = data.get('name', user.name)
user.email = data.get('email', user.email)
db.session.commit()
return jsonify(user.to_dict())
@app.route('/users/<int:user_id>', methods=['DELETE'])
def delete_user(user_id):
user = User.query.get_or_404(user_id)
db.session.delete(user)
db.session.commit()
return '', 204
Go + GORM
// Gin + GORM
package main
import (
"github.com/gin-gonic/gin"
"gorm.io/driver/sqlite"
"gorm.io/gorm"
"net/http"
)
// 模型定义
type User struct {
ID uint `gorm:"primaryKey" json:"id"`
Name string `json:"name" binding:"required"`
Email string `gorm:"unique" json:"email" binding:"required,email"`
}
var db *gorm.DB
func main() {
// 连接数据库
var err error
db, err = gorm.Open(sqlite.Open("app.db"), &gorm.Config{})
if err != nil {
panic("failed to connect database")
}
// 自动迁移
db.AutoMigrate(&User{})
r := gin.Default()
// CRUD 路由
r.GET("/users", getUsers)
r.GET("/users/:id", getUser)
r.POST("/users", createUser)
r.PUT("/users/:id", updateUser)
r.DELETE("/users/:id", deleteUser)
r.Run(":8080")
}
func getUsers(c *gin.Context) {
var users []User
db.Find(&users)
c.JSON(http.StatusOK, users)
}
func getUser(c *gin.Context) {
var user User
if err := db.First(&user, c.Param("id")).Error; err != nil {
c.JSON(http.StatusNotFound, gin.H{"error": "User not found"})
return
}
c.JSON(http.StatusOK, user)
}
func createUser(c *gin.Context) {
var user User
if err := c.ShouldBindJSON(&user); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
if err := db.Create(&user).Error; err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
}
c.JSON(http.StatusCreated, user)
}
func updateUser(c *gin.Context) {
var user User
if err := db.First(&user, c.Param("id")).Error; err != nil {
c.JSON(http.StatusNotFound, gin.H{"error": "User not found"})
return
}
var updates User
if err := c.ShouldBindJSON(&updates); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
db.Model(&user).Updates(updates)
c.JSON(http.StatusOK, user)
}
func deleteUser(c *gin.Context) {
if err := db.Delete(&User{}, c.Param("id")).Error; err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
}
c.Status(http.StatusNoContent)
}
认证与授权
Python JWT 认证
# Flask + JWT
from flask import Flask, request, jsonify
from flask_jwt_extended import (
JWTManager, create_access_token,
jwt_required, get_jwt_identity
)
app = Flask(__name__)
app.config['JWT_SECRET_KEY'] = 'super-secret-key'
jwt = JWTManager(app)
@app.route('/login', methods=['POST'])
def login():
data = request.get_json()
username = data.get('username')
password = data.get('password')
# 验证用户(实际应查询数据库)
if username == 'admin' and password == 'password':
access_token = create_access_token(identity=username)
return jsonify(access_token=access_token)
return jsonify({'error': 'Invalid credentials'}), 401
@app.route('/protected', methods=['GET'])
@jwt_required()
def protected():
current_user = get_jwt_identity()
return jsonify(logged_in_as=current_user)
Go JWT 认证
// Gin + JWT
package main
import (
"github.com/gin-gonic/gin"
"github.com/golang-jwt/jwt/v5"
"net/http"
"time"
)
var jwtSecret = []byte("super-secret-key")
type Claims struct {
Username string `json:"username"`
jwt.RegisteredClaims
}
type LoginRequest struct {
Username string `json:"username" binding:"required"`
Password string `json:"password" binding:"required"`
}
func main() {
r := gin.Default()
r.POST("/login", login)
r.GET("/protected", authMiddleware(), protected)
r.Run(":8080")
}
func login(c *gin.Context) {
var req LoginRequest
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
// 验证用户
if req.Username != "admin" || req.Password != "password" {
c.JSON(http.StatusUnauthorized, gin.H{"error": "Invalid credentials"})
return
}
// 生成 JWT
claims := Claims{
Username: req.Username,
RegisteredClaims: jwt.RegisteredClaims{
ExpiresAt: jwt.NewNumericDate(time.Now().Add(24 * time.Hour)),
IssuedAt: jwt.NewNumericDate(time.Now()),
},
}
token := jwt.NewWithClaims(jwt.SigningMethodHS256, claims)
tokenString, err := token.SignedString(jwtSecret)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
}
c.JSON(http.StatusOK, gin.H{"access_token": tokenString})
}
func authMiddleware() gin.HandlerFunc {
return func(c *gin.Context) {
tokenString := c.GetHeader("Authorization")
if tokenString == "" {
c.JSON(http.StatusUnauthorized, gin.H{"error": "Missing token"})
c.Abort()
return
}
// 移除 "Bearer " 前缀
if len(tokenString) > 7 && tokenString[:7] == "Bearer " {
tokenString = tokenString[7:]
}
// 验证 token
token, err := jwt.ParseWithClaims(tokenString, &Claims{}, func(token *jwt.Token) (interface{}, error) {
return jwtSecret, nil
})
if err != nil || !token.Valid {
c.JSON(http.StatusUnauthorized, gin.H{"error": "Invalid token"})
c.Abort()
return
}
claims, ok := token.Claims.(*Claims)
if !ok {
c.JSON(http.StatusUnauthorized, gin.H{"error": "Invalid token claims"})
c.Abort()
return
}
c.Set("username", claims.Username)
c.Next()
}
}
func protected(c *gin.Context) {
username := c.GetString("username")
c.JSON(http.StatusOK, gin.H{"logged_in_as": username})
}
完整 REST API 示例
Python完整示例(FastAPI)
# 完整的 FastAPI REST API
from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel, EmailStr
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session
from typing import List
# 数据库设置
SQLALCHEMY_DATABASE_URL = "sqlite:///./api.db"
engine = create_engine(SQLALCHEMY_DATABASE_URL)
SessionLocal = sessionmaker(bind=engine)
Base = declarative_base()
# 模型
class UserDB(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
name = Column(String, index=True)
email = Column(String, unique=True, index=True)
Base.metadata.create_all(bind=engine)
# Pydantic 模型
class UserBase(BaseModel):
name: str
email: EmailStr
class UserCreate(UserBase):
pass
class User(UserBase):
id: int
class Config:
from_attributes = True
# 依赖注入
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
# API
app = FastAPI(title="User API", version="1.0.0")
@app.get("/users", response_model=List[User])
def list_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
users = db.query(UserDB).offset(skip).limit(limit).all()
return users
@app.get("/users/{user_id}", response_model=User)
def get_user(user_id: int, db: Session = Depends(get_db)):
user = db.query(UserDB).filter(UserDB.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
@app.post("/users", response_model=User, status_code=201)
def create_user(user: UserCreate, db: Session = Depends(get_db)):
db_user = UserDB(**user.dict())
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
@app.put("/users/{user_id}", response_model=User)
def update_user(user_id: int, user: UserCreate, db: Session = Depends(get_db)):
db_user = db.query(UserDB).filter(UserDB.id == user_id).first()
if not db_user:
raise HTTPException(status_code=404, detail="User not found")
for key, value in user.dict().items():
setattr(db_user, key, value)
db.commit()
db.refresh(db_user)
return db_user
@app.delete("/users/{user_id}", status_code=204)
def delete_user(user_id: int, db: Session = Depends(get_db)):
db_user = db.query(UserDB).filter(UserDB.id == user_id).first()
if not db_user:
raise HTTPException(status_code=404, detail="User not found")
db.delete(db_user)
db.commit()
return None
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
Go 完整示例(Gin)
// 完整的 Gin REST API
package main
import (
"github.com/gin-gonic/gin"
"gorm.io/driver/sqlite"
"gorm.io/gorm"
"net/http"
)
// 模型
type User struct {
ID uint `gorm:"primaryKey" json:"id"`
Name string `json:"name" binding:"required"`
Email string `gorm:"unique" json:"email" binding:"required,email"`
}
// 数据库连接
var db *gorm.DB
func initDB() {
var err error
db, err = gorm.Open(sqlite.Open("api.db"), &gorm.Config{})
if err != nil {
panic("failed to connect database")
}
db.AutoMigrate(&User{})
}
// 处理器
func listUsers(c *gin.Context) {
var users []User
// 分页
page := c.DefaultQuery("skip", "0")
limit := c.DefaultQuery("limit", "100")
db.Offset(page).Limit(limit).Find(&users)
c.JSON(http.StatusOK, users)
}
func getUser(c *gin.Context) {
var user User
if err := db.First(&user, c.Param("id")).Error; err != nil {
c.JSON(http.StatusNotFound, gin.H{"error": "User not found"})
return
}
c.JSON(http.StatusOK, user)
}
func createUser(c *gin.Context) {
var user User
if err := c.ShouldBindJSON(&user); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
if err := db.Create(&user).Error; err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
c.JSON(http.StatusCreated, user)
}
func updateUser(c *gin.Context) {
var user User
if err := db.First(&user, c.Param("id")).Error; err != nil {
c.JSON(http.StatusNotFound, gin.H{"error": "User not found"})
return
}
var updates User
if err := c.ShouldBindJSON(&updates); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
db.Model(&user).Updates(updates)
c.JSON(http.StatusOK, user)
}
func deleteUser(c *gin.Context) {
if err := db.Delete(&User{}, c.Param("id")).Error; err != nil {
c.JSON(http.StatusNotFound, gin.H{"error": "User not found"})
return
}
c.Status(http.StatusNoContent)
}
func main() {
initDB()
r := gin.Default()
// 路由
api := r.Group("/users")
{
api.GET("", listUsers)
api.GET("/:id", getUser)
api.POST("", createUser)
api.PUT("/:id", updateUser)
api.DELETE("/:id", deleteUser)
}
r.Run(":8080")
}
最佳实践
1. 项目结构
// ✅ 好的 Go 项目结构
myapi/
├── cmd/
│ └── api/
│ └── main.go // 程序入口
├── internal/
│ ├── handler/ // HTTP 处理器
│ │ ├── user.go
│ │ └── post.go
│ ├── service/ // 业务逻辑
│ │ ├── user.go
│ │ └── post.go
│ ├── repository/ // 数据访问
│ │ ├── user.go
│ │ └── post.go
│ └── model/ // 数据模型
│ ├── user.go
│ └── post.go
├── pkg/ // 可重用的包
│ ├── auth/
│ └── middleware/
├── go.mod
└── go.sum
2. 错误处理
// ✅ 统一的错误处理
type APIError struct {
Code int `json:"code"`
Message string `json:"message"`
}
func handleError(c *gin.Context, err error, status int) {
c.JSON(status, APIError{
Code: status,
Message: err.Error(),
})
}
func getUser(c *gin.Context) {
var user User
if err := db.First(&user, c.Param("id")).Error; err != nil {
if err == gorm.ErrRecordNotFound {
handleError(c, err, http.StatusNotFound)
} else {
handleError(c, err, http.StatusInternalServerError)
}
return
}
c.JSON(http.StatusOK, user)
}
3. 配置管理
// ✅ 使用环境变量和配置文件
type Config struct {
Port string
DBHost string
DBPort string
DBUser string
DBPassword string
JWTSecret string
}
func LoadConfig() *Config {
return &Config{
Port: getEnv("PORT", "8080"),
DBHost: getEnv("DB_HOST", "localhost"),
DBPort: getEnv("DB_PORT", "5432"),
DBUser: getEnv("DB_USER", "postgres"),
DBPassword: getEnv("DB_PASSWORD", ""),
JWTSecret: getEnv("JWT_SECRET", ""),
}
}
func getEnv(key, fallback string) string {
if value := os.Getenv(key); value != "" {
return value
}
return fallback
}
4. 测试
// ✅ 编写测试
func TestGetUser(t *testing.T) {
// 设置测试数据库
db := setupTestDB()
defer db.Close()
// 创建测试用户
user := User{Name: "Test", Email: "[email protected]"}
db.Create(&user)
// 创建测试路由
r := setupRouter(db)
w := httptest.NewRecorder()
req, _ := http.NewRequest("GET", "/users/1", nil)
r.ServeHTTP(w, req)
// 断言
assert.Equal(t, http.StatusOK, w.Code)
var response User
json.Unmarshal(w.Body.Bytes(), &response)
assert.Equal(t, "Test", response.Name)
}
总结
Python 到 Go 的思维转变
Python 思维 | Go 思维 |
---|---|
动态类型,运行时错误 | 静态类型,编译时检查 |
丰富的装饰器 | 中间件函数 |
ORM 自动查询 | 显式查询(GORM) |
异步 async/await | Goroutine + Channel |
请求对象全局可用 | Context 显式传递 |
关键要点
- Gin 性能卓越:比 Flask 快数倍,接近 C 语言性能
- 类型安全:编译时捕获错误,减少运行时问题
- 并发模型:利用 Goroutine 轻松实现高并发
- 标准库强大:内置 HTTP 服务器,无需 WSGI/ASGI
- 部署简单:单个二进制文件,无需 Python 环境
实用建议
- 🔧 使用结构体绑定和验证请求数据
- 🔧 实现统一的错误处理机制
- 🔧 使用中间件处理横切关注点
- 🔧 分层架构:handler -> service -> repository
- 🔧 编写单元测试和集成测试
- 🔧 使用配置文件和环境变量管理配置
系列目录:从 Python 到 Go 完全指南