CyanHall.com 创建于 2020-11-22, 上次更新:2021-04-30。
👉  github shields 如果有用请点赞。

1. 驱动

    # psycopg2
postgresql+psycopg2://user:password@host:port/dbname[?key=value&key=value...]

# asyncpg
postgresql+asyncpg://user:password@host:port/dbname[?key=value&key=value...]

# MySQL-Python
mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>

# PyMySQL
mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]

# MySQL-Connector
mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname>

# More: http://docs.sqlalchemy.org/en/latest/dialects/index.html
  

2. SampleModel

    import datetime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index

Base = declarative_base()

class SampleModel(Base):
    __tablename__ = 'samples'
 
    id = Column(Integer, primary_key=True)
    name = Column(String(32), index=True, nullable=False,default='xx')
    email = Column(String(32), unique=True)
    create_time = Column(DateTime, default=datetime.datetime.now)
    extra = Column(Text, nullable=True)
  

3. New Model

    model = SampleModel()
model.name = 'test_name'
db.session.add(model)
db.session.commit()

# INSERT INTO user (name, email, ...) VALUES ('test_name', '[email protected]', ...)
  

4. 查询单个

    SampleModel.query.filter(SampleModel.id == id).first()
# SELECT * FROM [table_name] WHERE id = [id] LIMIT 1;
  

5. 查询所有

    SampleModel.query.filter(SampleModel.id == id).all()
# SELECT * FROM [table_name] WHERE id = [id];
  

6. 统计对象

    SampleModel.query.filter(SampleModel.id == id).count()
# SELECT count(*) FROM [table_name] WHERE [table_name].id = [id];
  

7. 查询删除

    SampleModel.query.filter(SampleModel.id == id).delete()
db.session.delete(obj)

# DELETE FROM [table_name] WHERE [table_name].id = [id];
  

8. 查询更新

    SampleModel.query.filter(SampleModel.id == id).update({
  'is_finish': 1
}, synchronize_session=False)
# UPDATE [table_name] SET is_finish=1 WHERE [table_name].id = [id]

SampleModel.query.filter(SampleModel.id == id).update({
  'is_finish': 1
}, synchronize_session="fetch")
# SELECT id FROM [table_name] WHERE [table_name].id = [id];
# UPDATE [table_name] SET is_finish=1 WHERE [table_name].id = [id];

SampleModel.query.filter(SampleModel.id == id).update({
  'count': SampleModel.count + 1
}, synchronize_session="evaluate")
# UPDATE [table_name] SET count=([table_name].count + 1) WHERE [table_name].id = [id];
  

9. order_by

    SampleModel.query.order_by(SampleModel.total.desc()).all()
# SELECT * FROM [table_name] ORDER BY [table_name].total DESC;
# SELECT * FROM [table_name] ORDER BY [table_name].total DESC, [table_name].count ASC;
  

10. id in/not in list

    filter_ids = [1, 2, 3]
SampleModel.query.filter(SampleModel.id.in_(filter_ids)).all()
SampleModel.query.filter(SampleModel.id.notin_(filter_ids)).all()
# SELECT * FROM [table_name] WHERE [table_name].id IN (1, 2, 3);
# SELECT * FROM [table_name] WHERE [table_name].id NOT IN (1, 2, 3);
  

11. 行锁

    SampleModel.query.filter(SampleModel.id == id).with_for_update().first()
# SELECT * FROM [table_name] WHERE [table_name].id = [id] LIMIT 1 FOR UPDATE;
  

12. label

    db.session.query(SampleModel.name.label('alias_name')).first()
# SELECT [table_name].name AS alias_name FROM [table_name] LIMIT 1;
  

13. or_

    from sqlalchemy import or_
SampleModel.query.filter(
  or_(
    SampleModel.id == id,
    SampleModel.name == name
  )
).first()
# SELECT * FROM [table_name] WHERE [table_name].id = [id] OR [table_name].name = [name];
  

14. and_

    from sqlalchemy import and_
