OpenClaw自动化工作流:从零搭建智能任务调度系统
为什么需要自动化工作流?
OpenClaw作为AI助手,每天需要处理大量重复性任务:内容发布、数据同步、系统监控、SEO优化等。手动操作不仅效率低下,还容易出错。通过自动化工作流,你可以:
– **节省90%重复工作时间** – **实现7×24小时无人值守运行** – **确保任务执行的准确性和一致性** – **快速响应业务变化和需求**
核心组件:OpenClaw自动化引擎
1. 任务调度器(Task Scheduler)
task_scheduler.py
import schedule import time from datetime import datetime
class OpenClawTaskScheduler: def __init__(self): self.tasks = {} self.running = False
def add_task(self, name, func, interval_minutes=60, **kwargs): """添加定时任务""" self.tasks[name] = { 'func': func, 'interval': interval_minutes, 'last_run': None, 'next_run': None, 'kwargs': kwargs } print(f"✅ 任务 '{name}' 已添加,每 {interval_minutes} 分钟执行一次")
def start(self): """启动调度器""" self.running = True print("🚀 OpenClaw自动化调度器已启动")
while self.running: current_time = datetime.now()
for name, task in self.tasks.items(): # 检查是否需要执行 if self._should_run_task(task, current_time): print(f"⏰ 执行任务: {name}") try: task'func' task['last_run'] = current_time task['next_run'] = self._calculate_next_run(task, current_time) print(f"✅ 任务 '{name}' 执行完成") except Exception as e: print(f"❌ 任务 '{name}' 执行失败: {e}")
time.sleep(60) # 每分钟检查一次
def _should_run_task(self, task, current_time): """判断任务是否需要执行""" if task['last_run'] is None: return True
minutes_passed = (current_time - task['last_run']).total_seconds() / 60 return minutes_passed >= task['interval']
def _calculate_next_run(self, task, current_time): """计算下次执行时间""" from datetime import timedelta return current_time + timedelta(minutes=task['interval'])
2. 内容自动化发布系统
content_automation.py
import requests import base64 import json from pathlib import Path
class ContentAutomation: def __init__(self, wp_url, username, password): self.wp_url = wp_url self.credentials = base64.b64encode(f'{username}:{password}'.encode()).decode() self.headers = { 'Authorization': f'Basic {self.credentials}', 'Content-Type': 'application/json' }
def publish_article(self, markdown_path, category_id=61, tags=None): """自动发布Markdown文章到WordPress"""
# 1. 读取Markdown文件 with open(markdown_path, 'r', encoding='utf-8') as f: content = f.read()
# 2. 提取标题和内容 title = self._extract_title(content) html_content = self._markdown_to_html(content)
# 3. 准备发布数据 post_data = { 'title': title, 'content': html_content, 'status': 'publish', 'categories': [category_id], 'tags': tags or [] }
# 4. 发布到WordPress response = requests.post( f'{self.wp_url}/posts', headers=self.headers, json=post_data )
if response.status_code == 201: post_id = response.json()['id'] print(f"✅ 文章 '{title}' 发布成功,ID: {post_id}") return post_id else: print(f"❌ 文章发布失败: {response.status_code}") return None
def _extract_title(self, content): """从Markdown中提取标题""" lines = content.strip().split('\n') for line in lines: if line.startswith('# '): return line[2:].strip() return "未命名文章"
def _markdown_to_html(self, markdown): """简化版Markdown转HTML""" import re
# 转换标题 html = re.sub(r'^# (.+)$', r'
\1
', markdown, flags=re.MULTILINE) html = re.sub(r'^## (.+)$', r'\1
', html, flags=re.MULTILINE) html = re.sub(r'^### (.+)$', r'\1
', html, flags=re.MULTILINE)
# 转换代码块 html = re.sub(r'``(\w+)?\n(.*?)\n`', r'\2
', html, flags=re.DOTALL)
# 转换行内代码 html = re.sub(r'
([^]+)', r'\1', html)
# 转换链接 html = re.sub(r'\[([^\]]+)\]\(([^)]+)\)', r'\1', html)
# 转换段落 lines = html.split('\n') result = [] current_paragraph = []
for line in lines: if line.strip() and not line.startswith(('{" ".join(current_paragraph)}
') current_paragraph = [] result.append(line)
if current_paragraph: result.append(f'
{" ".join(current_paragraph)}
')
return '\n'.join(result)
3. 数据同步自动化
data_sync_automation.py
import sqlite3 import csv import json from datetime import datetime
class DataSyncAutomation: def __init__(self, db_path='openclaw_data.db'): self.db_path = db_path self._init_database()
def _init_database(self): """初始化数据库""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor()
# 创建任务日志表 cursor.execute(''' CREATE TABLE IF NOT EXISTS task_logs ( id INTEGER PRIMARY KEY AUTOINCREMENT, task_name TEXT NOT NULL, status TEXT NOT NULL, start_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, end_time TIMESTAMP, result TEXT, error_message TEXT ) ''')
# 创建内容统计表 cursor.execute(''' CREATE TABLE IF NOT EXISTS content_stats ( id INTEGER PRIMARY KEY AUTOINCREMENT, date DATE NOT NULL, articles_published INTEGER DEFAULT 0, words_written INTEGER DEFAULT 0, seo_score INTEGER DEFAULT 0, traffic_visits INTEGER DEFAULT 0 ) ''')
conn.commit() conn.close()
def log_task(self, task_name, status, result=None, error=None): """记录任务执行日志""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor()
cursor.execute(''' INSERT INTO task_logs (task_name, status, result, error_message) VALUES (?, ?, ?, ?) ''', (task_name, status, json.dumps(result) if result else None, error))
conn.commit() conn.close() print(f"📝 任务日志已记录: {task_name} - {status}")
def sync_wordpress_stats(self): """同步WordPress统计数据""" try: # 这里可以集成WordPress API获取统计数据 stats = { 'articles_published': 15, 'words_written': 45000, 'seo_score': 85, 'traffic_visits': 1200 }
self.log_task('wordpress_stats_sync', 'success', stats) return stats
except Exception as e: self.log_task('wordpress_stats_sync', 'failed', error=str(e)) return None
def export_to_csv(self, table_name, output_path): """导出数据到CSV""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor()
cursor.execute(f'SELECT * FROM {table_name}') rows = cursor.fetchall() columns = [description[0] for description in cursor.description]
with open(output_path, 'w', newline='', encoding='utf-8') as f: writer = csv.writer(f) writer.writerow(columns) writer.writerows(rows)
conn.close() print(f"📤 数据已导出到: {output_path}")
实战案例:搭建完整的内容自动化工作流
案例1:每日内容发布自动化
daily_content_automation.py
from task_scheduler import OpenClawTaskScheduler from content_automation import ContentAutomation from data_sync_automation import DataSyncAutomation import os
def setup_daily_content_workflow(): """设置每日内容自动化工作流"""
# 1. 初始化组件 scheduler = OpenClawTaskScheduler() content_auto = ContentAutomation( wp_url='https://www.aixianbao.cn/wp-json/wp/v2', username='laoyou', password='dQZHPam8F2DB1xAE7cHNqRJb' ) data_sync = DataSyncAutomation()
# 2. 定义每日任务 def daily_content_publish(): """每日内容发布任务""" articles_dir = '/home/huanhuan/.openclaw-1号/workspace/articles/'
# 查找最新的文章文件 article_files = sorted([ f for f in os.listdir(articles_dir) if f.endswith('.md') and f.startswith('openclaw-') ])
if article_files: latest_article = os.path.join(articles_dir, article_files[-1]) print(f"📄 准备发布文章: {latest_article}")
post_id = content_auto.publish_article(latest_article) if post_id: data_sync.log_task('daily_content_publish', 'success', {'post_id': post_id})
def daily_stats_sync(): """每日数据同步任务""" stats = data_sync.sync_wordpress_stats() if stats: print(f"📊 数据同步完成: {stats}")
def daily_backup(): """每日备份任务""" import shutil from datetime import datetime
backup_dir = '/home/huanhuan/.openclaw-1号/backups/' os.makedirs(backup_dir, exist_ok=True)
backup_file = f"{backup_dir}openclaw_backup_{datetime.now().strftime('%Y%m%d')}.db" shutil.copy2('openclaw_data.db', backup_file)
print(f"💾 数据库备份完成: {backup_file}") data_sync.log_task('daily_backup', 'success', {'backup_file': backup_file})
# 3. 添加定时任务 scheduler.add_task( name='daily_content_publish', func=daily_content_publish, interval_minutes=1440 # 24小时 )
scheduler.add_task( name='daily_stats_sync', func=daily_stats_sync, interval_minutes=1440 )
scheduler.add_task( name='daily_backup', func=daily_backup, interval_minutes=1440 )
# 4. 启动调度器 scheduler.start()
if __name__ == '__main__': setup_daily_content_workflow()
案例2:SEO监控和优化自动化
“`python
seo_monitoring_automation.py
import requests from bs4 import BeautifulSoup import time
class SEOMonitoringAutomation: def __init__(self, website_url): self.website_url = website_url
def check_seo_health(self): “””检查网站SEO健康度””” seo_checks = { ‘title_tag’: self._check_title_tag(), ‘meta_description’: self._check_meta_description(), ‘heading_structure’: self._check_heading_structure(), ‘internal_links’: self._check_internal_links(), ‘page_speed’: self._check_page_speed() }
score = sum(1 for check in seo_checks.values() if check[‘passed’]) total = len(seo_checks)
return { ‘score’: score, ‘total’: total, ‘percentage’: int((score / total) * 100), ‘checks’: seo_checks }
def _check_title_tag(self): “””检查标题标签””” try: response = requests.get(self.website_url, timeout=10) soup = BeautifulSoup(response.text, ‘html.parser’) title = soup.find(‘title’)
if title and title.text: title_length = len(title.text) return { ‘passed’: 30 <= title_length <= 60, 'message': f'标题长度: {title_length} 字符', 'recommendation': '标题应在30-60字符之间' if not (30 <= title_length <= 60) else '标题长度合适' } except Exception as e: return {'passed': False, 'message': f'检查失败: {e}'}
return {‘passed’: False, ‘message’: ‘未找到标题标签’}
def _check_meta_description(self): “””检查Meta描述””” try: response = requests.get(self.website_url, timeout=10) soup = BeautifulSoup(response.text, ‘html.parser’) meta_desc = soup.find(‘meta’, attrs={‘name’: ‘description’})
if meta_desc and meta_desc.get(‘content’): desc_length = len(meta_desc[‘content’]) return { ‘passed’: 120 <= desc_length <= 160, 'message': f'描述长度: {desc_length} 字符', 'recommendation': '描述应在120-160字符之间' if not (120 <= desc_length <= 160) else '描述长度合适' } except Exception as e: return {'passed': False, 'message': f'检查失败: {e}'}
return {‘passed’: False, ‘message’: ‘未找到Meta描述’}
def _check_heading_structure(self): “””检查标题结构””” try: response = requests.get(self.website_url, timeout=10) soup = BeautifulSoup(response.text, ‘html.parser’)
h1_count = len(soup.find_all(‘h1’)) h2_count = len(soup.find_all(‘h2’)) h3_count = len(soup.find_all(‘h3’))
return { ‘passed’: h1_count == 1, ‘message’: f’H1: {h1_count}, H2: {h2_count}, H3: {h3_count}’, ‘recommendation’: ‘每页应有且仅有一个H1标签’ if h1_count != 1 else ‘标题结构良好’ } except Exception as e: return {‘passed’: False, ‘message’: f’检查失败: {e}’}
def _check_internal_links(self): “””检查内部链接””” try: response = requests.get(self.website_url, timeout=10) soup = BeautifulSoup(response.text, ‘html.parser’)
internal_links = [] external_links = []
for link in soup.find_all(‘a’, href=True): href = link[‘href’] if href.startswith(‘/’) or self.website_url in href: internal_links.append(href) else: external_links.append(href)
return { ‘passed’: len(internal_links) >= 3, ‘message’: f’内部链接: {len(internal_links)}, 外部链接: {len(external_links)}’, ‘recommendation’: ‘建议增加内部链接数量’ if len(internal_links) < 3 else '内部链接充足' } except Exception as e: return {'passed': False, 'message': f'检查失败: {e}'}
def _check_page_speed(self): “””检查页面加载速度””” try: start_time = time.time() response = requests.get(self.website_url, timeout=10) load_time = time.time() – start_time
return { ‘passed’: load_time < 3.0, 'message': f'加载时间: {load_time:.2f}秒', 'recommendation': '页面加载较慢,建议优化' if load_time >= 3.0 else ‘页面加载速度良好’ } except Exception as e: return {‘passed’: False, ‘message’: f’检查失败: {e}’}
使用示例
def setup_seo_monitoring(): “””设置SEO监控自动化””” seo_monitor = SEOMonitoringAutomation(‘https://www.aixianbao.cn’)
# 每日SEO检查 def daily_seo_check(): print(“🔍 开始每日SEO检查…”) seo_health = seo_monitor.check_seo_health()
print(f





暂无评论内容