本文最后更新于 2024-10-15,文章内容可能已经过时。

在写自动化的过程中,经常会遇到需要处理各种类型文件的需求和数据库相关操作。本文将介绍一个Python工具类用于读写文件相关的封装,它可以帮助你轻松地读取和写入JSON、Excel、YAML、TXT、MySQL以及CSV文件。此工具类已经集成了异常处理机制,使得代码更加健壮。

PS:下面的代码直接CV即可运行

1. 准备工作

首先,确保安装了必要的库:

pip install openpyxl PyYAML pymysql

2. 工具类定义

接下来,我们将定义一个名为 FileHandler 的类,其中包含静态方法用于处理不同类型的文件及额外的数据库操作。

file_utils.py:

import json
import csv
import pymysql
import pymysql.cursors
from openpyxl import load_workbook, Workbook
import yaml


class FileHandler:
    @staticmethod
    def read_json(file_path: str):
        """读取JSON文件"""
        try:
            with open(file_path, 'r', encoding='utf-8') as file:
                return json.load(file)
        except FileNotFoundError:
            print(f"File {file_path} not found.")
            return None
        except Exception as e:
            print(f"Error reading JSON file: {e}")
            return None

    @staticmethod
    def write_json(data: dict, file_path: str):
        """写入JSON文件"""
        try:
            with open(file_path, 'w', encoding='utf-8') as file:
                json.dump(data, file, ensure_ascii=False, indent=4)
        except Exception as e:
            print(f"Error writing to JSON file: {e}")

    @staticmethod
    def read_excel(file_path: str, sheet_name: str = None):
        """读取Excel文件"""
        try:
            wb = load_workbook(filename=file_path, data_only=True)
            ws = wb[sheet_name] if sheet_name else wb.active
            return [[cell.value for cell in row] for row in ws]
        except FileNotFoundError:
            print(f"File {file_path} not found.")
            return None
        except KeyError:
            print(f"Sheet {sheet_name} not found in workbook.")
            return None
        except Exception as e:
            print(f"Error reading Excel file: {e}")
            return None

    @staticmethod
    def write_excel(data: list, file_path: str, sheet_name: str = "Sheet1"):
        """写入Excel文件"""
        try:
            wb = Workbook()
            ws = wb.active
            ws.title = sheet_name
            for row in data:
                ws.append(row)
            wb.save(file_path)
        except Exception as e:
            print(f"Error writing to Excel file: {e}")

    @staticmethod
    def read_yaml(file_path: str):
        """读取YAML文件"""
        try:
            with open(file_path, 'r', encoding='utf-8') as file:
                return yaml.safe_load(file)
        except FileNotFoundError:
            print(f"File {file_path} not found.")
            return None
        except yaml.YAMLError as e:
            print(f"Error parsing YAML file: {e}")
            return None
        except Exception as e:
            print(f"Error reading YAML file: {e}")
            return None

    @staticmethod
    def write_yaml(data: dict, file_path: str):
        """写入YAML文件"""
        try:
            with open(file_path, 'w', encoding='utf-8') as file:
                yaml.dump(data, file, allow_unicode=True)
        except Exception as e:
            print(f"Error writing to YAML file: {e}")

    @staticmethod
    def read_txt(file_path: str):
        """读取TXT文件"""
        try:
            with open(file_path, 'r', encoding='utf-8') as file:
                return file.read()
        except FileNotFoundError:
            print(f"File {file_path} not found.")
            return None
        except Exception as e:
            print(f"Error reading TXT file: {e}")
            return None

    @staticmethod
    def write_txt(content: str, file_path: str):
        """写入TXT文件"""
        try:
            with open(file_path, 'w', encoding='utf-8') as file:
                file.write(content)
        except Exception as e:
            print(f"Error writing to TXT file: {e}")

    @staticmethod
    def read_csv(file_path: str):
        """读取CSV文件"""
        try:
            with open(file_path, newline='', encoding='utf-8') as csvfile:
                reader = csv.reader(csvfile)
                return list(reader)
        except FileNotFoundError:
            print(f"File {file_path} not found.")
            return None
        except Exception as e:
            print(f"Error reading CSV file: {e}")
            return None

    @staticmethod
    def write_csv(data: list, file_path: str):
        """写入CSV文件"""
        try:
            with open(file_path, 'w', newline='', encoding='utf-8') as csvfile:
                writer = csv.writer(csvfile)
                writer.writerows(data)
        except Exception as e:
            print(f"Error writing to CSV file: {e}")

    @staticmethod
    def connect_mysql(host: str, user: str, password: str, database: str):
        """连接MySQL数据库"""
        try:
            connection = pymysql.connect(
                host=host,
                user=user,
                password=password,
                database=database,
                charset='utf8mb4',
                cursorclass=pymysql.cursors.DictCursor
            )
            return connection
        except pymysql.err.OperationalError as e:
            print(f"Database connection error: {e}")
            return None
        except Exception as e:
            print(f"Error connecting to MySQL: {e}")
            return None

    @staticmethod
    def execute_query(connection, query: str):
        """执行查询语句"""
        try:
            with connection.cursor() as cursor:
                cursor.execute(query)
                result = cursor.fetchall()
                connection.commit()
                return result
        except pymysql.err.MySQLError as e:
            print(f"Query execution error: {e}")
            return None
        except Exception as e:
            print(f"Error executing query: {e}")
            return None
        finally:
            connection.close()

    @staticmethod
    def execute_update(connection, query: str):
        """执行更新语句"""
        try:
            with connection.cursor() as cursor:
                cursor.execute(query)
                connection.commit()
        except pymysql.err.MySQLError as e:
            print(f"Update execution error: {e}")
        except Exception as e:
            print(f"Error executing update: {e}")
        finally:
            connection.close()

