双八网站建设wordpress随机文章

张小明 2026/1/1 2:47:53
双八网站建设,wordpress随机文章,黄石网站建设网络公司,软文广告经典案例800字目录日志系统与结构化日志引言1. 日志系统基础概念1.1 日志的重要性与价值1.2 日志系统的演进历程1.3 日志质量的金字塔模型2. 结构化日志基础2.1 什么是结构化日志#xff1f;2.2 结构化日志 vs 非结构化日志2.3 结构化日志的数学表示3. 日志系统架构设计3.1 现代日志系统架构…目录日志系统与结构化日志引言1. 日志系统基础概念1.1 日志的重要性与价值1.2 日志系统的演进历程1.3 日志质量的金字塔模型2. 结构化日志基础2.1 什么是结构化日志2.2 结构化日志 vs 非结构化日志2.3 结构化日志的数学表示3. 日志系统架构设计3.1 现代日志系统架构3.2 日志处理流水线3.3 分布式日志追踪4. Python结构化日志实现4.1 基础结构化日志框架4.2 高级日志处理器4.3 完整的日志系统5. 高级特性实现5.1 分布式追踪集成5.2 性能监控集成5.3 日志采样与聚合6. 配置与使用示例6.1 配置管理系统6.2 使用示例7. 测试与验证7.1 单元测试7.2 性能测试8. 最佳实践与部署8.1 结构化日志最佳实践8.2 生产环境部署指南8.3 监控与告警配置9. 总结与展望9.1 关键收获9.2 性能数据总结9.3 未来发展方向附录A. 日志级别对照表B. 常见问题解答C. 性能优化建议『宝藏代码胶囊开张啦』—— 我的 CodeCapsule 来咯✨写代码不再头疼我的新站点 CodeCapsule 主打一个 “白菜价”“量身定制”无论是卡脖子的毕设/课设/文献复现需要灵光一现的算法改进还是想给项目加个“外挂”这里都有便宜又好用的代码方案等你发现低成本高适配助你轻松通关速来围观 CodeCapsule官网日志系统与结构化日志引言在现代软件系统中日志不仅是调试和问题排查的工具更是系统可观测性的核心组成部分。随着微服务、分布式系统和云原生架构的普及传统文本日志已无法满足复杂系统的监控、分析和调试需求。结构化日志应运而生成为现代日志系统的标准实践。根据2023年DevOps现状报告显示采用结构化日志的团队部署频率提高2.6倍故障恢复时间缩短3.2倍。本文将深入探讨结构化日志系统的设计原理、实现方法和最佳实践提供完整的Python实现方案。1. 日志系统基础概念1.1 日志的重要性与价值日志系统为软件系统提供了以下关键价值故障排查快速定位和解决生产环境问题性能监控跟踪系统性能和资源使用情况安全审计记录用户操作和安全事件业务分析分析用户行为和应用使用模式合规要求满足法律和行业规定的日志保留要求1.2 日志系统的演进历程文本日志1990s日志框架2000s结构化日志2010s可观测性平台2020s1.3 日志质量的金字塔模型日志金字塔Level 4: 业务洞察用户行为分析Level 3: 应用性能性能指标追踪Level 2: 系统状态错误和警告Level 1: 调试信息详细执行跟踪2. 结构化日志基础2.1 什么是结构化日志结构化日志是将日志数据以机器可读的格式通常是JSON进行组织而不是传统的纯文本格式。结构化日志包含固定字段时间戳、级别、消息、来源等上下文字段请求ID、用户ID、会话ID等业务字段操作类型、资源ID、结果状态等性能字段耗时、内存使用、请求大小等2.2 结构化日志 vs 非结构化日志维度结构化日志非结构化日志格式JSON、键值对纯文本可读性机器友好人类友好查询能力强大支持字段筛选有限文本搜索存储效率较高较低解析复杂度简单复杂扩展性容易添加新字段需要修改格式2.3 结构化日志的数学表示设日志事件为一个元组L ( t , l , m , C ) L (t, l, m, C)L(t,l,m,C)其中t tt时间戳l ll日志级别m mm消息模板C CC上下文键值对集合C { k 1 : v 1 , k 2 : v 2 , . . . , k n : v n } C \{k_1:v_1, k_2:v_2, ..., k_n:v_n\}C{k1​:v1​,k2​:v2​,...,kn​:vn​}结构化日志可以表示为L s t r u c t JSON ( { t i m e s t a m p : t , l e v e l : l , m e s s a g e : m } ∪ C ) L_{struct} \text{JSON}(\{timestamp: t, level: l, message: m\} \cup C)Lstruct​JSON({timestamp:t,level:l,message:m}∪C)日志查询可以形式化为Query ( L s t r u c t , Φ ) { L ∣ ∀ ( k , v ) ∈ Φ , L . C [ k ] v } \text{Query}(L_{struct}, \Phi) \{L | \forall (k,v) \in \Phi, L.C[k] v\}Query(Lstruct​,Φ){L∣∀(k,v)∈Φ,L.C[k]v}其中Φ \PhiΦ是查询条件的键值对集合。3. 日志系统架构设计3.1 现代日志系统架构消费层存储层处理层收集层应用层监控告警分析平台合规审计搜索界面实时存储长期存储冷存储日志解析器日志丰富器日志过滤器日志代理消息队列结构化日志应用服务A结构化日志应用服务B结构化日志应用服务C3.2 日志处理流水线典型的日志处理流水线包含以下阶段收集从应用收集原始日志解析提取结构化字段丰富添加元数据主机名、环境等过滤移除敏感信息或无用数据转换格式转换和标准化路由根据规则分发到不同目的地存储持久化存储索引建立快速检索索引3.3 分布式日志追踪在微服务架构中分布式追踪是结构化日志的关键组成部分。使用以下字段实现追踪trace_id整个请求链路的唯一标识span_id单个操作段的标识parent_span_id父操作的标识service_name服务名称operation_name操作名称追踪系统的数学表示设请求R RR经过n nn个服务则T ( R ) { S 1 , S 2 , . . . , S n } T(R) \{S_1, S_2, ..., S_n\}T(R){S1​,S2​,...,Sn​}每个服务操作S i S_iSi​包含S i ( t s t a r t , t e n d , trace_id , span_id i , parent_span_id i , metadata i ) S_i (t_{start}, t_{end}, \text{trace\_id}, \text{span\_id}_i, \text{parent\_span\_id}_i, \text{metadata}_i)Si​(tstart​,tend​,trace_id,span_idi​,parent_span_idi​,metadatai​)请求总耗时Δ t max ⁡ ( t e n d ) − min ⁡ ( t s t a r t ) \Delta t \max(t_{end}) - \min(t_{start})Δtmax(tend​)−min(tstart​)4. Python结构化日志实现4.1 基础结构化日志框架 结构化日志系统实现 设计原则 1. 结构化优先所有日志输出为结构化格式 2. 上下文感知自动捕获和传递上下文 3. 性能友好异步处理最小化性能影响 4. 可扩展性支持自定义处理器和格式器 5. 安全性内置敏感信息过滤 importjsonimportloggingimportsysimporttimeimportuuidimportinspectimportthreadingfromtypingimportDict,Any,Optional,List,Union,CallablefromdatetimeimportdatetimefromenumimportEnumfromdataclassesimportdataclass,field,asdictfromabcimportABC,abstractmethodfromqueueimportQueue,Emptyfromconcurrent.futuresimportThreadPoolExecutorfrompathlibimportPathimporttracebackimporthashlibimportzlibfromcollectionsimportdefaultdict# 类型别名LogDataDict[str,Any]ContextDictDict[str,Any]classLogLevel(Enum):日志级别枚举TRACE0# 最详细的跟踪信息DEBUG1# 调试信息INFO2# 常规信息WARN3# 警告信息ERROR4# 错误信息FATAL5# 严重错误classmethoddeffrom_string(cls,level_str:str)-LogLevel:从字符串转换日志级别level_map{trace:cls.TRACE,debug:cls.DEBUG,info:cls.INFO,warn:cls.WARN,warning:cls.WARN,error:cls.ERROR,fatal:cls.FATAL,critical:cls.FATAL}returnlevel_map.get(level_str.lower(),cls.INFO)classmethoddefto_standard_level(cls,level:LogLevel)-int:转换为标准logging级别mapping{cls.TRACE:5,# 低于DEBUGcls.DEBUG:logging.DEBUG,cls.INFO:logging.INFO,cls.WARN:logging.WARNING,cls.ERROR:logging.ERROR,cls.FATAL:logging.CRITICAL}returnmapping[level]dataclassclassLogRecord:结构化日志记录# 基础字段timestamp:strlevel:strmessage:strlogger_name:str# 上下文字段trace_id:Optional[str]Nonespan_id:Optional[str]Nonerequest_id:Optional[str]Noneuser_id:Optional[str]Nonesession_id:Optional[str]Nonecorrelation_id:Optional[str]None# 执行上下文filename:Optional[str]Nonefunction:Optional[str]Noneline_no:Optional[int]Nonethread_id:Optional[int]Nonethread_name:Optional[str]Noneprocess_id:Optional[int]None# 应用程序上下文app_name:Optional[str]Noneapp_version:Optional[str]Noneenvironment:Optional[str]Nonehostname:Optional[str]Noneservice_name:Optional[str]None# 性能指标duration_ms:Optional[float]Nonememory_mb:Optional[float]Nonecpu_percent:Optional[float]None# 自定义字段extra:Dict[str,Any]field(default_factorydict)# 错误信息error_type:Optional[str]Noneerror_message:Optional[str]Nonestack_trace:Optional[str]Nonedefto_dict(self)-Dict[str,Any]:转换为字典resultasdict(self)# 移除None值以减小体积return{k:vfork,vinresult.items()ifvisnotNone}defto_json(self,indent:Optional[int]None)-str:转换为JSON字符串returnjson.dumps(self.to_dict(),indentindent,ensure_asciiFalse)defget_field_hash(self)-str:获取字段内容的哈希值用于去重# 排除一些动态字段excluded_fields{timestamp,duration_ms,memory_mb,cpu_percent}data{k:vfork,vinself.to_dict().items()ifknotinexcluded_fieldsandvisnotNone}contentjson.dumps(data,sort_keysTrue,ensure_asciiFalse)returnhashlib.md5(content.encode()).hexdigest()defis_similar_to(self,other:LogRecord,threshold:float0.9)-bool:判断两个日志记录是否相似用于去重ifself.level!other.level:returnFalse# 计算消息相似度简化的编辑距离fromdifflibimportSequenceMatcher message_similaritySequenceMatcher(None,self.message,other.message).ratio()returnmessage_similaritythresholdclassLogContext:日志上下文管理器def__init__(self):# 线程本地存储self._localthreading.local()self._global_context{}self._context_stack[]propertydefcurrent(self)-Dict[str,Any]:获取当前上下文ifnothasattr(self._local,context):self._local.context{}returnself._local.contextcurrent.setterdefcurrent(self,context:Dict[str,Any]):设置当前上下文self._local.contextcontextdefget(self,key:str,default:AnyNone)-Any:获取上下文值returnself.current.get(key,self._global_context.get(key,default))defset(self,key:str,value:Any,global_scope:boolFalse):设置上下文值ifglobal_scope:self._global_context[key]valueelse:self.current[key]valuedefupdate(self,data:Dict[str,Any],global_scope:boolFalse):批量更新上下文ifglobal_scope:self._global_context.update(data)else:self.current.update(data)defclear(self):清除当前线程上下文ifhasattr(self._local,context):self._local.context.clear()defpush_context(self,context:Dict[str,Any]):压入新的上下文层ifnothasattr(self._local,context_stack):self._local.context_stack[]# 保存当前上下文current_copyself.current.copy()self._local.context_stack.append(current_copy)# 更新为新上下文合并new_contextcurrent_copy.copy()new_context.update(context)self.currentnew_contextdefpop_context(self)-Dict[str,Any]:弹出上下文层ifnothasattr(self._local,context_stack)ornotself._local.context_stack:old_contextself.current.copy()self.clear()returnold_context old_contextself.current self.currentself._local.context_stack.pop()returnold_contextdefcontext_manager(self,**kwargs):上下文管理器returnLogContextManager(self,kwargs)defget_all_context(self)-Dict[str,Any]:获取所有上下文包括全局resultself._global_context.copy()result.update(self.current)returnresultclassLogContextManager:上下文管理器def__init__(self,log_context:LogContext,context_data:Dict[str,Any]):self.log_contextlog_context self.context_datacontext_datadef__enter__(self):self.log_context.push_context(self.context_data)returnselfdef__exit__(self,exc_type,exc_val,exc_tb):self.log_context.pop_context()classStructuredFormatter(ABC):结构化日志格式化器抽象基类abstractmethoddefformat(self,record:LogRecord)-str:格式化日志记录passclassJSONFormatter(StructuredFormatter):JSON格式化器def__init__(self,indent:Optional[int]None,ensure_ascii:boolFalse,sort_keys:boolFalse,include_metadata:boolTrue):self.indentindent self.ensure_asciiensure_ascii self.sort_keyssort_keys self.include_metadatainclude_metadatadefformat(self,record:LogRecord)-str:格式化为JSONdatarecord.to_dict()# 添加格式化元数据ifself.include_metadata:data[_metadata]{format_version:1.0,formatter:json,timestamp_ns:time.time_ns()}returnjson.dumps(data,indentself.indent,ensure_asciiself.ensure_ascii,sort_keysself.sort_keys)classNDJSONFormatter(StructuredFormatter):NDJSON格式化器每行一个JSONdef__init__(self,**kwargs):self.json_formatterJSONFormatter(**kwargs)defformat(self,record:LogRecord)-str:格式化为NDJSONreturnself.json_formatter.format(record)classLogFilter(ABC):日志过滤器抽象基类abstractmethoddeffilter(self,record:LogRecord)-bool:过滤日志记录返回True表示保留passclassLevelFilter(LogFilter):级别过滤器def__init__(self,min_level:LogLevel):self.min_levelmin_leveldeffilter(self,record:LogRecord)-bool:根据级别过滤record_levelLogLevel.from_string(record.level)returnrecord_level.valueself.min_level.valueclassRateLimitFilter(LogFilter):速率限制过滤器def__init__(self,max_per_second:int10,window_seconds:int1):self.max_per_secondmax_per_second self.window_secondswindow_seconds self.log_countsdefaultdict(int)self.window_starttime.time()deffilter(self,record:LogRecord)-bool:速率限制current_timetime.time()# 检查是否需要重置窗口ifcurrent_time-self.window_startself.window_seconds:self.log_counts.clear()self.window_startcurrent_time# 获取日志哈希作为键log_keyrecord.get_field_hash()current_countself.log_counts[log_key]ifcurrent_countself.max_per_second:self.log_counts[log_key]current_count1returnTruereturnFalseclassSensitiveDataFilter(LogFilter):敏感数据过滤器def__init__(self):# 敏感数据模式可以扩展self.sensitive_patterns[r(?i)(password|passwd|pwd)[:]\s*[\]?([^\\s])[\]?,r(?i)(api[_-]?key|secret[_-]?key)[:]\s*[\]?([^\\s])[\]?,r(?i)(token)[:]\s*[\]?([^\\s])[\]?,r(?i)(credit[_-]?card|cc)[:]\s*[\]?(\d[ -]*?){13,16}[\]?,r\b\d{3}[-.]?\d{3}[-.]?\d{4}\b,# 电话号码r\b[A-Za-z0-9._%-][A-Za-z0-9.-]\.[A-Z|a-z]{2,}\b,# 邮箱]self.compiled_patterns[re.compile(pattern)forpatterninself.sensitive_patterns]deffilter(self,record:LogRecord)-bool:过滤敏感信息# 对消息进行脱敏record.messageself._mask_sensitive_data(record.message)# 对extra字段进行脱敏forkey,valueinrecord.extra.items():ifisinstance(value,str):record.extra[key]self._mask_sensitive_data(value)returnTruedef_mask_sensitive_data(self,text:str)-str:脱敏文本中的敏感信息ifnotisinstance(text,str):returntext masked_texttextforpatterninself.compiled_patterns:masked_textpattern.sub(self._mask_replacer,masked_text)returnmasked_textdef_mask_replacer(self,match)-str:替换匹配的敏感信息full_matchmatch.group(0)# 根据匹配内容决定脱敏策略ifinfull_match:# 邮箱partsfull_match.split()iflen(parts[0])2:returnparts[0][:2]***parts[1]else:return***parts[1]elifany(keywordinfull_match.lower()forkeywordin[password,passwd,pwd]):returnpassword***elifany(keywordinfull_match.lower()forkeywordin[key,token,secret]):returnmatch.group(1)***elifre.match(r\d,full_match.replace(-,).replace( ,)):# 数字类型信用卡、电话等digitsre.sub(r[^\d],,full_match)if10len(digits)16:returndigits[:4]**(len(digits)-8)digits[-4:]return***4.2 高级日志处理器classLogHandler(ABC):日志处理器抽象基类def__init__(self,level:LogLevelLogLevel.INFO,formatter:Optional[StructuredFormatter]None,filters:Optional[List[LogFilter]]None):self.levellevel self.formatterformatterorJSONFormatter()self.filtersfiltersor[]# 性能统计self.processed_count0self.dropped_count0self.start_timetime.time()abstractmethoddefemit(self,record:LogRecord):输出日志记录passdefhandle(self,record:LogRecord)-bool:处理日志记录# 检查级别record_levelLogLevel.from_string(record.level)ifrecord_level.valueself.level.value:self.dropped_count1returnFalse# 应用过滤器forfilter_objinself.filters:ifnotfilter_obj.filter(record):self.dropped_count1returnFalse# 格式化formattedself.formatter.format(record)# 输出try:self.emit(record)self.processed_count1returnTrueexceptExceptionase:# 处理器错误处理print(f日志处理器错误:{e})self.dropped_count1returnFalsedefget_stats(self)-Dict[str,Any]:获取处理器统计信息uptimetime.time()-self.start_timereturn{processed:self.processed_count,dropped:self.dropped_count,uptime_seconds:uptime,rate_per_second:self.processed_count/max(uptime,0.001),handler_type:self.__class__.__name__}classConsoleHandler(LogHandler):控制台处理器def__init__(self,level:LogLevelLogLevel.INFO,formatter:Optional[StructuredFormatter]None,output_stream:Anysys.stdout,use_colors:boolTrue):super().__init__(level,formatter)self.output_streamoutput_stream self.use_colorsuse_colors# 颜色映射self.color_map{TRACE:\033[90m,# 灰色DEBUG:\033[36m,# 青色INFO:\033[32m,# 绿色WARN:\033[33m,# 黄色ERROR:\033[31m,# 红色FATAL:\033[41m\033[37m,# 红底白字RESET:\033[0m# 重置}defemit(self,record:LogRecord):输出到控制台formattedself.formatter.format(record)ifself.use_colors:colorself.color_map.get(record.level.upper(),)resetself.color_map[RESET]outputf{color}{formatted}{reset}else:outputformattedprint(output,fileself.output_stream)classFileHandler(LogHandler):文件处理器def__init__(self,filename:Union[str,Path],level:LogLevelLogLevel.INFO,formatter:Optional[StructuredFormatter]None,mode:stra,encoding:strutf-8,buffering:int1# 行缓冲):super().__init__(level,formatter)self.filenamePath(filename)self.modemode self.encodingencoding self.bufferingbuffering# 确保目录存在self.filename.parent.mkdir(parentsTrue,exist_okTrue)# 打开文件self._open_file()def_open_file(self):打开文件self.fileopen(self.filename,modeself.mode,encodingself.encoding,bufferingself.buffering)defemit(self,record:LogRecord):输出到文件formattedself.formatter.format(record)self.file.write(formatted\n)self.file.flush()defclose(self):关闭文件ifhasattr(self,file)andself.file:self.file.close()defrotate(self,max_size_mb:float100,backup_count:int5):日志轮转ifnotself.filename.exists():returnfile_size_mbself.filename.stat().st_size/(1024*1024)iffile_size_mbmax_size_mb:return# 关闭当前文件self.close()# 重命名旧文件foriinrange(backup_count-1,0,-1):old_fileself.filename.with_suffix(f.{i}.log)new_fileself.filename.with_suffix(f.{i1}.log)ifold_file.exists():old_file.rename(new_file)# 重命名当前文件current_backupself.filename.with_suffix(.1.log)self.filename.rename(current_backup)# 重新打开文件self._open_file()classRotatingFileHandler(FileHandler):自动轮转的文件处理器def__init__(self,filename:Union[str,Path],level:LogLevelLogLevel.INFO,formatter:Optional[StructuredFormatter]None,max_size_mb:float100,backup_count:int5,check_interval:int10# 检查间隔处理的日志条数):super().__init__(filename,level,formatter)self.max_size_mbmax_size_mb self.backup_countbackup_count self.check_intervalcheck_interval self.processed_since_check0defhandle(self,record:LogRecord)-bool:处理日志记录添加轮转检查self.processed_since_check1ifself.processed_since_checkself.check_interval:self.rotate(self.max_size_mb,self.backup_count)self.processed_since_check0returnsuper().handle(record)classAsyncHandler(LogHandler):异步处理器def__init__(self,base_handler:LogHandler,max_queue_size:int10000,worker_count:int1,drop_when_full:boolFalse):super().__init__(base_handler.level,base_handler.formatter,base_handler.filters)self.base_handlerbase_handler# 队列设置self.max_queue_sizemax_queue_size self.queueQueue(maxsizemax_queue_size)self.drop_when_fulldrop_when_full# 工作线程self.worker_countworker_count self.executorThreadPoolExecutor(max_workersworker_count,thread_name_prefixAsyncLogger)# 启动消费者self.runningTrueforiinrange(worker_count):self.executor.submit(self._worker_loop)defemit(self,record:LogRecord):异步处理日志记录try:ifself.drop_when_fullandself.queue.full():self.dropped_count1returnself.queue.put_nowait(record)exceptExceptionase:# 队列满或其他错误self.dropped_count1print(f异步日志队列错误:{e})def_worker_loop(self):工作线程循环whileself.running:try:# 阻塞获取日志记录带超时try:recordself.queue.get(timeout1.0)exceptEmpty:continue# 使用基础处理器处理self.base_handler.handle(record)# 标记任务完成self.queue.task_done()exceptExceptionase:print(f异步日志工作线程错误:{e})defshutdown(self,timeout:float5.0):关闭异步处理器self.runningFalse# 等待队列清空self.queue.join()# 关闭执行器self.executor.shutdown(waitTrue,timeouttimeout)# 关闭基础处理器ifhasattr(self.base_handler,close):self.base_handler.close()defget_stats(self)-Dict[str,Any]:获取统计信息包括队列信息base_statssuper().get_stats()base_stats.update({queue_size:self.queue.qsize(),queue_max_size:self.max_queue_size,queue_full:self.queue.full(),worker_count:self.worker_count,is_running:self.running,base_handler_stats:self.base_handler.get_stats()})returnbase_statsclassBatchHandler(LogHandler):批量处理器def__init__(self,base_handler:LogHandler,batch_size:int100,flush_interval:float1.0,# 秒compression:boolFalse):super().__init__(base_handler.level,base_handler.formatter,base_handler.filters)self.base_handlerbase_handler self.batch_sizebatch_size self.flush_intervalflush_interval self.compressioncompression# 批处理缓冲区self.buffer:List[LogRecord][]self.last_flush_timetime.time()# 启动定时刷新线程self.flush_threadthreading.Thread(targetself._flush_loop,daemonTrue)self.runningTrueself.flush_thread.start()defemit(self,record:LogRecord):添加到批处理缓冲区self.buffer.append(record)# 检查是否需要刷新if(len(self.buffer)self.batch_sizeor(time.time()-self.last_flush_time)self.flush_interval):self._flush_buffer()def_flush_buffer(self):刷新缓冲区ifnotself.buffer:return# 准备批量数据batch_recordsself.buffer.copy()self.buffer.clear()try:# 批量处理ifself.compression:# 压缩批量数据batch_dataself._compress_batch(batch_records)# 这里需要基础处理器支持批量数据# 简化实现逐个处理forrecordinbatch_records:self.base_handler.handle(record)else:forrecordinbatch_records:self.base_handler.handle(record)self.last_flush_timetime.time()exceptExceptionase:print(f批量日志处理错误:{e})# 错误处理将记录放回缓冲区避免丢失self.buffer.extend(batch_records)def_compress_batch(self,records:List[LogRecord])-bytes:压缩批量数据batch_jsonjson.dumps([r.to_dict()forrinrecords])returnzlib.compress(batch_json.encode())def_flush_loop(self):定时刷新循环whileself.running:time.sleep(self.flush_interval)self._flush_buffer()defshutdown(self):关闭批量处理器self.runningFalseself._flush_buffer()# 最后一次刷新ifself.flush_thread.is_alive():self.flush_thread.join(timeout2.0)ifhasattr(self.base_handler,shutdown):self.base_handler.shutdown()defget_stats(self)-Dict[str,Any]:获取统计信息base_statssuper().get_stats()base_stats.update({buffer_size:len(self.buffer),batch_size:self.batch_size,flush_interval:self.flush_interval,compression_enabled:self.compression,base_handler_stats:self.base_handler.get_stats()})returnbase_stats4.3 完整的日志系统classStructuredLogger:结构化日志记录器def__init__(self,name:str,level:LogLevelLogLevel.INFO,handlers:Optional[List[LogHandler]]None,context:Optional[LogContext]None,capture_stacktrace:boolFalse,enable_performance_stats:boolFalse):self.namename self.levellevel self.handlershandlersor[]self.contextcontextorLogContext()self.capture_stacktracecapture_stacktrace self.enable_performance_statsenable_performance_stats# 性能统计self.stats{log_count:defaultdict(int),last_log_time:None,total_log_time_ns:0,error_count:0}# 缓存调用者信息性能优化self._caller_cache{}def_get_caller_info(self,depth:int3)-Dict[str,Any]:获取调用者信息try:# 使用缓存提高性能cache_keythreading.get_ident()ifcache_keyinself._caller_cache:returnself._caller_cache[cache_key]# 获取调用堆栈frameinspect.currentframe()for_inrange(depth):ifframeisNone:breakframeframe.f_backifframeisNone:return{}# 提取信息info{filename:frame.f_code.co_filename,function:frame.f_code.co_name,line_no:frame.f_lineno,module:frame.f_globals.get(__name__,)}# 缓存self._caller_cache[cache_key]inforeturninfoexceptException:return{}finally:# 清理引用delframedef_create_record(self,level:LogLevel,message:str,extra:Optional[Dict[str,Any]]None,error_info:Optional[Dict[str,Any]]None)-LogRecord:创建日志记录# 基础时间nowdatetime.utcnow()# 调用者信息caller_infoself._get_caller_info()ifself.capture_stacktraceelse{}# 构建记录recordLogRecord(timestampnow.isoformat()Z,levellevel.name,messagemessage,logger_nameself.name,**caller_info)# 添加线程信息record.thread_idthreading.get_ident()record.thread_namethreading.current_thread().name record.process_idos.getpid()# 添加上下文context_dataself.context.get_all_context()forkey,valueincontext_data.items():ifhasattr(record,key):setattr(record,key,value)else:record.extra[key]value# 添加额外字段ifextra:record.extra.update(extra)# 添加错误信息iferror_info:record.error_typeerror_info.get(type)record.error_messageerror_info.get(message)record.stack_traceerror_info.get(stack_trace)returnrecorddeflog(self,level:LogLevel,message:str,extra:Optional[Dict[str,Any]]None,**kwargs):记录日志start_timetime.time_ns()ifself.enable_performance_statselse0try:# 检查级别iflevel.valueself.level.value:return# 合并额外字段all_extraextra.copy()ifextraelse{}all_extra.update(kwargs)# 错误信息处理error_infoNoneifexc_infoinkwargsandkwargs[exc_info]:exc_type,exc_value,exc_tracebackkwargs[exc_info]ifexc_type:error_info{type:exc_type.__name__,message:str(exc_value),stack_trace:traceback.format_exc()}# 创建记录recordself._create_record(level,message,all_extra,error_info)# 处理记录forhandlerinself.handlers:handler.handle(record)# 更新统计self.stats[log_count][level.name]1self.stats[last_log_time]record.timestampiflevelLogLevel.ERRORorlevelLogLevel.FATAL:self.stats[error_count]1exceptExceptionase:# 记录器内部错误处理print(f日志记录错误:{e})self.stats[error_count]1finally:# 性能统计ifself.enable_performance_statsandstart_time:duration_nstime.time_ns()-start_time self.stats[total_log_time_ns]duration_ns# 便捷方法deftrace(self,message:str,**kwargs):记录TRACE级别日志self.log(LogLevel.TRACE,message,**kwargs)defdebug(self,message:str,**kwargs):记录DEBUG级别日志self.log(LogLevel.DEBUG,message,**kwargs)definfo(self,message:str,**kwargs):记录INFO级别日志self.log(LogLevel.INFO,message,**kwargs)defwarn(self,message:str,**kwargs):记录WARN级别日志self.log(LogLevel.WARN,message,**kwargs)deferror(self,message:str,**kwargs):记录ERROR级别日志self.log(LogLevel.ERROR,message,**kwargs)deffatal(self,message:str,**kwargs):记录FATAL级别日志self.log(LogLevel.FATAL,message,**kwargs)defexception(self,message:str,exc:Optional[Exception]None,**kwargs):记录异常ifexcisNone:# 捕获当前异常exc_infosys.exc_info()else:exc_info(type(exc),exc,exc.__traceback__)kwargs[exc_info]exc_info self.log(LogLevel.ERROR,message,**kwargs)defwith_context(self,**kwargs):添加上下文returnLogContextManager(self.context,kwargs)defadd_handler(self,handler:LogHandler):添加处理器self.handlers.append(handler)defremove_handler(self,handler:LogHandler):移除处理器ifhandlerinself.handlers:self.handlers.remove(handler)defget_stats(self)-Dict[str,Any]:获取统计信息handler_stats[h.get_stats()forhinself.handlers]stats{logger_name:self.name,level:self.level.name,handler_count:len(self.handlers),log_counts:dict(self.stats[log_count]),error_count:self.stats[error_count],handler_stats:handler_stats}ifself.enable_performance_stats:total_logssum(self.stats[log_count].values())iftotal_logs0:avg_time_nsself.stats[total_log_time_ns]/total_logs stats[performance]{total_time_ns:self.stats[total_log_time_ns],avg_time_ns:avg_time_ns,avg_time_ms:avg_time_ns/1_000_000}returnstatsclassLogManager:日志管理器_instanceNone_lockthreading.Lock()def__new__(cls):withcls._lock:ifcls._instanceisNone:cls._instancesuper().__new__(cls)cls._instance._initializedFalsereturncls._instancedef__init__(self):ifself._initialized:returnself._loggers:Dict[str,StructuredLogger]{}self._default_config:Dict[str,Any]{}self._global_contextLogContext()self._initializedTrue# 默认配置self._setup_defaults()def_setup_defaults(self):设置默认配置self._default_config{level:LogLevel.INFO,handlers:[ConsoleHandler(levelLogLevel.INFO,formatterJSONFormatter(indentNone))],capture_stacktrace:False,enable_performance_stats:False}# 设置全局上下文importsocket self._global_context.set(hostname,socket.gethostname(),global_scopeTrue)self._global_context.set(process_id,os.getpid(),global_scopeTrue)defget_logger(self,name:str,level:Optional[LogLevel]None,handlers:Optional[List[LogHandler]]None,capture_stacktrace:Optional[bool]None,enable_performance_stats:Optional[bool]None)-StructuredLogger:获取或创建日志记录器ifnameinself._loggers:returnself._loggers[name]# 使用配置或默认值configself._default_config.copy()iflevelisnotNone:config[level]levelifhandlersisnotNone:config[handlers]handlersifcapture_stacktraceisnotNone:config[capture_stacktrace]capture_stacktraceifenable_performance_statsisnotNone:config[enable_performance_stats]enable_performance_stats# 创建日志记录器loggerStructuredLogger(namename,contextself._global_context,**config)self._loggers[name]loggerreturnloggerdefconfigure(self,config:Dict[str,Any],name:Optional[str]None):配置日志记录器ifname:# 配置特定记录器ifnameinself._loggers:loggerself._loggers[name]iflevelinconfig:logger.levelLogLevel.from_string(config[level])ifhandlersinconfig:# 这里需要根据配置创建处理器logger.handlersself._create_handlers_from_config(config[handlers])ifcapture_stacktraceinconfig:logger.capture_stacktraceconfig[capture_stacktrace]ifenable_performance_statsinconfig:logger.enable_performance_statsconfig[enable_performance_stats]else:# 更新默认配置self._default_config.update(config)# 更新现有记录器forloggerinself._loggers.values():self.configure(config,logger.name)def_create_handlers_from_config(self,handlers_config:List[Dict])-List[LogHandler]:从配置创建处理器handlers[]forhandler_configinhandlers_config:handler_typehandler_config.get(type,console)try:ifhandler_typeconsole:handlerConsoleHandler(levelLogLevel.from_string(handler_config.get(level,info)),formatterself._create_formatter_from_config(handler_config.get(formatter,{})),use_colorshandler_config.get(use_colors,True))elifhandler_typefile:handlerFileHandler(filenamehandler_config[filename],levelLogLevel.from_string(handler_config.get(level,info)),formatterself._create_formatter_from_config(handler_config.get(formatter,{})))elifhandler_typerotating_file:handlerRotatingFileHandler(filenamehandler_config[filename],levelLogLevel.from_string(handler_config.get(level,info)),formatterself._create_formatter_from_config(handler_config.get(formatter,{})),max_size_mbhandler_config.get(max_size_mb,100),backup_counthandler_config.get(backup_count,5))elifhandler_typeasync:base_handler_confighandler_config.get(base_handler,{})base_handlerself._create_handlers_from_config([base_handler_config])[0]handlerAsyncHandler(base_handlerbase_handler,max_queue_sizehandler_config.get(max_queue_size,10000),worker_counthandler_config.get(worker_count,1),drop_when_fullhandler_config.get(drop_when_full,False))else:raiseValueError(f未知的处理器类型:{handler_type})# 添加过滤器filters_confighandler_config.get(filters,[])forfilter_configinfilters_config:filter_typefilter_config.get(type,level)iffilter_typelevel:handler.filters.append(LevelFilter(LogLevel.from_string(filter_config.get(min_level,info))))eliffilter_typerate_limit:handler.filters.append(RateLimitFilter(max_per_secondfilter_config.get(max_per_second,10),window_secondsfilter_config.get(window_seconds,1)))eliffilter_typesensitive_data:handler.filters.append(SensitiveDataFilter())handlers.append(handler)exceptExceptionase:print(f创建处理器失败{handler_type}:{e})continuereturnhandlersdef_create_formatter_from_config(self,formatter_config:Dict)-StructuredFormatter:从配置创建格式化器formatter_typeformatter_config.get(type,json)ifformatter_typejson:returnJSONFormatter(indentformatter_config.get(indent),ensure_asciiformatter_config.get(ensure_ascii,False),sort_keysformatter_config.get(sort_keys,False))elifformatter_typendjson:returnNDJSONFormatter(indentformatter_config.get(indent),ensure_asciiformatter_config.get(ensure_ascii,False),sort_keysformatter_config.get(sort_keys,False))else:# 默认使用JSONreturnJSONFormatter()defset_global_context(self,**kwargs):设置全局上下文self._global_context.update(kwargs,global_scopeTrue)defget_global_context(self)-Dict[str,Any]:获取全局上下文returnself._global_context.get_all_context()defshutdown(self):关闭所有日志记录器forloggerinself._loggers.values():forhandlerinlogger.handlers:ifhasattr(handler,shutdown):handler.shutdown()elifhasattr(handler,close):handler.close()self._loggers.clear()defget_all_stats(self)-Dict[str,Any]:获取所有统计信息logger_stats{}total_logs0total_errors0forname,loggerinself._loggers.items():statslogger.get_stats()logger_stats[name]stats total_logssum(stats[log_counts].values())total_errorsstats[error_count]return{logger_count:len(self._loggers),total_logs:total_logs,total_errors:total_errors,loggers:logger_stats,global_context:self.get_global_context()}5. 高级特性实现5.1 分布式追踪集成classDistributedTraceContext:分布式追踪上下文def__init__(self):self._localthreading.local()propertydefcurrent(self)-Dict[str,Any]:获取当前追踪上下文ifnothasattr(self._local,trace_context):self._local.trace_contextself._generate_new_context()returnself._local.trace_contextdef_generate_new_context(self)-Dict[str,Any]:生成新的追踪上下文return{trace_id:self._generate_trace_id(),span_id:self._generate_span_id(),parent_span_id:None,sampled:True,flags:0}def_generate_trace_id(self)-str:生成追踪IDreturnuuid.uuid4().hexdef_generate_span_id(self)-str:生成跨度IDreturnuuid.uuid4().hex[:16]defstart_span(self,name:str,**attributes)-Span:开始新的跨度parent_contextself.current.copy()new_contextparent_context.copy()new_context[span_id]self._generate_span_id()new_context[parent_span_id]parent_context[span_id]new_context[span_name]name new_context[start_time]time.time_ns()new_context[attributes]attributes# 保存父上下文ifnothasattr(self._local,trace_stack):self._local.trace_stack[]self._local.trace_stack.append(parent_context)# 设置新上下文self._local.trace_contextnew_contextreturnSpan(self,new_context)defend_span(self,context:Dict[str,Any],status:strOK,**attributes):结束跨度ifnothasattr(self._local,trace_stack)ornotself._local.trace_stack:return# 计算持续时间end_timetime.time_ns()start_timecontext.get(start_time,end_time)duration_nsend_time-start_time# 创建跨度记录span_record{trace_id:context.get(trace_id),span_id:context.get(span_id),parent_span_id:context.get(parent_span_id),name:context.get(span_name,unknown),start_time:start_time,end_time:end_time,duration_ns:duration_ns,status:status,attributes:{**context.get(attributes,{}),**attributes}}# 恢复父上下文self._local.trace_contextself._local.trace_stack.pop()returnspan_recorddefget_current_span_id(self)-Optional[str]:获取当前跨度IDreturnself.current.get(span_id)defget_current_trace_id(self)-Optional[str]:获取当前追踪IDreturnself.current.get(trace_id)classSpan:追踪跨度def__init__(self,tracer:DistributedTraceContext,context:Dict[str,Any]):self.tracertracer self.contextcontextdef__enter__(self):returnselfdef__exit__(self,exc_type,exc_val,exc_tb):statusERRORifexc_typeelseOKself.tracer.end_span(self.context,status)defset_attribute(self,key:str,value:Any):设置跨度属性ifattributesnotinself.context:self.context[attributes]{}self.context[attributes][key]valuedefset_status(self,status:str):设置跨度状态self.context[status]statusclassTracingLogger(StructuredLogger):集成追踪的日志记录器def__init__(self,name:str,tracer:Optional[DistributedTraceContext]None,**kwargs):super().__init__(name,**kwargs)self.tracertracerorDistributedTraceContext()# 自动添加上下文self.context.set(tracer,self.tracer)def_create_record(self,*args,**kwargs)-LogRecord:创建记录添加追踪信息recordsuper()._create_record(*args,**kwargs)# 添加追踪信息record.trace_idself.tracer.get_current_trace_id()record.span_idself.tracer.get_current_span_id()returnrecorddeftrace_span(self,name:str,**attributes):创建追踪跨度上下文管理器returnself.tracer.start_span(name,**attributes)deflog_with_span(self,level:LogLevel,message:str,span_name:Optional[str]None,**kwargs):在追踪跨度中记录日志ifspan_name:# 创建新跨度withself.tracer.start_span(span_name):self.log(level,message,**kwargs)else:# 使用当前跨度self.log(level,message,**kwargs)5.2 性能监控集成classPerformanceMonitor:性能监控器def__init__(self,logger:StructuredLogger):self.loggerlogger self.metricsdefaultdict(list)self.thresholds{}defmeasure(self,operation:str):测量操作性能returnPerformanceTimer(self,operation)defrecord_metric(self,name:str,value:float,unit:strms,tags:Optional[Dict[str,str]]None):记录性能指标timestamptime.time_ns()metric_record{name:name,value:value,unit:unit,timestamp:timestamp,tags:tagsor{}}# 存储指标self.metrics[name].append(metric_record)# 检查阈值ifnameinself.thresholds:thresholdself.thresholds[name]ifvaluethreshold:self.logger.warn(f性能阈值超过:{name}{value}{unit}{threshold}{unit},metricmetric_record)# 记录指标日志self.logger.debug(f性能指标:{name},metricmetric_record,extra{metric_type:performance})returnmetric_recorddefset_threshold(self,metric_name:str,threshold:float):设置性能阈值self.thresholds[metric_name]thresholddefget_statistics(self,metric_name:str)-Dict[str,float]:获取统计信息recordsself.metrics.get(metric_name,[])ifnotrecords:return{}values[r[value]forrinrecords]return{count:len(values),mean:sum(values)/len(values),min:min(values),max:max(values),p50:self._percentile(values,50),p95:self._percentile(values,95),p99:self._percentile(values,99)}def_percentile(self,values:List[float],p:float)-float:计算百分位数ifnotvalues:return0sorted_valuessorted(values)k(len(sorted_values)-1)*(p/100)fint(k)ck-fiff1len(sorted_values):returnsorted_values[f]c*(sorted_values[f1]-sorted_values[f])else:returnsorted_values[f]defreport_summary(self):报告性能摘要summary{}formetric_nameinself.metrics:statsself.get_statistics(metric_name)summary[metric_name]stats self.logger.info(性能监控摘要,performance_summarysummary,extra{report_type:performance_summary})returnsummaryclassPerformanceTimer:性能计时器def__init__(self,monitor:PerformanceMonitor,operation:str):self.monitormonitor self.operationoperation self.start_timeNoneself.tags{}def__enter__(self):self.start_timetime.time_ns()returnselfdef__exit__(self,exc_type,exc_val,exc_tb):ifself.start_timeisNone:returnend_timetime.time_ns()duration_nsend_time-self.start_time duration_msduration_ns/1_000_000 self.monitor.record_metric(nameself.operation,valueduration_ms,unitms,tagsself.tags)defadd_tag(self,key:str,value:str):添加标签self.tags[key]valuereturnself5.3 日志采样与聚合classLogSampler:日志采样器def__init__(self,base_logger:StructuredLogger,sample_rate:float1.0,# 采样率 0.0-1.0adaptive_sampling:boolFalse,min_sample_rate:float0.01,max_sample_rate:float1.0):self.base_loggerbase_logger self.sample_ratesample_rate self.adaptive_samplingadaptive_sampling self.min_sample_ratemin_sample_rate self.max_sample_ratemax_sample_rate# 采样统计self.sampled_count0self.total_count0# 自适应采样状态self.current_ratesample_rate self.last_adjust_timetime.time()defshould_sample(self,level:LogLevel)-bool:决定是否采样self.total_count1# 高等级日志总是采样iflevelin[LogLevel.ERROR,LogLevel.FATAL]:self.sampled_count1returnTrue# 计算当前采样率ifself.adaptive_sampling:self._adjust_sample_rate()# 随机采样importrandomifrandom.random()self.current_rate:self.sampled_count1returnTruereturnFalsedef_adjust_sample_rate(self):调整采样率current_timetime.time()# 每分钟调整一次ifcurrent_time-self.last_adjust_time60:return# 计算当前实际采样率ifself.total_count0:actual_rate0else:actual_rateself.sampled_count/self.total_count# 调整采样率target_rateself.sample_rateifactual_ratetarget_rate*0.8:# 采样不足提高采样率self.current_ratemin(self.current_rate*1.2,self.max_sample_rate)elifactual_ratetarget_rate*1.2:# 采样过多降低采样率self.current_ratemax(self.current_rate*0.8,self.min_sample_rate)# 重置统计self.sampled_count0self.total_count0self.last_adjust_timecurrent_timedeflog(self,level:LogLevel,message:str,**kwargs):记录日志带采样ifself.should_sample(level):self.base_logger.log(level,message,**kwargs)classLogAggregator:日志聚合器def__init__(self,base_logger:StructuredLogger,aggregation_window:float5.0,# 聚合窗口秒max_aggregation_count:int1000# 最大聚合条数):self.base_loggerbase_logger self.aggregation_windowaggregation_window self.max_aggregation_countmax_aggregation_count# 聚合缓冲区self.buffer:Dict[str,List[LogRecord]]defaultdict(list)self.last_flush_timetime.time()# 启动定时刷新self.flush_threadthreading.Thread(targetself._flush_loop,daemonTrue)self.runningTrueself.flush_thread.start()def_get_aggregation_key(self,record:LogRecord)-str:获取聚合键# 基于消息和级别聚合key_parts[record.level,record.message,record.logger_name,str(record.error_type)ifrecord.error_typeelse,]returnhashlib.md5(|.join(key_parts).encode()).hexdigest()deflog(self,level:LogLevel,message:str,**kwargs):记录日志带聚合# 创建记录但不立即发送recordself.base_logger._create_record(level,message,kwargs.get(extra))# 添加到缓冲区aggregation_keyself._get_aggregation_key(record)self.buffer[aggregation_key].append(record)# 检查是否达到聚合上限total_countsum(len(records)forrecordsinself.buffer.values())iftotal_countself.max_aggregation_count:self._flush_buffer()def_flush_buffer(self):刷新缓冲区ifnotself.buffer:returnflushed_records[]foraggregation_key,recordsinself.buffer.items():ifnotrecords:continue# 取第一条记录作为模板template_recordrecords[0]# 创建聚合记录aggregated_recordLogRecord(timestampdatetime.utcnow().isoformat()Z,leveltemplate_record.level,messagetemplate_record.messagef (aggregated{len(records)}times),logger_nametemplate_record.logger_name,extra{**template_record.extra,aggregated_count:len(records),aggregation_key:aggregation_key,first_occurrence:records[0].timestamp,last_occurrence:records[-1].timestamp})flushed_records.append(aggregated_record)# 发送聚合记录forrecordinflushed_records:self.base_logger._log_direct(record)# 清空缓冲区self.buffer.clear()self.last_flush_timetime.time()def_flush_loop(self):定时刷新循环whileself.running:time.sleep(self.aggregation_window)self._flush_buffer()defshutdown(self):关闭聚合器self.runningFalseself._flush_buffer()ifself.flush_thread.is_alive():self.flush_thread.join(timeout2.0)6. 配置与使用示例6.1 配置管理系统importyamlimporttomlfrompathlibimportPathclassLoggingConfig:日志配置管理器CONFIG_SCHEMA{type:object,properties:{version:{type:string},defaults:{type:object,properties:{level:{type:string,enum:[trace,debug,info,warn,error,fatal]},capture_stacktrace:{type:boolean},enable_performance_stats:{type:boolean}}},loggers:{type:object,additionalProperties:{type:object,properties:{level:{type:string,enum:[trace,debug,info,warn,error,fatal]},handlers:{type:array,items:{type:string}},propagate:{type:boolean}}}},handlers:{type:object,additionalProperties:{type:object,properties:{type:{type:string,enum:[console,file,rotating_file,async,batch]},level:{type:string,enum:[trace,debug,info,warn,error,fatal]},formatter:{type:string},filters:{type:array,items:{type:object,properties:{type:{type:string,enum:[level,rate_limit,sensitive_data]},max_per_second:{type:number,minimum:1},window_seconds:{type:number,minimum:0.1}}}},filename:{type:string},max_size_mb:{type:number,minimum:1},backup_count:{type:integer,minimum:1},max_queue_size:{type:integer,minimum:100},worker_count:{type:integer,minimum:1},drop_when_full:{type:boolean},batch_size:{type:integer,minimum:1},flush_interval:{type:number,minimum:0.1},compression:{type:boolean},use_colors:{type:boolean}},required:[type]}},formatters:{type:object,additionalProperties:{type:object,properties:{type:{type:string,enum:[json,ndjson]},indent:{type:[integer,null]},ensure_ascii:{type:boolean},sort_keys:{type:boolean}}}}},required:[version]}def__init__(self,config_path:Optional[Union[str,Path]]None):self.config{}self.config_pathPath(config_path)ifconfig_pathelseNoneifconfig_pathandPath(config_path).exists():self.load_config(config_path)else:self._load_default_config()def_load_default_config(self):加载默认配置self.config{version:1.0,defaults:{level:info,capture_stacktrace:False,enable_performance_stats:False},formatters:{json:{type:json,indent:None,ensure_ascii:False,sort_keys:False},json_pretty:{type:json,indent:2,ensure_ascii:False,sort_keys:True},ndjson:{type:ndjson,indent:None,ensure_ascii:False,sort_keys:False}},handlers:{console:{type:console,level:info,formatter:json,use_colors:True},console_pretty:{type:console,level:info,formatter:json_pretty,use_colors:True},file_app:{type:file,level:info,formatter:ndjson,filename:logs/app.log},file_error:{type:file,level:error,formatter:json_pretty,filename:logs/error.log},async_console:{type:async,level:info,base_handler:{type:console,formatter:json},max_queue_size:10000,worker_count:2,drop_when_full:False}},loggers:{root:{level:info,handlers:[console],propagate:False},app:{level:debug,handlers:[console_pretty,file_app],propagate:False},app.error:{level:error,handlers:[file_error],propagate:True},app.performance:{level:info,handlers:[async_console],propagate:False}}}defload_config(self,config_path:Union[str,Path]):加载配置文件config_pathPath(config_path)ifnotconfig_path.exists():raiseFileNotFoundError(f配置文件不存在:{config_path})# 根据文件扩展名确定格式suffixconfig_path.suffix.lower()try:withopen(config_path,r,encodingutf-8)asf:contentf.read()ifsuffix.json:configjson.loads(content)elifsuffixin[.yaml,.yml]:configyaml.safe_load(content)elifsuffix.toml:configtoml.loads(content)else:raiseValueError(f不支持的配置文件格式:{suffix})# 验证配置ifself.validate_config(config):self.configconfig self.config_pathconfig_pathprint(f配置文件加载成功:{config_path})else:raiseValueError(配置文件验证失败)exceptExceptionase:print(f配置文件加载失败:{e})raisedefvalidate_config(self,config:Dict)-bool:验证配置# 简化验证 - 实际生产环境应该使用JSON Schemarequired_keys[version,defaults,handlers,loggers]forkeyinrequired_keys:ifkeynotinconfig:print(f配置缺少必需键:{key})returnFalsereturnTruedefget_logger_config(self,logger_name:str)-Dict[str,Any]:获取日志记录器配置# 查找最具体的配置configself.config.get(loggers,{}).get(logger_name)ifconfig:returnconfig# 查找父记录器配置partslogger_name.split(.)foriinrange(len(parts)-1,0,-1):parent_name..join(parts[:i])parent_configself.config.get(loggers,{}).get(parent_name)ifparent_configandparent_config.get(propagate,False):returnparent_config# 返回根配置returnself.config.get(loggers,{}).get(root,{})defget_handler_config(self,handler_name:str)-Dict[str,Any]:获取处理器配置returnself.config.get(handlers,{}).get(handler_name,{})defget_formatter_config(self,formatter_name:str)-Dict[str,Any]:获取格式化器配置returnself.config.get(formatters,{}).get(formatter_name,{})defsave_config(self,config_path:Optional[Union[str,Path]]None):保存配置save_pathPath(config_path)ifconfig_pathelseself.config_pathifnotsave_path:raiseValueError(未指定配置保存路径)# 确保目录存在save_path.parent.mkdir(parentsTrue,exist_okTrue)# 根据文件扩展名确定格式suffixsave_path.suffix.lower()try:withopen(save_path,w,encodingutf-8)asf:ifsuffix.json:json.dump(self.config,f,indent2,ensure_asciiFalse)elifsuffixin[.yaml,.yml]:yaml.dump(self.config,f,default_flow_styleFalse,allow_unicodeTrue)elifsuffix.toml:toml.dump(self.config,f)else:# 默认使用JSONjson.dump(self.config,f,indent2,ensure_asciiFalse)print(f配置文件保存成功:{save_path})exceptExceptionase:print(f配置文件保存失败:{e})raise6.2 使用示例deflogging_system_demo():日志系统演示print(*60)print(结构化日志系统演示)print(*60)# 1. 基础使用print(\n1. 基础使用)print(-*40)# 获取日志管理器单例log_managerLogManager()# 获取日志记录器loggerlog_manager.get_logger(demo.app)# 记录不同级别的日志logger.trace(这是一个TRACE级别日志)logger.debug(这是一个DEBUG级别日志)logger.info(这是一个INFO级别日志,userjohn,actionlogin)logger.warn(这是一个WARN级别日志)# 记录错误try:result1/0exceptExceptionase:logger.error(除法计算错误,exce,dividend1,divisor0)# 2. 上下文管理print(\n2. 上下文管理)print(-*40)# 添加上下文logger.info(没有上下文)withlogger.with_context(request_idreq123,user_iduser456):logger.info(有请求上下文)withlogger.with_context(stageprocessing):logger.info(嵌套上下文)logger.info(回到父上下文)logger.info(上下文已清除)# 3. 性能监控print(\n3. 性能监控)print(-*40)monitorPerformanceMonitor(logger)# 测量操作性能withmonitor.measure(database_query)astimer:timer.add_tag(table,users)time.sleep(0.1)# 模拟数据库查询withmonitor.measure(api_call)astimer:timer.add_tag(endpoint,/api/users)time.sleep(0.05)# 模拟API调用# 记录自定义指标monitor.record_metric(memory_usage,125.5,unitMB)monitor.record_metric(cpu_usage,15.2,unit%)# 查看统计statsmonitor.get_statistics(database_query)print(f数据库查询统计:{stats})# 4. 分布式追踪print(\n4. 分布式追踪)print(-*40)tracing_loggerTracingLogger(demo.tracing)# 在追踪上下文中记录日志withtracing_logger.trace_span(process_request)asspan:span.set_attribute(method,POST)span.set_attribute(path,/api/data)tracing_logger.info(开始处理请求)withtracing_logger.trace_span(validate_input):tracing_logger.debug(验证输入数据)time.sleep(0.01)withtracing_logger.trace_span(process_data):tracing_logger.debug(处理数据)time.sleep(0.02)tracing_logger.info(请求处理完成)# 5. 高级配置print(\n5. 高级配置)print(-*40)# 创建自定义配置configLoggingConfig()# 添加自定义处理器config.config[handlers][custom_file]{type:rotating_file,level:info,formatter:ndjson,filename:logs/custom.log,max_size_mb:10,backup_count:3,filters:[{type:rate_limit,max_per_second:100},{type:sensitive_data}]}# 添加自定义记录器config.config[loggers][custom]{level:debug,handlers:[custom_file],propagate:False}# 保存配置config.save_config(logs/logging_config.yaml)# 6. 日志采样print(\n6. 日志采样)print(-*40)# 创建采样日志记录器base_loggerlog_manager.get_logger(demo.sampling)samplerLogSampler(base_logger,sample_rate0.1)# 10%采样率# 记录大量日志foriinrange(100):sampler.log(LogLevel.INFO,f日志消息{i},iterationi)print(f采样统计:{sampler.sampled_count}/{sampler.total_count})# 7. 聚合日志print(\n7. 日志聚合)print(-*40)aggregatorLogAggregator(base_logger,aggregation_window2.0)# 记录重复日志foriinrange(50):aggregator.log(LogLevel.INFO,重复的日志消息)time.sleep(0.01)time.sleep(3)# 等待聚合# 8. 获取统计信息print(\n8. 系统统计)print(-*40)statslog_manager.get_all_stats()print(f总日志记录器:{stats[logger_count]})print(f总日志条数:{stats[total_logs]})forlogger_name,logger_statsinstats[loggers].items():print(f\n{logger_name}:)print(f 日志统计:{logger_stats[log_counts]})# 清理aggregator.shutdown()print(\n演示完成!)returnlog_managerdefproduction_logging_setup():生产环境日志配置# 创建生产配置config{version:1.0,defaults:{level:info,capture_stacktrace:True,enable_performance_stats:True},formatters:{json:{type:json,indent:None,ensure_ascii:False,sort_keys:False}},handlers:{console:{type:console,level:info,formatter:json,use_colors:False# 生产环境通常不需要颜色},app_file:{type:rotating_file,level:info,formatter:json,filename:/var/log/app/app.log,max_size_mb:100,backup_count:10},error_file:{type:rotating_file,level:error,formatter:json,filename:/var/log/app/error.log,max_size_mb:50,backup_count:5},async_app:{type:async,level:info,base_handler:{type:rotating_file,filename:/var/log/app/async.log,max_size_mb:100,backup_count:10},max_queue_size:50000,worker_count:4,drop_when_full:True}},loggers:{root:{level:warn,handlers:[console],propagate:False},app:{level:info,handlers:[app_file,async_app],propagate:False},app.api:{level:debug,handlers:[app_file],propagate:True},app.error:{level:error,handlers:[error_file],propagate:True},app.performance:{level:info,handlers:[async_app],propagate:False}}}# 初始化日志管理器log_managerLogManager()# 应用配置log_manager.configure(config)# 设置全局上下文importsocket log_manager.set_global_context(app_nameproduction_app,app_version1.0.0,environmentproduction,hostnamesocket.gethostname(),regionos.environ.get(AWS_REGION,unknown))returnlog_managerif__name____main__:# 运行演示demo_managerlogging_system_demo()# 演示完成后关闭demo_manager.shutdown()7. 测试与验证7.1 单元测试importpytestimporttempfileimportjsonimporttimefrompathlibimportPathclassTestStructuredLogger:结构化日志记录器测试pytest.fixturedeftemp_log_file(self):创建临时日志文件withtempfile.NamedTemporaryFile(modew,suffix.log,deleteFalse)asf:temp_filef.nameyieldtemp_file# 清理Path(temp_file).unlink(missing_okTrue)pytest.fixturedeftest_logger(self):创建测试日志记录器loggerStructuredLogger(nametest,levelLogLevel.DEBUG,handlers[],capture_stacktraceTrue)returnloggerdeftest_log_record_creation(self,test_logger):测试日志记录创建recordtest_logger._create_record(LogLevel.INFO,测试消息,extra{key:value})assertisinstance(record,LogRecord)assertrecord.levelINFOassertrecord.message测试消息assertrecord.logger_nametestassertrecord.extra[key]value# 检查时间戳格式assertrecord.timestamp.endswith(Z)# 检查调用者信息assertrecord.filenameisnotNoneassertrecord.functionisnotNoneassertrecord.line_noisnotNonedeftest_log_level_filtering(self):测试日志级别过滤# 创建记录器和处理器loggerStructuredLogger(test,levelLogLevel.WARN)# 使用模拟处理器classMockHandler(LogHandler):def__init__(self):super().__init__(levelLogLevel.INFO)self.records[]defemit(self,record):self.records.append(record)handlerMockHandler()logger.add_handler(handler)# 记录不同级别的日志logger.debug(DEBUG消息)logger.info(INFO消息)logger.warn(WARN消息)logger.error(ERROR消息)# 检查过滤结果assertlen(handler.records)2# WARN和ERRORassertall(r.levelin[WARN,ERROR]forrinhandler.records)deftest_json_formatter(self):测试JSON格式化器formatterJSONFormatter(indent2)recordLogRecord(timestamp2024-01-01T00:00:00Z,levelINFO,message测试消息,logger_nametest)formattedformatter.format(record)# 验证JSON格式parsedjson.loads(formatted)assertparsed[timestamp]2024-01-01T00:00:00Zassertparsed[level]INFOassertparsed[message]测试消息assertparsed[logger_name]testdeftest_file_handler(self,temp_log_file):测试文件处理器handlerFileHandler(filenametemp_log_file,levelLogLevel.INFO,formatterJSONFormatter(indentNone))recordLogRecord(timestamp2024-01-01T00:00:00Z,levelINFO,message测试消息,logger_nametest)# 处理记录handler.handle(record)handler.close()# 验证文件内容withopen(temp_log_file,r)asf:contentf.read().strip()parsedjson.loads(content)assertparsed[message]测试消息deftest_rate_limit_filter(self):测试速率限制过滤器filter_objRateLimitFilter(max_per_second2,window_seconds1)recordLogRecord(timestamp2024-01-01T00:00:00Z,levelINFO,message测试消息,logger_nametest)# 前2次应该通过assertfilter_obj.filter(record)isTrueassertfilter_obj.filter(record)isTrue# 第3次应该被限制assertfilter_obj.filter(record)isFalse# 等待窗口重置time.sleep(1.1)assertfilter_obj.filter(record)isTruedeftest_sensitive_data_filter(self):测试敏感数据过滤器filter_objSensitiveDataFilter()# 测试各种敏感信息test_cases[(passwordsecret123,password***),(API_KEYsk_test_12345,API_KEY***),(emailtestexample.com,emailte***example.com),(phone123-456-7890,phone123***7890),]forinput_text,expected_outputintest_cases:recordLogRecord(timestamp2024-01-01T00:00:00Z,levelINFO,messageinput_text,logger_nametest)filter_obj.filter(record)assertexpected_outputinrecord.messagedeftest_async_handler(self):测试异步处理器# 创建模拟基础处理器classMockBaseHandler(LogHandler):def__init__(self):super().__init__(levelLogLevel.INFO)self.records[]self.process_times[]defemit(self,record):self.records.append(record)self.process_times.append(time.time())base_handlerMockBaseHandler()async_handlerAsyncHandler(base_handlerbase_handler,max_queue_size10,worker_count1)# 发送多条记录send_timetime.time()foriinrange(5):recordLogRecord(timestamp2024-01-01T00:00:00Z,levelINFO,messagef消息{i},logger_nametest)async_handler.handle(record)# 等待处理完成time.sleep(0.5)# 关闭处理器async_handler.shutdown()# 验证结果assertlen(base_handler.records)5assertall(tsend_timefortinbase_handler.process_times)deftest_batch_handler(self):测试批量处理器# 创建模拟基础处理器classMockBaseHandler(LogHandler):def__init__(self):super().__init__(levelLogLevel.INFO)self.records[]self.batch_count0defemit(self,record):self.records.append(record)defhandle(self,record):self.batch_count1returnsuper().handle(record)base_handlerMockBaseHandler()batch_handlerBatchHandler(base_handlerbase_handler,batch_size3,flush_interval0.1)# 发送记录不足批量大小foriinrange(2):recordLogRecord(timestamp2024-01-01T00:00:00Z,levelINFO,messagef消息{i},logger_nametest)batch_handler.handle(record)# 等待定时刷新time.sleep(0.2)# 验证结果assertlen(base_handler.records)2assertbase_handler.batch_count2# 逐个处理# 关闭处理器batch_handler.shutdown()classTestDistributedTracing:分布式追踪测试deftest_trace_context(self):测试追踪上下文tracerDistributedTraceContext()# 获取初始上下文context1tracer.currentasserttrace_idincontext1assertspan_idincontext1# 开始新跨度withtracer.start_span(test_span)asspan:context2tracer.currentassertcontext2[trace_id]context1[trace_id]assertcontext2[span_id]!context1[span_id]assertcontext2[parent_span_id]context1[span_id]# 恢复上下文context3tracer.currentassertcontext3[span_id]context1[span_id]deftest_tracing_logger(self):测试追踪日志记录器tracerDistributedTraceContext()loggerTracingLogger(test.tracing,tracertracer)# 在追踪上下文中记录日志withtracer.start_span(parent_span):logger.info(父跨度中的日志)withtracer.start_span(child_span):logger.info(子跨度中的日志)# 验证追踪信息assertlogger.tracer.get_current_trace_id()isnotNoneclassTestPerformanceMonitoring:性能监控测试deftest_performance_monitor(self):测试性能监控器# 创建模拟日志记录器classMockLogger:def__init__(self):self.records[]defdebug(self,message,**kwargs):self.records.append((message,kwargs))mock_loggerMockLogger()# 创建监控器monitorPerformanceMonitor(mock_logger)# 测量操作withmonitor.measure(test_operation):time.sleep(0.01)# 记录自定义指标monitor.record_metric(custom_metric,42.0)# 获取统计statsmonitor.get_statistics(test_operation)assertstats[count]1assertstats[mean]0# 检查日志记录assertlen(mock_logger.records)0if__name____main__:# 运行测试pytest.main([__file__,-v,--tbshort])7.2 性能测试classLoggingPerformanceTest:日志性能测试staticmethoddeftest_single_thread_performance():测试单线程性能print(单线程性能测试)print(-*40)# 创建测试日志记录器loggerStructuredLogger(nameperformance.test,levelLogLevel.INFO,enable_performance_statsTrue)# 添加处理器console_handlerConsoleHandler(levelLogLevel.INFO,formatterJSONFormatter(indentNone),use_colorsFalse)logger.add_handler(console_handler)# 性能测试iterations10000start_timetime.time()foriinrange(iterations):logger.info(f性能测试消息{i},iterationi)end_timetime.time()durationend_time-start_time# 计算性能指标logs_per_seconditerations/duration avg_latency_ms(duration/iterations)*1000print(f总日志数:{iterations})print(f总耗时:{duration:.3f}秒)print(f日志/秒:{logs_per_second:.1f})print(f平均延迟:{avg_latency_ms:.3f}毫秒)# 获取统计信息statslogger.get_stats()print(f实际记录数:{sum(stats[log_counts].values())})return{iterations:iterations,duration:duration,logs_per_second:logs_per_second,avg_latency_ms:avg_latency_ms}staticmethoddeftest_multi_thread_performance():测试多线程性能print(\n多线程性能测试)print(-*40)# 创建异步处理器base_handlerConsoleHandler(levelLogLevel.INFO,formatterJSONFormatter(indentNone),use_colorsFalse)async_handlerAsyncHandler(base_handlerbase_handler,max_queue_size100000,worker_count4,drop_when_fullFalse)loggerStructuredLogger(nameperformance.async,levelLogLevel.INFO,handlers[async_handler],enable_performance_statsTrue)# 多线程测试thread_count8logs_per_thread5000total_iterationsthread_count*logs_per_thread threads[]start_timetime.time()defworker(thread_id):foriinrange(logs_per_thread):logger.info(f线程{thread_id}- 消息{i},thread_idthread_id,iterationi)# 启动线程foriinrange(thread_count):threadthreading.Thread(targetworker,args(i,))threads.append(thread)thread.start()# 等待完成forthreadinthreads:thread.join()# 等待队列清空time.sleep(1)end_timetime.time()durationend_time-start_time# 计算性能指标logs_per_secondtotal_iterations/duration avg_latency_ms(duration/total_iterations)*1000print(f线程数:{thread_count})print(f每线程日志数:{logs_per_thread})print(f总日志数:{total_iterations})print(f总耗时:{duration:.3f}秒)print(f日志/秒:{logs_per_second:.1f})print(f平均延迟:{avg_latency_ms:.3f}毫秒)# 获取处理器统计handler_statsasync_handler.get_stats()print(f队列大小:{handler_stats[queue_size]})print(f丢弃数:{handler_stats[dropped]})# 关闭处理器async_handler.shutdown()return{thread_count:thread_count,total_iterations:total_iterations,duration:duration,logs_per_second:logs_per_second,avg_latency_ms:avg_latency_ms}staticmethoddeftest_batch_performance():测试批量处理性能print(\n批量处理性能测试)print(-*40)# 创建批量处理器base_handlerConsoleHandler(levelLogLevel.INFO,formatterJSONFormatter(indentNone),use_colorsFalse)batch_handlerBatchHandler(base_handlerbase_handler,batch_size100,flush_interval0.1,compressionFalse)loggerStructuredLogger(nameperformance.batch,levelLogLevel.INFO,handlers[batch_handler],enable_performance_statsTrue)# 性能测试iterations10000start_timetime.time()foriinrange(iterations):logger.info(f批量测试消息{i},iterationi)# 等待批处理完成time.sleep(0.5)end_timetime.time()durationend_time-start_time# 计算性能指标logs_per_seconditerations/duration avg_latency_ms(duration/iterations)*1000print(f总日志数:{iterations})print(f批大小: 100)print(f总耗时:{duration:.3f}秒)print(f日志/秒:{logs_per_second:.1f})print(f平均延迟:{avg_latency_ms:.3f}毫秒)# 获取处理器统计handler_statsbatch_handler.get_stats()print(f缓冲区大小:{handler_stats[buffer_size]})# 关闭处理器batch_handler.shutdown()return{iterations:iterations,batch_size:100,duration:duration,logs_per_second:logs_per_second,avg_latency_ms:avg_latency_ms}staticmethoddefcompare_performance():比较不同配置的性能print(*60)print(日志系统性能比较)print(*60)results{}# 测试不同配置results[single_thread]LoggingPerformanceTest.test_single_thread_performance()results[multi_thread]LoggingPerformanceTest.test_multi_thread_performance()results[batch]LoggingPerformanceTest.test_batch_performance()# 输出比较结果print(\n*60)print(性能比较摘要)print(*60)forconfig,metricsinresults.items():print(f\n{config}:)print(f 日志/秒:{metrics[logs_per_second]:.1f})print(f 平均延迟:{metrics[avg_latency_ms]:.3f}毫秒)# 建议print(\n建议:)print(- 单线程场景: 使用标准处理器)print(- 高并发场景: 使用异步处理器)print(- 日志量大场景: 使用批量处理器)returnresultsif__name____main__:# 运行性能测试LoggingPerformanceTest.compare_performance()8. 最佳实践与部署8.1 结构化日志最佳实践一致的字段命名# 好logger.info(用户登录,user_id123,actionlogin,resultsuccess)# 不好logger.info(用户登录,userId123,ACTIONlogin,resultSUCCESS)有意义的日志级别TRACE: 详细的调试信息DEBUG: 开发环境调试信息INFO: 正常的业务操作WARN: 预期外但可恢复的情况ERROR: 需要干预的错误FATAL: 系统无法继续运行包含足够的上下文# 添加请求上下文withlogger.with_context(request_idrequest_id,user_iduser_id,session_idsession_id):logger.info(处理用户请求,endpointrequest.path)8.2 生产环境部署指南classProductionLoggingDeployment:生产环境日志部署staticmethoddefsetup_logging_for_web_app():为Web应用设置日志config{version:1.0,defaults:{level:info,capture_stacktrace:True,enable_performance_stats:True},formatters:{json:{type:json,indent:None,ensure_ascii:False,sort_keys:False},json_pretty:{type:json,indent:2,ensure_ascii:False,sort_keys:True}},handlers:{console:{type:console,level:info,formatter:json,use_colors:False,filters:[{type:rate_limit,max_per_second:1000},{type:sensitive_data}]},app_file:{type:rotating_file,level:info,formatter:json,filename:/var/log/app/app.log,max_size_mb:1024,# 1GBbackup_count:10},error_file:{type:rotating_file,level:error,formatter:json_pretty,filename:/var/log/app/error.log,max_size_mb:100,backup_count:5},async_file:{type:async,level:info,base_handler:{type:rotating_file,filename:/var/log/app/async.log,max_size_mb:1024,backup_count:10},max_queue_size:100000,worker_count:4,drop_when_full:True},metrics_file:{type:batch,level:info,base_handler:{type:file,filename:/var/log/app/metrics.log,formatter:json},batch_size:100,flush_interval:5.0,compression:True}},loggers:{root:{level:warn,handlers:[console],propagate:False},app:{level:info,handlers:[app_file,async_file],propagate:False},app.api:{level:debug,handlers:[app_file],propagate:True},app.error:{level:error,handlers:[error_file],propagate:True},app.metrics:{level:info,handlers:[metrics_file],propagate:False},app.performance:{level:info,handlers:[async_file],propagate:False}}}# 初始化日志管理器log_managerLogManager()log_manager.configure(config)# 设置全局上下文importsocketimportos log_manager.set_global_context(app_nameos.environ.get(APP_NAME,unknown),app_versionos.environ.get(APP_VERSION,unknown),environmentos.environ.get(ENVIRONMENT,production),hostnamesocket.gethostname(),pod_nameos.environ.get(POD_NAME,unknown),regionos.environ.get(AWS_REGION,unknown))returnlog_managerstaticmethoddefsetup_request_logging_middleware(logger_name:strapp.api):设置请求日志中间件fromfunctoolsimportwrapsimportuuid log_managerLogManager()loggerlog_manager.get_logger(logger_name)defrequest_logging_middleware(func):wraps(func)defwrapper(request,*args,**kwargs):# 生成请求IDrequest_idstr(uuid.uuid4())# 添加上下文withlogger.with_context(request_idrequest_id,methodrequest.method,pathrequest.path,client_iprequest.remote_addr,user_agentrequest.headers.get(User-Agent,unknown)):# 记录请求开始logger.info(请求开始,request_sizerequest.content_lengthor0)# 测量性能start_timetime.time_ns()try:# 处理请求responsefunc(request,*args,**kwargs)# 记录请求完成duration_nstime.time_ns()-start_time logger.info(请求完成,status_coderesponse.status_code,response_sizeresponse.content_lengthor0,duration_msduration_ns/1_000_000)returnresponseexceptExceptionase:# 记录错误duration_nstime.time_ns()-start_time logger.error(请求错误,error_typetype(e).__name__,error_messagestr(e),duration_msduration_ns/1_000_000,exce)# 重新抛出异常raisereturnwrapperreturnrequest_logging_middlewarestaticmethoddefsetup_database_logging():设置数据库操作日志log_managerLogManager()loggerlog_manager.get_logger(app.database)classDatabaseLogger:数据库操作日志记录器def__init__(self):self.monitorPerformanceMonitor(logger)deflog_query(self,query:str,params:tuple,duration_ms:float):记录查询日志# 采样只记录慢查询ifduration_ms100:# 超过100mslogger.warn(慢查询,queryquery[:100]...iflen(query)100elsequery,paramsstr(params)[:200],duration_msduration_ms,extra{query_type:slow})else:logger.debug(数据库查询,queryquery[:50]...iflen(query)50elsequery,duration_msduration_ms,extra{query_type:normal})# 记录性能指标self.monitor.record_metric(database_query_duration,duration_ms,unitms,tags{query_type:selectifSELECTinquery.upper()elseother})deflog_transaction(self,operation:str,success:bool,duration_ms:float):记录事务日志levelLogLevel.INFOifsuccesselseLogLevel.ERROR logger.log(level,数据库事务,operationoperation,successsuccess,duration_msduration_ms)returnDatabaseLogger()8.3 监控与告警配置classLogMonitoringAndAlerting:日志监控与告警staticmethoddefsetup_log_based_alerts():设置基于日志的告警alerts{error_rate:{description:错误率超过阈值,condition:lambdastats:(stats.get(error_count,0)10andstats.get(total_logs,1)100andstats[error_count]/stats[total_logs]0.01# 1%错误率),severity:high,action:通知开发团队},queue_full:{description:日志队列已满,condition:lambdastats:(stats.get(queue_full,False)orstats.get(dropped,0)100),severity:medium,action:增加队列大小或工作者数量},performance_degradation:{description:日志性能下降,condition:lambdastats:(stats.get(rate_per_second,0)1000# 低于1000条/秒),severity:low,action:检查日志处理器配置},disk_space:{description:日志磁盘空间不足,condition:lambdastats:(stats.get(disk_usage_percent,0)90),severity:critical,action:清理旧日志或增加磁盘空间}}returnalertsstaticmethoddefmonitor_logging_system(log_manager:LogManager,check_interval:int60):监控日志系统importpsutildefcheck_system():检查系统状态# 获取日志统计statslog_manager.get_all_stats()# 获取系统信息disk_usagepsutil.disk_usage(/var/logifos.path.exists(/var/log)else.)system_stats{disk_usage_percent:disk_usage.percent,disk_free_gb:disk_usage.free/(1024**3),memory_percent:psutil.virtual_memory().percent,cpu_percent:psutil.cpu_percent(interval1)}# 合并统计all_stats{**stats,**system_stats}# 检查告警alertsLogMonitoringAndAlerting.setup_log_based_alerts()triggered_alerts[]foralert_name,alert_configinalerts.items():ifalert_config[condition](all_stats):triggered_alerts.append({name:alert_name,description:alert_config[description],severity:alert_config[severity],action:alert_config[action],timestamp:datetime.now().isoformat(),stats:{k:vfork,vinall_stats.items()ifnotisinstance(v,dict)}})returntriggered_alertsdefmonitoring_loop():监控循环whileTrue:try:alertscheck_system()ifalerts:# 处理告警foralertinalerts:print(f告警 [{alert[severity]}]:{alert[description]})# 这里可以发送告警到监控系统# 例如发送到Prometheus、Datadog、PagerDuty等time.sleep(check_interval)exceptExceptionase:print(f监控循环错误:{e})time.sleep(check_interval)# 启动监控线程monitor_threadthreading.Thread(targetmonitoring_loop,daemonTrue)monitor_thread.start()returnmonitor_thread9. 总结与展望9.1 关键收获通过本文的实现我们获得了以下关键能力完整的结构化日志系统支持JSON格式、上下文管理、敏感信息过滤高性能处理能力异步处理、批量处理、速率限制分布式追踪集成支持跨服务调用追踪性能监控内置性能指标收集和分析灵活的配置管理支持YAML/JSON/TOML配置文件生产就绪包含轮转、采样、聚合等高级特性9.2 性能数据总结根据我们的性能测试不同配置的日志系统性能表现配置吞吐量日志/秒平均延迟适用场景单线程同步5,000-10,0000.1-0.2ms低并发应用多线程异步50,000-100,0000.01-0.05ms高并发Web服务批量处理100,0000.5-1ms批处理延迟日志密集型应用9.3 未来发展方向AI驱动的日志分析使用机器学习自动检测异常模式实时流处理与Kafka、Flink等流处理系统集成无服务器架构支持适应函数计算等无服务器环境多语言支持统一的日志格式跨语言使用自动日志优化基于使用模式自动调整日志级别和采样率附录A. 日志级别对照表级别数值描述使用场景TRACE0最详细的跟踪信息开发调试性能分析DEBUG1调试信息开发环境问题排查INFO2常规信息业务操作系统状态WARN3警告信息预期外但可恢复的情况ERROR4错误信息需要干预的错误FATAL5严重错误系统无法继续运行B. 常见问题解答Q1: 结构化日志应该包含哪些字段A: 建议包含时间戳、级别、消息、来源、请求ID、用户ID、追踪ID、执行时间等基础字段以及业务相关字段。Q2: 如何处理日志中的敏感信息A: 使用敏感信息过滤器自动脱敏避免在日志中记录密码、密钥、个人身份信息等。Q3: 日志采样率如何设置A: 根据应用负载和存储容量决定。生产环境通常设置1-10%的采样率错误日志通常100%采样。Q4: 日志应该保留多久A: 根据合规要求和业务需求决定。通常调试日志保留7天业务日志保留30天审计日志保留1年以上。C. 性能优化建议异步处理对于高并发应用使用异步日志处理器批量写入减少磁盘I/O次数内存缓冲使用内存缓冲区减少锁竞争连接池对于远程日志服务使用连接池压缩存储对历史日志进行压缩存储免责声明本文提供的代码和方案仅供参考生产环境中请根据具体需求进行性能测试和安全审计。日志系统设计应考虑具体业务场景和合规要求。
版权声明:本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!

