sqlalchemy采用简单的Python语言,为高效和高性能的数据库访问设计,实现了完整的企业级持久模型。
安装
- 需要安装MySQLdb
- pip install sqlalchemy
安装完成后,执行
>>>import sqlalchemy
>>>sqlalchemy.__version__
连接数据库
在sqlalchemy中,session用于创建程序与数据库之间的会话。所有对象的载入和保存都需要通过session对象。
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
engine = create_engine('mysql://user:passwd@ip:port/db', echo=True)
Session = sessionmaker(bind=engine)
session = Session()
session.execute('show databases')
其中, echo
为 True
代表打开logging。
创建一个映射
一个映射对应着一个Python类,用来表示一个表的结构。下面创建一个person表,包括id和name两个字段。
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class Person(Base):
__tablename__ = 'person'
id = Column(Integer, primary_key=True)
name = Column(String(32))
def __repr__(self):
return "<Person(name='%s')>" % self.name
添加数据
#创建一个person对象
person = Person(name='jack')
#添加person对象,但是仍然没有commit到数据库
session.add(person)
#commit操作
session.commit()
如何获取id的?
>>> person = Person(name='ilis')
>>> person.id #此时还没有commit到mysql,因此无id
>>> session.add(person)
>>> person.id #同上
>>> session.commit()
2015-08-18 23:08:23,530 INFO sqlalchemy.engine.base.Engine INSERT INTO person (name) VALUES (%s)
2015-08-18 23:08:23,531 INFO sqlalchemy.engine.base.Engine ('ilis',)
2015-08-18 23:08:23,532 INFO sqlalchemy.engine.base.Engine COMMIT
>>> person.id #commit后,可以获取该对象的id
2015-08-18 23:08:27,556 INFO sqlalchemy.engine.base.Engine BEGIN (implicit)
2015-08-18 23:08:27,557 INFO sqlalchemy.engine.base.Engine SELECT person.id AS person_id, person.name AS person_name
FROM person
WHERE person.id = %s
2015-08-18 23:08:27,557 INFO sqlalchemy.engine.base.Engine (5L,)
5L
>>>
添加多个数据
session.add_all([
Person(name='jack'),
Person(name='mike')
])
session.commit()
回滚
>>> person = Person(name='test')
>>> session.add(person)
>>> session.query(person).filter(name=='test')
>>> session.query(Person).filter(Person.name=='test').all()
2015-08-18 23:13:23,265 INFO sqlalchemy.engine.base.Engine INSERT INTO person (name) VALUES (%s)
2015-08-18 23:13:23,265 INFO sqlalchemy.engine.base.Engine ('test',)
2015-08-18 23:13:23,267 INFO sqlalchemy.engine.base.Engine SELECT person.id AS person_id, person.name AS person_name
FROM person
WHERE person.name = %s
2015-08-18 23:13:23,267 INFO sqlalchemy.engine.base.Engine ('test',)
[<demo.Person object at 0x7f4e37730510>]
>>> session.rollback()
2015-08-18 23:13:37,496 INFO sqlalchemy.engine.base.Engine ROLLBACK
>>> session.query(Person).filter(Person.name=='test').all()
2015-08-18 23:13:38,690 INFO sqlalchemy.engine.base.Engine BEGIN (implicit)
2015-08-18 23:13:38,691 INFO sqlalchemy.engine.base.Engine SELECT person.id AS person_id, person.name AS person_name
FROM person
WHERE person.name = %s
2015-08-18 23:13:38,692 INFO sqlalchemy.engine.base.Engine ('test',)
[]
>>>
数据查询
使用Session的query()方法。
#获取所有数据
session.query(Person).all()
#获取某一列数据,类似于django的get,如果返回数据为多个则报错
session.query(Person).filter(Person.name=='jack').one()
#获取返回数据的第一行
session.query(Person).first()
#过滤数据
session.query(Person.name).filter(Person.id>1).all()
#limit
session.query(Person).all()[1:3]
#order by
session.query(Person).ordre_by(-Person.id)
#equal/like/in
query = session.query(Person)
query.filter(Person.id==1).all()
query.filter(Person.id!=1).all()
query.filter(Person.name.like('%ac%')).all()
query.filter(Person.id.in_([1,2,3])).all()
query.filter(~Person.id.in_([1,2,3])).all()
query.filter(Person.name==None).all()
#and or
from sqlalchemy import and_
query.filter(and_(Person.id==1, Person.name=='jack')).all()
query.filter(Person.id==1, Person.name=='jack').all()
query.filter(Person.id==1).filter(Person.name=='jack').all()
from sqlalchemy import or_
query.filter(or_(Person.id==1, Person.id==2)).all()
使用text
from sqlalchemy import text
query.filter(text("id>1")).all()
query.filter(Person.id>1).all() #同上
query.filter(text("id>:id")).params(id=1).all() #使用:,params来传参
query.from_statement(
text("select * from person where name=:name"))./
params(name='jack').all()
计数
Query使用count()函数来实现查询计数。
query.filter(Person.id>1).count()
group by的用法
from sqlalchemy import func
session.query(func.count(Person.name), Person.name),group_by(Person.name).all()
实现count(*)来查询表内行数
session.query(func.count('*')).select_from(Person).scalar()
session.query(func.count(Person.id)).scalar()