3. 复制后的代码自测示例

你可以像下面这样使用这个类中的方法:

MySQL部分的代码使用了mock,所以你可能需要额外安装所需的库。


import unittest
import os
from unittest.mock import patch

import pymysql
from utils.file_utils import FileHandler


class TestFileHandler(unittest.TestCase):

    def setUp(self):
        """设置测试数据和文件名"""
        self.json_data = {"key": "value"}
        self.yaml_data = {"key": "value"}
        self.txt_data = "Hello, World!"
        self.csv_data = [["Name", "Age"], ["Alice", "30"], ["Bob", "25"]]
        self.excel_data = [["Name", "Age"], ["Alice", 30], ["Bob", 25]]

        # 文件名
        self.json_file = "test.json"
        self.yaml_file = "test.yaml"
        self.txt_file = "test.txt"
        self.csv_file = "test.csv"
        self.excel_file = "test.xlsx"

    def tearDown(self):
        """删除测试生成的文件"""
        files = [self.json_file, self.yaml_file, self.txt_file, self.csv_file, self.excel_file]
        for file in files:
            if os.path.exists(file):
                os.remove(file)

    # Test read_json and write_json
    def test_json(self):
        # 写入 JSON 文件
        FileHandler.write_json(self.json_data, self.json_file)
        self.assertTrue(os.path.exists(self.json_file))

        # 读取 JSON 文件
        result = FileHandler.read_json(self.json_file)
        self.assertEqual(result, self.json_data)

    # Test read_yaml and write_yaml
    def test_yaml(self):
        # 写入 YAML 文件
        FileHandler.write_yaml(self.yaml_data, self.yaml_file)
        self.assertTrue(os.path.exists(self.yaml_file))

        # 读取 YAML 文件
        result = FileHandler.read_yaml(self.yaml_file)
        self.assertEqual(result, self.yaml_data)

    # Test read_txt and write_txt
    def test_txt(self):
        # 写入 TXT 文件
        FileHandler.write_txt(self.txt_data, self.txt_file)
        self.assertTrue(os.path.exists(self.txt_file))

        # 读取 TXT 文件
        result = FileHandler.read_txt(self.txt_file)
        self.assertEqual(result, self.txt_data)

    # Test read_csv and write_csv
    def test_csv(self):
        # 写入 CSV 文件
        FileHandler.write_csv(self.csv_data, self.csv_file)
        self.assertTrue(os.path.exists(self.csv_file))

        # 读取 CSV 文件
        result = FileHandler.read_csv(self.csv_file)
        self.assertEqual(result, self.csv_data)

    # Test read_excel and write_excel
    def test_excel(self):
        # 写入 Excel 文件
        FileHandler.write_excel(self.excel_data, self.excel_file)
        self.assertTrue(os.path.exists(self.excel_file))

        # 读取 Excel 文件
        result = FileHandler.read_excel(self.excel_file)
        self.assertEqual(result, self.excel_data)

    # Mocked connection and cursor for MySQL operations
    @patch("pymysql.connect")
    def test_connect_mysql(self, mock_connect):
        """测试 MySQL 连接"""
        mock_connect.return_value = "mock_connection"

        # 调用被测方法
        result = FileHandler.connect_mysql("host", "user", "pass", "db")

        # 验证连接是否返回预期的 mock_connection
        self.assertEqual(result, "mock_connection")

        # 验证 pymysql.connect 是否使用了正确的参数调用
        mock_connect.assert_called_with(
            host="host",
            user="user",
            password="pass",
            database="db",
            charset='utf8mb4',
            cursorclass=pymysql.cursors.DictCursor
        )

    @patch("pymysql.connect")
    def test_execute_query(self, mock_connect):
        """测试 MySQL 查询操作"""
        mock_connection = mock_connect.return_value
        mock_cursor = mock_connection.cursor.return_value.__enter__.return_value
        mock_cursor.fetchall.return_value = [{"key": "value"}]  # 模拟查询结果

        # 调用被测方法
        result = FileHandler.execute_query(mock_connection, "SELECT * FROM table")

        # 验证查询返回的结果
        self.assertEqual(result, [{"key": "value"}])

        # 验证游标是否执行了正确的 SQL 语句
        mock_cursor.execute.assert_called_with("SELECT * FROM table")

        # 验证连接是否在最后关闭
        mock_connection.close.assert_called_once()

    @patch("pymysql.connect")
    def test_execute_update(self, mock_connect):
        """测试 MySQL 更新操作"""
        mock_connection = mock_connect.return_value
        mock_cursor = mock_connection.cursor.return_value.__enter__.return_value

        # 调用被测方法
        FileHandler.execute_update(mock_connection, "UPDATE table SET key='value'")

        # 验证游标是否执行了正确的更新 SQL 语句
        mock_cursor.execute.assert_called_with("UPDATE table SET key='value'")

        # 验证连接是否在最后关闭
        mock_connection.close.assert_called_once()

