python|python mysql sqlalchemy orm 使用

sqlalchemy orm关系框架

pip install sqlacodegen#将表生成对应的类 pip install mysqlclient pip install sqlalchemy

1、参考
Python-SQLAlchemy:第1节:SQLAlchemy入门
Python-SQLAlchemy:第2节:查询条件设置
Python-SQLAlchemy:第3节:关系操作
Python-SQLAlchemy:第4节:级联
2、映射关系类
from sqlalchemy import Column, String, Integer, Float, or_, TEXT from sqlalchemy import create_engine from sqlalchemy import Table, Column, Integer, ForeignKey, String from sqlalchemy.orm import relationship, backref from sqlalchemy.ext.declarative import declarative_base # 创建对象的基类: Base = declarative_base()class Class(Base): __tablename__ = 'class' class_id = Column(Integer, primary_key=True,autoincrement=True) name = Column(String(50)) level = Column(Integer) address = Column(Text)

可使用 sqlacodegen 工具生成
sqlacodegen mysql+pymysql://root:123456@192.168.50.140:3306/zentao--outfile tt.py

sqlalchemy 使用
【python|python mysql sqlalchemy orm 使用】生成close工具类
from sqlalchemy.orm import sessionmaker, Session from sqlalchemy import create_engine from test.mysql.mysql_tables import MyTable,Class,Studentclass DbOrm:def __init__(self): engine = create_engine('mysql+pymysql://root:123456@192.168.50.140:3306/zentao', echo_pool=True, pool_pre_ping=True) # 创建 DBSession 类型: self.dbSession = sessionmaker(bind=engine)def __enter__(self) -> Session: """:return: """ self.session = self.dbSession() return self.sessiondef __exit__(self, type, value, trace): self.session.commit() self.session.close()@staticmethod def getOrmSession(func): def wrapper(*args, **kw): with DbOrm() as db: result = func(db, *args, **kw) return resultreturn wrapper

@DbOrm.getOrmSession def testFind(session): res = session.query(MyTable).all() for u in res: print(u.id, u.name, u.name)# testFind()@DbOrm.getOrmSession def testDel(session): res = session.query(MyTable).filter(MyTable.id == 1).delete()# 删除否合条件的数据,返回删除数量 print(res)# testDel()@DbOrm.getOrmSession def testAdd(session): table = MyTable(name="bb", count=1) print(table) res = session.add(table) print(res)# testAdd(); @DbOrm.getOrmSession def testUpdate(session): data = https://www.it610.com/article/{} data["name"] = "ccc"; query_filter = session.query(MyTable).filter(MyTable.id == 2) res = query_filter.update(data) print(res)# testUpdate(); @DbOrm.getOrmSession def testJoin(session): res = session.query(Class, MyTable).outerjoin(MyTable, Class.class_id == MyTable.id).filter(Class.class_id ==2).one() print(res.Class.class_id,res.MyTable.id)# testJoin(); @DbOrm.getOrmSession def testAdd2(session): # 添加 # table = MyTable(name="bb", count=1) # print(table) # res = session.add(table) # print(res) # class_ = Class(name="11",level=1,address="aaaa") # res=session.add(class_) # print(res) # student3 = Student(name="11",age=5) # session.class_id=1 # session.class_=class_ # session.add(student3)# 关联查询 # res=session.query(Class).filter(Class.class_id==1).one(); # print(res.name) # for item in res.students: #print(item.name) # res=session.query(Student).filter(Student.student_id==1).all(); # for item in res: #print(item.class_.name)# 修改信息 res = session.query(Class).filter(Class.class_id == 1); res1=res.update({"name":"1111aaa"}) print(res1)# testAdd2()

    推荐阅读