From 43b06c1a8702ead31bedf5765fefb3b0b40e2b0b Mon Sep 17 00:00:00 2001 From: Robert Brennan Date: Tue, 17 Dec 2024 14:01:39 -0500 Subject: [PATCH] fix session leak --- openhands/server/session/agent_session.py | 56 +++++++++++++++-------- openhands/server/session/manager.py | 1 + 2 files changed, 37 insertions(+), 20 deletions(-) diff --git a/openhands/server/session/agent_session.py b/openhands/server/session/agent_session.py index 8ad454c7072e..d405ed7ab44a 100644 --- a/openhands/server/session/agent_session.py +++ b/openhands/server/session/agent_session.py @@ -16,6 +16,9 @@ from openhands.storage.files import FileStore from openhands.utils.async_utils import call_async_from_sync +WAIT_TIME_BEFORE_CLOSE = 300 +WAIT_TIME_BEFORE_CLOSE_INTERVAL = 5 + class AgentSession: """Represents a session with an Agent @@ -30,6 +33,7 @@ class AgentSession: controller: AgentController | None = None runtime: Runtime | None = None security_analyzer: SecurityAnalyzer | None = None + _initializing: bool = False _closed: bool = False loop: asyncio.AbstractEventLoop | None = None @@ -111,6 +115,10 @@ async def _start( github_token: str | None = None, selected_repository: str | None = None, ): + if self._closed: + logger.warning('Session closed before starting') + return + self._initializing = True self._create_security_analyzer(config.security.security_analyzer) await self._create_runtime( runtime_name=runtime_name, @@ -120,7 +128,7 @@ async def _start( selected_repository=selected_repository, ) - self._create_controller( + self.controller = self._create_controller( agent, config.security.confirmation_mode, max_iterations, @@ -131,9 +139,9 @@ async def _start( self.event_stream.add_event( ChangeAgentStateAction(AgentState.INIT), EventSource.ENVIRONMENT ) - if self.controller: - self.controller.agent_task = self.controller.start_step_loop() - await self.controller.agent_task # type: ignore + self.controller.agent_task = self.controller.start_step_loop() + self._initializing = False + await self.controller.agent_task # type: ignore def close(self): """Closes the Agent session""" @@ -143,6 +151,18 @@ def close(self): call_async_from_sync(self._close) async def _close(self): + seconds_waited = 0 + while self._initializing: + logger.debug( + f'Waiting for initialization to finish before closing session {self.sid}' + ) + await asyncio.sleep(WAIT_TIME_BEFORE_CLOSE_INTERVAL) + seconds_waited += WAIT_TIME_BEFORE_CLOSE_INTERVAL + if seconds_waited > WAIT_TIME_BEFORE_CLOSE: + logger.error( + f'Waited too long for initialization to finish before closing session {self.sid}' + ) + break if self.controller is not None: end_state = self.controller.get_state() end_state.save_to_session(self.sid, self.file_store) @@ -209,18 +229,15 @@ async def _create_runtime( ) return - if self.runtime is not None: - self.runtime.clone_repo(github_token, selected_repository) - if agent.prompt_manager: - agent.prompt_manager.load_microagent_files( - self.runtime.get_custom_microagents(selected_repository) - ) - - logger.debug( - f'Runtime initialized with plugins: {[plugin.name for plugin in self.runtime.plugins]}' + self.runtime.clone_repo(github_token, selected_repository) + if agent.prompt_manager: + agent.prompt_manager.load_microagent_files( + self.runtime.get_custom_microagents(selected_repository) ) - else: - logger.warning('Runtime initialization failed') + + logger.debug( + f'Runtime initialized with plugins: {[plugin.name for plugin in self.runtime.plugins]}' + ) def _create_controller( self, @@ -230,7 +247,7 @@ def _create_controller( max_budget_per_task: float | None = None, agent_to_llm_config: dict[str, LLMConfig] | None = None, agent_configs: dict[str, AgentConfig] | None = None, - ): + ) -> AgentController: """Creates an AgentController instance Parameters: @@ -267,7 +284,7 @@ def _create_controller( ) logger.debug(msg) - self.controller = AgentController( + controller = AgentController( sid=self.sid, event_stream=self.event_stream, agent=agent, @@ -281,10 +298,9 @@ def _create_controller( ) try: agent_state = State.restore_from_session(self.sid, self.file_store) - self.controller.set_initial_state( - agent_state, max_iterations, confirmation_mode - ) + controller.set_initial_state(agent_state, max_iterations, confirmation_mode) logger.debug(f'Restored agent state from session, sid: {self.sid}') except Exception as e: logger.debug(f'State could not be restored: {e}') logger.debug('Agent controller initialized.') + return controller diff --git a/openhands/server/session/manager.py b/openhands/server/session/manager.py index fbcf4eaa836a..d0ca96778a5b 100644 --- a/openhands/server/session/manager.py +++ b/openhands/server/session/manager.py @@ -308,6 +308,7 @@ async def _cleanup_session(self, session: Session): ) await self._close_session(session) + return True async def _close_session(self, session: Session): logger.info(f'_close_session:{session.sid}')