Flask 小项目——可视化数据

准备

工具:

  • Navicat for mysql(熟悉mysql的操作也可以不用)
  • pyCharm(可以用其他的编辑器)

数据:
history

details

hotsearch

项目

获取数据的过程:

  1. 在main.js中用ajax请求数据,url是app.py中的路由
  2. 由app.py中指定的路由下的def函数处理utils.py传来的数据
  3. utils.py负责连接数据库和获取数据

main.js

// 获取时间
function getTime() {
  $.ajax({
    url: "/time",
    // timeout: 10000,  // 超时时间
    success: function (data) {
      $(".showTime").html(data)
    },
    error: function (err) {
      console.log(err)
    }
  })
}

app.py

# 获取当前时间
@app.route('/time')
def get_time():
    return utils.get_time()

utils.py

import time

# 获取当前时间
def get_time():
    time_str = time.strftime("%Y{}%m{}%d{} %X")
    return time_str.format("年","月","日")

 

main.js

// 获取时间
function getTime() {
  $.ajax({
    url: "/time",
    // timeout: 10000,  // 超时时间
    success: function (data) {
      $(".showTime").html(data)
    },
    error: function (err) {
      console.log(err)
    }
  })
}

// 获取疫情数据
function getCovCountData() {
  $.ajax({
    url: "/covCountData",
    success: function (data) {
      $(".no-num ul li").eq(0).text(data.confirm);
      $(".no-num ul li").eq(1).text(data.suspect);
      $(".no-num ul li").eq(2).text(data.heal);
      $(".no-num ul li").eq(3).text(data.dead);
    },
    error: function (err) {
      console.log(err)
    }
  })
}

// 获取地图数据
function getCovMapData() {
  $.ajax({
    url: "/covMapData",
    success: function (data) {
      map_option.series[0].data = data.data;
      myMap.setOption(map_option)
    },
    error: function (err) {
      console.log(err)
    }
  })
}

// 获取全国疫情累计数据
function getCovTotalData() {
  $.ajax({
    url: "/covTotalData",
    success: function (data) {
      left_top_option.xAxis.data = data.day;
      left_top_option.series[0].data = data.confirm;
      left_top_option.series[1].data = data.suspect;
      left_top_option.series[2].data = data.heal;
      left_top_option.series[3].data = data.dead;
      cov_lefttop_chart.setOption(left_top_option)
    },
    error: function (err) {
      console.log(err)
    }
  })
}

// 获取全国疫情新增数据
function getCovNewData() {
  $.ajax({
    url: "/covNewData",
    success: function (data) {
      left_bottom_option.xAxis.data = data.day;
      left_bottom_option.series[0].data = data.confirm_add;
      left_bottom_option.series[1].data = data.suspect_add;
      cov_leftbottom_chart.setOption(left_bottom_option)
    },
    error: function (err) {
      console.log(err)
    }
  })
}

// 获取全国疫情严重地区数据
function getCovSeriousData() {
  $.ajax({
    url: "/covSeriousData",
    success: function (data) {
      right_top_option.xAxis[0].data = data.city;
      right_top_option.series[0].data = data.city_confirm;
      cov_righttop_chart.setOption(right_top_option)
    },
    error: function (err) {
      console.log(err)
    }
  })
}

// 获取热搜云词数据
function getCovCloudData() {
  $.ajax({
    url: "/covCloudData",
    success: function (data) {
      cloud_option.series[0].data = data.keyWords;
      cov_cloud_chart.setOption(cloud_option)
    },
    error: function (err) {
      console.log(err)
    }
  })
}

getTime();
getCovCountData();
getCovMapData();
getCovTotalData();
getCovNewData();
getCovSeriousData();
getCovCloudData();

setInterval(getTime,1000);
setInterval(getCovCountData,100000);
setInterval(getCovMapData,100000);
setInterval(getCovTotalData,100000);
setInterval(getCovNewData,100000);
setInterval(getCovSeriousData,100000);
setInterval(getCovCloudData,100000);

app.py

from flask import Flask
from flask import render_template
from flask import jsonify
from jieba.analyse import extract_tags
import utils
import string

app = Flask(__name__)

# 开始页面
@app.route('/')
def go_main():
    return render_template("main.html")

# 获取当前时间
@app.route('/time')
def get_time():
    return utils.get_time()

# 获取各情况累计人数数据
@app.route('/covCountData')
def get_cov_count_data():
    data = utils.get_cov_count_data()
    return jsonify({"confirm": str(data[0]), "suspect": data[1], "heal": str(data[2]), "dead": str(data[3])})