SampleModel.query.filter(
  and_(
    SampleModel.id == id,
    SampleModel.name == name
  )
).first()
# and_ in filter is default, can be omitted
# SELECT * FROM [table_name] WHERE [table_name].id = [id] AND [table_name].name = [name];
  

15. func.avg 和 func.count

    from sqlalchemy import func
SampleModel.query(
  func.avg(SampleModel.price).label('average_price'),
  func.count(SampleModel.order_id).label('order_count')
).outerjoin(
  Order, Order.order_id == SampleModel.order_id
).group_by(
  SampleModel.id
).all()
  

16. func.group_concat

    from sqlalchemy import func
SampleModel.query(
  func.avg(SampleModel.price).label('average_price'),
  func.group_concat(SampleRole.name.distinct())
).outerjoin(
  SampleRole, SampleRole.id == SampleModel.role_id
).group_by(
  SampleModel.id
).all()
# SELECT avg(sample_table.price) AS average_price, group_concat(DISTINCT sample_table.name) AS concat_names FROM sample_table LEFT OUTER JOIN sample_role_table ON sample_role_table.id = sample_table.role_id GROUP BY sample_table.id
  

17. func.concat

    from sqlalchemy import func
SampleModel.query.filter(
  func.concat(SampleModel.first_name, ' ', SampleModel.last_name) == 'First Last'
).first()
# SELECT * FROM sample_table WHERE concat(sample_table.first_name, ' ', sample_table.last_name) = 'First Last' LIMIT 1;
  

18. keyword filter

    filter_keywords = "some name"
q = db.session.query(
  *SampleModel1.__table__.columns, SampleModel2.name2
).outerjoin(
  SampleModel2, SampleModel2.some_id == SampleModel1.some_id
) if filter_keywords.strip():
    q = q.filter(
        or_(
            SampleModel1.name1.like(f"%{filter_keywords}%"),
            SampleModel2.name2.like(f"%{filter_keywords}%")
        )
    )
# SELECT sample_table1.name1, sample_table2.name2 FROM sample_table1 LEFT OUTER JOIN sample_table2 ON sample_table2.some_id = sample_table1.some_id WHERE sample_table1.name1 LIKE 'some_keywords' OR sample_table2.name2 LIKE 'some_keywords'
  

19. Date filter

    q = q.filter(func.date(Order.shipping_date) >= datetime.datetime.strptime(start_date, "%Y-%m-%d"))

class User:
    created = Column(DateTime)

session.query(
  func.date_format(User.created, "%Y-%m-%d %H:%i:%s") // MySQL,
  func.strftime(User.created, "%Y-%m-%d %H:%M:%S") // SQLite
)
  

20. string length filter

    q = q.filter(func.length(User.desc) > 0)
# SELECT * FROM user WHERE length(user.desc) > 0
  

21. dynamic filter

    filter_exps = []
for word in words.split(','):
    filter_exps.append(
        User.name.like("%{}%".format(word))
    )
q = q.filter(or_(*filter_exps))

# SQL 通配符: %	替代 0 个或多个字符
  

22. None filter

    q = q.filter(User.desc.is_(None))
q = q.filter(User.desc.isnot(None))
# SELECT * FROM user WHERE user.desc IS NULL
# SELECT * FROM user WHERE user.desc IS NOT NULL
  

23. alias

    from sqlalchemy.orm import aliased
user1 = aliased(User)
user2 = aliased(User)
q = db.session.query(
    *Order.__table__.columns, user1.name.label('name1'), user2.name.label('name2')
).outerjoin(
    user1, user1.user_id == Order.user1_id
).outerjoin(
    user2, user2.user_id == Order.user2_id
).first()
# SELECT order.order_id, user_1.name AS name1, user_2.name AS name2 FROM order LEFT OUTER JOIN user AS user_1 ON user_1.user_id = order.user1_id LEFT OUTER JOIN user AS user_2 ON user_2.user_id = order.user2_id LIMIT 1
  

24. bigger, smaller

    from sqlalchemy import func