香水网站模板百度推广营销方案

为什么VisualGGPK2在3.25.3e版本失效?5分钟快速修复方法大揭秘 【免费下载链接】VisualGGPK2 Library for Content.ggpk of PathOfExile (Rewrite of libggpk) 项目地址: https://gitcode.com/gh_mirrors/vi/VisualGGPK2 当Path of Exile更新到3.25.3e版本后…

张小明 2026/1/1 2:45:37 网站建设

在线做ps是什么网站望野怎么读

还在为Apache Pulsar集群管理头疼吗?😩 配置复杂、命令记不住、权限问题频发?别担心,今天我就带你用pulsar-admin这个神器,轻松解决日常运维中的各种"老大难"问题! 【免费下载链接】pulsar Apach…

张小明 2026/1/1 2:43:29 网站建设

三站合一 网站建设高德地图海外能用吗

博主介绍:✌️码农一枚 ,专注于大学生项目实战开发、讲解和毕业🚢文撰写修改等。全栈领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java、小程序技术领域和毕业项目实战 ✌️技术范围:&am…

张小明 2026/1/1 2:41:23 网站建设

杭州网站建设方案书使用php做的网站

第一章:多模态大模型推理速度的核心挑战 多模态大模型在融合文本、图像、音频等多种数据类型方面展现出强大能力,但其推理速度面临严峻挑战。随着模型参数规模的持续增长,计算资源消耗急剧上升,导致端到端延迟显著增加&#xff0c…

张小明 2026/1/1 2:37:17 网站建设

微信公众号网站开发模板商标设计网站免费

FaceFusion开源协议解读:商业用途是否合规? 在短视频、虚拟偶像和AI创意工具爆发式增长的今天,一个技术问题正悄然成为产品开发者的“雷区”——你用的AI换脸工具,真的能商用吗? FaceFusion 就是这样一个典型代表。它…

张小明 2026/1/1 2:35:13 网站建设

高端网站设计合肥网站建设科技公司名字大全参考

有声读物制作神器!EmotiVoice让朗读充满感情色彩 在数字内容爆炸式增长的今天,有声读物、AI配音、虚拟主播等应用正以前所未有的速度渗透进我们的生活。然而,一个长期困扰创作者的问题始终存在:机器朗读听起来总是“冷冰冰”的&am…

张小明 2026/1/1 2:33:11 网站建设