• 日常搜索
  • 端口查询
  • IP查询
  • 在线工具
  • 搜本站

Excel数据导入数据库MySQL工具

excel数据导入mysql工具(mysql批量入库),可以批量导入文件夹下所有.xlsx表格数据。

使用方法

1、配置.ini文件

修改目录下“配置.ini”文件的数据库连接参数、包括:数据库名称、表名称、excel列名称和对应的数据库字段名称。

Excel数据导入数据库MySQL工具  第1张

2、点击“测试连接”

3、点击“导入数据”

选择.xlsx 文件所在的文件夹。软件会自动遍历文件夹下所有.xlsx文件,并导入数据。

Excel数据导入数据库MySQL工具  第2张

源码内容:

import tkinter as tkfrom tkinter import filedialog, messagebox, ttkimport threadingimport pandas as pdimport mysql.connectorimport osimport loggingfrom logging.handlers import RotatingFileHandlerimport configparser# 设置日志记录logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
handler = RotatingFileHandler('app.log', maxBytes=1000000, backupCount=5)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)class ExcelToMySQLApp:
    def __init__(self, master):
        self.master = master
        master.title("Excel数据导入MySQL工具-老吴搭建教程www.lw50.com")

        # 加载配置文件        config = configparser.ConfigParser()
        config.read('配置.ini', encoding='utf-8')

        # 数据库连接信息        self.db_config = {
            'host': config.get('Database', 'host'),
            'port': config.getint('Database', 'port'),
            'user': config.get('Database', 'user'),
            'password': config.get('Database', 'password'),
            'database': config.get('Database', 'name'),  # 使用'database'作为关键字参数        }

        # 表名称        self.table_name = config.get('Table', 'name')

        # Excel 列名        self.excel_columns = config.get('ExcelColumns', 'columns').split(',')

        # 数据库字段名        self.database_columns = config.get('DatabaseColumns', 'columns').split(',')

        # 初始化界面组件        self.create_ui()

    def create_ui(self):
        # 主界面布局        self.db_frame = ttk.LabelFrame(self.master, text="数据库连接设置")
        self.db_frame.grid(row=0, column=0, sticky=(tk.W + tk.E), padx=5, pady=5)

        # 测试连接按钮和连接状态标签放在同一行        self.test_connection_button = ttk.Button(self.db_frame, text="测试连接", command=self.test_connection)
        self.test_connection_button.grid(row=0, column=0, sticky=tk.W, pady=5)

        self.db_connection_status_label = ttk.Label(self.db_frame, text="数据库未连接")
        self.db_connection_status_label.grid(row=0, column=1, sticky=tk.W)

        self.db_name_label = ttk.Label(self.db_frame, text="数据库名称:")
        self.db_name_label.grid(row=1, column=0, sticky=tk.W)

        self.db_name_value_label = ttk.Label(self.db_frame, text=self.db_config['database'])
        self.db_name_value_label.grid(row=1, column=1, sticky=tk.W)

        self.table_name_label = ttk.Label(self.db_frame, text="表名称:")
        self.table_name_label.grid(row=2, column=0, sticky=tk.W)

        self.table_name_value_label = ttk.Label(self.db_frame, text=self.table_name)
        self.table_name_value_label.grid(row=2, column=1, sticky=tk.W)

        # 数据来源选择区        self.source_frame = ttk.LabelFrame(self.master, text="数据来源")
        self.source_frame.grid(row=1, column=0, sticky=(tk.W + tk.E), padx=5, pady=5)

        self.select_folder_button = ttk.Button(self.source_frame, text="选择文件夹", command=self.select_folder)
        self.select_folder_button.grid(row=0, column=0, sticky=tk.W, pady=5)

        self.selected_folder_label = ttk.Label(self.source_frame, text="未选择文件夹")
        self.selected_folder_label.grid(row=1, column=0, sticky=tk.W)

        # 映射关系展示区        self.mapping_frame = ttk.LabelFrame(self.master, text="列名映射")
        self.mapping_frame.grid(row=2, column=0, sticky=(tk.W + tk.E), padx=5, pady=5)
        self.mapping_tree = ttk.Treeview(self.mapping_frame, columns=("Excel Column", "DB Column"), show="headings")
        self.mapping_tree.heading("Excel Column", text="Excel Column")
        self.mapping_tree.heading("DB Column", text="DB Column")
        for excel_col, db_col in zip(self.excel_columns, self.database_columns):
            self.mapping_tree.insert("", "end", values=(excel_col, db_col))
        self.mapping_tree.pack(fill=tk.BOTH, expand=True)

        # 导入数据按钮        self.import_button = ttk.Button(self.master, text="导入数据", command=self.start_import_process,
                                        state=tk.DISABLED)
        self.import_button.grid(row=3, column=0, sticky=tk.W, pady=5)

        # 可导入状态标签        self.ready_to_import_label = ttk.Label(self.master, text="未准备好导入")
        self.ready_to_import_label.grid(row=4, column=0, sticky=tk.W)

        # 文件进度条        self.file_progressbar = ttk.Progressbar(self.master, orient="horizontal", length=200, mode="determinate")
        self.file_progressbar.grid(row=5, column=0, sticky=tk.W)

        # 文件进度状态标签        self.file_progress_status_label = ttk.Label(self.master, text="文件进度: ")
        self.file_progress_status_label.grid(row=6, column=0, sticky=tk.W)

        # 总进度条        self.total_progressbar = ttk.Progressbar(self.master, orient="horizontal", length=200, mode="determinate")
        self.total_progressbar.grid(row=7, column=0, sticky=tk.W)

        # 总进度状态标签        self.overall_progress_status_label = ttk.Label(self.master, text="总进度: ")
        self.overall_progress_status_label.grid(row=8, column=0, sticky=tk.W)

    def test_connection(self):
        # 测试数据库连接        try:
            connection = mysql.connector.connect(**self.db_config)
            connection.close()
            self.db_connection_status_label.config(text="数据库已连接")
            self.test_connection_button.config(state=tk.DISABLED)
            logger.info("数据库连接成功")
            messagebox.showinfo("成功", "数据库连接成功!")
        except Exception as e:
            self.db_connection_status_label.config(text="数据库连接失败")
            logger.error(f"数据库连接失败: {e}")
            messagebox.showerror("错误", f"数据库连接失败: {e}")

    def select_folder(self):
        # 选择文件夹        directory = filedialog.askdirectory()
        if directory:
            self.selected_folder_label.config(text=directory)
            self.check_excel_columns(directory)

    def check_excel_columns(self, directory):
        # 检查文件夹中的 Excel 列名        first_excel_file = next((f for f in os.listdir(directory) if f.endswith('.xlsx')), None)
        if first_excel_file:
            filepath = os.path.join(directory, first_excel_file)
            try:
                df = pd.read_excel(filepath, usecols=self.excel_columns, nrows=1)
                logger.debug(f"检查文件 {first_excel_file} 的列名:{df.columns}")
                if set(self.excel_columns).issubset(set(df.columns)):
                    logger.info("检查完成,Excel 列名和配置文件中的列名匹配!")
                    messagebox.showinfo("提示", "Excel 列名与配置文件中的列名匹配!")
                    self.enable_import_button()
                else:
                    logger.warning(
                        f"Excel 列名与配置文件中的列名不匹配!缺少列:{set(self.excel_columns) - set(df.columns)}")
                    messagebox.showwarning("警告",
                                           f"Excel 列名与配置文件中的列名不匹配!缺少列:{set(self.excel_columns) - set(df.columns)}")
            except Exception as e:
                logger.error(f"无法读取文件 {first_excel_file}: {e}")
                messagebox.showerror("错误", f"无法读取文件 {first_excel_file}: {e}")
                return    def enable_import_button(self):
        # 启用导入数据按钮        self.import_button.config(state=tk.NORMAL)
        self.ready_to_import_label.config(text="准备好导入")

    def start_import_process(self):
        # 开始导入数据过程        self.import_button.config(state=tk.DISABLED)
        self.ready_to_import_label.config(text="正在导入...")
        selected_folder = self.selected_folder_label.cget("text")
        if not selected_folder or selected_folder == "未选择文件夹":
            messagebox.showwarning("警告", "请先选择文件夹!")
            return        self.files_to_process = [f for f in os.listdir(selected_folder) if f.endswith('.xlsx')]
        self.total_rows = sum(len(pd.read_excel(os.path.join(selected_folder, f), usecols=self.excel_columns)) for f in                              self.files_to_process)
        self.inserted_rows = 0        logger.info("连接数据库...")
        self.db = mysql.connector.connect(**self.db_config)
        logger.info("数据库已连接")
        self.insert_sql = f"INSERT INTO {self.table_name} ({', '.join(self.database_columns)}) VALUES ({', '.join(['%s'] * len(self.database_columns))});"        # 检查数据库表结构        cursor = self.db.cursor()
        try:
            self.check_table_structure(cursor)
        finally:
            cursor.close()

        threading.Thread(target=self.import_data_thread, args=(selected_folder,)).start()

    def import_data_thread(self, selected_folder):
        # 导入数据线程        cursor = self.db.cursor()
        try:
            for file_index, filename in enumerate(self.files_to_process):
                filepath = os.path.join(selected_folder, filename)
                df = pd.read_excel(filepath, usecols=self.excel_columns)
                # 数据清洗                mapped_df = self.clean_data(df)
                mapped_values = mapped_df.values.tolist()
                self.current_file_rows = len(mapped_values)
                self.current_inserted_rows = 0                batch_size = 1000  # 批量插入的大小                total_batches = len(mapped_values) // batch_size + (1 if len(mapped_values) % batch_size > 0 else 0)

                for batch_index in range(total_batches):
                    start = batch_index * batch_size
                    end = min((batch_index + 1) * batch_size, len(mapped_values))
                    current_batch = mapped_values[start:end]

                    # 开始事务                    cursor.execute("START TRANSACTION")

                    # 尝试插入数据                    try:
                        cursor.executemany(self.insert_sql, current_batch)
                        affected_rows = cursor.rowcount
                        # 提交事务                        cursor.execute("COMMIT")
                        logger.info(
                            f"Batch {batch_index + 1}/{total_batches} of file {filename} successfully inserted. Affected rows: {affected_rows}")
                        self.current_inserted_rows += affected_rows
                    except Exception as e:
                        # 记录错误信息,并回滚事务                        logger.error(
                            f"From {filename}: Error inserting batch {batch_index + 1}/{total_batches}: {str(e)}")
                        cursor.execute("ROLLBACK")
                        self.retry_insert(cursor, self.insert_sql, current_batch, filename)

                    # 更新文件进度状态                    self.file_progress_status_label.config(
                        text=f"正在处理 {filename},已处理行数: {self.current_inserted_rows}/{self.current_file_rows}")
                    self.file_progressbar['value'] = (self.current_inserted_rows * 100) / self.current_file_rows

                    # 更新总进度状态                    self.inserted_rows += affected_rows
                    self.overall_progress_status_label.config(
                        text=f"总进度: {self.inserted_rows}/{self.total_rows}")
                    self.total_progressbar['value'] = (self.inserted_rows * 100) / self.total_rows

                    # 模拟主循环更新UI                    self.master.update_idletasks()

            if self.inserted_rows == self.total_rows:
                messagebox.showinfo("提示", "导入完成!")
        finally:
            cursor.close()  # 确保在任何情况下都关闭游标    def check_table_structure(self, cursor):
        cursor.execute(f"DESCRIBE {self.table_name}")
        table_structure = cursor.fetchall()
        logger.info(f"数据库表 {self.table_name} 字段结构:{table_structure}")

        # 检查数据库表字段名与 Excel 列名是否匹配        db_columns = [col_info[0] for col_info in table_structure]
        missing_db_columns = [col for col in self.database_columns if col not in db_columns]
        if missing_db_columns:
            logger.error(f"数据库表 {self.table_name} 缺少字段:{missing_db_columns}")
            raise ValueError(f"数据库表 {self.table_name} 缺少字段:{missing_db_columns}")

    def clean_data(self, df):
        # 数据清洗        # 确保所有数据在插入数据库之前已经进行了适当的清洗        df = df.where(pd.notna(df), None)
        return df

    def retry_insert(self, cursor, insert_sql, data, filename):
        max_retries = 3        retries = 0        while retries < max_retries:
            try:
                cursor.execute("START TRANSACTION")
                cursor.executemany(insert_sql, data)
                cursor.execute("COMMIT")
                logger.info(f"Retry successful after {retries + 1} attempts for file {filename}.")
                self.current_inserted_rows += len(data)
                break            except Exception as e:
                retries += 1                logger.error(f"Retry {retries}/{max_retries} failed for {filename}: {str(e)}")
                cursor.execute("ROLLBACK")
                if retries == max_retries:
                    # 在最大重试次数后记录错误行                    logger.error(f"Max retries reached for {filename}. Attempting to insert rows individually.")
                    for row_index, row in enumerate(data):
                        try:
                            cursor.execute("START TRANSACTION")
                            cursor.execute(insert_sql, row)
                            cursor.execute("COMMIT")
                            self.current_inserted_rows += 1                        except Exception as single_row_error:
                            logger.error(
                                f"Failed to insert row {row} at index {row_index} from {filename}: {single_row_error}")
                            cursor.execute("ROLLBACK")
                    breakif __name__ == "__main__":
    root = tk.Tk()
    app = ExcelToMySQLApp(root)
    root.mainloop()

附件地址:

附件为编译后exe文件:Excel数据导入数据库MySQL工具.exe

此处为隐藏内容,请评论后刷新页面查看,谢谢!

文章目录
  • 使用方法:
    • 1、配置.ini文件
    • 2、点击“测试连接”
    • 3、点击“导入数据”
  • 源码内容:
  • 附件地址:
  • 发表评论