准备
工具:
- Navicat for mysql(熟悉mysql的操作也可以不用)
- pyCharm(可以用其他的编辑器)
数据:
history
details
hotsearch
项目

获取数据的过程:
- 在main.js中用ajax请求数据,url是app.py中的路由
- 由app.py中指定的路由下的def函数处理utils.py传来的数据
- utils.py负责连接数据库和获取数据
main.js
// 获取时间
function getTime() {
$.ajax({
url: "/time",
// timeout: 10000, // 超时时间
success: function (data) {
$(".showTime").html(data)
},
error: function (err) {
console.log(err)
}
})
}
app.py
# 获取当前时间
@app.route('/time')
def get_time():
return utils.get_time()
utils.py
import time
# 获取当前时间
def get_time():
time_str = time.strftime("%Y{}%m{}%d{} %X")
return time_str.format("年","月","日")
main.js
// 获取时间
function getTime() {
$.ajax({
url: "/time",
// timeout: 10000, // 超时时间
success: function (data) {
$(".showTime").html(data)
},
error: function (err) {
console.log(err)
}
})
}
// 获取疫情数据
function getCovCountData() {
$.ajax({
url: "/covCountData",
success: function (data) {
$(".no-num ul li").eq(0).text(data.confirm);
$(".no-num ul li").eq(1).text(data.suspect);
$(".no-num ul li").eq(2).text(data.heal);
$(".no-num ul li").eq(3).text(data.dead);
},
error: function (err) {
console.log(err)
}
})
}
// 获取地图数据
function getCovMapData() {
$.ajax({
url: "/covMapData",
success: function (data) {
map_option.series[0].data = data.data;
myMap.setOption(map_option)
},
error: function (err) {
console.log(err)
}
})
}
// 获取全国疫情累计数据
function getCovTotalData() {
$.ajax({
url: "/covTotalData",
success: function (data) {
left_top_option.xAxis.data = data.day;
left_top_option.series[0].data = data.confirm;
left_top_option.series[1].data = data.suspect;
left_top_option.series[2].data = data.heal;
left_top_option.series[3].data = data.dead;
cov_lefttop_chart.setOption(left_top_option)
},
error: function (err) {
console.log(err)
}
})
}
// 获取全国疫情新增数据
function getCovNewData() {
$.ajax({
url: "/covNewData",
success: function (data) {
left_bottom_option.xAxis.data = data.day;
left_bottom_option.series[0].data = data.confirm_add;
left_bottom_option.series[1].data = data.suspect_add;
cov_leftbottom_chart.setOption(left_bottom_option)
},
error: function (err) {
console.log(err)
}
})
}
// 获取全国疫情严重地区数据
function getCovSeriousData() {
$.ajax({
url: "/covSeriousData",
success: function (data) {
right_top_option.xAxis[0].data = data.city;
right_top_option.series[0].data = data.city_confirm;
cov_righttop_chart.setOption(right_top_option)
},
error: function (err) {
console.log(err)
}
})
}
// 获取热搜云词数据
function getCovCloudData() {
$.ajax({
url: "/covCloudData",
success: function (data) {
cloud_option.series[0].data = data.keyWords;
cov_cloud_chart.setOption(cloud_option)
},
error: function (err) {
console.log(err)
}
})
}
getTime();
getCovCountData();
getCovMapData();
getCovTotalData();
getCovNewData();
getCovSeriousData();
getCovCloudData();
setInterval(getTime,1000);
setInterval(getCovCountData,100000);
setInterval(getCovMapData,100000);
setInterval(getCovTotalData,100000);
setInterval(getCovNewData,100000);
setInterval(getCovSeriousData,100000);
setInterval(getCovCloudData,100000);
app.py
from flask import Flask
from flask import render_template
from flask import jsonify
from jieba.analyse import extract_tags
import utils
import string
app = Flask(__name__)
# 开始页面
@app.route('/')
def go_main():
return render_template("main.html")
# 获取当前时间
@app.route('/time')
def get_time():
return utils.get_time()
# 获取各情况累计人数数据
@app.route('/covCountData')
def get_cov_count_data():
data = utils.get_cov_count_data()
return jsonify({"confirm": str(data[0]), "suspect": data[1], "heal": str(data[2]), "dead": str(data[3])})
# 获取地图数据
@app.route('/covMapData')
def get_cov_map_data():
res = []
for item in utils.get_cov_map_data():
res.append({"name":item[0], "value":int(item[1])})
return jsonify({"data":res})
# 获取全国疫情累计数据
@app.route('/covTotalData')
def get_cov_total_data():
data = utils.get_cov_total_data()
day, confirm, suspect, heal, dead = [], [], [], [], []
for a, b, c, d, e in data[0:]:
day.append(a.strftime("%m-%d"))
confirm.append(b)
suspect.append(c)
heal.append(d)
dead.append(e)
return jsonify({"day": day, "confirm": confirm, "suspect": suspect, "heal": heal, "dead": dead})
# 获取全国疫情新增数据
@app.route('/covNewData')
def get_cov_new_data():
data = utils.get_cov_new_data()
day, confirm_add, suspect_add = [], [], []
for a, b, c in data[0:]:
day.append(a.strftime("%m-%d"))
confirm_add.append(b)
suspect_add.append(c)
return jsonify({"day": day, "confirm_add": confirm_add, "suspect_add": suspect_add})
# 获取全国疫情严重地区数据
@app.route('/covSeriousData')
def get_cov_serious_data():
data = utils.get_cov_serious_data()
city, city_confirm = [], []
for a, b in data[0:]:
city.append(a)
city_confirm.append(int(b))
return jsonify({"city": city, "city_confirm": city_confirm})
# 获取热搜词云数据
@app.route('/covCloudData')
def get_cov_cloud_data():
data = utils.get_cov_cloud_data()
c = []
for i in data:
k = i[0].rstrip(string.digits) # 移除热搜数字
v = i[0][len(k):] # 获取热搜数字
ks = extract_tags(k) # 使用jieba提取关键字
for j in ks:
if not j.isdigit():
c.append({"name": j, "value": v})
return jsonify({"keyWords": c})
if __name__ == '__main__':
app.run()
utils.py
import time
import pymysql
# 获取当前时间
def get_time():
time_str = time.strftime("%Y{}%m{}%d{} %X")
return time_str.format("年","月","日")
# 获取数据库疫情数据
# 连接数据库
def get_conn():
# 建立连接
conn = pymysql.connect(host="127.0.0.1",
user="root",
password="123456",
db="cov2019",
charset="utf8")
# 创建游标
cursor = conn.cursor()
return conn, cursor
# 关闭数据库
def close_conn(conn, cursor):
if cursor:
cursor.close()
if conn:
conn.close()
# 查询
def query(sql, *args):
conn, cursor = get_conn()
cursor.execute(sql, args)
res = cursor.fetchall()
close_conn(conn, cursor)
return res
# 获取数据库疫情数据
def get_cov_count_data():
sql = "select sum(confirm)," \
"(select suspect from history order by ds desc limit 1)," \
"sum(heal)," \
"sum(dead)" \
"from details " \
"where update_time=(select update_time from details order by update_time desc limit 1)"
res = query(sql)
return res[0]
# 获取地图数据
def get_cov_map_data():
sql = "select province, sum(confirm) from details " \
"where update_time=(select update_time from details " \
"order by update_time desc limit 1)" \
"group by province"
res = query(sql)
return res
# 获取全国疫情累计数据
def get_cov_total_data():
sql = "select ds, confirm, suspect, heal, dead from history"
res = query(sql)
return res
# 获取全国疫情新增数据
def get_cov_new_data():
sql = "select ds, confirm_add, suspect_add from history"
res = query(sql)
return res
# 获取全国疫情严重地区数据
def get_cov_serious_data():
sql = 'select city, confirm ' \
'from (select city, confirm from details ' \
'where update_time=(select update_time from details ' \
'order by update_time desc limit 1)' \
'and province not in("湖北","北京","上海","天津","重庆")' \
'and city not in ("地区待确认")' \
'union all ' \
'select province as city, sum(confirm) as confirm ' \
'from details ' \
'where update_time=(select update_time from details ' \
'order by update_time desc limit 1)' \
'and province in ("北京","上海","天津","重庆") group by province) as a ' \
'order by confirm desc limit 5'
res = query(sql)
return res
# 获取热搜云词数据
def get_cov_cloud_data():
sql = 'select content from hotsearch ' \
'order by id desc limit 20'
res = query(sql)
return res
# 用于测试
if __name__ == "__main__":
print(get_cov_cloud_data())
版权声明:本文为weixin_45068889原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。