Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: SQL size keyword error #1103

Closed
Vasu2803k opened this issue Feb 24, 2025 · 9 comments
Closed

[Bug]: SQL size keyword error #1103

Vasu2803k opened this issue Feb 24, 2025 · 9 comments
Assignees
Labels
bug Something isn't working

Comments

@Vasu2803k
Copy link

Vasu2803k commented Feb 24, 2025

This bug appears to be related to the disk cache implementation in the AutoGen framework. Let me break down the error:

The error occurs during the chat execution when trying to cache an LLM response
The specific error is: sqlite3.OperationalError: no such column: "size" - should this be a string literal in single-quotes?
The error chain shows it's happening in the diskcache library's SQLite operations
This suggests there's a schema mismatch in the disk cache database. The cache is trying to insert a record with a "size" column, but the database table doesn't have this column defined.

Traceback (most recent call last):
File "/media/vasu/Hard Disk/Projects/CareerSage/SageBuilder/src/scripts/SageBuilder.py", line 624, in _run_group_chat
chat_result = initiator_agent.initiate_chat(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/vasu/miniconda3/envs/sagebuilder/lib/python3.11/site-packages/autogen/agentchat/conversable_agent.py", line 1106, in initiate_chat
self.send(msg2send, recipient, silent=silent)
File "/home/vasu/miniconda3/envs/sagebuilder/lib/python3.11/site-packages/autogen/agentchat/conversable_agent.py", line 741, in send
recipient.receive(message, self, request_reply, silent)
File "/home/vasu/miniconda3/envs/sagebuilder/lib/python3.11/site-packages/autogen/agentchat/conversable_agent.py", line 906, in receive
reply = self.generate_reply(messages=self.chat_messages[sender], sender=sender)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/vasu/miniconda3/envs/sagebuilder/lib/python3.11/site-packages/autogen/agentchat/conversable_agent.py", line 2060, in generate_reply
final, reply = reply_func(self, messages=messages, sender=sender, config=reply_func_tuple["config"])
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/vasu/miniconda3/envs/sagebuilder/lib/python3.11/site-packages/autogen/agentchat/groupchat.py", line 1174, in run_chat
reply = speaker.generate_reply(sender=self)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/vasu/miniconda3/envs/sagebuilder/lib/python3.11/site-packages/autogen/agentchat/conversable_agent.py", line 2060, in generate_reply
final, reply = reply_func(self, messages=messages, sender=sender, config=reply_func_tuple["config"])
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/vasu/miniconda3/envs/sagebuilder/lib/python3.11/site-packages/autogen/agentchat/conversable_agent.py", line 1428, in generate_oai_reply
extracted_response = self._generate_oai_reply_from_client(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/vasu/miniconda3/envs/sagebuilder/lib/python3.11/site-packages/autogen/agentchat/conversable_agent.py", line 1447, in _generate_oai_reply_from_client
response = llm_client.create(
^^^^^^^^^^^^^^^^^^
File "/home/vasu/miniconda3/envs/sagebuilder/lib/python3.11/site-packages/autogen/oai/client.py", line 897, in create
cache.set(key, response)
File "/home/vasu/miniconda3/envs/sagebuilder/lib/python3.11/site-packages/autogen/cache/disk_cache.py", line 73, in set
self.cache.set(key, value)
File "/home/vasu/miniconda3/envs/sagebuilder/lib/python3.11/site-packages/diskcache/core.py", line 808, in set
self._row_insert(db_key, raw, now, columns)
File "/home/vasu/miniconda3/envs/sagebuilder/lib/python3.11/site-packages/diskcache/core.py", line 857, in _row_insert
sql(
sqlite3.OperationalError: no such column: "size" - should this be a string literal in single-quotes?

Steps to reproduce

No response

Model Used

No response

Expected Behavior

No response

Screenshots and logs

No response

Additional Information

No response

@Vasu2803k Vasu2803k added the bug Something isn't working label Feb 24, 2025
@rjambrecic rjambrecic self-assigned this Feb 24, 2025
@rjambrecic rjambrecic added this to ag2 Feb 24, 2025
@rjambrecic rjambrecic moved this to Todo in ag2 Feb 24, 2025
@rjambrecic
Copy link
Collaborator

Hi @Vasu2803k ,
can you share the code example so I can reproduce the bug?

@Vasu2803k
Copy link
Author

try:

    #     # If cache is already set up, return it
    #     if self.cache_instance is not None:
    #         self.logger.info("Using existing cache instance")
    #         return self.cache_instance
        
    #     # Initialize cache manager with task_type
    #     self.cache_manager = CacheManager(
    #         cache_type=self.cache_type,
    #         user_id=self.user_id,
    #         session_id=self.session_id,
    #         config_dir=self.config_dir,
    #         task_type=self.task_type
    #     )
        
    #     # Initialize the cache manager
    #     await self.cache_manager.initialize()
        
    #     try:
    #         if self.cache_type == "redis":
    #             cache_url = self.cache_manager.get_cache_url()
    #             self.cache_instance = Cache.redis(redis_url=cache_url, cache_seed=42)
    #         else:  # disk cache
    #             # Get cache path for disk cache
    #             cache_path = str(Path(".cache") / str(self.user_id))
    #             Path(cache_path).mkdir(parents=True, exist_ok=True)
                
    #             # Initialize disk cache with proper schema configuration
    #             self.cache_instance = Cache.disk(
    #                 cache_path_root=cache_path,
    #                 cache_seed=42
    #             )
                
    #             self.logger.debug(f"Set up disk cache at: {cache_path}")
            
    #         return self.cache_instance
            
    #     except Exception as e:
    #         self.logger.error(f"Failed to initialize cache: {str(e)}")
    #         self.logger.info("Attempting to create fresh disk cache")
            
    #         try:
    #             # Try to create a fresh cache with minimal configuration
    #             cache_path = str(Path(".cache") / str(self.user_id))
    #             Path(cache_path).mkdir(parents=True, exist_ok=True)
                
    #             self.cache_instance = Cache.disk(
    #                 cache_path_root=cache_path,
    #                 cache_seed=42
    #             )
    #             return self.cache_instance
    #         except Exception as inner_e:
    #             self.logger.error(f"Failed to create fresh disk cache: {str(inner_e)}")
    #             self.logger.info("Proceeding without cache")
    #             return None
            
    # except Exception as e:
    #     self.logger.error(f"Failed to setup cache: {str(e)}")
    #     self.logger.info("Proceeding without cache")



        start_time = time.time()
        chat_result = initiator_agent.initiate_chat(
            manager,
            message=prompt
            cache=cache 
        )

@Vasu2803k
Copy link
Author

'''from typing import Optional, Dict, Any, Union
import redis.asyncio as aioredis
import os
from pathlib import Path
import yaml
import logging
from urllib.parse import quote_plus
import asyncio
import dotenv
from datetime import datetime
from src.tools.setup_logging import setup_logging

dotenv.load_dotenv()

class CacheManager:
"""Manages cache client initialization with support for Redis and disk cache fallback."""

def __init__(self, cache_type: str, user_id: str, config_dir: str, session_id: str = None, task_type: str = "general"):
    self.cache_type = cache_type.lower()
    if self.cache_type not in ["redis", "disk"]:
        self.cache_type = "disk"
    self.user_id = user_id or "default"
    self.session_id = session_id or datetime.now().strftime('%Y%m%d_%H%M%S')
    
    # Initialize logging with passed task_type
    self.logger = setup_logging(
        log_level="INFO",
        log_dir="data/output/logs",
        user_id=self.user_id,
        session_id=self.session_id,
        task_type=task_type
    )
    self.config = self._load_cache_config(config_dir)
    self.client = None
    
async def initialize(self) -> None:
    """Initialize the cache client asynchronously."""
    await self._initialize_cache_client()

def _load_cache_config(self, config_dir: str) -> Dict:
    """Load cache configuration from YAML file."""
    try:
        config_path = Path(config_dir) / 'cache_config.yaml'
        with open(config_path, 'r') as f:
            config = yaml.safe_load(f)
        
        # Get Redis config and interpolate environment variables
        redis_config = config.get('cache', {}).get('redis', {})
        processed_config = {}
        
        for key, value in redis_config.items():
            if isinstance(value, str) and value.startswith('${') and value.endswith('}'):
                env_var = value[2:-1]  # Remove ${ and }
                processed_config[key] = os.getenv(env_var)
            else:
                processed_config[key] = value
                
        # Convert numeric values
        numeric_fields = ['port', 'db', 'max_connections', 'socket_timeout', 
                         'socket_connect_timeout', 'health_check_interval']
        for field in numeric_fields:
            if field in processed_config and processed_config[field]:
                try:
                    processed_config[field] = int(processed_config[field])
                except (TypeError, ValueError):
                    self.logger.warning(f"Invalid value for {field}, using default")
                    processed_config[field] = None
                    
        # Convert boolean values
        boolean_fields = ['retry_on_timeout']
        for field in boolean_fields:
            if field in processed_config and processed_config[field]:
                if isinstance(processed_config[field], str):
                    processed_config[field] = processed_config[field].lower() == 'true'
                    
        return {'cache': {'redis': processed_config}}
        
    except Exception as e:
        self.logger.error(f"Failed to load cache config: {str(e)}")
        return {}

async def _initialize_cache_client(self) -> None:
    """Initialize appropriate cache client based on configuration."""
    try:
        if self.cache_type == "redis":
            await self._initialize_redis_client()
        else:  # disk cache
            self.logger.info("Using disk cache")
            self.client = None
    except Exception as e:
        self.logger.error(f"Failed to initialize cache client: {str(e)}")
        self.client = None
        self.cache_type = "disk"

async def _initialize_redis_client(self) -> None:
    """Initialize Redis client with improved connection handling."""
    try:
        redis_config = self.config.get('cache', {}).get('redis', {})
        
        # Validate required Redis configuration
        required_fields = ['host', 'port']
        for field in required_fields:
            if not redis_config.get(field):
                raise ValueError(f"Missing required Redis configuration: {field}")
        
        # Add default timeouts if not specified
        default_timeouts = {
            'socket_timeout': 5,
            'socket_connect_timeout': 5,
            'socket_keepalive': True,
            'health_check_interval': 30,
            'retry_on_timeout': True,
            'max_connections': 100
        }
        
        # Merge defaults with config
        for key, value in default_timeouts.items():
            if key not in redis_config or redis_config[key] is None:
                redis_config[key] = value
        
        # Initialize async Redis client with proper error handling for config values
        self.client = aioredis.Redis(
            host=redis_config['host'],
            port=redis_config['port'],
            db=redis_config.get('db', 0),
            password=redis_config.get('password', None),  # Make password optional
            max_connections=redis_config.get('max_connections', 100),
            socket_timeout=redis_config.get('socket_timeout', 5),
            socket_connect_timeout=redis_config.get('socket_connect_timeout', 5),
            socket_keepalive=redis_config.get('socket_keepalive', True),
            health_check_interval=redis_config.get('health_check_interval', 30),
            retry_on_timeout=redis_config.get('retry_on_timeout', True),
            encoding='utf-8',
            decode_responses=True
        )
        
        # Test connection immediately
        await self.client.ping()
        self.logger.info("Redis client initialized and connection tested successfully")
        
    except (TypeError, ValueError) as e:
        self.logger.error(f"Invalid Redis configuration: {str(e)}")
        raise
    except aioredis.ConnectionError as e:
        self.logger.error(f"Failed to connect to Redis: {str(e)}")
        raise
    except aioredis.RedisError as e:
        self.logger.error(f"Redis error during initialization: {str(e)}")
        raise
    except Exception as e:
        self.logger.error(f"Unexpected error initializing Redis client: {str(e)}")
        raise

async def health_check(self) -> bool:
    """Perform health check on the cache connection."""
    try:
        if self.cache_type == "redis":
            return await self._redis_health_check()
        else:  # disk cache
            return await self._disk_health_check()
    except Exception as e:
        self.logger.error(f"Health check failed: {str(e)}")
        return False

async def _redis_health_check(self) -> bool:
    """Check Redis connection health with retries and better error handling."""
    max_retries = 3
    retry_delay = 1  # seconds
    
    for attempt in range(max_retries):
        try:
            if not self.client:
                self.logger.error("Redis client not initialized")
                return False
            
            await self.client.ping()
            return True
            
        except (aioredis.ConnectionError, aioredis.TimeoutError) as e:
            self.logger.warning(f"Redis ping failed (attempt {attempt + 1}/{max_retries}): {str(e)}")
            if attempt < max_retries - 1:
                await asyncio.sleep(retry_delay * (attempt + 1))
            continue
                
        except Exception as e:
            self.logger.error(f"Redis health check error (attempt {attempt + 1}/{max_retries}): {str(e)}")
            if attempt < max_retries - 1:
                await asyncio.sleep(retry_delay * (attempt + 1))
            continue
    
    self.logger.error("Redis health check failed after all retries")
    return False

async def _disk_health_check(self) -> bool:
    """Check disk cache health."""
    try:
        # Create test directory path without session_id
        cache_dir = Path(".cache") / str(self.user_id)
        cache_dir.mkdir(parents=True, exist_ok=True)
        
        # Check write permissions
        if not os.access(cache_dir, os.W_OK):
            self.logger.error(f"No write permission for cache directory: {cache_dir}")
            return False
        
        # Try to create a test file
        test_file = cache_dir / "test.txt"
        try:
            test_file.write_text("test")
            test_file.unlink()  # Clean up test file
        except Exception as e:
            self.logger.error(f"Failed to write test file: {str(e)}")
            return False
        
        return True
    
    except Exception as e:
        self.logger.error(f"Disk cache health check failed: {str(e)}")
        return False

def get_cache_url(self) -> str:
    """Get cache URL or path for cache setup."""
    try:
        if self.cache_type == "redis":
            redis_config = self.config.get('cache', {}).get('redis', {})
            if not redis_config.get('host') or not redis_config.get('port'):
                self.logger.error("Missing Redis host or port configuration")
                raise ValueError("Invalid Redis configuration")
            
            # Normalize and validate host
            host = redis_config['host'].strip()
            
            # Handle IPv4, IPv6, and hostnames
            if ':' in host and not host.startswith('['):  # IPv6
                host = f'[{host}]'
            elif not any(c.isascii() for c in host):  # Non-ASCII hostname
                try:
                    host = host.encode('idna').decode('ascii')
                except UnicodeError as e:
                    self.logger.error(f"Invalid hostname encoding: {str(e)}")
                    raise ValueError("Invalid hostname encoding")
            
            # Validate host length
            if len(host) > 253:  # Max DNS name length
                self.logger.error("Redis host name too long")
                raise ValueError("Redis host name exceeds maximum length")
            
            # Clean and validate components
            port = int(redis_config['port'])
            if not (1 <= port <= 65535):
                raise ValueError(f"Invalid port number: {port}")
            
            password = redis_config.get('password', '')
            if password:
                # URL encode special characters in password
                password = quote_plus(password)
                # Validate encoded password length
                if len(password) > 512:  # reasonable max length
                    raise ValueError("Password too long after encoding")
            
            db = redis_config.get('db', 0)
            if not isinstance(db, int) or db < 0:
                db = 0
            
            # Construct URL components
            scheme = "redis"
            auth = f":{password}@" if password else ""
            path = f"/{db}" if db else ""
            
            # Build the final URL
            url = f"{scheme}://{auth}{host}:{port}{path}"
            
            # Validate final URL length
            if len(url) > 2048:  # standard URL length limit
                raise ValueError("Generated Redis URL exceeds maximum length")
            
            self.logger.debug(f"Generated Redis URL format: {scheme}://{auth}[masked]:{port}{path}")
            return url
            
        else:  # disk cache
            # Only use user_id for cache path, removing session_id dependency
            cache_dir = Path(".cache") / str(self.user_id)
            cache_dir.mkdir(parents=True, exist_ok=True)
            return str(cache_dir)
        
    except Exception as e:
        self.logger.error(f"Failed to get cache URL: {str(e)}")
        # Fall back to disk cache path without session_id
        cache_dir = Path(".cache") / str(self.user_id)
        cache_dir.mkdir(parents=True, exist_ok=True)
        return str(cache_dir)

def get_cache_path(self) -> Path:
    """Get the cache directory path."""
    try:
        if self.cache_type == "disk":
            # Only use user_id for cache path, removing session_id dependency
            cache_dir = Path(".cache") / str(self.user_id)
            cache_dir.mkdir(parents=True, exist_ok=True)
            return cache_dir
        else:
            raise ValueError(f"Cache path not applicable for {self.cache_type} cache type")
    except Exception as e:
        self.logger.error(f"Failed to get cache path: {str(e)}")
        # Return default cache path without session_id
        cache_dir = Path(".cache") / str(self.user_id)
        cache_dir.mkdir(parents=True, exist_ok=True)
        return cache_dir

'''

@rjambrecic
Copy link
Collaborator

'''from typing import Optional, Dict, Any, Union import redis.asyncio as aioredis import os from pathlib import Path import yaml import logging from urllib.parse import quote_plus import asyncio import dotenv from datetime import datetime from src.tools.setup_logging import setup_logging

dotenv.load_dotenv()

class CacheManager: """Manages cache client initialization with support for Redis and disk cache fallback."""

def __init__(self, cache_type: str, user_id: str, config_dir: str, session_id: str = None, task_type: str = "general"):
    self.cache_type = cache_type.lower()
    if self.cache_type not in ["redis", "disk"]:
        self.cache_type = "disk"
    self.user_id = user_id or "default"
    self.session_id = session_id or datetime.now().strftime('%Y%m%d_%H%M%S')
    
    # Initialize logging with passed task_type
    self.logger = setup_logging(
        log_level="INFO",
        log_dir="data/output/logs",
        user_id=self.user_id,
        session_id=self.session_id,
        task_type=task_type
    )
    self.config = self._load_cache_config(config_dir)
    self.client = None
    
async def initialize(self) -> None:
    """Initialize the cache client asynchronously."""
    await self._initialize_cache_client()

def _load_cache_config(self, config_dir: str) -> Dict:
    """Load cache configuration from YAML file."""
    try:
        config_path = Path(config_dir) / 'cache_config.yaml'
        with open(config_path, 'r') as f:
            config = yaml.safe_load(f)
        
        # Get Redis config and interpolate environment variables
        redis_config = config.get('cache', {}).get('redis', {})
        processed_config = {}
        
        for key, value in redis_config.items():
            if isinstance(value, str) and value.startswith('${') and value.endswith('}'):
                env_var = value[2:-1]  # Remove ${ and }
                processed_config[key] = os.getenv(env_var)
            else:
                processed_config[key] = value
                
        # Convert numeric values
        numeric_fields = ['port', 'db', 'max_connections', 'socket_timeout', 
                         'socket_connect_timeout', 'health_check_interval']
        for field in numeric_fields:
            if field in processed_config and processed_config[field]:
                try:
                    processed_config[field] = int(processed_config[field])
                except (TypeError, ValueError):
                    self.logger.warning(f"Invalid value for {field}, using default")
                    processed_config[field] = None
                    
        # Convert boolean values
        boolean_fields = ['retry_on_timeout']
        for field in boolean_fields:
            if field in processed_config and processed_config[field]:
                if isinstance(processed_config[field], str):
                    processed_config[field] = processed_config[field].lower() == 'true'
                    
        return {'cache': {'redis': processed_config}}
        
    except Exception as e:
        self.logger.error(f"Failed to load cache config: {str(e)}")
        return {}

async def _initialize_cache_client(self) -> None:
    """Initialize appropriate cache client based on configuration."""
    try:
        if self.cache_type == "redis":
            await self._initialize_redis_client()
        else:  # disk cache
            self.logger.info("Using disk cache")
            self.client = None
    except Exception as e:
        self.logger.error(f"Failed to initialize cache client: {str(e)}")
        self.client = None
        self.cache_type = "disk"

async def _initialize_redis_client(self) -> None:
    """Initialize Redis client with improved connection handling."""
    try:
        redis_config = self.config.get('cache', {}).get('redis', {})
        
        # Validate required Redis configuration
        required_fields = ['host', 'port']
        for field in required_fields:
            if not redis_config.get(field):
                raise ValueError(f"Missing required Redis configuration: {field}")
        
        # Add default timeouts if not specified
        default_timeouts = {
            'socket_timeout': 5,
            'socket_connect_timeout': 5,
            'socket_keepalive': True,
            'health_check_interval': 30,
            'retry_on_timeout': True,
            'max_connections': 100
        }
        
        # Merge defaults with config
        for key, value in default_timeouts.items():
            if key not in redis_config or redis_config[key] is None:
                redis_config[key] = value
        
        # Initialize async Redis client with proper error handling for config values
        self.client = aioredis.Redis(
            host=redis_config['host'],
            port=redis_config['port'],
            db=redis_config.get('db', 0),
            password=redis_config.get('password', None),  # Make password optional
            max_connections=redis_config.get('max_connections', 100),
            socket_timeout=redis_config.get('socket_timeout', 5),
            socket_connect_timeout=redis_config.get('socket_connect_timeout', 5),
            socket_keepalive=redis_config.get('socket_keepalive', True),
            health_check_interval=redis_config.get('health_check_interval', 30),
            retry_on_timeout=redis_config.get('retry_on_timeout', True),
            encoding='utf-8',
            decode_responses=True
        )
        
        # Test connection immediately
        await self.client.ping()
        self.logger.info("Redis client initialized and connection tested successfully")
        
    except (TypeError, ValueError) as e:
        self.logger.error(f"Invalid Redis configuration: {str(e)}")
        raise
    except aioredis.ConnectionError as e:
        self.logger.error(f"Failed to connect to Redis: {str(e)}")
        raise
    except aioredis.RedisError as e:
        self.logger.error(f"Redis error during initialization: {str(e)}")
        raise
    except Exception as e:
        self.logger.error(f"Unexpected error initializing Redis client: {str(e)}")
        raise

async def health_check(self) -> bool:
    """Perform health check on the cache connection."""
    try:
        if self.cache_type == "redis":
            return await self._redis_health_check()
        else:  # disk cache
            return await self._disk_health_check()
    except Exception as e:
        self.logger.error(f"Health check failed: {str(e)}")
        return False

async def _redis_health_check(self) -> bool:
    """Check Redis connection health with retries and better error handling."""
    max_retries = 3
    retry_delay = 1  # seconds
    
    for attempt in range(max_retries):
        try:
            if not self.client:
                self.logger.error("Redis client not initialized")
                return False
            
            await self.client.ping()
            return True
            
        except (aioredis.ConnectionError, aioredis.TimeoutError) as e:
            self.logger.warning(f"Redis ping failed (attempt {attempt + 1}/{max_retries}): {str(e)}")
            if attempt < max_retries - 1:
                await asyncio.sleep(retry_delay * (attempt + 1))
            continue
                
        except Exception as e:
            self.logger.error(f"Redis health check error (attempt {attempt + 1}/{max_retries}): {str(e)}")
            if attempt < max_retries - 1:
                await asyncio.sleep(retry_delay * (attempt + 1))
            continue
    
    self.logger.error("Redis health check failed after all retries")
    return False

async def _disk_health_check(self) -> bool:
    """Check disk cache health."""
    try:
        # Create test directory path without session_id
        cache_dir = Path(".cache") / str(self.user_id)
        cache_dir.mkdir(parents=True, exist_ok=True)
        
        # Check write permissions
        if not os.access(cache_dir, os.W_OK):
            self.logger.error(f"No write permission for cache directory: {cache_dir}")
            return False
        
        # Try to create a test file
        test_file = cache_dir / "test.txt"
        try:
            test_file.write_text("test")
            test_file.unlink()  # Clean up test file
        except Exception as e:
            self.logger.error(f"Failed to write test file: {str(e)}")
            return False
        
        return True
    
    except Exception as e:
        self.logger.error(f"Disk cache health check failed: {str(e)}")
        return False

def get_cache_url(self) -> str:
    """Get cache URL or path for cache setup."""
    try:
        if self.cache_type == "redis":
            redis_config = self.config.get('cache', {}).get('redis', {})
            if not redis_config.get('host') or not redis_config.get('port'):
                self.logger.error("Missing Redis host or port configuration")
                raise ValueError("Invalid Redis configuration")
            
            # Normalize and validate host
            host = redis_config['host'].strip()
            
            # Handle IPv4, IPv6, and hostnames
            if ':' in host and not host.startswith('['):  # IPv6
                host = f'[{host}]'
            elif not any(c.isascii() for c in host):  # Non-ASCII hostname
                try:
                    host = host.encode('idna').decode('ascii')
                except UnicodeError as e:
                    self.logger.error(f"Invalid hostname encoding: {str(e)}")
                    raise ValueError("Invalid hostname encoding")
            
            # Validate host length
            if len(host) > 253:  # Max DNS name length
                self.logger.error("Redis host name too long")
                raise ValueError("Redis host name exceeds maximum length")
            
            # Clean and validate components
            port = int(redis_config['port'])
            if not (1 <= port <= 65535):
                raise ValueError(f"Invalid port number: {port}")
            
            password = redis_config.get('password', '')
            if password:
                # URL encode special characters in password
                password = quote_plus(password)
                # Validate encoded password length
                if len(password) > 512:  # reasonable max length
                    raise ValueError("Password too long after encoding")
            
            db = redis_config.get('db', 0)
            if not isinstance(db, int) or db < 0:
                db = 0
            
            # Construct URL components
            scheme = "redis"
            auth = f":{password}@" if password else ""
            path = f"/{db}" if db else ""
            
            # Build the final URL
            url = f"{scheme}://{auth}{host}:{port}{path}"
            
            # Validate final URL length
            if len(url) > 2048:  # standard URL length limit
                raise ValueError("Generated Redis URL exceeds maximum length")
            
            self.logger.debug(f"Generated Redis URL format: {scheme}://{auth}[masked]:{port}{path}")
            return url
            
        else:  # disk cache
            # Only use user_id for cache path, removing session_id dependency
            cache_dir = Path(".cache") / str(self.user_id)
            cache_dir.mkdir(parents=True, exist_ok=True)
            return str(cache_dir)
        
    except Exception as e:
        self.logger.error(f"Failed to get cache URL: {str(e)}")
        # Fall back to disk cache path without session_id
        cache_dir = Path(".cache") / str(self.user_id)
        cache_dir.mkdir(parents=True, exist_ok=True)
        return str(cache_dir)

def get_cache_path(self) -> Path:
    """Get the cache directory path."""
    try:
        if self.cache_type == "disk":
            # Only use user_id for cache path, removing session_id dependency
            cache_dir = Path(".cache") / str(self.user_id)
            cache_dir.mkdir(parents=True, exist_ok=True)
            return cache_dir
        else:
            raise ValueError(f"Cache path not applicable for {self.cache_type} cache type")
    except Exception as e:
        self.logger.error(f"Failed to get cache path: {str(e)}")
        # Return default cache path without session_id
        cache_dir = Path(".cache") / str(self.user_id)
        cache_dir.mkdir(parents=True, exist_ok=True)
        return cache_dir

'''

@Vasu2803k To better understand and debug the problem, could you please provide a minimal, self-contained code example that reproduces the issue? A smaller snippet (around 10-20 lines) would make it much easier to diagnose and fix.

@Vasu2803k
Copy link
Author

Vasu2803k commented Feb 24, 2025

Here is the smaller snippet that you have asked for.

try:
    if self.cache_instance is not None:
        self.logger.info("Using existing cache instance")
        return self.cache_instance

    self.cache_manager = CacheManager(
        cache_type=self.cache_type,
        user_id=self.user_id,
        session_id=self.session_id,
        config_dir=self.config_dir,
        task_type=self.task_type
    )

    await self.cache_manager.initialize()

    try:
        if self.cache_type == "redis":
            cache_url = self.cache_manager.get_cache_url()
            self.cache_instance = Cache.redis(redis_url=cache_url, cache_seed=42)
        else:
            cache_path = str(Path(".cache") / str(self.user_id))
            Path(cache_path).mkdir(parents=True, exist_ok=True)
            self.cache_instance = Cache.disk(cache_path_root=cache_path, cache_seed=42)

        return self.cache_instance

    except Exception as e:
        self.logger.error(f"Failed to initialize cache: {str(e)}")
        self.logger.info("Attempting to create fresh disk cache")

        try:
            cache_path = str(Path(".cache") / str(self.user_id))
            Path(cache_path).mkdir(parents=True, exist_ok=True)
            self.cache_instance = Cache.disk(cache_path_root=cache_path, cache_seed=42)
            return self.cache_instance
        except Exception as inner_e:
            self.logger.error(f"Failed to create fresh disk cache: {str(inner_e)}")
            self.logger.info("Proceeding without cache")
            return None

except Exception as e:
    self.logger.error(f"Failed to setup cache: {str(e)}")
    self.logger.info("Proceeding without cache")

start_time = time.time()
chat_result = initiator_agent.initiate_chat(
    manager,
    message=prompt,
    cache=cache
)

@rjambrecic rjambrecic moved this from Todo to In Progress in ag2 Feb 25, 2025
@rjambrecic
Copy link
Collaborator

@Vasu2803k
did you take a look on how to use cache in AG2 - Example ?

import os
from autogen import AssistantAgent, Cache, UserProxyAgent

config_list = [
    {
        "api_type": "openai",
        "model": "gpt-4o",
        "api_key": os.environ["OPENAI_API_KEY"]
    }
]

user_proxy = UserProxyAgent(
    name="user_proxy",
    human_input_mode="NEVER",
)
assistant = AssistantAgent(
    name="assistant",
    llm_config={"config_list": config_list},
)


with Cache.disk(cache_path_root="/tmp/.cache-test") as cache:
    user_proxy.initiate_chat(
        recipient=assistant,
        message="What is the capital of France?",
        cache=cache,
    )

and for Redis it should be pretty much the same:

with Cache.redis(redis_url=...) as cache:
  ...

@Vasu2803k
Copy link
Author

You think I haven't gone through this and raised a bug. Earlier it was working well before your latest update on Feb 15, 2025. All of a sudden when I ran the whole system, it raised an error due to disk cache. I was not using redis and I had not initialized it. It is not working for some reason. Could you check the code that I gave you and try to reproduce the issue?

@davorrunje
Copy link
Collaborator

@Vasu2803k Thank you again for reporting this weird behaviour. It seems that it is caused by an updated sqlite3 library on your system, we found a number of similar issues in other repos:

Please check your version of sqlite3 by running:

sqlite3 --version

If the version is 3.49.1, please try downgrading it (it seems like 3.42.0 does not have this problem).

In any case, please let us know what happened and we'll take it from there.

@Vasu2803k
Copy link
Author

Vasu2803k commented Feb 26, 2025

@davorrunje Thank you for providing the solution to resolve the issue. It worked fine after downgrading the sqlite version to 3.42.0. You are a legend.

@github-project-automation github-project-automation bot moved this from In Progress to Done in ag2 Feb 26, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
Status: Done
Development

When branches are created from issues, their pull requests are automatically linked.

3 participants