1、创建一个数据库并新建数据库表(根据需要导入的数据筛选列,可建多个表)
2、将数据写入数据库(手动选择数据库及数据源等)
3、编写SQL查询语句,并将其写入VBA并插入新建的EXCEL文件中
以实用为主,不讲美观,持续优化
注释比较少,见谅
图1:创建数据库并写入数据
图2:使用VBA+SQL读取数据(数据库、csv文件、excel文件)
#!/usr/bin/env python # -*- coding:utf-8 -*- # @FileName :UI.py # @Time :2022/3/8 14:45 # @Author :ziqihualai import datetime import os import re import sqlite3 import sys import chardet import numpy as np import pandas as pd import qdarkstyle as qdarkstyle import win32com.client import xlrd from PyQt5 import QtWidgets from PyQt5.QtCore import Qt from PyQt5.QtGui import QIcon, QStandardItemModel, QFont, QBrush from PyQt5.QtWidgets import QWidget, QVBoxLayout, QLabel, QGridLayout, QLineEdit, QHBoxLayout, \ QPushButton, QComboBox, QFormLayout, QGroupBox, QTabWidget, QListWidget, QStackedWidget, QMessageBox, QStatusBar, \ QAbstractItemView, QHeaderView, QListWidgetItem, QTableWidget, QTableWidgetItem, QFileDialog, QPlainTextEdit, \ QDialog, QRadioButton, QMainWindow, QApplication, QCheckBox, QTreeWidget, QTreeWidgetItem class MainWindow(QMainWindow): def __init__(self, parent=None): super(MainWindow, self).__init__(parent) self.setup_ui() def setup_ui(self): self.setObjectName("MainWindow") # 对象名称(objectName):首字母大写 self.resize(QApplication.desktop().width(), QApplication.desktop().height() - 100) # 设置窗口尺寸 self.setWindowTitle('数据分析') # 窗口标题(windowSystemTitle): self.setWindowIcon(QIcon('th.png')) # 应用图标(windowIcon): self.main_widget = QWidget() self.tab_widget = QTabWidget() self.db_widget = QWidget() self.hlayout_1 = QHBoxLayout() self.hlayout_1.addWidget(self.create_db_groupbox()) self.hlayout_1.addWidget(self.write_into_db_groupbox()) self.db_widget.setLayout(self.hlayout_1) self.db_vba_widget = QWidget() self.glayout_1 = QGridLayout() self.glayout_1.addWidget(self.select_from_groupbox(), 0, 0, 1, 1) self.glayout_1.addWidget(self.condition_groupbox(), 1, 0, 1, 1) self.glayout_1.addWidget(self.vba_groupbox(), 0, 1, 2, 1) self.db_vba_widget.setLayout(self.glayout_1) self.tab_widget.addTab(self.db_widget, "数据库读写") self.tab_widget.addTab(self.db_vba_widget, "VBA+SQL") self.glayout = QGridLayout() self.glayout.addWidget(self.select_table_groupbox(), 0, 0, 1, 3) self.glayout.addWidget(self.select_cols_groupbox(), 1, 0, 1, 1) self.glayout.addWidget(self.tab_widget, 1, 1, 1, 2) self.main_widget.setLayout(self.glayout) self.setCentralWidget(self.main_widget) def init_status(self): self.sBar = QStatusBar() self.setStatusBar(self.sBar) # 选择数据库及数据表 def select_table_groupbox(self): self.select_table_groupbox = QGroupBox("1.选择数据库和数据表") self.select_table_groupbox.setMaximumHeight(500) self.data_path_btn1 = QPushButton("选择数据库") self.data_path_btn1.setMinimumWidth(150) self.data_path_edit1 = QLineEdit() self.data_path_edit1.setPlaceholderText("请输入数据库/数据表的地址(*.xls *.xlsx *.csv *.db)") self.table_combo1 = QComboBox() self.table_combo1.setMinimumWidth(150) self.table_edit1 = QLineEdit() self.data_path_btn1.clicked.connect( lambda: on_select_data(self.data_path_edit1, self.table_combo1)) self.data_path_btn1.clicked.connect( lambda: on_get_data(self.data_path_edit1, self.table_combo1, self.table_edit1, self.tablewidget_1, self.listwidget_1, self.listwidget)) self.table_combo1.activated.connect( lambda: on_get_data(self.data_path_edit1, self.table_combo1, self.table_edit1, self.tablewidget_1, self.listwidget_1, self.listwidget)) self.data_path_btn2 = QPushButton("选择数据库") self.data_path_btn2.setMinimumWidth(150) self.data_path_edit2 = QLineEdit() self.data_path_edit2.setPlaceholderText("请输入需要导入数据库的地址(后缀.db)") self.table_combo2 = QComboBox() self.table_combo2.setMinimumWidth(150) self.table_edit2 = QLineEdit() self.data_path_btn2.clicked.connect( lambda: on_select_data(self.data_path_edit2, self.table_combo2)) self.data_path_btn2.clicked.connect( lambda: on_get_data(self.data_path_edit2, self.table_combo2, self.table_edit2, self.tablewidget_2, self.listwidget_2, self.listwidget)) self.table_combo2.activated.connect( lambda: on_get_data(self.data_path_edit2, self.table_combo2, self.table_edit2, self.tablewidget_2, self.listwidget_2, self.listwidget)) self.tablewidget_1 = QTableWidget() # 显示数据源前2行数据 self.tablewidget_1.setMaximumHeight(150) # 禁止编辑 self.tablewidget_1.setEditTriggers(QAbstractItemView.NoEditTriggers) self.tablewidget_1.horizontalHeader().setSectionResizeMode(QHeaderView.Stretch) # 所有列自动拉伸,充满界面 self.tablewidget_2 = QTableWidget() # 显示数据源前2行数据 self.tablewidget_2.setMaximumHeight(150) # 禁止编辑 self.tablewidget_2.setEditTriggers(QAbstractItemView.NoEditTriggers) self.tablewidget_2.horizontalHeader().setSectionResizeMode(QHeaderView.Stretch) # 所有列自动拉伸,充满界面 self.flayout = QFormLayout() self.flayout.addRow(self.data_path_btn1, self.data_path_edit1) self.flayout.addRow(self.table_combo1, self.table_edit1) self.flayout.addRow(self.tablewidget_1) self.flayout.addRow(self.data_path_btn2, self.data_path_edit2) self.flayout.addRow(self.table_combo2, self.table_edit2) self.flayout.addRow(self.tablewidget_2) self.select_table_groupbox.setLayout(self.flayout) return self.select_table_groupbox # 选择数据列 def select_cols_groupbox(self): self.select_cols_groupbox = QGroupBox("""2.选择数据列:双击选择要导入数据表的列名:(不选择则默认全部导入)""") self.select_cols_groupbox.setMaximumWidth(500) self.listwidget_1 = QListWidget() # 显示数据源表头 self.listwidget_1.setFixedWidth(150) self.listwidget_2 = QListWidget() # 显示数据源表头 self.listwidget_2.setFixedWidth(150) self.listwidget = QListWidget() # 选择需要的列 self.listwidget.setFixedWidth(150) self.listwidget_1.doubleClicked.connect(lambda: on_change_lv(0, self.listwidget, self.listwidget_1)) self.listwidget_2.doubleClicked.connect(lambda: on_change_lv(0, self.listwidget, self.listwidget_2)) self.listwidget.doubleClicked.connect(lambda: on_change_lv(1, self.listwidget)) self.hlayout_1 = QHBoxLayout() self.hlayout_1.addWidget(self.listwidget_1) self.hlayout_1.addWidget(self.listwidget_2) self.hlayout_1.addWidget(self.listwidget) self.select_cols_groupbox.setLayout(self.hlayout_1) return self.select_cols_groupbox # TabWidget1 # 创建数据库及数据表 def create_db_groupbox(self): self.create_db_groupbox = QGroupBox("3.创建数据库及数据表") self.db_path_btn = QPushButton("数据库地址:") self.db_path_btn.clicked.connect(lambda: on_open_folder(self.db_path_edit)) self.db_path_edit = QLineEdit() self.db_path_edit.setPlaceholderText("默认为当前文件夹") self.db_path_edit.setMinimumWidth(300) self.db_name = QLabel("数据库名称:") self.db_name_edit = QLineEdit() self.db_name_edit.setPlaceholderText("请输入数据库名称(不带拓展名)") self.table_name_lab = QLabel("数据表名称:") self.table_name_edit = QLineEdit() self.table_name_edit.setPlaceholderText("请输入数据表的名字,如果已存在,将弹出消息框") self.col_name_btn = QPushButton("导入选择的列") self.col_name_edit = QPlainTextEdit() self.col_name_edit.setPlaceholderText("列名不能以数字开头、不能有特殊字符(./等)") self.col_name_btn.clicked.connect( lambda: on_import_colsname(self.listwidget, self.listwidget_1, self.col_name_edit)) self.create_db_btn = QPushButton("创建数据表") self.create_db_btn.clicked.connect( lambda: on_create_table(self.db_path_edit, self.db_name_edit, self.table_name_edit, self.col_name_edit)) self.flayout = QFormLayout() self.flayout.addRow(self.db_path_btn, self.db_path_edit) self.flayout.addRow(self.db_name, self.db_name_edit) self.flayout.addRow(self.table_name_lab, self.table_name_edit) self.flayout.addRow(self.col_name_btn, self.col_name_edit) self.flayout.addRow(self.create_db_btn) self.create_db_groupbox.setLayout(self.flayout) return self.create_db_groupbox # 写入数据库并导出查询结果 def write_into_db_groupbox(self): self.write_into_db_groupbox = QGroupBox("4.写入数据库并导出查询结果") self.import_setting_check2 = QCheckBox("导入设置文件") self.import_setting_check2.setChecked(False) self.import_setting_check2.stateChanged.connect( lambda: on_import_setting(self.import_setting_check2, self.table_name_edit2, self.db_path_edit2, self.table_name_combo2, self.date_col_combo2, self.select_edit2)) self.db_path_btn2 = QPushButton("选择数据库") self.db_path_btn2.clicked.connect(lambda: on_select_db(self.db_path_edit2, self.table_name_combo2)) self.db_path_edit2 = QLineEdit() self.db_path_edit2.setPlaceholderText("请输入数据库的地址(后缀.db)") self.table_name_combo2 = QComboBox() self.table_name_combo2.addItem("选择数据表") self.table_name_combo2.activated.connect( lambda: on_select_table(self.db_path_edit2, self.table_name_combo2, self.table_name_edit2, self.date_col_combo2)) self.table_name_edit2 = QLineEdit() self.date_col_lab2 = QLabel("选择日期列") self.date_col_combo2 = QComboBox() self.write_data_btn2 = QPushButton("选择数据源") self.write_data_btn2.clicked.connect( lambda: on_select_data(self.write_data_edit2, self.sheet_name_combo2)) self.write_data_edit2 = QLineEdit() self.sheet_name_lab2 = QLabel("选择工作表") self.sheet_name_combo2 = QComboBox() self.write_db_btn2 = QPushButton("导入数据库") self.write_db_btn2.clicked.connect( lambda: on_write_into_db(self.db_path_edit2, self.table_name_combo2, self.table_name_edit2, self.date_col_combo2, self.write_data_edit2, self.sheet_name_combo2)) self.select_btn2 = QPushButton("查询并导出") self.select_btn2.clicked.connect(lambda: on_select(self.db_path_edit2, self.select_edit2)) self.select_edit2 = QPlainTextEdit() self.flayout = QFormLayout() self.flayout.addWidget(self.import_setting_check2) self.flayout.addRow(self.db_path_btn2, self.db_path_edit2) self.flayout.addRow(self.table_name_combo2, self.table_name_edit2) self.flayout.addRow(self.date_col_lab2, self.date_col_combo2) self.flayout.addRow(self.write_data_btn2, self.write_data_edit2) self.flayout.addRow(self.sheet_name_lab2, self.sheet_name_combo2) self.flayout.addWidget(self.write_db_btn2) self.flayout.addRow(self.select_btn2, self.select_edit2) self.write_into_db_groupbox.setLayout(self.flayout) return self.write_into_db_groupbox # TabWidget2 # SELECT语句块 def select_from_groupbox(self): self.select_from_groupbox = QGroupBox("3.SELECT语句:选择列名 FROM语句:选择数据表") # self.select_groupbox.setMaximumWidth(700) self.select_btn3 = QPushButton("生成SELECT语句") self.select_btn3.clicked.connect(self.on_select_clicked) self.select_edit3 = QLineEdit() self.select_edit3.setPlaceholderText("SELECT 日期,列名1,列名2,SUM(列名3)") self.sheet_radio3 = QRadioButton() self.sheet_radio3.setChecked(True) self.col_edit3 = QLineEdit() self.sheet_radio4 = QRadioButton() self.col_edit4 = QLineEdit() self.from_btn3 = QPushButton("生成FROM语句") self.from_btn3.clicked.connect(self.on_from_clicked) self.from_edit3 = QLineEdit() self.from_edit3.setPlaceholderText("FROM 表1 as a LEFT JOIN 表2 as b ON a.列名1=b.列名2") self.flayout_3 = QFormLayout() self.flayout_3.addRow(self.select_btn3, self.select_edit3) self.flayout_3.addRow(self.sheet_radio3, self.col_edit3) self.flayout_3.addRow(self.sheet_radio4, self.col_edit4) self.flayout_3.addRow(self.from_btn3, self.from_edit3) self.select_from_groupbox.setLayout(self.flayout_3) return self.select_from_groupbox def on_select_clicked(self): num = self.listwidget.count() table_name1 = self.table_combo1.currentText() table_name2 = self.table_combo2.currentText() col_name1 = [self.listwidget_1.item(index).text() for index in range(self.listwidget_1.count())] col_name2 = [self.listwidget_2.item(index).text() for index in range(self.listwidget_2.count())] col_names = col_name1 + col_name2 print(col_names) if num: lists = [self.listwidget.item(index).text() for index in range(num)] text = "SELECT " + ",".join(lists) else: text = "SELECT *" self.select_edit3.setText(text) self.sheet_radio3.setText(table_name1) self.sheet_radio4.setText(table_name2) self.col_name_cmb.addItems(col_names) def on_from_clicked(self): main_table = self.sheet_radio3.text() connect_table = self.sheet_radio4.text() main_col = self.col_edit3.text() connect_col = self.col_edit4.text() text = "" if main_table and connect_table: if main_col and connect_col and self.sheet_radio3.isChecked(): text = "FROM " + main_table + " as a LEFT JOIN " + connect_table + " as b ON a." + main_col + "=b." + connect_col elif main_col and connect_col and self.sheet_radio4.isChecked(): text = "FROM " + main_table + " as a RIGHT JOIN " + connect_table + " as b ON a." + main_col + "=b." + connect_col else: QMessageBox.information(self, '信息', "请补齐匹配的两个列名", QMessageBox.Close) else: text = "FROM " + main_table + connect_table print(text) self.from_edit3.setText(text) # 条件语句块 def condition_groupbox(self): self.condition_groupbox = QGroupBox("4.查询条件:日期格式:T-1 ==> DATE('NOW', '-1 DAY')") self.sql_grammar_btn = QPushButton("输入SQLite查询语句") self.sql_grammar_btn.clicked.connect(self.on_show_grammar) # self.sql_grammar_dlg = SqlGrammar() # 实例化 # 行数和列数 self.rows = 1 self.cols = 4 self.tablewidget_3 = QTableWidget(self.rows, self.cols) # 表格部件 self.tablewidget_3.setHorizontalHeaderLabels(['连接运算符', '列名', '运算符', '操作数']) self.tablewidget_3.horizontalHeader().setSectionResizeMode(QHeaderView.Stretch) # 所有列自动拉伸,充满界面 for row in range(self.rows): # 第一列,连接运算符 self.and_or_cmb = QComboBox() self.and_or_cmb.addItem('AND') self.and_or_cmb.addItem('OR') self.and_or_cmb.setCurrentIndex(0) # 第二列,列名 self.col_name_cmb = QComboBox() # 第三列,运算符 self.operator_edit = QLineEdit() # 第四列,操作值 self.value_edit = QLineEdit() # self.tablewidget_3.setCellWidget(row, 0, self.and_or_cmb) self.tablewidget_3.setCellWidget(row, 1, self.col_name_cmb) self.tablewidget_3.setCellWidget(row, 2, self.operator_edit) self.tablewidget_3.setCellWidget(row, 3, self.value_edit) self.tablewidget_hlayout = QHBoxLayout() # 按钮组 self.tablewidget_hlayout.addWidget(QPushButton("增加", clicked=self.do_addRow)) self.tablewidget_hlayout.addWidget(QPushButton("删除", clicked=self.do_deleteRow)) self.tablewidget_vlayout = QVBoxLayout() self.tablewidget_vlayout.addWidget(self.tablewidget_3) self.tablewidget_vlayout.addLayout(self.tablewidget_hlayout) self.where_btn = QPushButton("生成WHERE语句") self.where_btn.clicked.connect(self.on_where_clicked) self.where_edit = QPlainTextEdit() self.where_edit.setPlaceholderText("WHERE 列名1 in ('元素1','元素2') AND 列名2 not in ('元素3','') AND 日期>date()-61") self.where_edit.setFixedHeight(100) self.groupby_lab = QLabel("GROUP BY语句") self.groupby_edit = QLineEdit() self.groupby_edit.setPlaceholderText("GROUP BY 列名1,列名2,列名3") self.flayout_1 = QFormLayout() self.flayout_1.addRow(self.sql_grammar_btn) self.flayout_1.addRow(self.tablewidget_vlayout) self.flayout_1.addRow(self.where_btn, self.where_edit) self.flayout_1.addRow(self.groupby_lab, self.groupby_edit) self.condition_groupbox.setLayout(self.flayout_1) return self.condition_groupbox def on_show_grammar(self): self.sql_grammar_dlg.exec_() def do_addRow(self): col_name = self.table_edit1.text() + "," + self.table_edit2.text() # print(col_name) if not col_name: QMessageBox.information(self, '信息', f"请先选择数据库及数据表", QMessageBox.Ok) else: col_name = col_name.replace("('", "").replace("')", "").replace("', '", ",").split(",") # print(col_name) self.tablewidget_3.setRowCount(self.tablewidget_3.rowCount() + 1) # col_name = self.col_edit.text() rows = self.tablewidget_3.rowCount() for row in range(rows - 1, rows): # 第一列,连接运算符 self.and_or_cmb = QComboBox() self.and_or_cmb.addItem('AND') self.and_or_cmb.addItem('OR') self.and_or_cmb.setCurrentIndex(0) # 第二列,列名 self.col_name_cmb = QComboBox() self.col_name_cmb.addItems(col_name) # 第三列,运算符 self.operator_edit = QLineEdit() # 第四列,操作值 self.value_edit = QLineEdit() self.tablewidget_3.setCellWidget(row, 0, self.and_or_cmb) self.tablewidget_3.setCellWidget(row, 1, self.col_name_cmb) self.tablewidget_3.setCellWidget(row, 2, self.operator_edit) self.tablewidget_3.setCellWidget(row, 3, self.value_edit) self.tablewidget_3.update() def do_deleteRow(self): curRow = self.tablewidget_3.currentRow() if curRow >= 0: self.tablewidget_3.removeRow(curRow) def on_where_clicked(self): for idx in range(self.tablewidget_3.rowCount()): if idx == 0: if self.tablewidget_3.cellWidget(idx, 3).text().isdigit(): text = "WHERE CAST(" + self.tablewidget_3.cellWidget(idx, 1).currentText() + " AS DECIMAL) " + \ self.tablewidget_3.cellWidget(idx, 2).text() + " " + \ self.tablewidget_3.cellWidget(idx, 3).text() else: text = "WHERE " + self.tablewidget_3.cellWidget(idx, 1).currentText() + " " + self.tablewidget_3.cellWidget( idx, 2).text() + \ " " + self.tablewidget_3.cellWidget(idx, 3).text() else: if self.tablewidget_3.cellWidget(idx, 3).text().isdigit(): text = text + " " + self.tablewidget_3.cellWidget(idx, 0).currentText() + " CAST(" + \ self.tablewidget_3.cellWidget(idx, 1).currentText() + " AS DECIMAL) " + \ self.tablewidget_3.cellWidget(idx, 2).text() + " " + self.tablewidget_3.cellWidget(idx, 3).text() else: text = text + " " + self.tablewidget_3.cellWidget(idx, 0).currentText() + " " + self.tablewidget_3.cellWidget( idx, 1).currentText() + \ " " + self.tablewidget_3.cellWidget(idx, 2).text() + " " + self.tablewidget_3.cellWidget(idx, 3).text() self.where_edit.setPlainText(text) # VBA语句块 def vba_groupbox(self): self.vba_groupbox = QGroupBox("5.生成宏文件并插入VBA语句") self.sql_btn = QPushButton("合并查询语句并查询") self.sql_btn.clicked.connect(self.on_merge_state) self.sql_edit = QPlainTextEdit() self.sql_edit.setPlaceholderText("合并后的查询语句") self.sql_edit.setMaximumHeight(100) self.tablewidget_4 = QTableWidget() self.tablewidget_4.setEditTriggers(QAbstractItemView.NoEditTriggers) self.tablewidget_4.horizontalHeader().setSectionResizeMode(QHeaderView.Stretch) # 所有列自动拉伸,充满界面 self.book_name_lab = QLabel("工作簿名称前N个字符") self.book_name_edit = QLineEdit() self.book_name_edit.setPlaceholderText("N") self.sheet_name_lab = QLabel("新建工作表名称") self.sheet_name_edit = QLineEdit() self.col_lab = QLabel("选择单元格") self.col_edit = QLineEdit() self.col_edit.setPlaceholderText("选择将查询到的数据放到‘数据源’工作表的单元格位置:A2") self.macro_btn = QPushButton("创建VBA查询代码") self.macro_btn.clicked.connect(self.on_create_select) self.macro_edit = QPlainTextEdit() self.macro_edit.setPlaceholderText("将代码追加到文本框中,全部添加完毕后,再进行下一步") self.sheet_name_lab2 = QLabel("新建工作表名称") self.sheet_name_lab3 = QLabel() self.excel_btn = QPushButton("创建宏工作簿") self.excel_btn.clicked.connect(self.on_create_macro) self.flayout_2 = QFormLayout() self.flayout_2.addRow(self.sql_btn, self.sql_edit) self.flayout_2.addRow(self.tablewidget_4) self.flayout_2.addRow(self.book_name_lab, self.book_name_edit) self.flayout_2.addRow(self.sheet_name_lab, self.sheet_name_edit) self.flayout_2.addRow(self.col_lab, self.col_edit) self.flayout_2.addRow(self.macro_btn, self.macro_edit) self.flayout_2.addRow(self.sheet_name_lab2, self.sheet_name_lab3) self.flayout_2.addRow(self.excel_btn) self.vba_groupbox.setLayout(self.flayout_2) return self.vba_groupbox def on_merge_state(self): filepath = self.data_path_edit1.text() stuf = filepath.split(".")[-1] if self.listwidget.count(): col_name = [self.listwidget.item(index).text() for index in range(self.listwidget.count())] else: col_name = [self.listwidget_1.item(index).text() for index in range(self.listwidget_1.count())] if "csv" in stuf: FROM = "FROM [" + filepath.split("/")[-1].split(".")[0] + "副本.csv]" elif "xls" in stuf: FROM = "FROM [" + self.table_combo1.currentText() + "$]" else: FROM = self.from_edit3.text() text = self.select_edit3.text() + " " + FROM \ + " " + self.where_edit.toPlainText() + " " + self.groupby_edit.text() print(text) self.sql_edit.setPlainText(text) if "db" in stuf: try: con = sqlite3.connect(filepath) cur = con.cursor() cur.execute(text + " limit 3") data = cur.fetchall() data = pd.DataFrame(data) rows = data.shape[0] # 行数 cols = data.shape[1] # 列数 # 给tablewidget设置行列表头 self.tablewidget_4.setRowCount(rows) self.tablewidget_4.setColumnCount(cols) self.tablewidget_4.setHorizontalHeaderLabels(col_name) for i in range(rows): data_rows_values = data.iloc[[i]] data_rows_values_array = np.array(data_rows_values) data_rows_values_list = data_rows_values_array.tolist()[0] for j in range(cols): data_items_list = data_rows_values_list[j] data_items = str(data_items_list) newItem = QTableWidgetItem(data_items) newItem.setTextAlignment(Qt.AlignHCenter | Qt.AlignVCenter) self.tablewidget_4.setItem(i, j, newItem) self.tablewidget_4.item(i, j).text() except: QMessageBox.information(self, '信息', f"查询失败,请检查查询语句", QMessageBox.Ok | QMessageBox.No) def on_create_select(self): state = self.sql_edit.toPlainText() book_path = self.data_path_edit1.text() num = int(self.book_name_edit.text()) book_name = book_path.split("/")[-1][:num] print(book_name) sheet_name = self.sheet_name_edit.text() range = self.col_edit.text() if num and sheet_name and range and state: code_main = f''' Case Is = "{book_name}" With ThisWorkbook.Sheets("{sheet_name}") .Rows("2:1048576").ClearContents sqlStr = "{state}" .Range("{range}").CopyFromRecordset cnn.Execute(sqlStr) End With ''' self.macro_edit.appendPlainText(code_main) string = self.macro_edit.toPlainText() print(string) pattern = re.compile(r'ThisWorkbook.Sheets\("(.*?)"\)') sheet_names = re.findall(pattern, string, flags=0) print(sheet_names) self.sheet_name_lab3.setText(str(sheet_names)) def on_create_macro(self): db_path = self.data_path_edit1.text() num = int(self.book_name_edit.text()) sheet_names = self.sheet_name_lab3.text() sheet_names = sheet_names.replace("[", "").replace("]", "").replace("'", "").replace(" ", "").split(",") print(sheet_names) xl = win32com.client.DispatchEx('Excel.Application') xl.Visible = True xlbook = xl.Workbooks.Add() xl.Worksheets.Add().Name = 'Config' for sheet_name in sheet_names: xl.Worksheets.Add().Name = sheet_name xlSheet_1 = xl.Worksheets('Config') xlSheet_1.Cells(1, 1).Value = '数据库所在位置' xlSheet_1.Cells(2, 1).Value = db_path Driver = "SQLite3 ODBC Driver" try: xlmodule = xlbook.VBProject.VBComponents.Add(1) xlmodule.Name = '查询数据库' # 创建模块 # 在模块中写入宏: code_start = f'''Sub 查询数据库() '打开Excel>文件>选项>信任中心>信任中心设置>宏设置>启用所有宏 '还要将“信任对VBA工程对象模型的访问”选中,按确定即可。 '通过ODBC来访问操作SQLite数据库需要安装第三方组件库的SQLite ODBC Driver,下载地址 https://www.devart.com/odbc/sqlite/download.html '根据提示完成安装,在控制面板——管理工具——数据源(ODBC)中添加SQLite3 ODBC Driver,并配置数据源即可连接 Dim wb As Workbook Dim wj, PathStr As String Dim row_count As Long Dim cnn, rst Dim SheetName As String Set cnn = CreateObject("ADODB.Connection") Set rst = CreateObject("ADODB.Recordset") Dim conStr$, sqlStr$ Application.DisplayAlerts = False Application.ScreenUpdating = False Application.AskToUpdateLinks = False Application.Calculation = xlCalculationManual wj = Application.GetOpenFilename(, , , , True) If VBA.TypeName(wj) = "Boolean" Then Exit Sub End If For wjjs = LBound(wj) To UBound(wj) PathStr = Split(wj(wjjs), Dir(wj(wjjs)))(0) sufstr = Split(Dir(wj(wjjs)), ".")(1) If InStr(sufstr, "xls") Then Select Case Application.Version * 1 '设置连接字符串,根据版本创建连接 Case Is <= 11 conStr = "Provider=Microsoft.Jet.OLEDB.4.0;Extended Properties=excel 8.0;Data source=" & wj(wjjs) Case Is >= 12 conStr = "Provider=Microsoft.ACE.OLEDB.12.0;Data Source=" & wj(wjjs) & ";Extended Properties=""Excel 12.0;HDR=YES"";" End Select ElseIf InStr(sufstr, "csv") Then conStr = "Provider=Microsoft.ace.Oledb.12.0;Data Source =" & PathStr & ";Extended Properties=Text" WbName = Split(Dir(wj(wjjs)), ".")(0) & "副本.csv" Set wb = GetObject(wj(wjjs)) wb.Sheets(1).Copy ActiveWorkbook.SaveAs Filename:=PathStr & WbName, FileFormat:=xlCSV, CreateBackup:=False ActiveWorkbook.Close ElseIf InStr(sufstr, "db") Then conStr = "Driver={Driver};Database=" & ThisWorkbook.Sheets("Config").Range("A2").Value End If cnn.Open conStr On Error Resume Next p = Left(Dir(wj(wjjs)), {num}) '工作簿名称前N个字符,与下面的Case Is 条件相对应 Select Case p ''' code_main = self.macro_edit.toPlainText() code_end = ''' End Select Next Application.CutCopyMode = False Application.DisplayAlerts = True Application.ScreenUpdating = True Application.AskToUpdateLinks = True Application.Calculation = xlCalculationAutomatic 'MsgBox "数据提取完毕" End Sub''' code = code_start + code_main + code_end print(code) xlmodule.CodeModule.AddFromString(code) xlbook.Application.Run('查询数据库.MyVBA') # 运行宏: xlbook.Save() xlbook.SaveAs("宏工作簿.xlsm") except: QMessageBox.information(self, '信息', f"请按如下设置:EXCEL>文件>选项>信任中心>信任中心设置>宏设置>(启用所有宏,信任对VBA工程对象模型的访问)", QMessageBox.Ok | QMessageBox.No) class SqlGrammar(QDialog): def __init__(self): super(SqlGrammar, self).__init__() # self.setAttribute(Qt.WA_DeleteOnClose) # 设置该属性即可在让在该窗口关闭时就可以释放 self.label = QLabel() text = """ SQLite 中所有的逻辑运算符列表。 运算符 描述 示例 AND/OR 连接运算符 BETWEEN 逻辑运算符 BETWEEN 最小值 AND 最大值 EXISTS 逻辑运算符用于在满足一定条件的指定表中搜索行的存在。 IN/NOT IN 逻辑运算符 IN (值1,值2,...) LIKE 通配符运算符 LIKE '%K' 以K结尾 / 'K%' 以K开头 / '%K%' 包含K %表任意个字符,_表一个字符 GLOB 通配符运算符 GLOB 与 LIKE 不同之处在于,它是大小写敏感的。 NOT NOT 运算符是所用的逻辑运算符的对立面。比如 NOT EXISTS、NOT BETWEEN、NOT IN,等等。它是否定运算符。 IS NULL IS NULL 运算符用于把某个值与 NULL 值进行比较。 | | 连接两个不同的字符串,得到一个新的字符串。 UNIQUE UNIQUE 运算符搜索指定表中的每一行,确保唯一性(无重复)。 """ self.label.setText(text) self.vlayout = QVBoxLayout() self.vlayout.addWidget(self.label) self.setLayout(self.vlayout) def on_select_data(data_path_edit, table_combo): filepath, _ = QFileDialog.getOpenFileName(None, '打开文件', os.getcwd(), 'Excel文件,数据库(*.xls *.xlsx *.csv *.db);;') if filepath: data_path_edit.setText(filepath) # 获取工作表名称 suf_str = filepath.split(".")[-1] if "xls" in suf_str: df = pd.read_excel(filepath, sheet_name=None) sheet_names = [sheet for sheet in df.keys()] # xl = pd.ExcelFile(filepath) # sheet_names = xl.sheet_names elif "csv" in suf_str: sheet_name = filepath.split("/")[-1].split(".")[0] sheet_names = [sheet_name] elif "db" in suf_str: con = sqlite3.connect(filepath) cur = con.cursor() # 获取表名,保存在tab_name列表 cur.execute("select name from sqlite_master where type='table'") sheet_name = cur.fetchall() sheet_names = [line[0] for line in sheet_name] table_combo.clear() table_combo.addItems(sheet_names) def on_read_file(file_path, sheet_name=None, nrows=None): global df extension = file_path.split(".")[-1] if "xls" in extension: df = pd.read_excel(file_path, sheet_name=sheet_name, nrows=nrows) # 读取EXCEL文件 df = pd.DataFrame(df) elif "csv" in extension: f = open(file_path, 'rb') data = f.readline() f.close() encode = chardet.detect(data)['encoding'] if b'\t' in data: df = pd.read_csv(file_path, encoding=encode, sep="\t", nrows=nrows) else: try: df = pd.read_csv(file_path, encoding=encode, sep=",", nrows=nrows) except: df = pd.read_csv(file_path, encoding="GBK", sep=",", nrows=nrows) elif "db" in extension: con = sqlite3.connect(file_path) cur = con.cursor() # 获取表的列名(字段名),保存在col_names列表,每个表的字段名集为一个元组 cur.execute(f'pragma table_info({sheet_name})') col_name = cur.fetchall() col_names = [x[1] for x in col_name] sql_execute = f"""select * from {sheet_name} limit 2""" cur.execute(sql_execute) data = cur.fetchall() df = pd.DataFrame(data, columns=col_names) return df def on_get_data(data_path_edit, table_combo, table_edit=None, tablewidget=None, listwidget_1=None, listwidget=None): filepath = data_path_edit.text() sheet_name = table_combo.currentText() data = on_read_file(filepath, sheet_name, nrows=2) col_names = data.columns.tolist() table_edit.setText(str(col_names).replace('[', '(').replace(']', ')')) listwidget_1.clear() listwidget_1.addItems(col_names) listwidget.clear() rows = data.shape[0] # 行数 cols = data.shape[1] # 列数 # 给tablewidget设置行列表头 tablewidget.setRowCount(rows) tablewidget.setColumnCount(cols) tablewidget.setHorizontalHeaderLabels(col_names) for i in range(rows): data_rows_values = data.iloc[[i]] data_rows_values_array = np.array(data_rows_values) data_rows_values_list = data_rows_values_array.tolist()[0] for j in range(cols): data_items_list = data_rows_values_list[j] data_items = str(data_items_list) newItem = QTableWidgetItem(data_items) newItem.setTextAlignment(Qt.AlignHCenter | Qt.AlignVCenter) tablewidget.setItem(i, j, newItem) tablewidget.item(i, j).text() def on_select_sheet(data_path_edit, table_combo, table_edit, tablewidget_1, listwidget_1, listwidget): filepath = data_path_edit.text() sheet_name = table_combo.currentText() if filepath and sheet_name: on_get_data(filepath, sheet_name, table_edit, tablewidget_1, listwidget_1, listwidget) def on_change_lv(index, listwidget, listwidget_1=None): if index == 0: item = QListWidgetItem(listwidget_1.currentItem()) listwidget.addItem(item) else: listwidget.takeItem(listwidget.currentRow()) def on_open_folder(data_path_edit): folder = QFileDialog.getExistingDirectory(None) if folder: data_path_edit.setText(folder) def on_import_colsname(listwidget, listwidget_1, col_name_edit): num_list_1 = listwidget_1.count() print(num_list_1) num = listwidget.count() print(num) if num: print(f"选择了{num}列") col_names = [listwidget.item(i).text() for i in range(num)] print(col_names) col_name_edit.setPlainText(" ".join(col_names)) else: col_names = [listwidget_1.item(i).text() for i in range(num_list_1)] print(col_names) col_name_edit.setPlainText(" ".join(col_names)) def on_create_table(db_path_edit, db_name_edit, table_name_edit, col_name_edit): db_path = db_path_edit.text() db_name = db_name_edit.text() table_name = table_name_edit.text() col_names = col_name_edit.toPlainText() if db_path and db_name and table_name and col_names: db_filepath = db_path + "\\" + db_name + ".db" if not os.path.exists(db_path): # 需要导入os包 os.makedirs(db_path) con = sqlite3.connect(db_filepath) # 创建数据库/打开数据库 cur = con.cursor() # 定义一个游标 cur.execute("select name from sqlite_master where type='table'") tab_name = cur.fetchall() tab_names = [line[0] for line in tab_name] print(tab_names) # if table_name in tab_names: # # cur.execute(f"drop table {table_name}") # # con.commit() # QMessageBox.information(self, '信息', "数据表已存在", QMessageBox.Close) try: # cur.execute(f"drop table {table_name}") col_names = col_names.replace(" ", " VARCHAR(50), ") + " VARCHAR(50)" sql_execute = f"create table {table_name} ({col_names})" print(sql_execute) cur.execute(sql_execute) con.commit() QMessageBox.information(None, '信息', f"数据表{table_name}创建成功", QMessageBox.Ok | QMessageBox.No) except: QMessageBox.information(None, '信息', "数据表已存在,或列名字段含有非法字符", QMessageBox.Close) def on_select_db(db_path_edit, table_name_combo): filepath, _ = QFileDialog.getOpenFileName(None, '打开文件', os.getcwd(), '数据库(*.db);;') if filepath: db_path_edit.setText(filepath) con = sqlite3.connect(filepath) cur = con.cursor() # 获取表名,保存在tab_name列表 cur.execute("select name from sqlite_master where type='table'") tab_name = cur.fetchall() tab_names = [line[0] for line in tab_name] # print(tab_names) table_name_combo.clear() table_name_combo.addItems(tab_names) def on_select_table(db_path_edit, table_name_combo, table_name_edit, date_col_combo): filepath = db_path_edit.text() tab_name = table_name_combo.currentText() if filepath: con = sqlite3.connect(filepath) cur = con.cursor() # 获取表的列名(字段名),保存在col_names列表,每个表的字段名集为一个元组 cur.execute(f'pragma table_info({tab_name})') col_name = cur.fetchall() col_name = [x[1] for x in col_name] col_name = tuple(col_name) print(col_name) table_name_edit.setText(str(col_name)) date_col_combo.clear() date_col_combo.addItem("") date_col_combo.addItems(col_name) def on_import_setting(import_setting_check, table_name_edit2, db_path_edit2, table_name_combo2, date_col_combo2, select_edit2): print(import_setting_check.isChecked()) if import_setting_check.isChecked(): with open("./setting.txt", "r", encoding="utf-8") as f: db_path = f.readline().replace("\n", "").split(":")[1] table_name = f.readline().replace("\n", "").split(":")[1] date_col = f.readline().replace("\n", "").split(":")[1] select = f.readline().replace("\n", "").split(":")[1] con = sqlite3.connect(db_path) cur = con.cursor() # 获取表的列名(字段名),保存在col_names列表,每个表的字段名集为一个元组 cur.execute(f'pragma table_info({table_name})') col_name = cur.fetchall() col_name = [x[1] for x in col_name] col_name = tuple(col_name) # print(col_name) table_name_edit2.setText(str(col_name)) db_path_edit2.setText(db_path) table_name_combo2.clear() table_name_combo2.addItem(table_name) date_col_combo2.addItem(date_col) select_edit2.setPlainText(select) print(select) else: db_path_edit2.clear() table_name_combo2.clear() table_name_combo2.addItem("选择数据表") table_name_edit2.clear() date_col_combo2.clear() select_edit2.clear() def on_write_into_db(db_path_edit2, table_name_combo2, table_name_edit2, date_col_combo2, write_data_edit2, sheet_name_combo2): filepath = db_path_edit2.text() table_name = table_name_combo2.currentText() col_name = table_name_edit2.text() date_col = date_col_combo2.currentText() data_path = write_data_edit2.text() sheet_name = sheet_name_combo2.currentText() pattern = re.compile(r'[\u4E00-\u9FA5A-Za-z0-9_]+') # 匹配汉字、字母、数字及下划线 col_names = re.findall(pattern, col_name) if filepath and table_name and col_name and data_path and sheet_name: df = on_read_file(data_path, sheet_name) df = df.applymap(str) df = df.copy() df = df[col_names] print(df) # df[date_col] = pd.to_datetime(df[date_col], format='%Y/%m/%d', errors='ignore') # print(df[date_col]) con = sqlite3.connect(filepath) cur = con.cursor() if date_col: min_date = df[date_col].min() max_date = df[date_col].max() sql_delete = f"delete from {table_name} where {date_col} between '{min_date}' and '{max_date}';" cur.execute(sql_delete) con.commit() df.to_sql(table_name, con, if_exists='append', index=False) # cur.execute("vacuum") QMessageBox.information(None, '信息', f"数据表{table_name}导入成功", QMessageBox.Ok | QMessageBox.No) def on_select(db_path_edit2, select_edit2): filepath = db_path_edit2.text() text = select_edit2.toPlainText() col_name = text.split(" ")[1].split(",") try: con = sqlite3.connect(filepath) cur = con.cursor() cur.execute(text) data = cur.fetchall() data = pd.DataFrame(data, columns=col_name) # print(data) if data.empty: QMessageBox.information(None, '信息', f"查询结果为空,请检查查询语句", QMessageBox.Ok) workbook_name = f"指标-{datetime.date.today()}.xlsx" # print(workbook_name) if os.path.exists(workbook_name): os.remove(workbook_name) data.to_excel(workbook_name, index=False, encoding="utf8") QMessageBox.information(None, '信息', f"查询结果导出完成", QMessageBox.Ok) except: QMessageBox.information(None, '信息', f"查询失败,请检查查询语句", QMessageBox.Ok | QMessageBox.No) if __name__ == '__main__': # 判断本文件是否是启动入口,__main__是系统赋给启动模块的变量值; app = QtWidgets.QApplication(sys.argv) # 如果是启动文件,则创建 Application 实例 app # app.setStyleSheet(qdarkstyle.load_stylesheet()) # 设置样式表 # 程序启动画面QSplashScreen # splash = QSplashScreen() # splash.setPixmap(QPixmap('images/splash.jpg')) # splash.show() # splash.showMessage('Welcome to Use This PyQt5-Made Notebook~', # Qt.AlignBottom | Qt.AlignCenter, Qt.white) # time.sleep(2) window = MainWindow() # 创建 MainWindow 的实例 window window.show() # 显示窗口 # 10秒之后退出整个程序 # QTimer.singleShot(10000, app.quit) sys.exit(app.exec_()) # 捕获 app 退出状态
版权声明:本文为u012902720原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。