q = db.session.query(
    func.sum(func.greatest(SampleModel.value1 - SampleModel.value2, 0)),
    func.sum(func.least(SampleModel.value1 - SampleModel.value2, 0))
).all()
SELECT sum(greatest(sample_table.value1 - sample_table.value2, 0), sum(least(sample_table.value1 - sample_table.value2, 0) FROM user
  

25. concat label/alias filter

    filter_compose_ids = [1, 2, 3]
compose_id = func.concat(SampleModel.from_id, '_', SampleModel.to_id).label('compose_id')
q = db.session.query(
    compose_id
).filter(
    compose_id.in_(filter_compose_ids)
).group_by(
    compose_id
).first()
# SELECT concat(sample_table.from_id, '_', sample_table.to_id) AS compose_id FROM user WHERE concat(sample_table.from_id, '_', sample_table.to_id) IN (1, 2, 3) GROUP BY concat(sample_table.from_id, '_', sample_table.to_id) LIMIT 1
  

26. order by text case

    User.query.order_by(text("case when ifnull(nickname, '') = '' then 0 else 1 end desc")).first()
# SELECT user.user_id AS user_user_id FROM user ORDER BY case when ifnull(nickname, '') = '' then 0 else 1 end desc LIMIT 1
  

27. case in query

    q = db.session.query()
# some process
q = q.add_columns(
  func.max(text('case when stock_num > 0 THEN 1 ELSE 0 end')).label('has_stock')
).order_by(
  text("has_stock desc"),
).all()
  

28. having

    User.query.having(User.user_id > 0).first()
# SELECT user.user_id AS user_user_id FROM user HAVING user.user_id > 0 LIMIT 1
  

29. raw sql

    from sqlalchemy import text
q = db.session.query(
  SampleModel.price.label('pay_price')
).filter(
  text('price > 0')  # no pay_price
).order_by(
  text("pay_price desc")
).all()
# SELECT sample_table.price AS pay_price FROM user WHERE price > 0 ORDER BY pay_price desc
  

30. randon query

    from sqlalchemy import func
q = db.session.query(
  SampleModel.name
).order_by(
  func.rand()
).limit(10).all()
# SELECT sample_table.name FROM sample_table ORDER BY rand() LIMIT 10
  

31. order by id list

    from sqlalchemy import case

user_ids = [11, 22, 33, 44]
indexes = [i for i in range(len(user_ids))]
whens = dict(zip(user_ids, indexes))  # {11: 0, 22: 1, 33: 2, 44: 3}
q = q.order_by(case(value=User.user_id, whens=whens))
# SELECT * FROM user ORDER BY CASE user.user_id WHEN 11 THEN 0 WHEN 22 THEN 1 WHEN 33 THEN 2 WHEN 44 THEN 3 END
  

32. batch add

    model1 = SampleModel()
model2 = SampleModel()
db.session.add_all([model1, model2])
  

33. pagination

    page = 1
limit = 20
q = q.limit(page).offset((page - 1) * limit)
  

34. flush

    model = SampleModel()
for k, v in info.items():
    setattr(model, k, v)
db.session.add(model)
db.session.flush()
  

35. one / first / scalar

    db.session.query(User).first()  # return one user or None
db.session.query(User).one()  # return one user or sqlalchemy.orm.exc.NoResultFound/sqlalchemy.orm.exc.MultipleResultsFound
db.session.query(User).scalar()  # return one user or None or sqlalchemy.orm.exc.MultipleResultsFound
  

36. data migration by batches

    while True:
    users = User.query.filter(User.level_id == 0).limit(100).all()
    if len(users) == 0:
        break
    for user in users:
        user.level_id = 1
    db.session.commit()
  

37. filter in Array column

    // some_ids = Column(postgresql.ARRAY(Integer))
// User1.some_ids: {1,2,3,4}
// User2.some_ids: {5,6,7,8}

res = User.query.filter(User.some_ids.contains([5])).all()
res = User.query.filter(func.cardinality(User.some_ids) >= 5).all()
res = User.query.filter(User.some_ids.any(5)).all()

// [User2]
  

38. add column to sesion.query

    q = q.add_columns(
  User.extra_field
)
  

1. 驱动

    # psycopg2
postgresql+psycopg2://user:password@host:port/dbname[?key=value&key=value...]

# asyncpg
postgresql+asyncpg://user:password@host:port/dbname[?key=value&key=value...]

# MySQL-Python
mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>

# PyMySQL
mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]

# MySQL-Connector
mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname>

# More: http://docs.sqlalchemy.org/en/latest/dialects/index.html
  

3. New Model

    model = SampleModel()
model.name = 'test_name'
db.session.add(model)
db.session.commit()

# INSERT INTO user (name, email, ...) VALUES ('test_name', '[email protected]', ...)
  

5. 查询所有

    SampleModel.query.filter(SampleModel.id == id).all()
# SELECT * FROM [table_name] WHERE id = [id];
  

7. 查询删除

    SampleModel.query.filter(SampleModel.id == id).delete()
db.session.delete(obj)

# DELETE FROM [table_name] WHERE [table_name].id = [id];
  

9. order_by

    SampleModel.query.order_by(SampleModel.total.desc()).all()
# SELECT * FROM [table_name] ORDER BY [table_name].total DESC;
# SELECT * FROM [table_name] ORDER BY [table_name].total DESC, [table_name].count ASC;
  

11. 行锁

    SampleModel.query.filter(SampleModel.id == id).with_for_update().first()
# SELECT * FROM [table_name] WHERE [table_name].id = [id] LIMIT 1 FOR UPDATE;
  

13. or_

    from sqlalchemy import or_
SampleModel.query.filter(
  or_(
    SampleModel.id == id,
    SampleModel.name == name
  )
).first()
# SELECT * FROM [table_name] WHERE [table_name].id = [id] OR [table_name].name = [name];
  

15. func.avg 和 func.count

    from sqlalchemy import func
SampleModel.query(
  func.avg(SampleModel.price).label('average_price'),
  func.count(SampleModel.order_id).label('order_count')
).outerjoin(
  Order, Order.order_id == SampleModel.order_id
).group_by(
  SampleModel.id
).all()
  

17. func.concat

    from sqlalchemy import func
SampleModel.query.filter(
  func.concat(SampleModel.first_name, ' ', SampleModel.last_name) == 'First Last'
).first()
# SELECT * FROM sample_table WHERE concat(sample_table.first_name, ' ', sample_table.last_name) = 'First Last' LIMIT 1;
  

19. Date filter

    q = q.filter(func.date(Order.shipping_date) >= datetime.datetime.strptime(start_date, "%Y-%m-%d"))

class User:
    created = Column(DateTime)

session.query(
  func.date_format(User.created, "%Y-%m-%d %H:%i:%s") // MySQL,
  func.strftime(User.created, "%Y-%m-%d %H:%M:%S") // SQLite
)
  

21. dynamic filter

    filter_exps = []
for word in words.split(','):
    filter_exps.append(
        User.name.like("%{}%".format(word))
    )
q = q.filter(or_(*filter_exps))

# SQL 通配符: %	替代 0 个或多个字符
  

23. alias

    from sqlalchemy.orm import aliased
user1 = aliased(User)
user2 = aliased(User)
q = db.session.query(
    *Order.__table__.columns, user1.name.label('name1'), user2.name.label('name2')
).outerjoin(
    user1, user1.user_id == Order.user1_id
).outerjoin(
    user2, user2.user_id == Order.user2_id
).first()
# SELECT order.order_id, user_1.name AS name1, user_2.name AS name2 FROM order LEFT OUTER JOIN user AS user_1 ON user_1.user_id = order.user1_id LEFT OUTER JOIN user AS user_2 ON user_2.user_id = order.user2_id LIMIT 1
  

25. concat label/alias filter

    filter_compose_ids = [1, 2, 3]
compose_id = func.concat(SampleModel.from_id, '_', SampleModel.to_id).label('compose_id')
q = db.session.query(
    compose_id
).filter(
    compose_id.in_(filter_compose_ids)
).group_by(
    compose_id
).first()
# SELECT concat(sample_table.from_id, '_', sample_table.to_id) AS compose_id FROM user WHERE concat(sample_table.from_id, '_', sample_table.to_id) IN (1, 2, 3) GROUP BY concat(sample_table.from_id, '_', sample_table.to_id) LIMIT 1
  

27. case in query

    q = db.session.query()
# some process
q = q.add_columns(
  func.max(text('case when stock_num > 0 THEN 1 ELSE 0 end')).label('has_stock')
).order_by(
  text("has_stock desc"),
).all()
  

29. raw sql

    from sqlalchemy import text
q = db.session.query(
  SampleModel.price.label('pay_price')
).filter(
  text('price > 0')  # no pay_price
).order_by(
  text("pay_price desc")
).all()
# SELECT sample_table.price AS pay_price FROM user WHERE price > 0 ORDER BY pay_price desc
  

31. order by id list

    from sqlalchemy import case

user_ids = [11, 22, 33, 44]
indexes = [i for i in range(len(user_ids))]
whens = dict(zip(user_ids, indexes))  # {11: 0, 22: 1, 33: 2, 44: 3}
q = q.order_by(case(value=User.user_id, whens=whens))
# SELECT * FROM user ORDER BY CASE user.user_id WHEN 11 THEN 0 WHEN 22 THEN 1 WHEN 33 THEN 2 WHEN 44 THEN 3 END
  

33. pagination

    page = 1
limit = 20
q = q.limit(page).offset((page - 1) * limit)
  

35. one / first / scalar

    db.session.query(User).first()  # return one user or None
db.session.query(User).one()  # return one user or sqlalchemy.orm.exc.NoResultFound/sqlalchemy.orm.exc.MultipleResultsFound
db.session.query(User).scalar()  # return one user or None or sqlalchemy.orm.exc.MultipleResultsFound
  

37. filter in Array column

    // some_ids = Column(postgresql.ARRAY(Integer))
// User1.some_ids: {1,2,3,4}
// User2.some_ids: {5,6,7,8}

res = User.query.filter(User.some_ids.contains([5])).all()
res = User.query.filter(func.cardinality(User.some_ids) >= 5).all()
res = User.query.filter(User.some_ids.any(5)).all()

// [User2]
  

2. SampleModel

    import datetime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index

Base = declarative_base()

class SampleModel(Base):
    __tablename__ = 'samples'
 
    id = Column(Integer, primary_key=True)
    name = Column(String(32), index=True, nullable=False,default='xx')
    email = Column(String(32), unique=True)
    create_time = Column(DateTime, default=datetime.datetime.now)
    extra = Column(Text, nullable=True)
  

4. 查询单个

    SampleModel.query.filter(SampleModel.id == id).first()
# SELECT * FROM [table_name] WHERE id = [id] LIMIT 1;
  

6. 统计对象

    SampleModel.query.filter(SampleModel.id == id).count()
# SELECT count(*) FROM [table_name] WHERE [table_name].id = [id];
  

8. 查询更新

    SampleModel.query.filter(SampleModel.id == id).update({
  'is_finish': 1
}, synchronize_session=False)
# UPDATE [table_name] SET is_finish=1 WHERE [table_name].id = [id]

SampleModel.query.filter(SampleModel.id == id).update({
  'is_finish': 1
}, synchronize_session="fetch")
# SELECT id FROM [table_name] WHERE [table_name].id = [id];
# UPDATE [table_name] SET is_finish=1 WHERE [table_name].id = [id];

SampleModel.query.filter(SampleModel.id == id).update({
  'count': SampleModel.count + 1
}, synchronize_session="evaluate")
# UPDATE [table_name] SET count=([table_name].count + 1) WHERE [table_name].id = [id];
  

10. id in/not in list

    filter_ids = [1, 2, 3]
SampleModel.query.filter(SampleModel.id.in_(filter_ids)).all()
SampleModel.query.filter(SampleModel.id.notin_(filter_ids)).all()
# SELECT * FROM [table_name] WHERE [table_name].id IN (1, 2, 3);
# SELECT * FROM [table_name] WHERE [table_name].id NOT IN (1, 2, 3);
  

12. label

    db.session.query(SampleModel.name.label('alias_name')).first()
# SELECT [table_name].name AS alias_name FROM [table_name] LIMIT 1;
  

14. and_

    from sqlalchemy import and_
SampleModel.query.filter(
  and_(
    SampleModel.id == id,
    SampleModel.name == name
  )
).first()
# and_ in filter is default, can be omitted
# SELECT * FROM [table_name] WHERE [table_name].id = [id] AND [table_name].name = [name];
  

16. func.group_concat

    from sqlalchemy import func
SampleModel.query(
  func.avg(SampleModel.price).label('average_price'),
  func.group_concat(SampleRole.name.distinct())
).outerjoin(
  SampleRole, SampleRole.id == SampleModel.role_id
).group_by(
  SampleModel.id
).all()
# SELECT avg(sample_table.price) AS average_price, group_concat(DISTINCT sample_table.name) AS concat_names FROM sample_table LEFT OUTER JOIN sample_role_table ON sample_role_table.id = sample_table.role_id GROUP BY sample_table.id
  

18. keyword filter

    filter_keywords = "some name"
q = db.session.query(
  *SampleModel1.__table__.columns, SampleModel2.name2
).outerjoin(
  SampleModel2, SampleModel2.some_id == SampleModel1.some_id
) if filter_keywords.strip():
    q = q.filter(
        or_(
            SampleModel1.name1.like(f"%{filter_keywords}%"),
            SampleModel2.name2.like(f"%{filter_keywords}%")
        )
    )
# SELECT sample_table1.name1, sample_table2.name2 FROM sample_table1 LEFT OUTER JOIN sample_table2 ON sample_table2.some_id = sample_table1.some_id WHERE sample_table1.name1 LIKE 'some_keywords' OR sample_table2.name2 LIKE 'some_keywords'
  

20. string length filter

    q = q.filter(func.length(User.desc) > 0)
# SELECT * FROM user WHERE length(user.desc) > 0
  

22. None filter

    q = q.filter(User.desc.is_(None))
q = q.filter(User.desc.isnot(None))
# SELECT * FROM user WHERE user.desc IS NULL
# SELECT * FROM user WHERE user.desc IS NOT NULL
  

24. bigger, smaller

    from sqlalchemy import func
q = db.session.query(
    func.sum(func.greatest(SampleModel.value1 - SampleModel.value2, 0)),
    func.sum(func.least(SampleModel.value1 - SampleModel.value2, 0))
).all()
SELECT sum(greatest(sample_table.value1 - sample_table.value2, 0), sum(least(sample_table.value1 - sample_table.value2, 0) FROM user
  

26. order by text case

    User.query.order_by(text("case when ifnull(nickname, '') = '' then 0 else 1 end desc")).first()
# SELECT user.user_id AS user_user_id FROM user ORDER BY case when ifnull(nickname, '') = '' then 0 else 1 end desc LIMIT 1
  

28. having

    User.query.having(User.user_id > 0).first()
# SELECT user.user_id AS user_user_id FROM user HAVING user.user_id > 0 LIMIT 1
  

30. randon query

    from sqlalchemy import func
q = db.session.query(
  SampleModel.name
).order_by(
  func.rand()
).limit(10).all()
# SELECT sample_table.name FROM sample_table ORDER BY rand() LIMIT 10
  

32. batch add

    model1 = SampleModel()
model2 = SampleModel()
db.session.add_all([model1, model2])
  

34. flush

    model = SampleModel()
for k, v in info.items():
    setattr(model, k, v)
db.session.add(model)
db.session.flush()
  

36. data migration by batches

    while True:
    users = User.query.filter(User.level_id == 0).limit(100).all()
    if len(users) == 0:
        break
    for user in users:
        user.level_id = 1
    db.session.commit()
  

38. add column to sesion.query

    q = q.add_columns(
  User.extra_field
)
  


Maitained by Cyanhall.com, Copy Rights @ CC BY-NC-SA 4.0     ExcelRoadMap