Cyan Hall C H
Created by CyanHall.com. Last updated: 11/25/2020 👉  

1. Driver

    # psycopg2
postgresql+psycopg2://user:[email protected]:port/dbname[?key=value&key=value...]

# asyncpg
postgresql+asyncpg://user:[email protected]:port/dbname[?key=value&key=value...]

# MySQL-Python
mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>

# PyMySQL
mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]

# MySQL-Connector
mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname>

# More: http://docs.sqlalchemy.org/en/latest/dialects/index.html
  

2. SampleModel

    import datetime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index

Base = declarative_base()

class SampleModel(Base):
    __tablename__ = 'samples'
 
    id = Column(Integer, primary_key=True)
    name = Column(String(32), index=True, nullable=False,default='xx')
    email = Column(String(32), unique=True)
    create_time = Column(DateTime, default=datetime.datetime.now)
    extra = Column(Text, nullable=True)
  

3. Query one

    SampleModel.query.filter(SampleModel.id == id).first()
  

4. Query all

    SampleModel.query.filter(SampleModel.id == id).all()
  

5. Count Model

    SampleModel.query.filter(SampleModel.id == id).count()
  

6. Query one and delete

    SampleModel.query.filter(SampleModel.id == id).delete()
  

7. Query one and update

    SampleModel.query.filter(SampleModel.id == id).update({
  'is_finish': 1
}, synchronize_session=False)
  

8. order_by

    SampleModel.query.order_by(SampleModel.total.desc()).all()
  

9. id in list

    SampleModel.query.filter(SampleModel.id.in_(filter_ids)).all()
  

10. Row lock

    SampleModel.query.filter(SampleModel.id == id).with_for_update().first()
  

11. Alias

    SampleModel.query(SampleModel.name.label('alias_name')).first()
  

12. or_

    from sqlalchemy import or_
SampleModel.query.filter(
  or_(
    SampleModel.id == id,
    SampleModel.name == name
  )
).first()
  

13. and_

    from sqlalchemy import and_
SampleModel.query.filter(
  and_(
    SampleModel.id == id,
    SampleModel.name == name
  )
).first()
  

14. func.avg and func.count

    from sqlalchemy import func
SampleModel.query(
  func.avg(SampleModel.price).label('average_price'),
  func.count(SampleModel.order_id).label('order_count')
).outerjoin(
  Order, Order.order_id == SampleModel.order_id
).group_by(
  SampleModel.id
).all()
  

15. func.group_concat

    from sqlalchemy import func
SampleModel.query(
  func.avg(SampleModel.price).label('average_price'),
  func.group_concat(SampleRole.name.distinct())
).outerjoin(
  SampleRole, SampleRole.id == SampleModel.role_id
).group_by(
  SampleModel.id
).all()