Skip to content

Add agent control features: prompt enhancement, pause/resume, context… #200

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 25 additions & 8 deletions src/agent/custom_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
import logging
import pdb
import traceback
from typing import Optional, Type, List, Dict, Any, Callable
import asyncio
from typing import Optional, Type, List, Dict, Any, Callable, Union
from PIL import Image, ImageDraw, ImageFont
import os
import base64
Expand All @@ -16,15 +17,16 @@
AgentHistoryList,
AgentOutput,
AgentHistory,
AgentStepInfo,
)
from browser_use.browser.browser import Browser
from browser_use.browser.context import BrowserContext
from browser_use.browser.views import BrowserStateHistory
from browser_use.browser.views import BrowserStateHistory, BrowserState
from browser_use.controller.service import Controller
from browser_use.telemetry.views import (
AgentEndTelemetryEvent,
AgentRunTelemetryEvent,
AgentStepTelemetryEvent,
AgentEndTelemetryEvent,
AgentRunTelemetryEvent,
AgentStepTelemetryEvent,
)
from browser_use.utils import time_execution_async
from langchain_core.language_models.chat_models import BaseChatModel
Expand Down Expand Up @@ -72,7 +74,7 @@ def __init__(
max_error_length: int = 400,
max_actions_per_step: int = 10,
tool_call_in_content: bool = True,
agent_state: AgentState = None,
agent_state: Optional[AgentState] = None,
initial_actions: Optional[List[Dict[str, Dict[str, Any]]]] = None,
# Cloud Callbacks
register_new_step_callback: Callable[['BrowserState', 'AgentOutput', int], None] | None = None,
Expand Down Expand Up @@ -228,15 +230,30 @@ async def step(self, step_info: Optional[CustomAgentStepInfo] = None) -> None:
result: list[ActionResult] = []

try:
# Check for pause
if self.agent_state:
while await self.agent_state.is_paused():
if self.agent_state.is_stop_requested():
logger.info("🛑 Stop requested while paused")
return
await asyncio.sleep(0.1)

# Get any updated context
context = await self.agent_state.get_context()
if context and "user_input" in context and step_info is not None:
step_info.add_infos = context["user_input"]
logger.info(f"📝 Updated context: {context['user_input']}")

state = await self.browser_context.get_state(use_vision=self.use_vision)
self.message_manager.add_state_message(state, self._last_actions, self._last_result, step_info)
input_messages = self.message_manager.get_messages()
try:
model_output = await self.get_next_action(input_messages)
if self.register_new_step_callback:
self.register_new_step_callback(state, model_output, self.n_steps)
self.update_step_info(model_output, step_info)
logger.info(f"🧠 All Memory: \n{step_info.memory}")
if step_info is not None:
self.update_step_info(model_output, step_info)
logger.info(f"🧠 All Memory: \n{step_info.memory}")
self._save_conversation(input_messages, model_output)
if self.model_name != "deepseek-reasoner":
# remove prev message
Expand Down
56 changes: 54 additions & 2 deletions src/utils/agent_state.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
import asyncio
import logging

logger = logging.getLogger(__name__)

class AgentState:
_instance = None

def __init__(self):
if not hasattr(self, '_stop_requested'):
self._stop_requested = asyncio.Event()
self.last_valid_state = None # store the last valid browser state
self.last_valid_state = None
self._current_task = None
self._task_queue = asyncio.Queue()
self._context = {}
self._paused = asyncio.Event()
self._pause_condition = asyncio.Condition()

def __new__(cls):
if cls._instance is None:
Expand All @@ -23,8 +31,52 @@ def clear_stop(self):
def is_stop_requested(self):
return self._stop_requested.is_set()

async def pause_execution(self):
"""Pause the current task execution"""
async with self._pause_condition:
self._paused.set()
logger.info("Execution paused")

async def resume_execution(self):
"""Resume the paused task execution"""
async with self._pause_condition:
self._paused.clear()
self._pause_condition.notify_all()
logger.info("Execution resumed")

async def wait_if_paused(self):
"""Wait if execution is paused"""
if self._paused.is_set():
async with self._pause_condition:
await self._pause_condition.wait()

async def is_paused(self):
"""Check if execution is paused"""
return self._paused.is_set()

async def update_context(self, context: dict):
"""Update execution context"""
self._context.update(context)
logger.info(f"Context updated: {context}")

async def get_context(self) -> dict:
"""Get current execution context"""
return self._context

def set_last_valid_state(self, state):
self.last_valid_state = state

def get_last_valid_state(self):
return self.last_valid_state
return self.last_valid_state

async def add_task(self, task):
"""Add a new task to the queue"""
await self._task_queue.put(task)

async def get_next_task(self):
"""Get the next task from the queue"""
return await self._task_queue.get()

async def has_pending_tasks(self):
"""Check if there are pending tasks"""
return not self._task_queue.empty()
59 changes: 59 additions & 0 deletions src/utils/interactive_execution.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import asyncio
import logging
from typing import Any, Dict, Optional

logger = logging.getLogger(__name__)

async def execute_with_interaction(agent_state, agent_func, *args, **kwargs):
"""
Execute an agent function with support for pausing, resuming, and context updates.

Args:
agent_state: The AgentState instance managing execution state
agent_func: The main agent function to execute
*args, **kwargs: Arguments to pass to the agent function
"""
try:
# Start execution
logger.info("Starting execution with interaction support")

# Execute the agent function with pause checks
async def wrapped_agent_func(*args, **kwargs):
while True:
# Check for stop request first
if agent_state.is_stop_requested():
logger.info("Stop requested, terminating execution")
return None

# Check and handle pause
if await agent_state.is_paused():
logger.info("Execution paused, waiting for resume...")
while await agent_state.is_paused():
if agent_state.is_stop_requested():
logger.info("Stop requested while paused")
return None
await asyncio.sleep(0.1)
logger.info("Execution resumed")

try:
# Get any pending tasks
if await agent_state.has_pending_tasks():
task = await agent_state.get_next_task()
logger.info(f"Processing task: {task}")
await agent_state.update_context({"current_task": task})

# Execute one step of the agent function
result = await agent_func(*args, **kwargs)
return result
except Exception as e:
logger.error(f"Error during execution step: {str(e)}")
await agent_state.update_context({"last_error": str(e)})
raise

# Run the wrapped function
return await wrapped_agent_func(*args, **kwargs)

except Exception as e:
logger.error(f"Error during execution: {str(e)}")
await agent_state.update_context({"last_error": str(e)})
raise
Loading