DissionPage
一、官网文档
DrissionPage GitHub 仓库地址
DrissionPage 官网文档
二、下载安装
# 安装
pip install DrissionPage
# 升级最新版
pip install DrissionPage --upgrade
# 升级指定版本升级
pip install DrissionPage==4.0.0b17
- DrissionPage 是一个基于 python 的网页自动化工具。
- 支持系统:Windows、Linux、Mac
- python版本:3.6及以上
- 支持浏览器:Chromium内核浏览器(如Chrome和Edge),电子应用
三、例子
建表语句
-- 考试题库数据表
-- auto-generated definition
create table a_secure_example_detail
(
detail_id bigint auto_increment comment '详情ID'
primary key,
info_id bigint not null comment '信息ID',
type varchar(200) not null comment '题目类型',
tag varchar(200) not null comment '题目标签',
answer varchar(200) not null comment '答案',
question varchar(2000) not null comment '题干',
option_A varchar(2000) not null comment '选项A',
option_B varchar(2000) not null comment '选项B',
option_C varchar(2000) not null comment '选项B',
option_D varchar(2000) not null comment '选项D',
status smallint not null comment '0:正常,1:失效',
create_date timestamp(6) not null comment '任务创建时间',
update_date timestamp(6) not null comment '任务更新时间'
)
comment '考试题库数据表' charset = utf8mb4;
-- 安全考试答题信息表
-- auto-generated definition
create table a_secure_example_info
(
info_id bigint auto_increment comment '信息ID'
primary key,
base_url varchar(2000) not null comment '邀请答题url',
url varchar(2000) not null comment '答题详情url',
user_name varchar(200) not null comment '答题人',
user_email varchar(500) not null comment '答题邮箱',
status smallint not null comment '0:未开始,1:进行中,2:完成',
create_date timestamp(6) not null comment '任务创建时间',
update_date timestamp(6) not null comment '任务更新时间'
)
comment '安全考试答题信息表' charset = utf8mb4;
代码
from threading import Thread
from DrissionPage import ChromiumPage
from DrissionPage.commons.constants import NoneElement
from sqlalchemy import create_engine, Column, BigInteger, String, SmallInteger, TIMESTAMP
from sqlalchemy.orm import declarative_base
from sqlalchemy.orm import sessionmaker
from datetime import datetime
Base = declarative_base()
# 创建数据库引擎
engine = create_engine('mysql://root:root@127.0.0.1:3306/spider_db', echo=True)
# 创建所有的表
Base.metadata.create_all(engine)
# 创建一个会话
Session = sessionmaker(bind=engine)
session = Session()
# 安全考试答题信息表
class SecureExampleInfo(Base):
__tablename__ = 'a_secure_example_info'
info_id = Column(BigInteger, primary_key=True, autoincrement=True)
base_url = Column(String(2000), nullable=False)
url = Column(String(2000), nullable=False)
user_name = Column(String(200), nullable=False)
user_email = Column(String(500), nullable=False)
status = Column(SmallInteger, nullable=False) # 0:未开始,1:进行中,2:完成
create_date = Column(TIMESTAMP, nullable=False)
update_date = Column(TIMESTAMP, nullable=False)
# 考试题库数据表
class SecureExampleDetail(Base):
__tablename__ = 'a_secure_example_detail'
detail_id = Column(BigInteger, primary_key=True, autoincrement=True)
info_id = Column(BigInteger, primary_key=False)
type = Column(String(200), nullable=False)
tag = Column(String(200), nullable=False)
answer = Column(String(200), nullable=False)
question = Column(String(2000), nullable=False)
option_A = Column(String(2000), nullable=False)
option_B = Column(String(2000), nullable=False)
option_C = Column(String(2000), nullable=False)
option_D = Column(String(2000), nullable=False)
status = Column(SmallInteger, nullable=False) # 0:正常,1:失效
create_date = Column(TIMESTAMP, nullable=False)
update_date = Column(TIMESTAMP, nullable=False)
def saveInfoUrl(base_url, url, user_name, user_email):
session.add(SecureExampleInfo(
base_url=base_url,
url=url,
user_name=user_name,
user_email=user_email,
status=0,
create_date=datetime.now(),
update_date=datetime.now()
))
session.commit()
def saveQuestion(details_to_insert):
# 添加对象到会话
session.add_all(details_to_insert)
# 增,删,改需要提交会话 查询不需要
session.commit()
def queryToDoTaskInfo():
result = session.query(SecureExampleInfo).filter_by(status=0).all()
return result
def updateQuestionTaskInfo(info_id, status):
session.query(SecureExampleInfo).filter(SecureExampleInfo.info_id == info_id).update({
"status": status,
"update_date": datetime.now()
})
session.commit()
def getAnswer(detail_id):
return session.query(SecureExampleDetail).filter_by(detail_id=detail_id).first()
def getQuestion(type, question):
result = session.query(SecureExampleDetail).filter(
SecureExampleDetail.type == type,
SecureExampleDetail.status == 0,
SecureExampleDetail.question == question
).all()
return result
def login(page, base_url, user_name, user_email, count):
tab = page.new_tab(base_url)
# 定位我知道了按钮
tab.ele('#btn-introduction').click()
# 输入姓名邮箱
tab.ele('xpath://input[@placeholder="请输入姓名"]').input(user_name)
tab.ele('xpath://input[@placeholder="请输入邮箱"]').input(user_email)
tab.ele('#login').click()
# 点击重新作答
tab.ele('xpath://div[@class="exam-end-btn"]/a[1]').click()
# 定位我知道了按钮
tab.ele('#btn-introduction').click()
tab.ele('#toExam').click()
if count > 0: # 立即交卷获取题库
saveNowInfoUrl(tab, base_url, user_name, user_email)
login(page, base_url, user_name, user_email, --count)
elif count == 0: # 递归立即交卷结束
return
else: # 登陆后自动答题
automateAnswer(tab)
saveNowInfoUrl(page, base_url, user_name, user_email)
return
def saveNowInfoUrl(tab, base_url, user_name, user_email):
# 交卷
tab.ele('xpath://div[@class="question-panels"]/div/div[2]/button[1]').click()
# 查看题目详情
tab.ele('xpath://div[@class="modal"]/div/div/div[@class="modal-footer"]/button[2]').click()
# 保存题目详情信息url
saveInfoUrl(base_url, tab.url, user_name, user_email)
def saveDetailUrl(page, url, info_id):
print("========> 开始执行 info_id:" + str(info_id))
updateQuestionTaskInfo(info_id, 1)
# 创建页面对象,并启动或接管浏览器
tab = page.new_tab(url)
x = 'xpath:'
q_xpath = '//div[@id="report-answer"]/div[@class="question" or @class="question "]'
size = tab.eles(x + q_xpath).__len__()
details_to_insert = []
for index in range(1, size + 1):
# print(index)
option_C = ''
option_D = ''
if index <= 40:
q_c = tab.ele(x + q_xpath + '[' + str(index) + ']/ul/li[3]')
q_d = tab.ele(x + q_xpath + '[' + str(index) + ']/ul/li[4]')
if not isinstance(q_c, NoneElement):
option_C = q_c.text
if not isinstance(q_d, NoneElement):
option_D = q_d.text
details_to_insert.append(
SecureExampleDetail(
info_id=info_id,
type=tab.ele(
x + q_xpath + '[' + str(index) + ']/div[1]/span[1]').text,
tag=tab.ele(
x + q_xpath + '[' + str(index) + ']/div[1]/span[2]').text,
answer=tab.ele(
x + q_xpath + '[' + str(index) + ']/div[4]/pre').text,
question=tab.ele(x + q_xpath + '[' + str(
index) + ']/div[2]/pre' + ' | ' + q_xpath + '[' + str(
index) + ']/div[2]/p').text,
option_A=tab.ele(
x + q_xpath + '[' + str(index) + ']/ul/li[1]').text,
option_B=tab.ele(
x + q_xpath + '[' + str(index) + ']/ul/li[2]').text,
option_C=option_C,
option_D=option_D,
status=0,
create_date=datetime.now(),
update_date=datetime.now()))
print("========> info_id:" + str(info_id) + " 页面解析完成开始入库,入库条数:" + str(details_to_insert.__len__()))
saveQuestion(details_to_insert)
print("========> info_id:" + str(info_id) + " 入库完成")
updateQuestionTaskInfo(info_id, 2)
# TODO 补全 Xpath
def automateAnswer(tab):
print("========> 开始自动答题")
e_question = tab.ele("xpath://").text # 获取答题页面题干
e_type = tab.ele("xpath://").text # 获取答题页面类型
e_A = tab.ele("xpath://").text # 获取答题页面选项
e_B = tab.ele("xpath://").text
e_C = ''
e_D = ''
selectIndex = 0
selectIndexs = []
if e_type == '[ 判断题 ]':
e_C = tab.ele("xpath://").text
e_D = tab.ele("xpath://").text
questionList = getQuestion(e_type, e_question)
if questionList.__len__() > 0: # 可以查到
for k in questionList:
if e_type == '[ 判断题 ]':
txt = ''
if k.answer == 'A':
txt = k.option_A
if k.answer == 'B':
txt = k.option_B
if txt == e_A:
selectIndex = 1
else:
selectIndex = 2
else:
temp = [k.option_A,k.option_B,k.option_C,k.option_D]
if e_A in temp and e_B in temp and e_C in temp and e_D in temp:
if e_type == '[ 单选题 ]':
txt = ''
if k.answer == 'A': txt = k.option_A
if k.answer == 'B': txt = k.option_B
if k.answer == 'C': txt = k.option_C
if k.answer == 'D': txt = k.option_D
if txt == e_A: selectIndex = 1
if txt == e_B: selectIndex = 2
if txt == e_C: selectIndex = 3
if txt == e_D: selectIndex = 4
elif e_type == '[ 多选题 ]':
value = []
kas = list(k.answer)
for ka in kas:
if ka.answer == 'A': value.append(k.option_A)
if ka.answer == 'B': value.append(k.option_B)
if ka.answer == 'C': value.append(k.option_C)
if ka.answer == 'D': value.append(k.option_D)
if e_A in value: selectIndexs.append(1)
if e_B in value: selectIndexs.append(2)
if e_C in value: selectIndexs.append(3)
if e_D in value: selectIndexs.append(4)
else:
# 空着下一题
tab.ele("xpath://").click()
# 选择选项 selectIndex,selectIndexs
if e_type == '[ 多选题 ]':
for index in selectIndexs:
tab.ele("xpath://[" + str(index) + "]").click()
else:
tab.ele("xpath://[" + str(selectIndex) + "]").click()
index = tab.ele("xpath://").text
# 下一题
if index != 50:
tab.ele("xpath://").click()
else: # 查不到
tab.ele("xpath://").click()
if __name__ == '__main__':
base_url = 'https://XXXX'
user_name = 'XX'
user_email = 'XXX@XXX.com'
page = ChromiumPage()
page.get(base_url)
# 【1】获取 url 执行3次
# login(page, base_url, user_name, user_email, 3)
# 【2】执行已交卷的待入库题目
secureExampleInfoList = queryToDoTaskInfo()
for item in secureExampleInfoList:
# 多线程同时处理多个页面
Thread(target=saveDetailUrl, args=(page, item.url, item.info_id)).start()
# 【3 自动答题
# login(base_url, user_name, user_email, -1)