# 获取地图数据
@app.route('/covMapData')
def get_cov_map_data():
    res = []
    for item in utils.get_cov_map_data():
        res.append({"name":item[0], "value":int(item[1])})
    return jsonify({"data":res})

# 获取全国疫情累计数据
@app.route('/covTotalData')
def get_cov_total_data():
    data = utils.get_cov_total_data()
    day, confirm, suspect, heal, dead = [], [], [], [], []
    for a, b, c, d, e in data[0:]:
        day.append(a.strftime("%m-%d"))
        confirm.append(b)
        suspect.append(c)
        heal.append(d)
        dead.append(e)
    return jsonify({"day": day, "confirm": confirm, "suspect": suspect, "heal": heal, "dead": dead})

# 获取全国疫情新增数据
@app.route('/covNewData')
def get_cov_new_data():
    data = utils.get_cov_new_data()
    day, confirm_add, suspect_add = [], [], []
    for a, b, c in data[0:]:
        day.append(a.strftime("%m-%d"))
        confirm_add.append(b)
        suspect_add.append(c)
    return jsonify({"day": day, "confirm_add": confirm_add, "suspect_add": suspect_add})

# 获取全国疫情严重地区数据
@app.route('/covSeriousData')
def get_cov_serious_data():
    data = utils.get_cov_serious_data()
    city, city_confirm = [], []
    for a, b in data[0:]:
        city.append(a)
        city_confirm.append(int(b))
    return jsonify({"city": city, "city_confirm": city_confirm})

# 获取热搜词云数据
@app.route('/covCloudData')
def get_cov_cloud_data():
    data = utils.get_cov_cloud_data()
    c = []
    for i in data:
        k = i[0].rstrip(string.digits)   # 移除热搜数字
        v = i[0][len(k):]   # 获取热搜数字
        ks = extract_tags(k)   # 使用jieba提取关键字
        for j in ks:
            if not j.isdigit():
                c.append({"name": j, "value": v})
    return jsonify({"keyWords": c})

if __name__ == '__main__':
    app.run()

utils.py

import time
import pymysql

# 获取当前时间
def get_time():
    time_str = time.strftime("%Y{}%m{}%d{} %X")
    return time_str.format("年","月","日")

# 获取数据库疫情数据
# 连接数据库
def get_conn():
    # 建立连接
    conn = pymysql.connect(host="127.0.0.1",
                           user="root",
                           password="123456",
                           db="cov2019",
                           charset="utf8")
    # 创建游标
    cursor = conn.cursor()
    return conn, cursor

# 关闭数据库
def close_conn(conn, cursor):
    if cursor:
        cursor.close()
    if conn:
        conn.close()

# 查询
def query(sql, *args):
    conn, cursor = get_conn()
    cursor.execute(sql, args)
    res = cursor.fetchall()
    close_conn(conn, cursor)
    return res

# 获取数据库疫情数据
def get_cov_count_data():
    sql = "select sum(confirm)," \
           "(select suspect from history order by ds desc limit 1)," \
           "sum(heal)," \
           "sum(dead)" \
           "from details " \
           "where update_time=(select update_time from details order by update_time desc limit 1)"
    res = query(sql)
    return res[0]

# 获取地图数据
def get_cov_map_data():
    sql = "select province, sum(confirm) from details " \
          "where update_time=(select update_time from details " \
          "order by update_time desc limit 1)" \
          "group by province"
    res = query(sql)
    return res

# 获取全国疫情累计数据
def get_cov_total_data():
    sql = "select ds, confirm, suspect, heal, dead from history"
    res = query(sql)
    return res

# 获取全国疫情新增数据
def get_cov_new_data():
    sql = "select ds, confirm_add, suspect_add from history"
    res = query(sql)
    return res

# 获取全国疫情严重地区数据
def get_cov_serious_data():
    sql = 'select city, confirm ' \
          'from (select city, confirm from details ' \
          'where update_time=(select update_time from details ' \
          'order by update_time desc limit 1)' \
          'and province not in("湖北","北京","上海","天津","重庆")' \
          'and city not in ("地区待确认")' \
          'union all ' \
          'select province as city, sum(confirm) as confirm ' \
          'from details ' \
          'where update_time=(select update_time from details ' \
          'order by update_time desc limit 1)' \
          'and province in ("北京","上海","天津","重庆") group by province) as a ' \
          'order by confirm desc limit 5'
    res = query(sql)
    return res

# 获取热搜云词数据
def get_cov_cloud_data():
    sql = 'select content from hotsearch ' \
          'order by id desc limit 20'
    res = query(sql)
    return res

# 用于测试
if __name__ == "__main__":
    print(get_cov_cloud_data())

版权声明:本文为weixin_45068889原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。