由 CyanHall.com
创建于 2020-11-22,
上次更新:2021-04-30。
👉 如果有用请点赞。
👉 如果有用请点赞。
1. 驱动
# psycopg2
postgresql+psycopg2://user:password@host:port/dbname[?key=value&key=value...]
# asyncpg
postgresql+asyncpg://user:password@host:port/dbname[?key=value&key=value...]
# MySQL-Python
mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>
# PyMySQL
mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]
# MySQL-Connector
mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname>
# More: http://docs.sqlalchemy.org/en/latest/dialects/index.html
3. New Model
model = SampleModel()
model.name = 'test_name'
db.session.add(model)
db.session.commit()
# INSERT INTO user (name, email, ...) VALUES ('test_name', '[email protected]', ...)
5. 查询所有
SampleModel.query.filter(SampleModel.id == id).all()
# SELECT * FROM [table_name] WHERE id = [id];
7. 查询删除
SampleModel.query.filter(SampleModel.id == id).delete()
db.session.delete(obj)
# DELETE FROM [table_name] WHERE [table_name].id = [id];
9. order_by
SampleModel.query.order_by(SampleModel.total.desc()).all()
# SELECT * FROM [table_name] ORDER BY [table_name].total DESC;
# SELECT * FROM [table_name] ORDER BY [table_name].total DESC, [table_name].count ASC;
11. 行锁
SampleModel.query.filter(SampleModel.id == id).with_for_update().first()
# SELECT * FROM [table_name] WHERE [table_name].id = [id] LIMIT 1 FOR UPDATE;
13. or_
from sqlalchemy import or_
SampleModel.query.filter(
or_(
SampleModel.id == id,
SampleModel.name == name
)
).first()
# SELECT * FROM [table_name] WHERE [table_name].id = [id] OR [table_name].name = [name];
15. func.avg 和 func.count
from sqlalchemy import func
SampleModel.query(
func.avg(SampleModel.price).label('average_price'),
func.count(SampleModel.order_id).label('order_count')
).outerjoin(
Order, Order.order_id == SampleModel.order_id
).group_by(
SampleModel.id
).all()
17. func.concat
from sqlalchemy import func
SampleModel.query.filter(
func.concat(SampleModel.first_name, ' ', SampleModel.last_name) == 'First Last'
).first()
# SELECT * FROM sample_table WHERE concat(sample_table.first_name, ' ', sample_table.last_name) = 'First Last' LIMIT 1;
19. Date filter
q = q.filter(func.date(Order.shipping_date) >= datetime.datetime.strptime(start_date, "%Y-%m-%d"))
class User:
created = Column(DateTime)
session.query(
func.date_format(User.created, "%Y-%m-%d %H:%i:%s") // MySQL,
func.strftime(User.created, "%Y-%m-%d %H:%M:%S") // SQLite
)
21. dynamic filter
filter_exps = []
for word in words.split(','):
filter_exps.append(
User.name.like("%{}%".format(word))
)
q = q.filter(or_(*filter_exps))
# SQL 通配符: % 替代 0 个或多个字符
23. alias
from sqlalchemy.orm import aliased
user1 = aliased(User)
user2 = aliased(User)
q = db.session.query(
*Order.__table__.columns, user1.name.label('name1'), user2.name.label('name2')
).outerjoin(
user1, user1.user_id == Order.user1_id
).outerjoin(
user2, user2.user_id == Order.user2_id
).first()
# SELECT order.order_id, user_1.name AS name1, user_2.name AS name2 FROM order LEFT OUTER JOIN user AS user_1 ON user_1.user_id = order.user1_id LEFT OUTER JOIN user AS user_2 ON user_2.user_id = order.user2_id LIMIT 1
25. concat label/alias filter
filter_compose_ids = [1, 2, 3]
compose_id = func.concat(SampleModel.from_id, '_', SampleModel.to_id).label('compose_id')
q = db.session.query(
compose_id
).filter(
compose_id.in_(filter_compose_ids)
).group_by(
compose_id
).first()
# SELECT concat(sample_table.from_id, '_', sample_table.to_id) AS compose_id FROM user WHERE concat(sample_table.from_id, '_', sample_table.to_id) IN (1, 2, 3) GROUP BY concat(sample_table.from_id, '_', sample_table.to_id) LIMIT 1
27. case in query
q = db.session.query()
# some process
q = q.add_columns(
func.max(text('case when stock_num > 0 THEN 1 ELSE 0 end')).label('has_stock')
).order_by(
text("has_stock desc"),
).all()
29. raw sql
from sqlalchemy import text
q = db.session.query(
SampleModel.price.label('pay_price')
).filter(
text('price > 0') # no pay_price
).order_by(
text("pay_price desc")
).all()
# SELECT sample_table.price AS pay_price FROM user WHERE price > 0 ORDER BY pay_price desc
31. order by id list
from sqlalchemy import case
user_ids = [11, 22, 33, 44]
indexes = [i for i in range(len(user_ids))]
whens = dict(zip(user_ids, indexes)) # {11: 0, 22: 1, 33: 2, 44: 3}
q = q.order_by(case(value=User.user_id, whens=whens))
# SELECT * FROM user ORDER BY CASE user.user_id WHEN 11 THEN 0 WHEN 22 THEN 1 WHEN 33 THEN 2 WHEN 44 THEN 3 END
33. pagination
page = 1
limit = 20
q = q.limit(page).offset((page - 1) * limit)
35. one / first / scalar
db.session.query(User).first() # return one user or None
db.session.query(User).one() # return one user or sqlalchemy.orm.exc.NoResultFound/sqlalchemy.orm.exc.MultipleResultsFound
db.session.query(User).scalar() # return one user or None or sqlalchemy.orm.exc.MultipleResultsFound
37. filter in Array column
// some_ids = Column(postgresql.ARRAY(Integer))
// User1.some_ids: {1,2,3,4}
// User2.some_ids: {5,6,7,8}
res = User.query.filter(User.some_ids.contains([5])).all()
res = User.query.filter(func.cardinality(User.some_ids) >= 5).all()
res = User.query.filter(User.some_ids.any(5)).all()
// [User2]
2. SampleModel
import datetime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
Base = declarative_base()
class SampleModel(Base):
__tablename__ = 'samples'
id = Column(Integer, primary_key=True)
name = Column(String(32), index=True, nullable=False,default='xx')
email = Column(String(32), unique=True)
create_time = Column(DateTime, default=datetime.datetime.now)
extra = Column(Text, nullable=True)
4. 查询单个
SampleModel.query.filter(SampleModel.id == id).first()
# SELECT * FROM [table_name] WHERE id = [id] LIMIT 1;
6. 统计对象
SampleModel.query.filter(SampleModel.id == id).count()
# SELECT count(*) FROM [table_name] WHERE [table_name].id = [id];
8. 查询更新
SampleModel.query.filter(SampleModel.id == id).update({
'is_finish': 1
}, synchronize_session=False)
# UPDATE [table_name] SET is_finish=1 WHERE [table_name].id = [id]
SampleModel.query.filter(SampleModel.id == id).update({
'is_finish': 1
}, synchronize_session="fetch")
# SELECT id FROM [table_name] WHERE [table_name].id = [id];
# UPDATE [table_name] SET is_finish=1 WHERE [table_name].id = [id];
SampleModel.query.filter(SampleModel.id == id).update({
'count': SampleModel.count + 1
}, synchronize_session="evaluate")
# UPDATE [table_name] SET count=([table_name].count + 1) WHERE [table_name].id = [id];
10. id in/not in list
filter_ids = [1, 2, 3]
SampleModel.query.filter(SampleModel.id.in_(filter_ids)).all()
SampleModel.query.filter(SampleModel.id.notin_(filter_ids)).all()
# SELECT * FROM [table_name] WHERE [table_name].id IN (1, 2, 3);
# SELECT * FROM [table_name] WHERE [table_name].id NOT IN (1, 2, 3);
12. label
db.session.query(SampleModel.name.label('alias_name')).first()
# SELECT [table_name].name AS alias_name FROM [table_name] LIMIT 1;
14. and_
from sqlalchemy import and_
SampleModel.query.filter(
and_(
SampleModel.id == id,
SampleModel.name == name
)
).first()
# and_ in filter is default, can be omitted
# SELECT * FROM [table_name] WHERE [table_name].id = [id] AND [table_name].name = [name];
16. func.group_concat
from sqlalchemy import func
SampleModel.query(
func.avg(SampleModel.price).label('average_price'),
func.group_concat(SampleRole.name.distinct())
).outerjoin(
SampleRole, SampleRole.id == SampleModel.role_id
).group_by(
SampleModel.id
).all()
# SELECT avg(sample_table.price) AS average_price, group_concat(DISTINCT sample_table.name) AS concat_names FROM sample_table LEFT OUTER JOIN sample_role_table ON sample_role_table.id = sample_table.role_id GROUP BY sample_table.id
18. keyword filter
filter_keywords = "some name"
q = db.session.query(
*SampleModel1.__table__.columns, SampleModel2.name2
).outerjoin(
SampleModel2, SampleModel2.some_id == SampleModel1.some_id
) if filter_keywords.strip():
q = q.filter(
or_(
SampleModel1.name1.like(f"%{filter_keywords}%"),
SampleModel2.name2.like(f"%{filter_keywords}%")
)
)
# SELECT sample_table1.name1, sample_table2.name2 FROM sample_table1 LEFT OUTER JOIN sample_table2 ON sample_table2.some_id = sample_table1.some_id WHERE sample_table1.name1 LIKE 'some_keywords' OR sample_table2.name2 LIKE 'some_keywords'
20. string length filter
q = q.filter(func.length(User.desc) > 0)
# SELECT * FROM user WHERE length(user.desc) > 0
22. None filter
q = q.filter(User.desc.is_(None))
q = q.filter(User.desc.isnot(None))
# SELECT * FROM user WHERE user.desc IS NULL
# SELECT * FROM user WHERE user.desc IS NOT NULL
24. bigger, smaller
from sqlalchemy import func
q = db.session.query(
func.sum(func.greatest(SampleModel.value1 - SampleModel.value2, 0)),
func.sum(func.least(SampleModel.value1 - SampleModel.value2, 0))
).all()
SELECT sum(greatest(sample_table.value1 - sample_table.value2, 0), sum(least(sample_table.value1 - sample_table.value2, 0) FROM user
26. order by text case
User.query.order_by(text("case when ifnull(nickname, '') = '' then 0 else 1 end desc")).first()
# SELECT user.user_id AS user_user_id FROM user ORDER BY case when ifnull(nickname, '') = '' then 0 else 1 end desc LIMIT 1
28. having
User.query.having(User.user_id > 0).first()
# SELECT user.user_id AS user_user_id FROM user HAVING user.user_id > 0 LIMIT 1
30. randon query
from sqlalchemy import func
q = db.session.query(
SampleModel.name
).order_by(
func.rand()
).limit(10).all()
# SELECT sample_table.name FROM sample_table ORDER BY rand() LIMIT 10
32. batch add
model1 = SampleModel()
model2 = SampleModel()
db.session.add_all([model1, model2])
34. flush
model = SampleModel()
for k, v in info.items():
setattr(model, k, v)
db.session.add(model)
db.session.flush()
36. data migration by batches
while True:
users = User.query.filter(User.level_id == 0).limit(100).all()
if len(users) == 0:
break
for user in users:
user.level_id = 1
db.session.commit()
38. add column to sesion.query
q = q.add_columns(
User.extra_field
)
更多