sqlalchemy对MySQL的读写_基于sqlalchemy对mysql实现增删改查操作

3a2c5c47719654b2eb3ed26ac95e70d8.png

需求场景:

老大让我利用爬虫爬取的数据写到或更新到mysql数据库中,百度了两种方法

1 是使用pymysql连接mysql,通过操作原生的sql语句进行增删改查数据;

2 是使用sqlalchemy连接mysql,通过ORM模型建表并操作数据库,不需要写原生的sql语句,相对简单些;

以下就是本次使用sqlalchemy的经验之谈。

实现流程:连接数据库》通过模型类创建表》建立会话》执行创建表语句》通过会话进行增删改查

from sqlalchemy import exists, Column, Integer, String, ForeignKey, exists

from sqlalchemy.ext.declarative import declarative_base

from sqlalchemy import create_engine

from sqlalchemy.orm import sessionmaker

# 创建的数据库引擎

engine = create_engine("mysql+pymysql://user:pwd@ip/数据库名?charset=utf8")

#创建session类型

DBSession = sessionmaker(bind=engine)

# 实例化官宣模型 - Base 就是 ORM 模型

Base = declarative_base()

# 创建服务单表

class ServiceOrder(Base):

__tablename__ = 'serviceOrderTable'

id = Column(Integer, primary_key=True, autoincrement=True)

serviceOrderId = Column(String(32), nullable=False, index=True, comment='服务单ID')

serviceDesc = Column(String(268), comment='服务说明')

oneLevelName = Column(String(32), comment='C类别')

twoLevelName = Column(String(32), comment='T子类')

threeLevelName = Column(String(32), comment='I项目')

fourLevelName = Column(String(32), comment='S子项')

transferTimes = Column(String(32), comment='转派次数')

overDueStatus = Column(String(32), comment='过期状态')

serviceTimeLimit = Column(String(32), comment='服务时限')

serTimeLimitTypeName = Column(String(16), comment='时限类型')

# 一对多:

# serviceWorkOrder = relationship("ServiceWorkOrder", backref="serviceorder")

# 多对一:多个服务工单可以属于服务单

class ServiceWorkOrder(Base):

__tablename__ = 'serviceWorkOrderTable'

id = Column(Integer, primary_key=True, autoincrement=True)

serviceWorkOrderId = Column(String(32), nullable=False, index=True, comment='服务工单ID')

workOrderName = Column(String(268), comment='工单名称')

fromId = Column(String(32), comment='服务单ID')

createUserSectionName = Column(String(32), comment='创建人室')

createUserName = Column(String(32), comment='创建人')

handlerName = Column(String(32), comment='处理人')

statusName = Column(String(32), comment='工单状态')

createTime = Column(String(32), comment='创建时间')

# “多”的一方的book表是通过外键关联到user表的:

# serviceOrder_id = Column(Integer, ForeignKey('serviceOrderTable.id'))

# 创建数据库 如果数据库已存在 则不会创建 会根据库名直接连接已有的库

def init_db():

Base.metadata.create_all(engine)

def drop_db():

Base.metadata.drop_all(engine)

def insert_update():

# all_needed_data_lists 是需要插入数据库的数据 格式[{key: value, ... }, { }, { }...]

for item in all_needed_data_lists:

ServiceOrderRow = ServiceOrder(serviceOrderId=item['serviceOrderId'],

serviceDesc=item['serviceDesc'],

oneLevelName=item['oneLevelName'],

twoLevelName=item['twoLevelName'],

threeLevelName=item['threeLevelName'],

fourLevelName=item['fourLevelName'],

transferTimes=item['transferTimes'],

overDueStatus=item['overDueStatus'],

serviceTimeLimit=item['serviceTimeLimit'],

serTimeLimitTypeName=item['serTimeLimitTypeName'],

)

try:

# 利用exists判断目标对象是否存在,返回True或Faults

it_exists = session.query(

exists().where(ServiceOrder.serviceOrderId == item['serviceOrderId'] )

).scalar()

except Exception as e:

self.log.error(e)

break

try:

# 如果不存在,进行新增;存在的话就更新现存的数据

if not it_exists:

session.add(ServiceOrderRow)

else:

session.query(ServiceOrder).filter(ServiceOrder.serviceOrderId == item['serviceOrderId'])\

.update(item)

except Exception as e:

self.log.error(e)

break

try:

session.commit()

self.log.info('数据更新成功!')

except:

session.rollback()

self.log.info('数据更新失败!')

if __name__ == "__main__":

# 创建数据库 如果数据库已存在 则不会创建 会根据库名直接连接已有的库

init_db()

# 创建session对象,进行增删改查:

session = DBSession()

# 利用session 增 改数据 记得提交

insert_update()

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。


版权声明:本文为weixin_32484897原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。