Created by CyanHall.com
on 11/22/2020
, Last updated: 01/08/2021.
👉
Star me if it’s helpful.
👉
1. Driver
# psycopg2
postgresql+psycopg2://user:[email protected]:port/dbname[?key=value&key=value...]
# asyncpg
postgresql+asyncpg://user:[email protected]:port/dbname[?key=value&key=value...]
# MySQL-Python
mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>
# PyMySQL
mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]
# MySQL-Connector
mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname>
# More: http://docs.sqlalchemy.org/en/latest/dialects/index.html
2. SampleModel
import datetime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
Base = declarative_base()
class SampleModel(Base):
__tablename__ = 'samples'
id = Column(Integer, primary_key=True)
name = Column(String(32), index=True, nullable=False,default='xx')
email = Column(String(32), unique=True)
create_time = Column(DateTime, default=datetime.datetime.now)
extra = Column(Text, nullable=True)
3. Query one
SampleModel.query.filter(SampleModel.id == id).first()
4. Query all
SampleModel.query.filter(SampleModel.id == id).all()
5. Count Model
SampleModel.query.filter(SampleModel.id == id).count()
6. Query one and delete
SampleModel.query.filter(SampleModel.id == id).delete()
7. Query one and update
SampleModel.query.filter(SampleModel.id == id).update({
'is_finish': 1
}, synchronize_session=False)
8. order_by
SampleModel.query.order_by(SampleModel.total.desc()).all()
9. id in list
SampleModel.query.filter(SampleModel.id.in_(filter_ids)).all()
10. Row lock
SampleModel.query.filter(SampleModel.id == id).with_for_update().first()
11. label
SampleModel.query(SampleModel.name.label('alias_name')).first()
12. or_
from sqlalchemy import or_
SampleModel.query.filter(
or_(
SampleModel.id == id,
SampleModel.name == name
)
).first()
13. and_
from sqlalchemy import and_
SampleModel.query.filter(
and_(
SampleModel.id == id,
SampleModel.name == name
)
).first()
14. func.avg and func.count
from sqlalchemy import func
SampleModel.query(
func.avg(SampleModel.price).label('average_price'),
func.count(SampleModel.order_id).label('order_count')
).outerjoin(
Order, Order.order_id == SampleModel.order_id
).group_by(
SampleModel.id
).all()
15. func.group_concat
from sqlalchemy import func
SampleModel.query(
func.avg(SampleModel.price).label('average_price'),
func.group_concat(SampleRole.name.distinct())
).outerjoin(
SampleRole, SampleRole.id == SampleModel.role_id
).group_by(
SampleModel.id
).all()
16. keyword filter
filter_keywords = "some name"
q = db.session.query(*SampleModel1.__table__.columns, SampleModel2.name2) .outerjoin(SampleModel2, SampleModel2.some_id == SampleModel1.some_id) if filter_keywords.strip():
q = q.filter(
or_(
SampleModel1.name1.like(f"%{filter_keywords}%"),
SampleModel2.name2.like(f"%{filter_keywords}%")
)
)
17. Date filter
q = q.filter(func.date(ShopDelivery.shipping_date) >= datetime.datetime.strptime(start_date, "%Y-%m-%d"))
18. dynamic filter
filter_exps = []
for word in words.split(','):
filter_exps.append(
User.name.like("%{}%".format(word))
)
q = q.filter(or_(*filter_exps))
19. alias
from sqlalchemy import alias
user1 = alias(User)
user2 = alias(User)
q = db.session.query(
*Order.__table__.columns, user1.name.label('name1'), user2.name.label('name2')
).outerjoin(
user1, user1.user_id == ShopOrder.user1_id
).outerjoin(
user2, user2.user_id == ShopOrder.user2_id
).first()
20. flush
model = SampleModel()
for k, v in info.items():
setattr(model, k, v)
db.session.add(model)
db.session.flush()
More