Python日志系统架构设计与ELK集成

日志是系统可观测性的重要组成部分。本文将介绍Python日志系统架构设计和ELK集成方案。

日志系统核心组件

  • Logger:日志记录器
  • Handler:日志处理器
  • Formatter:日志格式化器
  • Filter:日志过滤器

日志系统核心实现

"""
Python日志系统架构设计与ELK集成
包含结构化日志、多处理器、ELK集成等
"""

import logging
import logging.handlers
import json
import sys
from datetime import datetime
from typing import Dict, Any, Optional
from pythonjsonlogger import jsonlogger
import queue
import threading


# ============ 结构化日志格式 ============
class StructuredLogFormatter(jsonlogger.JsonFormatter):
    """结构化日志格式化器"""
    
    def add_fields(self, log_record, record, message_dict):
        super().add_fields(log_record, record, message_dict)
        
        # 添加时间戳
        if not log_record.get('timestamp'):
            log_record['timestamp'] = datetime.utcnow().isoformat()
        
        # 添加日志级别
        if log_record.get('level'):
            log_record['level'] = log_record['level'].upper()
        else:
            log_record['level'] = record.levelname
        
        # 添加源信息
        log_record['source'] = {
            'file': record.filename,
            'line': record.lineno,
            'function': record.funcName,
            'module': record.module
        }


# ============ 自定义日志处理器 ============
class AsyncHandler(logging.Handler):
    """异步日志处理器"""
    
    def __init__(self, handler: logging.Handler, max_queue_size: int = 1000):
        super().__init__()
        self.handler = handler
        self.queue = queue.Queue(maxsize=max_queue_size)
        self.thread = threading.Thread(target=self._process, daemon=True)
        self.thread.start()
    
    def emit(self, record):
        try:
            self.queue.put_nowait(record)
        except queue.Full:
            # 队列满时直接处理
            self.handler.emit(record)
    
    def _process(self):
        while True:
            try:
                record = self.queue.get()
                if record is None:
                    break
                self.handler.emit(record)
            except Exception:
                pass
    
    def close(self):
        self.queue.put(None)
        self.thread.join(timeout=5)
        self.handler.close()
        super().close()


class ContextFilter(logging.Filter):
    """上下文过滤器"""
    
    def __init__(self, context: Dict[str, Any] = None):
        super().__init__()
        self.context = context or {}
        self._local = threading.local()
    
    def filter(self, record):
        # 添加上下文信息
        for key, value in self.context.items():
            setattr(record, key, value)
        
        # 添加线程本地上下文
        if hasattr(self._local, 'request_id'):
            record.request_id = self._local.request_id
        
        return True
    
    def set_request_id(self, request_id: str):
        self._local.request_id = request_id


# ============ 日志配置 ============
def setup_logging(
    level: str = "INFO",
    log_file: Optional[str] = None,
    json_format: bool = True
) -> logging.Logger:
    """配置日志系统"""
    
    # 创建logger
    logger = logging.getLogger("app")
    logger.setLevel(getattr(logging, level.upper()))
    logger.handlers = []  # 清除现有处理器
    
    # 创建格式化器
    if json_format:
        formatter = StructuredLogFormatter(
            '%(timestamp)s %(level)s %(name)s %(message)s'
        )
    else:
        formatter = logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
    
    # 控制台处理器
    console_handler = logging.StreamHandler(sys.stdout)
    console_handler.setFormatter(formatter)
    logger.addHandler(console_handler)
    
    # 文件处理器
    if log_file:
        file_handler = logging.handlers.RotatingFileHandler(
            log_file,
            maxBytes=10*1024*1024,  # 10MB
            backupCount=5
        )
        file_handler.setFormatter(formatter)
        logger.addHandler(file_handler)
    
    # 添加上下文过滤器
    context_filter = ContextFilter({
        'service': 'my-service',
        'version': '1.0.0'
    })
    logger.addFilter(context_filter)
    
    return logger


# ============ 日志记录器类 ============
class StructuredLogger:
    """结构化日志记录器"""
    
    def __init__(self, logger: logging.Logger):
        self._logger = logger
    
    def _log(self, level: int, message: str, **kwargs):
        """记录结构化日志"""
        extra = {'structured_data': kwargs}
        self._logger.log(level, message, extra=extra)
    
    def info(self, message: str, **kwargs):
        self._log(logging.INFO, message, **kwargs)
    
    def error(self, message: str, **kwargs):
        self._log(logging.ERROR, message, **kwargs)
    
    def debug(self, message: str, **kwargs):
        self._log(logging.DEBUG, message, **kwargs)
    
    def warning(self, message: str, **kwargs):
        self._log(logging.WARNING, message, **kwargs)


# ============ 业务日志记录 ============
class UserService:
    """用户服务"""
    
    def __init__(self, logger: StructuredLogger):
        self.logger = logger
    
    def create_user(self, username: str, email: str):
        """创建用户"""
        self.logger.info(
            "创建用户",
            action="create_user",
            username=username,
            email=email
        )
        # 业务逻辑...
        return {"id": 1, "username": username}
    
    def get_user(self, user_id: int):
        """获取用户"""
        self.logger.info(
            "获取用户",
            action="get_user",
            user_id=user_id
        )
        # 业务逻辑...
        return {"id": user_id, "username": "alice"}


# ============ ELK集成示例 ============
class ELKHandler(logging.Handler):
    """ELK日志处理器"""
    
    def __init__(self, host: str, port: int):
        super().__init__()
        self.host = host
        self.port = port
    
    def emit(self, record):
        """发送日志到ELK"""
        log_entry = self.format(record)
        # 实际实现中这里会发送到Logstash或Elasticsearch
        print(f"[ELK] {log_entry}")


def main():
    """主函数"""
    print("="*60)
    print("Python日志系统架构设计")
    print("="*60)
    
    # 配置日志
    logger = setup_logging(
        level="INFO",
        log_file="app.log",
        json_format=True
    )
    
    structured_logger = StructuredLogger(logger)
    
    # 使用日志
    print("\n【结构化日志示例】")
    user_service = UserService(structured_logger)
    user_service.create_user("alice", "alice@example.com")
    user_service.get_user(1)
    
    # 记录异常
    print("\n【异常日志示例】")
    try:
        1 / 0
    except Exception as e:
        structured_logger.error(
            "发生错误",
            error_type=type(e).__name__,
            error_message=str(e)
        )
    
    print("\n" + "="*60)
    print("日志系统总结")
    print("="*60)
    print("1. 结构化日志: JSON格式便于解析")
    print("2. 多处理器: 同时输出到控制台和文件")
    print("3. 异步处理: 避免日志阻塞业务")
    print("4. 上下文信息: 添加上下文便于追踪")
    print("5. ELK集成: 集中化日志管理")
    print("="*60)


if __name__ == "__main__":
    main()

日志系统架构图

flowchart TB
    subgraph App["应用程序"]
        A1[业务代码]
        A2[结构化日志]
    end
    
    subgraph Logger["日志系统"]
        L1[Logger]
        L2[Handler]
        L3[Formatter]
        L4[Filter]
    end
    
    subgraph Output["输出目标"]
        O1[控制台]
        O2[文件]
        O3[ELK Stack]
    end
    
    App --> Logger --> Output

关键要点

  1. 结构化日志:JSON格式便于解析和分析
  2. 多处理器:同时输出到多个目标
  3. 异步处理:避免日志阻塞业务逻辑
  4. 上下文信息:添加上下文便于问题追踪
  5. ELK集成:集中化日志管理和分析

良好的日志系统是系统可观测性的基础。

标签: none

添加新评论