if __name__ == '__main__':
    unittest.main()

4.使用示例

这里是基于 FileHandler 类中每个方法的使用示例,展示如何利用这些方法进行文件操作和 MySQL 数据库的操作。

1. 使用 read_jsonwrite_json 操作 JSON 文件

# 写入 JSON 文件
data = {"name": "Alice", "age": 30}
FileHandler.write_json(data, "data.json")

# 读取 JSON 文件
result = FileHandler.read_json("data.json")
print(result)  # 输出: {"name": "Alice", "age": 30}

2. 使用 read_excelwrite_excel 操作 Excel 文件

# 写入 Excel 文件
excel_data = [["Name", "Age"], ["Alice", 30], ["Bob", 25]]
FileHandler.write_excel(excel_data, "data.xlsx")

# 读取 Excel 文件
result = FileHandler.read_excel("data.xlsx")
print(result)  # 输出: [["Name", "Age"], ["Alice", 30], ["Bob", 25]]

3. 使用 read_yamlwrite_yaml 操作 YAML 文件

# 写入 YAML 文件
yaml_data = {"name": "Alice", "age": 30}
FileHandler.write_yaml(yaml_data, "data.yaml")

# 读取 YAML 文件
result = FileHandler.read_yaml("data.yaml")
print(result)  # 输出: {"name": "Alice", "age": 30}

4. 使用 read_txtwrite_txt 操作文本文件

# 写入 TXT 文件
text_data = "Hello, this is a text file."
FileHandler.write_txt(text_data, "data.txt")

# 读取 TXT 文件
result = FileHandler.read_txt("data.txt")
print(result)  # 输出: Hello, this is a text file.

5. 使用 read_csvwrite_csv 操作 CSV 文件

# 写入 CSV 文件
csv_data = [["Name", "Age"], ["Alice", "30"], ["Bob", "25"]]
FileHandler.write_csv(csv_data, "data.csv")

# 读取 CSV 文件
result = FileHandler.read_csv("data.csv")
print(result)  # 输出: [["Name", "Age"], ["Alice", "30"], ["Bob", "25"]]

6. 使用 connect_mysql 连接 MySQL 数据库

# 连接 MySQL 数据库
connection = FileHandler.connect_mysql("localhost", "user", "password", "testdb")

# 检查连接是否成功
if connection:
    print("Successfully connected to MySQL")
else:
    print("Connection failed")

7. 使用 execute_query 执行 MySQL 查询操作

# 假设已经建立了数据库连接
query = "SELECT * FROM users"
result = FileHandler.execute_query(connection, query)

# 输出查询结果
print(result)  # 输出查询结果列表

8. 使用 execute_update 执行 MySQL 更新操作

# 执行 SQL 更新操作
update_query = "UPDATE users SET age = 31 WHERE name = 'Alice'"
FileHandler.execute_update(connection, update_query)

# 确认更新是否成功
print("Update query executed successfully")

总结

通过 FileHandler 类,你可以方便地进行以下操作:

  • JSON、YAML、TXT、CSV、Excel 文件的读写。

  • MySQL 数据库连接、查询以及更新操作。

这些操作可以通过简单的方法调用来完成,无需重复编写底层文件和数据库处理逻辑,提升了代码的可读性和维护性。