altyzen-agi-browser / order_agent_worker.py
Yuvan666's picture
fix: Remove hardcoded Windows path breaking Docker env loading
b328621
"""
ALTYZEN Order Validation Worker
"""
import os
import json
import time
import asyncio
import logging
import httpx
from typing import Dict, Any, Optional
from order_worker_info import IDENTITY, MISSION, NATURE, RESOURCES, CONFLICT_PROTOCOL
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
from dotenv import load_dotenv
# Load .env from current directory (works in both Windows dev and Linux Docker)
load_dotenv()
# Validate API Key immediately
api_key = os.getenv("OPENROUTER_API_KEY")
if not api_key:
logger.critical("❌ CRITICAL: OPENROUTER_API_KEY NOT FOUND in environment! LLM calls will fail.")
else:
logger.info(f"✅ API Key found: {api_key[:5]}...{api_key[-4:]}")
MODELS = {
"brain": "tngtech/deepseek-r1t-chimera:free",
"eyes": "qwen/qwen-2.5-vl-7b-instruct:free",
"hands": "kwaipilot/kat-coder-pro:free",
"guard": "deepseek/deepseek-r1-0528:free",
}
OPENROUTER_URL = "https://openrouter.ai/api/v1/chat/completions"
SYSTEM_PROMPT = f"""
{IDENTITY}
MISSION:
{MISSION}
NATURE:
{NATURE}
RESOURCES:
{RESOURCES}
CONFLICT PROTOCOL:
{CONFLICT_PROTOCOL}
"""
class AgentOrgans:
def __init__(self, api_key: str):
self.api_key = api_key
self.page = None
self.browser = None
self.context = None
async def call_llm(self, organ: str, prompt: str) -> str:
model = MODELS.get(organ, MODELS["guard"])
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
"HTTP-Referer": "https://altyzen.com",
"X-Title": f"Altyzen-Worker-{organ.upper()}"
}
payload = {
"model": model,
"messages": [
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": prompt}
],
"temperature": 0.1,
"max_tokens": 2000
}
try:
async with httpx.AsyncClient(timeout=60.0) as client:
resp = await client.post(OPENROUTER_URL, json=payload, headers=headers)
if resp.status_code == 200:
return resp.json()["choices"][0]["message"]["content"]
return f"ERROR: {resp.status_code}"
except Exception as e:
return f"ERROR: {e}"
async def initialize(self, proxy_config: Optional[Dict] = None) -> bool:
try:
from playwright.async_api import async_playwright
pw = await async_playwright().start()
context_args = {"viewport": {"width": 1280, "height": 720}}
if proxy_config:
context_args["proxy"] = proxy_config
self.browser = await pw.chromium.launch(headless=True)
self.context = await self.browser.new_context(**context_args)
self.page = await self.context.new_page()
return True
except Exception as e:
logger.error(f"Init failed: {e}")
return False
async def close(self):
if self.browser:
await self.browser.close()
async def get_rotating_proxy() -> Optional[Dict]:
webshare_key = os.getenv("WEBSHARE_API_KEY", "")
if not webshare_key:
return None
try:
async with httpx.AsyncClient(timeout=10.0) as client:
resp = await client.get(
"https://proxy.webshare.io/api/v2/proxy/list/?mode=direct&page=1&page_size=1",
headers={"Authorization": f"Token {webshare_key}"}
)
if resp.status_code == 200:
data = resp.json()
if data.get("results"):
p = data["results"][0]
return {
"server": f"http://{p['proxy_address']}:{p['port']}",
"username": p["username"],
"password": p["password"]
}
except:
pass
return None
async def get_a11y_tree(page) -> str:
try:
snapshot = await page.accessibility.snapshot()
return json.dumps(snapshot, indent=2)[:3000] if snapshot else ""
except:
return ""
async def validate_order(order_data: Dict[str, Any]) -> Dict[str, Any]:
start_time = time.time()
email = order_data.get("email", "")
phone = order_data.get("phone", "")
zip_code = order_data.get("zip", "")
city = order_data.get("city", "")
state = order_data.get("state", "")
api_key = os.getenv("OPENROUTER_API_KEY")
if not api_key:
return {"decision": "ERROR", "error": "No API key"}
organs = AgentOrgans(api_key)
proxy = await get_rotating_proxy()
if not await organs.initialize(proxy):
return {"decision": "ERROR", "error": "Browser init failed"}
try:
task = f"Validate: Email={email}, Phone={phone}, Geo={zip_code},{city},{state}. Return JSON with decision, email_valid, phone_valid, geo_match."
result = None
for _ in range(10):
a11y = await get_a11y_tree(organs.page)
url = organs.page.url
prompt = f"URL: {url}\nA11y: {a11y[:1500]}\nTask: {task}\nWhat next? If done, return JSON."
response = await organs.call_llm("brain", prompt)
if '{"decision":' in response:
try:
start = response.find("{")
end = response.rfind("}") + 1
result = json.loads(response[start:end])
break
except:
pass
if "NAVIGATE:" in response.upper():
url = response.split(":", 1)[1].strip().split()[0]
await organs.page.goto(url, timeout=30000)
elif "CLICK:" in response.upper():
target = response.split(":", 1)[1].strip()
try:
await organs.page.click(f"text={target[:30]}", timeout=5000)
except:
pass
elif "TYPE:" in response.upper() and "INTO" in response.upper():
parts = response.split("INTO")
text = parts[0].replace("TYPE:", "").strip()
try:
await organs.page.fill("input:visible", text, timeout=5000)
except:
pass
await asyncio.sleep(1)
await organs.close()
if result:
return {
"order_id": order_data.get("order_id", "UNKNOWN"),
"decision": result.get("decision", "UNKNOWN"),
"email_valid": result.get("email_valid", False),
"phone_valid": result.get("phone_valid", False),
"geo_match": result.get("geo_match", False),
"reasoning": result.get("reasoning", ""),
"execution_time_ms": int((time.time() - start_time) * 1000)
}
return {"order_id": order_data.get("order_id"), "decision": "MANUAL_REVIEW"}
except Exception as e:
await organs.close()
return {"decision": "ERROR", "error": str(e)}