跳到主要内容

连接数据库

前言

nil

连接关系型数据

使用ORM,比如SQLAlchemy

同步

from fastapi import FastAPI
from pydantic import BaseModel
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

app = FastAPI()

class Item(BaseModel):
id: int
name: str
description: str

engine = create_engine("mysql+mysqlconnector://user:password@localhost/testdb")
Session = sessionmaker(bind=engine)
session = Session()

@app.get("/")
def read_root():
return {"Hello": "World"}

@app.post("/item/")
def create_item(item: Item):
session.add(item)
session.commit()
return {"item": item}

@app.get("/item/{item_id}")
def read_item(item_id: int, q: str = None):
item = session.query(Item).get(item_id)
return {"item": item}

@app.put("/item/{item_id}")
def update_item(item_id: int, item: Item):
item = session.query(Item).get(item_id)
item.name = item.name
item.description = item.description
session.commit()
return {"item": item}

@app.delete("/item/{item_id}")
def delete_item(item_id: int):
item = session.query(Item).get(item_id)
session.delete(item)
session.commit()
return {"message": "Item deleted"}

异步

nil

依赖注入形式

nil

连接Redis

  • Python 3.11
  • Redis 7.2

安装依赖

# 不一定要保持一致的版本,不同版本的API可能不兼容
pip install redis==5.0.2 fastapi==0.110.0 uvicorn==0.27.1

示例

为了方便在其它模块中使用redis,示例中将redis客户端放在单独的模块中,并在启动时建立redis连接,停止后关闭redis连接。

目录结构:

├── demo.py
└── pkg
├── __init__.py
└── redis
└── __init__.py

其中pkg/redis/__init__.py内容如下(__init__.py脚本会在import时运行一次)

from redis import ConnectionPool, Redis

pool = ConnectionPool(
host="192.168.1.112",
port=6379,
db=1,
password="123456",
max_connections=10, # 最大连接池大小为10
encoding = "utf-8", # 用于指定返回utf-8编码的字符串
decode_responses=True, # 用于指定返回utf-8编码的字符串
)

redis_client = Redis(connection_pool=pool)

__all__ = ["redis_client"]

主代码文件demo.py内容如下

from fastapi import FastAPI
from contextlib import asynccontextmanager
import uvicorn
from pkg.redis import redis_client
from datetime import datetime


def start_event():
uptime = datetime.now()
redis_client.set("uptime", uptime.strftime("%Y-%m-%d %H:%M:%S"))

def shutdown_event():
redis_client.close()

@asynccontextmanager
async def applifespan(app: FastAPI):
start_event()
yield
shutdown_event()

app = FastAPI(
lifespan=applifespan,
)

@app.get("/")
async def get_root():
uptime = redis_client.get("uptime")
return {"msg": f"uptime is {uptime}"}

if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)

常见操作

  • 检测链接是否正常
redis_client.ping()
  • set and get
redis_client.set("k1", "v1")
redis_client.get("k1")
redis_client.delete("k1")

redis_client.exists("k1") # 是否存在

redis_client.setex("greeting", 10, "hello world") # ttl为10秒
redis_client.ttl("greeting") # 获取指定键的当前ttl
redis_client.expire("greeting", 100) # 刷新指定键的ttl为100秒

# 设置字典
dict_data = {
"employee_name": "Adam Adams",
"employee_age": 30,
"position": "Software Engineer",
}

redis_client.mset(dict_data)

# 自增
redis_client.set("request_total", 0)
redis_client.get("request_total") # 自增1
redis_client.get("request_total", 10) # 自增10
  • 管道操作。常用于批量操作,减少多轮多写导致的性能损耗。
with redis_client.pipeline() as pipe:
for _ in range(10000):
pipe.incr("request_total")
pipe.exe
  • 列表。lpush -> rpop就是队列操作,先入先出。lpush -> lpop为栈操作,先入后出。
# 向名为language的列表lpush
redis_client.lpush('language', 'python')
redis_client.lpush('language', 'go')
redis_client.lpush('language', 'rust')

redis_client.rpop('language') # output: python
redis_client.rpop('language') # output: go
redis_client.rpop('language') # output: rust

# 列表长度
redis_client.llen('language')
  • 集合
# 向名为warframe的集合添加元素
redis_client.sadd('warframe', 'atlas')
redis_client.sadd('warframe', 'banshee')

# 删除元素
redis_client.srem('warframe', 'atlas')

参考