OpenClaw性能优化:让AI助手快10倍的实战技巧
为什么需要性能优化?
OpenClaw作为AI助手,性能直接影响用户体验和效率。通过优化,你可以:
– **提升响应速度**:从秒级响应到毫秒级响应 – **降低资源消耗**:减少CPU、内存、网络使用 – **提高并发能力**:同时处理更多任务 – **增强系统稳定性**:避免崩溃和超时
核心优化策略
1. 内存优化:减少不必要的内存占用
memory_optimizer.py
import gc import psutil import os from collections import defaultdict from datetime import datetime, timedelta
class MemoryOptimizer: def __init__(self, max_memory_mb=500, max_cache_items=1000): self.max_memory_mb = max_memory_mb self.max_cache_items = max_cache_items self.cache = {} self.access_times = {} self.hit_count = 0 self.miss_count = 0
def get(self, key, default=None): """获取缓存项,更新访问时间""" if key in self.cache: self.access_times[key] = datetime.now() self.hit_count += 1 return self.cache[key] else: self.miss_count += 1 return default
def set(self, key, value, ttl_seconds=3600): """设置缓存项,自动清理过期项""" # 检查内存使用 if self._memory_usage_mb() > self.max_memory_mb: self._cleanup_old_items()
# 检查缓存数量 if len(self.cache) >= self.max_cache_items: self._remove_least_used()
self.cache[key] = value self.access_times[key] = datetime.now()
# 设置过期时间(如果需要) if ttl_seconds: expiration_time = datetime.now() + timedelta(seconds=ttl_seconds) # 这里可以存储过期时间,定期清理
return True
def _memory_usage_mb(self): """获取当前内存使用(MB)""" process = psutil.Process(os.getpid()) return process.memory_info().rss / 1024 / 1024
def _cleanup_old_items(self): """清理旧缓存项""" current_time = datetime.now() keys_to_remove = []
for key, last_access in self.access_times.items(): # 清理超过1小时未访问的项 if (current_time - last_access).total_seconds() > 3600: keys_to_remove.append(key)
for key in keys_to_remove: del self.cache[key] del self.access_times[key]
if keys_to_remove: print(f"🧹 清理了 {len(keys_to_remove)} 个旧缓存项")
def _remove_least_used(self): """移除最少使用的缓存项""" if not self.access_times: return
# 找到最久未访问的项 oldest_key = min(self.access_times.items(), key=lambda x: x[1])[0]
del self.cache[oldest_key] del self.access_times[oldest_key]
print(f"🗑️ 移除了最少使用的缓存项: {oldest_key}")
def stats(self): """获取缓存统计信息""" return { 'total_items': len(self.cache), 'memory_usage_mb': self._memory_usage_mb(), 'hit_rate': self.hit_count / (self.hit_count + self.miss_count) if (self.hit_count + self.miss_count) > 0 else 0, 'hit_count': self.hit_count, 'miss_count': self.miss_count, 'oldest_item_age': min((datetime.now() - t).total_seconds() for t in self.access_times.values()) if self.access_times else 0 }
def optimize_memory(self): """执行内存优化""" print("🧠 开始内存优化...")
# 1. 清理缓存 self._cleanup_old_items()
# 2. 强制垃圾回收 gc.collect()
# 3. 压缩内存(如果有大对象) self._compress_large_objects()
stats = self.stats() print(f"✅ 内存优化完成,当前使用: {stats['memory_usage_mb']:.1f}MB") return stats
def _compress_large_objects(self): """压缩大对象(示例)""" # 这里可以实现对大对象的压缩逻辑 # 例如:对大型字符串、列表、字典进行压缩 pass
使用示例
def demo_memory_optimization(): """内存优化演示""" optimizer = MemoryOptimizer(max_memory_mb=500, max_cache_items=5000)
# 1. 添加测试数据到缓存 print("添加测试数据到缓存...") for i in range(1000): key = f"data_{i}" value = { 'id': i, 'name': f'Item {i}', 'content': 'x' * 1000, # 1KB数据 'timestamp': datetime.now().isoformat() } optimizer.set(key, value)
# 2. 模拟访问模式 print("模拟访问模式...") import random for _ in range(5000): key = f"data_{random.randint(0, 999)}" optimizer.get(key)
# 3. 查看统计信息 stats = optimizer.stats() print(f"\n📊 缓存统计:") print(f" 总项数: {stats['total_items']}") print(f" 内存使用: {stats['memory_usage_mb']:.1f}MB") print(f" 命中率: {stats['hit_rate']:.2%}") print(f" 命中次数: {stats['hit_count']}") print(f" 未命中次数: {stats['miss_count']}")
# 4. 执行优化 print("\n🔧 执行内存优化...") optimizer.optimize_memory()
# 5. 优化后统计 stats_after = optimizer.stats() print(f"\n✅ 优化后统计:") print(f" 总项数: {stats_after['total_items']}") print(f" 内存使用: {stats_after['memory_usage_mb']:.1f}MB") print(f" 命中率: {stats_after['hit_rate']:.2%}")
if __name__ == '__main__': demo_memory_optimization()
2. 数据库优化:提升数据访问速度
“`python
database_optimizer.py
import sqlite3 import json from datetime import datetime, timedelta import time
class DatabaseOptimizer: def __init__(self, db_path=’openclaw_data.db’): self.db_path = db_path self.connection_pool = [] self.max_pool_size = 5 self.query_cache = {} self._init_optimized_tables()
def _init_optimized_tables(self): “””初始化优化后的数据库表””” conn = self._get_connection() cursor = conn.cursor()
# 创建带索引的任务日志表 cursor.execute(”’ CREATE TABLE IF NOT EXISTS task_logs_optimized ( id INTEGER PRIMARY KEY AUTOINCREMENT, task_name TEXT NOT NULL, status TEXT NOT NULL, start_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, end_time TIMESTAMP, result TEXT, error_message TEXT, INDEX idx_task_name (task_name), INDEX idx_status (status), INDEX idx_start_time (start_time) ) ”’)
# 创建带索引的内容统计表 cursor.execute(”’ CREATE TABLE IF NOT EXISTS content_stats_optimized ( id INTEGER PRIMARY KEY AUTOINCREMENT, date DATE NOT NULL, articles_published INTEGER DEFAULT 0, words_written INTEGER DEFAULT 0, seo_score INTEGER DEFAULT 0, traffic_visits INTEGER DEFAULT 0, UNIQUE(date), INDEX idx_date (date) ) ”’)
# 创建查询缓存表 cursor.execute(”’ CREATE TABLE IF NOT EXISTS query_cache ( query_hash TEXT PRIMARY KEY, query_text TEXT NOT NULL, result_json TEXT NOT NULL, created_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, expires_at TIMESTAMP, hit_count INTEGER DEFAULT 0, INDEX idx_expires_at (expires_at) ) ”’)
conn.commit() self._return_connection(conn)
def _get_connection(self): “””从连接池获取连接””” if self.connection_pool: return self.connection_pool.pop() else: return sqlite3.connect(self.db_path, check_same_thread=False)
def _return_connection(self, conn): “””归还连接到连接池””” if len(self.connection_pool) < self.max_pool_size: self.connection_pool.append(conn) else: conn.close()
def execute_query(self, query, params=None, use_cache=True, cache_ttl=300): “””执行查询,支持缓存””” query_hash = self._hash_query(query, params)
# 检查缓存 if use_cache: cached_result = self._get_cached_query(query_hash) if cached_result is not None: return cached_result
# 执行查询 conn = self._get_connection() cursor = conn.cursor()
start_time = time.time()
if params: cursor.execute(query, params) else: cursor.execute(query)
# 获取结果 if query.strip().upper().startswith(‘SELECT’): results = cursor.fetchall() columns = [description[0] for description in cursor.description] result_data = { ‘columns’: columns, ‘rows’: results, ‘row_count’: len(results) } else: conn.commit() result_data = { ‘rowcount’: cursor.rowcount, ‘lastrowid’: cursor.lastrowid }
query_time = time.time() – start_time
# 缓存结果 if use_cache and query.strip().upper().startswith(‘SELECT’): self._cache_query(query_hash, query, params, result_data, cache_ttl)
self._return_connection(conn)
return { ‘data’: result_data, ‘query_time’: query_time, ‘cached’: False }
def _hash_query(self, query, params): “””生成查询哈希””” import hashlib
query_str = query + (json.dumps(params) if params else ”) return hashlib.md5(query_str.encode()).hexdigest()
def _get_cached_query(self, query_hash): “””获取缓存的查询结果””” conn = self._get_connection() cursor = conn.cursor()
cursor.execute(”’ SELECT result_json, expires_at FROM query_cache WHERE query_hash = ? AND (expires_at IS NULL OR expires_at > ?) ”’, (query_hash, datetime.now().isoformat()))
row = cursor.fetchone()
if row: result_json, expires_at = row
# 更新命中计数 cursor.execute(”’ UPDATE query_cache SET hit_count = hit_count + 1 WHERE query_hash = ? ”’, (query_hash,))
conn.commit() self._return_connection(conn)
return { ‘data’: json.loads(result_json), ‘query_time’: 0.001, # 缓存查询时间 ‘cached’: True }
self._return_connection(conn) return None
def _cache_query(self, query_hash, query, params, result_data, ttl_seconds): “””缓存查询结果””” conn = self._get_connection() cursor = conn.cursor()
expires_at = None if ttl_seconds: expires_at = (datetime.now() + timedelta(seconds=ttl_seconds)).isoformat()
try: cursor.execute(”’ INSERT OR REPLACE INTO query_cache (query_hash, query_text, result_json, expires_at, hit_count) VALUES (?, ?, ?, ?, 0) ”’, ( query_hash, query + (json.dumps(params) if params else ”), json.dumps(result_data), expires_at ))
conn.commit() except Exception as e: print(f”❌ 缓存查询失败: {e}”) finally: self._return_connection(conn)
def optimize_database(self): “””执行数据库优化””” print(“🗄️ 开始数据库优化…”)
conn = self._get_connection() cursor = conn.cursor()
optimizations = []
# 1. 重建索引 cursor.execute(“SELECT name FROM sqlite_master WHERE type=’index’”) indexes = cursor.fetchall()
for (index_name,) in indexes: try: cursor.execute(f”REINDEX {index_name}”) optimizations.append(f”重建索引: {index_name}”) except Exception as e: print(f”❌ 重建索引 {index_name} 失败: {e}”)
# 2. 清理过期缓存 cursor.execute(”’ DELETE FROM query_cache WHERE expires_at IS NOT NULL AND expires_at < ? ''', (datetime.now().isoformat(),))
deleted_count = cursor.rowcount if deleted_count > 0: optimizations.append(f”清理过期缓存: {deleted_count} 条”)
# 3. 分析表统计信息 cursor.execute(“ANALYZE”) optimizations.append(“更新表统计信息”)
# 4. 清理碎片 cursor.execute(“VACUUM”) optimizations.append(“清理数据库碎片”)
conn.commit() self._return_connection(conn)
print(f”✅ 数据库优化完成,执行了 {len(optimizations)} 项优化:”) for opt in optimizations: print(f” • {opt}”)
return optimizations
def get_performance_stats(self): “””获取数据库性能统计””” conn = self._get_connection() cursor = conn.cursor()
stats = {}
# 表大小统计 cursor.execute(”’ SELECT name as table_name, (SELECT COUNT(*) FROM pragma_table_info(name)) as column_count, (SELECT COUNT(*) FROM main.sqlite_master WHERE type=’index’ AND tbl_name=name) as index_count FROM sqlite_master WHERE type=’table’ AND name NOT LIKE ‘sqlite_%’ ”’)
tables = cursor.fetchall() stats[‘tables’] = [ {‘name’: name, ‘columns’: cols, ‘indexes’: idx} for name, cols, idx in tables ]
# 缓存命中率 cursor.execute(”’ SELECT COUNT(*) as total_queries, SUM(hit_count) as total_hits, AVG(hit_count) as avg_hits_per_query FROM query_cache ”’)
cache_stats = cursor.fetchone() if cache_stats: total_queries, total_hits, avg_hits = cache_stats stats[‘cache’] = { ‘total_queries’: total_queries or 0, ‘total_hits’: total_hits or 0, ‘hit_rate’: total_hits / total_queries if total_queries else 0, ‘avg_hits_per_query’: avg_hits or 0 }
# 连接池状态 stats[‘connection_pool’] = { ‘current_size’: len(self.connection_pool), ‘max_size’: self.max_pool_size, ‘utilization’: len(self.connection_pool) / self.max_pool_size if self.max_pool_size else 0 }
self._return_connection(conn) return stats
使用示例
def demo_database_optimization(): “””数据库优化演示””” optimizer = DatabaseOptimizer(‘test_performance.db’)
print(“1. 创建测试数据…”)
# 插入测试数据 for i in range(1000): optimizer.execute_query(”’ INSERT INTO task_logs_optimized (task_name, status, result) VALUES (?, ?, ?) ”’, (f’task_{i}’, ‘success’ if i % 10 != 0 else ‘failed’, json.dumps({‘iteration’: i})))
print(“2. 测试查询性能…”)
# 测试无缓存查询 start_time = time.time() for i in range(100): result = optimizer.execute_query( ‘SELECT * FROM task_logs_optimized WHERE task_name = ?’, (f’task_{i % 100}’,), use_cache=False ) no_cache_time = time.time() – start_time
# 测试有缓存查询 start_time = time.time() for i in range(100): result = optimizer.execute_query( ‘SELECT * FROM task_logs_optimized WHERE task_name = ?’, (f’task_{i % 100}’,), use_cache=True, cache_ttl=60 ) cache_time = time.time() – start_time
print(f”\n📊 查询性能对比:”) print(f” 无缓存: {no_cache_time:.3f}秒 (100次查询)”) print(f” 有缓存: {cache_time:.3f}秒 (100次查询)”) print(f” 性能提升: {(no_cache_time – cache_time) / no_cache_time *





暂无评论内容