由 CyanHall.com
创建于 2020-11-22,
上次更新:2021-01-08。
👉
如果有用请点赞。
👉
1. 驱动
# psycopg2
postgresql+psycopg2://user:[email protected]:port/dbname[?key=value&key=value...]
# asyncpg
postgresql+asyncpg://user:[email protected]:port/dbname[?key=value&key=value...]
# MySQL-Python
mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>
# PyMySQL
mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]
# MySQL-Connector
mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname>
# More: http://docs.sqlalchemy.org/en/latest/dialects/index.html
2. SampleModel
import datetime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
Base = declarative_base()
class SampleModel(Base):
__tablename__ = 'samples'
id = Column(Integer, primary_key=True)
name = Column(String(32), index=True, nullable=False,default='xx')
email = Column(String(32), unique=True)
create_time = Column(DateTime, default=datetime.datetime.now)
extra = Column(Text, nullable=True)
3. 查询单个
SampleModel.query.filter(SampleModel.id == id).first()
4. 查询所有
SampleModel.query.filter(SampleModel.id == id).all()
5. 统计对象
SampleModel.query.filter(SampleModel.id == id).count()
6. 查询删除
SampleModel.query.filter(SampleModel.id == id).delete()
7. 查询更新
SampleModel.query.filter(SampleModel.id == id).update({
'is_finish': 1
}, synchronize_session=False)
8. order_by
SampleModel.query.order_by(SampleModel.total.desc()).all()
9. id in list
SampleModel.query.filter(SampleModel.id.in_(filter_ids)).all()
10. 行锁
SampleModel.query.filter(SampleModel.id == id).with_for_update().first()
11. 别名
SampleModel.query(SampleModel.name.label('alias_name')).first()
12. or_
from sqlalchemy import or_
SampleModel.query.filter(
or_(
SampleModel.id == id,
SampleModel.name == name
)
).first()
13. and_
from sqlalchemy import and_
SampleModel.query.filter(
and_(
SampleModel.id == id,
SampleModel.name == name
)
).first()
14. func.avg 和 func.count
from sqlalchemy import func
SampleModel.query(
func.avg(SampleModel.price).label('average_price'),
func.count(SampleModel.order_id).label('order_count')
).outerjoin(
Order, Order.order_id == SampleModel.order_id
).group_by(
SampleModel.id
).all()
15. func.group_concat
from sqlalchemy import func
SampleModel.query(
func.avg(SampleModel.price).label('average_price'),
func.group_concat(SampleRole.name.distinct())
).outerjoin(
SampleRole, SampleRole.id == SampleModel.role_id
).group_by(
SampleModel.id
).all()
16. keyword filter
filter_keywords = "some name"
q = db.session.query(*SampleModel1.__table__.columns, SampleModel2.name2) .outerjoin(SampleModel2, SampleModel2.some_id == SampleModel1.some_id) if filter_keywords.strip():
q = q.filter(
or_(
SampleModel1.name1.like(f"%{filter_keywords}%"),
SampleModel2.name2.like(f"%{filter_keywords}%")
)
)
17. Date filter
q = q.filter(func.date(ShopDelivery.shipping_date) >= datetime.datetime.strptime(start_date, "%Y-%m-%d"))
更多