引言:打破数据孤岛——构建数据库与办公文档的自动化桥梁 #
在企业信息化建设中,一个普遍存在的困境是:业务数据安全地存储在专业的数据库系统中,而日常办公和报告输出却依赖于WPS等办公软件。财务人员每天手动从数据库导出数据再导入Excel制作报表,市场人员反复复制粘贴数据到PPT进行业绩分析,行政人员耗费数小时从不同系统整理数据生成统计报告——这种低效的手工作业模式不仅浪费人力资源,更因人为操作引入了数据错误的风险。
本文将作为您打通数据库与办公自动化最后一公里的终极指南,系统讲解如何使用Python和WPS构建高效、准确、自动化的数据处理流水线。从基础的数据库连接配置,到复杂的企业级ETL流程,再到智能报表的自动生成,我们将为您展示如何将WPS Office打造为企业数据价值链的终端输出引擎。如果您对Python操作WPS的基础还不熟悉,建议先阅读《WPS二次开发完全手册:用Python扩展你的办公软件能力》建立必要的技术基础。
第一章:技术架构与环境配置——构建稳健的数据通道 #
1.1 数据库连接技术选型 #
在选择数据库连接方案时,需要根据具体场景做出技术决策:
class DatabaseConnectorFactory:
"""数据库连接器工厂类"""
@staticmethod
def create_connector(db_type, config):
"""创建数据库连接器"""
connectors = {
'mysql': MySQLConnector,
'sqlserver': SQLServerConnector,
'oracle': OracleConnector,
'postgresql': PostgreSQLConnector,
'sqlite': SQLiteConnector
}
connector_class = connectors.get(db_type.lower())
if not connector_class:
raise ValueError(f"不支持的数据库类型: {db_type}")
return connector_class(config)
@staticmethod
def get_recommended_connector(use_case):
"""根据使用场景推荐连接方案"""
recommendations = {
'high_performance': 'mysql',
'enterprise': 'oracle',
'windows_env': 'sqlserver',
'lightweight': 'sqlite',
'analytics': 'postgresql'
}
return recommendations.get(use_case, 'mysql')
1.2 环境依赖与驱动配置 #
import platform
import subprocess
import sys
class EnvironmentConfigurator:
"""环境配置器"""
def __init__(self):
self.system_info = platform.system()
def install_dependencies(self):
"""安装必要的Python包"""
dependencies = [
'pyodbc', # ODBC连接
'pymysql', # MySQL连接
'cx_Oracle', # Oracle连接
'psycopg2', # PostgreSQL连接
'sqlalchemy', # ORM工具
'pandas', # 数据处理
'openpyxl' # Excel操作
]
for package in dependencies:
try:
subprocess.check_call([
sys.executable, "-m", "pip", "install", package
])
print(f"✓ 成功安装 {package}")
except subprocess.CalledProcessError:
print(f"✗ 安装 {package} 失败")
def configure_odbc(self, db_type):
"""配置ODBC数据源"""
odbc_templates = {
'mysql': {
'driver': 'MySQL ODBC 8.0 Driver',
'config': 'SERVER=localhost;DATABASE=test;USER=root;PASSWORD=123456;'
},
'sqlserver': {
'driver': 'ODBC Driver 17 for SQL Server',
'config': 'SERVER=localhost;DATABASE=test;UID=sa;PWD=123456;'
}
}
template = odbc_templates.get(db_type)
if template:
print(f"请手动配置ODBC数据源:")
print(f"驱动:{template['driver']}")
print(f"连接字符串:{template['config']}")
1.3 连接配置管理 #
import configparser
from cryptography.fernet import Fernet
import base64
class SecureConfigManager:
"""安全的配置管理器"""
def __init__(self, config_file='database_config.ini', key_file='encryption.key'):
self.config_file = config_file
self.key_file = key_file
self.cipher_suite = self._setup_encryption()
def _setup_encryption(self):
"""设置加密"""
try:
with open(self.key_file, 'rb') as f:
key = f.read()
except FileNotFoundError:
key = Fernet.generate_key()
with open(self.key_file, 'wb') as f:
f.write(key)
return Fernet(key)
def save_database_config(self, config_name, db_config):
"""保存数据库配置(密码加密)"""
config = configparser.ConfigParser()
if 'password' in db_config:
encrypted_pwd = self.cipher_suite.encrypt(
db_config['password'].encode()
)
db_config['password'] = base64.b64encode(encrypted_pwd).decode()
config[config_name] = db_config
with open(self.config_file, 'w') as f:
config.write(f)
def load_database_config(self, config_name):
"""加载数据库配置(密码解密)"""
config = configparser.ConfigParser()
config.read(self.config_file)
if config_name not in config:
raise ValueError(f"配置 {config_name} 不存在")
db_config = dict(config[config_name])
if 'password' in db_config:
encrypted_pwd = base64.b64decode(db_config['password'])
db_config['password'] = self.cipher_suite.decrypt(encrypted_pwd).decode()
return db_config
第二章:数据库连接核心实战——多数据库支持与优化 #
2.1 MySQL数据库深度集成 #
import pymysql
import pandas as pd
from datetime import datetime, timedelta
class MySQLConnector:
"""MySQL数据库连接器"""
def __init__(self, config):
self.config = config
self.connection = None
self.connect()
def connect(self):
"""建立数据库连接"""
try:
self.connection = pymysql.connect(
host=self.config.get('host', 'localhost'),
user=self.config.get('user', 'root'),
password=self.config.get('password', ''),
database=self.config.get('database', 'test'),
port=self.config.get('port', 3306),
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor,
connect_timeout=10
)
print("✓ MySQL数据库连接成功")
except Exception as e:
print(f"✗ MySQL连接失败: {e}")
raise
def execute_query(self, query, params=None):
"""执行查询语句"""
try:
with self.connection.cursor() as cursor:
cursor.execute(query, params or ())
result = cursor.fetchall()
return result
except Exception as e:
print(f"查询执行失败: {e}")
self.connection.rollback()
raise
def get_data_as_dataframe(self, query, params=None):
"""获取DataFrame格式的数据"""
result = self.execute_query(query, params)
return pd.DataFrame(result)
def batch_insert_data(self, table_name, data_df):
"""批量插入数据"""
try:
with self.connection.cursor() as cursor:
# 构建插入语句
placeholders = ', '.join(['%s'] * len(data_df.columns))
columns = ', '.join(data_df.columns)
sql = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})"
# 批量插入
cursor.executemany(sql, data_df.values.tolist())
self.connection.commit()
print(f"✓ 成功插入 {cursor.rowcount} 行数据")
return cursor.rowcount
except Exception as e:
self.connection.rollback()
print(f"✗ 批量插入失败: {e}")
raise
def get_table_schema(self, table_name):
"""获取表结构信息"""
query = """
SELECT
COLUMN_NAME,
DATA_TYPE,
IS_NULLABLE,
COLUMN_DEFAULT,
COLUMN_COMMENT
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_NAME = %s AND TABLE_SCHEMA = %s
ORDER BY ORDINAL_POSITION
"""
params = (table_name, self.config['database'])
return self.execute_query(query, params)
# 高级查询示例
class AdvancedMySQLQueries:
"""高级MySQL查询示例"""
def __init__(self, connector):
self.connector = connector
def get_sales_trend_analysis(self, start_date, end_date):
"""销售趋势分析"""
query = """
SELECT
DATE(sale_date) as sale_day,
COUNT(*) as order_count,
SUM(amount) as total_amount,
AVG(amount) as avg_amount,
MAX(amount) as max_amount,
MIN(amount) as min_amount
FROM sales
WHERE sale_date BETWEEN %s AND %s
GROUP BY DATE(sale_date)
ORDER BY sale_day
"""
return self.connector.get_data_as_dataframe(query, (start_date, end_date))
def get_customer_behavior_analysis(self, customer_id=None):
"""客户行为分析"""
base_query = """
SELECT
c.customer_name,
c.customer_level,
COUNT(o.order_id) as total_orders,
SUM(o.amount) as total_spent,
AVG(o.amount) as avg_order_value,
MAX(o.order_date) as last_order_date,
DATEDIFF(CURDATE(), MAX(o.order_date)) as days_since_last_order
FROM customers c
LEFT JOIN orders o ON c.customer_id = o.customer_id
"""
if customer_id:
base_query += " WHERE c.customer_id = %s"
params = (customer_id,)
else:
base_query += " GROUP BY c.customer_id, c.customer_name, c.customer_level"
params = None
base_query += " HAVING total_orders > 0 ORDER BY total_spent DESC"
return self.connector.get_data_as_dataframe(base_query, params)
2.2 SQL Server集成方案 #
import pyodbc
import struct
class SQLServerConnector:
"""SQL Server数据库连接器"""
def __init__(self, config):
self.config = config
self.connection = None
self.connect()
def build_connection_string(self):
"""构建连接字符串"""
drivers = [
'ODBC Driver 18 for SQL Server',
'ODBC Driver 17 for SQL Server',
'ODBC Driver 13 for SQL Server'
]
for driver in drivers:
try:
conn_str = (
f"DRIVER={{{driver}}};"
f"SERVER={self.config['server']};"
f"DATABASE={self.config['database']};"
f"UID={self.config['username']};"
f"PWD={self.config['password']};"
"Encrypt=yes;"
"TrustServerCertificate=yes;"
)
return conn_str
except Exception:
continue
raise Exception("未找到可用的ODBC驱动")
def connect(self):
"""建立连接"""
try:
conn_str = self.build_connection_string()
self.connection = pyodbc.connect(conn_str)
print("✓ SQL Server数据库连接成功")
except Exception as e:
print(f"✗ SQL Server连接失败: {e}")
raise
def execute_stored_procedure(self, proc_name, params=None):
"""执行存储过程"""
try:
cursor = self.connection.cursor()
# 构建参数占位符
param_placeholders = ', '.join(['?'] * len(params)) if params else ''
sql = f"{{CALL {proc_name}({param_placeholders})}}"
cursor.execute(sql, params or ())
# 获取所有结果集
results = []
while True:
try:
result = cursor.fetchall()
if result:
results.append(result)
if not cursor.nextset():
break
except pyodbc.ProgrammingError:
# 没有更多结果集
break
cursor.close()
return results
except Exception as e:
print(f"存储过程执行失败: {e}")
raise
2.3 多数据库统一接口 #
from abc import ABC, abstractmethod
class DatabaseAdapter(ABC):
"""数据库适配器抽象基类"""
@abstractmethod
def connect(self):
pass
@abstractmethod
def execute_query(self, query, params=None):
pass
@abstractmethod
def get_dataframe(self, query, params=None):
pass
@abstractmethod
def disconnect(self):
pass
class UnifiedDatabaseClient:
"""统一数据库客户端"""
def __init__(self, db_type, config):
self.db_type = db_type
self.config = config
self.adapter = self._create_adapter()
def _create_adapter(self):
"""创建数据库适配器"""
adapters = {
'mysql': MySQLAdapter,
'sqlserver': SQLServerAdapter,
'oracle': OracleAdapter
}
adapter_class = adapters.get(self.db_type)
if not adapter_class:
raise ValueError(f"不支持的数据库类型: {self.db_type}")
return adapter_class(self.config)
def test_connection(self):
"""测试连接"""
try:
result = self.adapter.execute_query("SELECT 1 as test")
return len(result) > 0
except Exception as e:
print(f"连接测试失败: {e}")
return False
def get_database_info(self):
"""获取数据库信息"""
queries = {
'mysql': "SELECT VERSION() as version, NOW() as current_time",
'sqlserver': "SELECT @@VERSION as version, GETDATE() as current_time",
'oracle': "SELECT * FROM v$version"
}
query = queries.get(self.db_type)
if query:
return self.adapter.execute_query(query)
return None
class MySQLAdapter(DatabaseAdapter):
"""MySQL适配器"""
def __init__(self, config):
self.config = config
self.connector = MySQLConnector(config)
def connect(self):
self.connector.connect()
def execute_query(self, query, params=None):
return self.connector.execute_query(query, params)
def get_dataframe(self, query, params=None):
return self.connector.get_data_as_dataframe(query, params)
def disconnect(self):
if self.connector.connection:
self.connector.connection.close()
第三章:数据ETL流程实现——从数据库到WPS的自动化流水线 #
3.1 提取(Extract)层实现 #
class DataExtractor:
"""数据提取器"""
def __init__(self, db_client):
self.db_client = db_client
def incremental_extract(self, table_name, timestamp_column, last_extract_time):
"""增量数据提取"""
query = f"""
SELECT * FROM {table_name}
WHERE {timestamp_column} > %s
ORDER BY {timestamp_column}
"""
return self.db_client.get_dataframe(query, (last_extract_time,))
def full_extract(self, table_name, condition=None):
"""全量数据提取"""
query = f"SELECT * FROM {table_name}"
if condition:
query += f" WHERE {condition}"
return self.db_client.get_dataframe(query)
def complex_extract(self, join_query, params=None):
"""复杂查询提取"""
return self.db_client.get_dataframe(join_query, params)
# 使用示例
def create_sales_report_extractor(db_client):
"""创建销售报表数据提取器"""
extractor = DataExtractor(db_client)
# 提取销售数据
sales_query = """
SELECT
s.sale_id,
s.sale_date,
p.product_name,
c.category_name,
s.quantity,
s.unit_price,
s.amount,
cust.customer_name,
emp.employee_name
FROM sales s
JOIN products p ON s.product_id = p.product_id
JOIN categories c ON p.category_id = c.category_id
JOIN customers cust ON s.customer_id = cust.customer_id
JOIN employees emp ON s.employee_id = emp.employee_id
WHERE s.sale_date BETWEEN %s AND %s
"""
return extractor.complex_extract(sales_query, ('2024-01-01', '2024-12-31'))
3.2 转换(Transform)层实现 #
class DataTransformer:
"""数据转换器"""
def __init__(self):
self.validation_rules = {}
def add_validation_rule(self, column_name, rule_func, error_message):
"""添加数据验证规则"""
if column_name not in self.validation_rules:
self.validation_rules[column_name] = []
self.validation_rules[column_name].append({
'rule': rule_func,
'message': error_message
})
def validate_data(self, df):
"""数据验证"""
errors = []
for column, rules in self.validation_rules.items():
if column in df.columns:
for rule_info in rules:
mask = df[column].apply(rule_info['rule'])
invalid_rows = df[~mask]
if len(invalid_rows) > 0:
errors.append({
'column': column,
'message': rule_info['message'],
'invalid_count': len(invalid_rows),
'sample_rows': invalid_rows.head(3).to_dict('records')
})
return errors
def clean_data(self, df):
"""数据清洗"""
# 处理空值
df_cleaned = df.copy()
# 数值列用0填充
numeric_columns = df.select_dtypes(include=['number']).columns
df_cleaned[numeric_columns] = df_cleaned[numeric_columns].fillna(0)
# 文本列用"未知"填充
text_columns = df.select_dtypes(include=['object']).columns
df_cleaned[text_columns] = df_cleaned[text_columns].fillna('未知')
return df_cleaned
def calculate_derived_columns(self, df):
"""计算衍生列"""
df_calculated = df.copy()
# 计算衍生指标
if all(col in df.columns for col in ['quantity', 'unit_price']):
df_calculated['total_amount'] = df['quantity'] * df['unit_price']
if 'sale_date' in df.columns:
df_calculated['sale_year'] = pd.to_datetime(df['sale_date']).dt.year
df_calculated['sale_month'] = pd.to_datetime(df['sale_date']).dt.month
df_calculated['sale_quarter'] = pd.to_datetime(df['sale_date']).dt.quarter
return df_calculated
def apply_business_rules(self, df):
"""应用业务规则"""
df_transformed = df.copy()
# 客户分级
if 'total_spent' in df.columns:
conditions = [
df['total_spent'] >= 10000,
df['total_spent'] >= 5000,
df['total_spent'] >= 1000
]
choices = ['VIP', '黄金', '白银']
df_transformed['customer_level'] = np.select(conditions, choices, default='普通')
# 产品分类
if 'category_name' in df.columns:
df_transformed['product_segment'] = df['category_name'].map({
'电子产品': '高科技',
'办公用品': '日常',
'家具': '大件'
}).fillna('其他')
return df_transformed
3.3 加载(Load)层实现 #
class WPSDataLoader:
"""WPS数据加载器"""
def __init__(self):
self.wps_app = None
self.initialize_wps()
def initialize_wps(self):
"""初始化WPS"""
try:
self.wps_app = win32.Dispatch("Ket.Application")
self.wps_app.Visible = True
print("✓ WPS表格初始化成功")
except Exception as e:
print(f"✗ WPS表格初始化失败: {e}")
raise
def create_workbook_from_dataframe(self, df, file_path, sheet_name="数据"):
"""从DataFrame创建工作簿"""
try:
# 创建新工作簿
workbook = self.wps_app.Workbooks.Add()
worksheet = workbook.ActiveSheet
worksheet.Name = sheet_name
# 写入表头
headers = df.columns.tolist()
for col_idx, header in enumerate(headers, 1):
worksheet.Cells(1, col_idx).Value = header
# 写入数据
for row_idx, row in df.iterrows():
for col_idx, value in enumerate(row, 1):
worksheet.Cells(row_idx + 2, col_idx).Value = value
# 自动调整列宽
worksheet.Cells.EntireColumn.AutoFit()
# 设置表头样式
header_range = worksheet.Range(worksheet.Cells(1, 1),
worksheet.Cells(1, len(headers)))
header_range.Font.Bold = True
header_range.Interior.Color = RGB(200, 200, 200)
# 保存文件
workbook.SaveAs(file_path)
print(f"✓ 文件已保存: {file_path}")
return workbook
except Exception as e:
print(f"✗ 创建工作簿失败: {e}")
raise
def append_to_existing_workbook(self, df, file_path, sheet_name):
"""追加数据到现有工作簿"""
try:
workbook = self.wps_app.Workbooks.Open(file_path)
# 查找或创建工作表
try:
worksheet = workbook.Worksheets(sheet_name)
except:
worksheet = workbook.Worksheets.Add()
worksheet.Name = sheet_name
# 找到最后一行
last_row = worksheet.Cells(worksheet.Rows.Count, 1).End(-4162).Row
start_row = last_row + 1 if last_row > 1 else 1
# 如果是第一行,写入表头
if start_row == 1:
headers = df.columns.tolist()
for col_idx, header in enumerate(headers, 1):
worksheet.Cells(1, col_idx).Value = header
start_row = 2
# 写入数据
for row_idx, row in df.iterrows():
for col_idx, value in enumerate(row, 1):
worksheet.Cells(start_row + row_idx, col_idx).Value = value
workbook.Save()
workbook.Close()
print(f"✓ 数据已追加到: {file_path}")
except Exception as e:
print(f"✗ 追加数据失败: {e}")
raise
def RGB(r, g, b):
"""转换RGB颜色"""
return r + (g * 256) + (b * 256 * 256)
第四章:动态报表生成系统——智能业务报表解决方案 #
4.1 报表模板引擎 #
class ReportTemplateEngine:
"""报表模板引擎"""
def __init__(self, template_path):
self.template_path = template_path
self.placeholders = self._extract_placeholders()
def _extract_placeholders(self):
"""提取模板中的占位符"""
workbook = self.wps_app.Workbooks.Open(self.template_path)
placeholders = {}
for worksheet in workbook.Worksheets:
used_range = worksheet.UsedRange
if used_range:
for cell in used_range:
cell_value = str(cell.Value) if cell.Value else ""
if cell_value.startswith('{{') and cell_value.endswith('}}'):
placeholder = cell_value[2:-2].strip()
placeholders[placeholder] = {
'worksheet': worksheet.Name,
'address': cell.Address,
'original_value': cell_value
}
workbook.Close()
return placeholders
def generate_report(self, data_dict, output_path):
"""生成报表"""
try:
# 复制模板
import shutil
shutil.copy2(self.template_path, output_path)
# 打开新文件
workbook = self.wps_app.Workbooks.Open(output_path)
# 替换占位符
for placeholder, info in self.placeholders.items():
if placeholder in data_dict:
worksheet = workbook.Worksheets(info['worksheet'])
cell = worksheet.Range(info['address'])
cell.Value = data_dict[placeholder]
# 保存
workbook.Save()
workbook.Close()
print(f"✓ 报表生成成功: {output_path}")
except Exception as e:
print(f"✗ 报表生成失败: {e}")
raise
4.2 自动化报表调度系统 #
import schedule
import time
from datetime import datetime
class ReportScheduler:
"""报表调度器"""
def __init__(self, db_config, report_configs):
self.db_config = db_config
self.report_configs = report_configs
self.db_client = UnifiedDatabaseClient(
db_config['type'], db_config
)
def setup_daily_reports(self):
"""设置日报"""
schedule.every().day.at("08:00").do(
self.generate_daily_sales_report
)
schedule.every().day.at("17:30").do(
self.generate_daily_summary
)
def setup_weekly_reports(self):
"""设置周报"""
schedule.every().monday.at("09:00").do(
self.generate_weekly_report
)
def setup_monthly_reports(self):
"""设置月报"""
schedule.every().month.at("10:00").do(
self.generate_monthly_report
)
def generate_daily_sales_report(self):
"""生成每日销售报表"""
print(f"{datetime.now()}: 开始生成每日销售报表")
try:
# 提取数据
extractor = DataExtractor(self.db_client)
sales_data = extractor.complex_extract("""
SELECT * FROM sales
WHERE sale_date = CURDATE() - INTERVAL 1 DAY
""")
# 转换数据
transformer = DataTransformer()
cleaned_data = transformer.clean_data(sales_data)
transformed_data = transformer.apply_business_rules(cleaned_data)
# 加载到WPS
loader = WPSDataLoader()
report_path = f"./reports/销售日报_{datetime.now().strftime('%Y%m%d')}.xlsx"
loader.create_workbook_from_dataframe(
transformed_data, report_path, "销售数据"
)
print("✓ 每日销售报表生成完成")
except Exception as e:
print(f"✗ 每日销售报表生成失败: {e}")
def run_scheduler(self):
"""运行调度器"""
print("报表调度器已启动...")
self.setup_daily_reports()
self.setup_weekly_reports()
self.setup_monthly_reports()
while True:
schedule.run_pending()
time.sleep(60) # 每分钟检查一次
第五章:高级应用与性能优化 #
5.1 大数据量处理优化 #
class BigDataProcessor:
"""大数据处理器"""
def __init__(self, db_client, batch_size=10000):
self.db_client = db_client
self.batch_size = batch_size
def process_large_dataset(self, query, output_path):
"""处理大数据集"""
try:
# 第一次查询获取总数
count_query = f"SELECT COUNT(*) as total FROM ({query}) as subquery"
total_count = self.db_client.execute_query(count_query)[0]['total']
print(f"总共需要处理 {total_count} 条记录")
# 分批次处理
for offset in range(0, total_count, self.batch_size):
batch_query = f"{query} LIMIT {self.batch_size} OFFSET {offset}"
batch_data = self.db_client.get_dataframe(batch_query)
# 处理当前批次
self._process_batch(batch_data, output_path, offset)
progress = min(offset + self.batch_size, total_count)
print(f"处理进度: {progress}/{total_count} ({progress/total_count*100:.1f}%)")
print("✓ 大数据处理完成")
except Exception as e:
print(f"✗ 大数据处理失败: {e}")
raise
def _process_batch(self, batch_data, output_path, offset):
"""处理单个批次"""
# 这里可以实现具体的数据处理逻辑
# 例如:数据清洗、转换、分析等
# 临时保存批次数据
temp_path = f"{output_path}_batch_{offset}.csv"
batch_data.to_csv(temp_path, index=False, encoding='utf-8-sig')
5.2 错误处理与日志系统 #
import logging
from logging.handlers import RotatingFileHandler
class DatabaseETLManager:
"""数据库ETL管理器"""
def __init__(self, config):
self.config = config
self.setup_logging()
self.db_client = UnifiedDatabaseClient(
config['database']['type'],
config['database']
)
def setup_logging(self):
"""设置日志系统"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
RotatingFileHandler(
'etl_process.log',
maxBytes=10485760, # 10MB
backupCount=5
),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
def run_etl_process(self, etl_config):
"""运行ETL流程"""
self.logger.info("开始ETL流程")
try:
# 提取阶段
self.logger.info("开始数据提取")
extractor = DataExtractor(self.db_client)
raw_data = extractor.complex_extract(
etl_config['extract_query'],
etl_config.get('extract_params')
)
self.logger.info(f"提取到 {len(raw_data)} 条记录")
# 转换阶段
self.logger.info("开始数据转换")
transformer = DataTransformer()
# 添加验证规则
for rule in etl_config.get('validation_rules', []):
transformer.add_validation_rule(
rule['column'],
rule['rule'],
rule['message']
)
# 验证数据
validation_errors = transformer.validate_data(raw_data)
if validation_errors:
self.logger.warning(f"发现 {len(validation_errors)} 个数据验证问题")
for error in validation_errors:
self.logger.warning(
f"列 {error['column']}: {error['message']} "
f"(影响 {error['invalid_count']} 行)"
)
# 清洗和转换数据
cleaned_data = transformer.clean_data(raw_data)
transformed_data = transformer.calculate_derived_columns(cleaned_data)
final_data = transformer.apply_business_rules(transformed_data)
# 加载阶段
self.logger.info("开始数据加载")
loader = WPSDataLoader()
loader.create_workbook_from_dataframe(
final_data,
etl_config['output_path'],
etl_config.get('sheet_name', '数据')
)
self.logger.info("ETL流程完成")
return True
except Exception as e:
self.logger.error(f"ETL流程失败: {e}", exc_info=True)
return False
结语:构建企业级数据自动化体系 #
通过这篇完整指南,您已经掌握了使用Python和WPS构建数据库自动化处理系统的全套技能。从基础的单数据库操作,到复杂的多源数据集成,再到企业级的ETL流程和报表系统,这些技术将彻底改变您处理数据的方式。
真正的价值在于将这些技术组件根据实际业务需求进行组合和创新。建议您从当前最耗时、最容易出错的数据处理任务开始,设计并实施自动化解决方案。当您看到原本需要数小时手动完成的工作现在只需点击一下就能自动完成时,您将真正体会到数据自动化的威力。
记住,技术只是工具,真正的成功在于它如何为业务创造价值。现在就开始规划您的第一个数据库自动化项目,让数据真正为企业赋能!如果在实施过程中遇到WPS操作的相关问题,可以参考《WPS二次开发完全手册:用Python扩展你的办公软件能力》寻求解决方案。