Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix session leak #5656

Merged
merged 1 commit into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 36 additions & 20 deletions openhands/server/session/agent_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
from openhands.storage.files import FileStore
from openhands.utils.async_utils import call_async_from_sync

WAIT_TIME_BEFORE_CLOSE = 300
WAIT_TIME_BEFORE_CLOSE_INTERVAL = 5


class AgentSession:
"""Represents a session with an Agent
Expand All @@ -30,6 +33,7 @@ class AgentSession:
controller: AgentController | None = None
runtime: Runtime | None = None
security_analyzer: SecurityAnalyzer | None = None
_initializing: bool = False
_closed: bool = False
loop: asyncio.AbstractEventLoop | None = None

Expand Down Expand Up @@ -111,6 +115,10 @@ async def _start(
github_token: str | None = None,
selected_repository: str | None = None,
):
if self._closed:
logger.warning('Session closed before starting')
return
self._initializing = True
self._create_security_analyzer(config.security.security_analyzer)
await self._create_runtime(
runtime_name=runtime_name,
Expand All @@ -120,7 +128,7 @@ async def _start(
selected_repository=selected_repository,
)

self._create_controller(
self.controller = self._create_controller(
agent,
config.security.confirmation_mode,
max_iterations,
Expand All @@ -131,9 +139,9 @@ async def _start(
self.event_stream.add_event(
ChangeAgentStateAction(AgentState.INIT), EventSource.ENVIRONMENT
)
if self.controller:
self.controller.agent_task = self.controller.start_step_loop()
await self.controller.agent_task # type: ignore
self.controller.agent_task = self.controller.start_step_loop()
self._initializing = False
await self.controller.agent_task # type: ignore

def close(self):
"""Closes the Agent session"""
Expand All @@ -143,6 +151,18 @@ def close(self):
call_async_from_sync(self._close)

async def _close(self):
seconds_waited = 0
while self._initializing:
logger.debug(
f'Waiting for initialization to finish before closing session {self.sid}'
)
await asyncio.sleep(WAIT_TIME_BEFORE_CLOSE_INTERVAL)
seconds_waited += WAIT_TIME_BEFORE_CLOSE_INTERVAL
if seconds_waited > WAIT_TIME_BEFORE_CLOSE:
logger.error(
f'Waited too long for initialization to finish before closing session {self.sid}'
)
break
Comment on lines +154 to +165
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the main fix

if self.controller is not None:
end_state = self.controller.get_state()
end_state.save_to_session(self.sid, self.file_store)
Expand Down Expand Up @@ -209,18 +229,15 @@ async def _create_runtime(
)
return

if self.runtime is not None:
self.runtime.clone_repo(github_token, selected_repository)
if agent.prompt_manager:
agent.prompt_manager.load_microagent_files(
self.runtime.get_custom_microagents(selected_repository)
)

logger.debug(
f'Runtime initialized with plugins: {[plugin.name for plugin in self.runtime.plugins]}'
self.runtime.clone_repo(github_token, selected_repository)
if agent.prompt_manager:
agent.prompt_manager.load_microagent_files(
self.runtime.get_custom_microagents(selected_repository)
)
else:
logger.warning('Runtime initialization failed')

logger.debug(
f'Runtime initialized with plugins: {[plugin.name for plugin in self.runtime.plugins]}'
)

def _create_controller(
self,
Expand All @@ -230,7 +247,7 @@ def _create_controller(
max_budget_per_task: float | None = None,
agent_to_llm_config: dict[str, LLMConfig] | None = None,
agent_configs: dict[str, AgentConfig] | None = None,
):
) -> AgentController:
"""Creates an AgentController instance

Parameters:
Expand Down Expand Up @@ -267,7 +284,7 @@ def _create_controller(
)
logger.debug(msg)

self.controller = AgentController(
controller = AgentController(
sid=self.sid,
event_stream=self.event_stream,
agent=agent,
Expand All @@ -281,10 +298,9 @@ def _create_controller(
)
try:
agent_state = State.restore_from_session(self.sid, self.file_store)
self.controller.set_initial_state(
agent_state, max_iterations, confirmation_mode
)
controller.set_initial_state(agent_state, max_iterations, confirmation_mode)
logger.debug(f'Restored agent state from session, sid: {self.sid}')
except Exception as e:
logger.debug(f'State could not be restored: {e}')
logger.debug('Agent controller initialized.')
return controller
1 change: 1 addition & 0 deletions openhands/server/session/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,7 @@ async def _cleanup_session(self, session: Session):
)

await self._close_session(session)
return True

async def _close_session(self, session: Session):
logger.info(f'_close_session:{session.sid}')
Expand Down
Loading