Troubleshooting Database Connection Issues: A Complete Guide
Database connection issues are among the most common problems developers face when building AI applications. This comprehensive guide provides systematic approaches to diagnosing and resolving connection problems with DataBridge AI.
Common Connection Problems
Connection Timeout Issues
The most frequent connection problem is timeouts:
# Example of connection timeout error
try:
connection = await mcp_client.connect({
'host': 'database.example.com',
'port': 5432,
'database': 'myapp',
'username': 'user',
'password': 'password'
})
except ConnectionTimeoutError as e:
print(f"Connection timeout: {e}")
# Implement retry logic
Common Causes:
- Network latency or packet loss
- Database server overload
- Firewall blocking connections
- Incorrect connection parameters
Authentication Failures
Authentication problems can be tricky to diagnose:
class AuthenticationDiagnostic:
def diagnose_auth_failure(self, error):
if "password authentication failed" in str(error):
return self.check_credentials()
elif "role does not exist" in str(error):
return self.check_user_exists()
elif "database does not exist" in str(error):
return self.check_database_exists()
elif "SSL required" in str(error):
return self.check_ssl_configuration()
def check_credentials(self):
return {
'issue': 'Invalid credentials',
'solutions': [
'Verify username and password',
'Check for special characters in password',
'Ensure user has login permissions',
'Check password expiration'
]
}
Connection Pool Exhaustion
When connection pools are exhausted:
class ConnectionPoolMonitor:
def __init__(self, pool):
self.pool = pool
self.metrics = {}
def monitor_pool_health(self):
self.metrics = {
'active_connections': self.pool.active_count,
'idle_connections': self.pool.idle_count,
'total_connections': self.pool.total_count,
'max_connections': self.pool.max_size,
'utilization': self.pool.active_count / self.pool.max_size
}
if self.metrics['utilization'] > 0.9:
self.alert_high_utilization()
return self.metrics
def alert_high_utilization(self):
print("WARNING: Connection pool utilization > 90%")
print("Consider increasing pool size or optimizing query performance")
Diagnostic Tools and Techniques
Connection Testing Framework
Build a comprehensive connection testing framework:
class ConnectionDiagnostic:
def __init__(self, mcp_client):
self.mcp_client = mcp_client
self.test_results = []
async def run_full_diagnostic(self, connection_config):
tests = [
self.test_network_connectivity,
self.test_dns_resolution,
self.test_port_accessibility,
self.test_ssl_handshake,
self.test_authentication,
self.test_database_access,
self.test_query_execution
]
for test in tests:
result = await test(connection_config)
self.test_results.append(result)
if not result.passed:
print(f"FAILED: {result.test_name} - {result.error}")
if result.critical:
break
return self.generate_diagnostic_report()
async def test_network_connectivity(self, config):
try:
# Test basic network connectivity
import socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(5)
result = sock.connect_ex((config.host, config.port))
sock.close()
return TestResult(
test_name="Network Connectivity",
passed=result == 0,
error=None if result == 0 else f"Cannot connect to {config.host}:{config.port}",
critical=True
)
except Exception as e:
return TestResult("Network Connectivity", False, str(e), True)
Performance Monitoring
Monitor connection performance metrics:
class ConnectionPerformanceMonitor:
def __init__(self):
self.metrics = {}
self.thresholds = {
'connection_time': 1000, # ms
'query_time': 5000, # ms
'error_rate': 0.05 # 5%
}
async def measure_connection_performance(self, connection_config):
start_time = time.time()
try:
# Measure connection establishment time
connection = await self.mcp_client.connect(connection_config)
connection_time = (time.time() - start_time) * 1000
# Measure query performance
query_start = time.time()
await self.mcp_client.query(connection, "SELECT 1")
query_time = (time.time() - query_start) * 1000
# Record metrics
self.record_metrics({
'connection_time': connection_time,
'query_time': query_time,
'success': True
})
await self.mcp_client.disconnect(connection)
except Exception as e:
self.record_metrics({
'connection_time': None,
'query_time': None,
'success': False,
'error': str(e)
})
def record_metrics(self, metrics):
timestamp = datetime.utcnow()
self.metrics[timestamp] = metrics
# Check thresholds and alert if necessary
self.check_performance_thresholds(metrics)
Network-Level Troubleshooting
DNS Resolution Issues
Diagnose DNS problems:
import socket
import dns.resolver
class DNSDiagnostic:
def diagnose_dns_issues(self, hostname):
results = {
'hostname': hostname,
'system_dns': None,
'custom_dns': None,
'reverse_dns': None,
'recommendations': []
}
# Test system DNS resolution
try:
ip_address = socket.gethostbyname(hostname)
results['system_dns'] = {
'success': True,
'ip_address': ip_address,
'resolution_time': self.measure_dns_time(hostname)
}
except socket.gaierror as e:
results['system_dns'] = {
'success': False,
'error': str(e)
}
results['recommendations'].append('Check DNS server configuration')
# Test with custom DNS servers
for dns_server in ['8.8.8.8', '1.1.1.1']:
try:
resolver = dns.resolver.Resolver()
resolver.nameservers = [dns_server]
answer = resolver.resolve(hostname, 'A')
results['custom_dns'] = {
'success': True,
'dns_server': dns_server,
'ip_addresses': [str(rdata) for rdata in answer]
}
break
except Exception as e:
continue
return results
Firewall and Security Group Issues
Check firewall configurations:
class FirewallDiagnostic:
def check_port_accessibility(self, host, port, timeout=5):
import socket
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
result = sock.connect_ex((host, port))
sock.close()
if result == 0:
return {
'accessible': True,
'message': f'Port {port} is accessible on {host}'
}
else:
return {
'accessible': False,
'message': f'Port {port} is not accessible on {host}',
'recommendations': [
'Check firewall rules',
'Verify security group settings',
'Ensure database server is running',
'Check if port is correct'
]
}
except Exception as e:
return {
'accessible': False,
'error': str(e),
'recommendations': [
'Check network connectivity',
'Verify hostname resolution'
]
}
Database-Specific Troubleshooting
PostgreSQL Connection Issues
Common PostgreSQL-specific problems:
class PostgreSQLDiagnostic:
def diagnose_postgresql_issues(self, error_message):
error_patterns = {
'FATAL: password authentication failed': {
'cause': 'Invalid credentials',
'solutions': [
'Verify username and password',
'Check pg_hba.conf authentication method',
'Ensure user exists in database'
]
},
'FATAL: database .* does not exist': {
'cause': 'Database does not exist',
'solutions': [
'Create the database',
'Check database name spelling',
'Verify connection to correct server'
]
},
'FATAL: no pg_hba.conf entry': {
'cause': 'Host-based authentication failure',
'solutions': [
'Add entry to pg_hba.conf',
'Reload PostgreSQL configuration',
'Check client IP address'
]
},
'connection refused': {
'cause': 'PostgreSQL server not accepting connections',
'solutions': [
'Start PostgreSQL service',
'Check listen_addresses in postgresql.conf',
'Verify port configuration'
]
}
}
for pattern, info in error_patterns.items():
if pattern.lower() in error_message.lower():
return info
return {'cause': 'Unknown error', 'solutions': ['Check PostgreSQL logs']}
MongoDB Connection Issues
MongoDB-specific troubleshooting:
class MongoDBDiagnostic:
def diagnose_mongodb_issues(self, error_message):
error_patterns = {
'Authentication failed': {
'cause': 'Invalid credentials or authentication database',
'solutions': [
'Verify username and password',
'Check authentication database (usually admin)',
'Ensure user has proper roles'
]
},
'No suitable servers found': {
'cause': 'Cannot connect to MongoDB servers',
'solutions': [
'Check connection string format',
'Verify server addresses and ports',
'Check network connectivity',
'Verify replica set configuration'
]
},
'SSL handshake failed': {
'cause': 'SSL/TLS configuration issues',
'solutions': [
'Check SSL certificate validity',
'Verify SSL configuration on server',
'Update SSL/TLS settings in connection string'
]
}
}
for pattern, info in error_patterns.items():
if pattern in error_message:
return info
return {'cause': 'Unknown MongoDB error', 'solutions': ['Check MongoDB logs']}
Performance Optimization
Query Performance Analysis
Analyze and optimize query performance:
class QueryPerformanceAnalyzer:
def __init__(self, mcp_client):
self.mcp_client = mcp_client
self.slow_query_threshold = 1000 # ms
async def analyze_query_performance(self, query, connection):
# Execute query with timing
start_time = time.time()
try:
# Get query execution plan
explain_query = f"EXPLAIN ANALYZE {query}"
execution_plan = await self.mcp_client.query(connection, explain_query)
# Execute actual query
result = await self.mcp_client.query(connection, query)
execution_time = (time.time() - start_time) * 1000
analysis = {
'query': query,
'execution_time_ms': execution_time,
'execution_plan': execution_plan,
'is_slow': execution_time > self.slow_query_threshold,
'recommendations': self.generate_recommendations(execution_plan, execution_time)
}
return analysis
except Exception as e:
return {
'query': query,
'error': str(e),
'recommendations': ['Check query syntax', 'Verify table/column names']
}
def generate_recommendations(self, execution_plan, execution_time):
recommendations = []
if execution_time > self.slow_query_threshold:
recommendations.append('Query is slow - consider optimization')
# Analyze execution plan for common issues
plan_text = str(execution_plan).lower()
if 'seq scan' in plan_text:
recommendations.append('Sequential scan detected - consider adding indexes')
if 'nested loop' in plan_text and 'large' in plan_text:
recommendations.append('Expensive nested loop - consider query restructuring')
if 'sort' in plan_text and 'disk' in plan_text:
recommendations.append('Sort spilling to disk - increase work_mem or add index')
return recommendations
Connection Pool Optimization
Optimize connection pool settings:
class ConnectionPoolOptimizer:
def __init__(self, mcp_client):
self.mcp_client = mcp_client
self.metrics_history = []
def analyze_pool_usage(self, pool_metrics):
# Collect metrics over time
self.metrics_history.append({
'timestamp': datetime.utcnow(),
'active_connections': pool_metrics['active_connections'],
'idle_connections': pool_metrics['idle_connections'],
'wait_time': pool_metrics.get('wait_time', 0)
})
# Keep only recent history
cutoff_time = datetime.utcnow() - timedelta(hours=1)
self.metrics_history = [
m for m in self.metrics_history
if m['timestamp'] > cutoff_time
]
return self.generate_pool_recommendations()
def generate_pool_recommendations(self):
if not self.metrics_history:
return []
recommendations = []
# Calculate average utilization
avg_active = sum(m['active_connections'] for m in self.metrics_history) / len(self.metrics_history)
max_active = max(m['active_connections'] for m in self.metrics_history)
avg_wait_time = sum(m.get('wait_time', 0) for m in self.metrics_history) / len(self.metrics_history)
# Generate recommendations based on patterns
if avg_wait_time > 100: # ms
recommendations.append({
'type': 'increase_pool_size',
'message': 'High wait times detected - consider increasing pool size',
'current_avg_wait': avg_wait_time
})
if max_active < avg_active * 1.5:
recommendations.append({
'type': 'decrease_pool_size',
'message': 'Pool may be oversized - consider reducing max connections',
'utilization': avg_active / max_active
})
return recommendations
Automated Monitoring and Alerting
Health Check System
Implement automated health checks:
class DatabaseHealthChecker:
def __init__(self, mcp_client, alert_manager):
self.mcp_client = mcp_client
self.alert_manager = alert_manager
self.health_checks = [
self.check_connection_availability,
self.check_query_performance,
self.check_connection_pool_health,
self.check_database_locks,
self.check_disk_space
]
async def run_health_checks(self, connection_config):
health_report = {
'timestamp': datetime.utcnow(),
'overall_status': 'healthy',
'checks': []
}
for check in self.health_checks:
try:
result = await check(connection_config)
health_report['checks'].append(result)
if result['status'] == 'critical':
health_report['overall_status'] = 'critical'
await self.alert_manager.send_critical_alert(result)
elif result['status'] == 'warning' and health_report['overall_status'] == 'healthy':
health_report['overall_status'] = 'warning'
except Exception as e:
error_result = {
'check_name': check.__name__,
'status': 'error',
'message': str(e)
}
health_report['checks'].append(error_result)
return health_report
async def check_connection_availability(self, config):
try:
connection = await self.mcp_client.connect(config)
await self.mcp_client.query(connection, "SELECT 1")
await self.mcp_client.disconnect(connection)
return {
'check_name': 'connection_availability',
'status': 'healthy',
'message': 'Database connection successful'
}
except Exception as e:
return {
'check_name': 'connection_availability',
'status': 'critical',
'message': f'Cannot connect to database: {str(e)}'
}
Alert Management
Implement intelligent alerting:
class AlertManager:
def __init__(self):
self.alert_channels = []
self.alert_history = []
self.suppression_rules = {}
async def send_critical_alert(self, check_result):
alert = {
'severity': 'critical',
'check_name': check_result['check_name'],
'message': check_result['message'],
'timestamp': datetime.utcnow(),
'id': self.generate_alert_id()
}
# Check if alert should be suppressed
if self.should_suppress_alert(alert):
return
# Send alert through all configured channels
for channel in self.alert_channels:
await channel.send_alert(alert)
# Record alert in history
self.alert_history.append(alert)
def should_suppress_alert(self, alert):
# Implement alert suppression logic
# - Rate limiting
# - Duplicate detection
# - Maintenance windows
recent_alerts = [
a for a in self.alert_history
if a['check_name'] == alert['check_name']
and (datetime.utcnow() - a['timestamp']).seconds < 300 # 5 minutes
]
return len(recent_alerts) > 3 # Suppress if more than 3 in 5 minutes
Recovery Procedures
Automatic Recovery
Implement automatic recovery mechanisms:
class AutoRecoveryManager:
def __init__(self, mcp_client):
self.mcp_client = mcp_client
self.recovery_strategies = {
'connection_timeout': self.recover_from_timeout,
'pool_exhaustion': self.recover_from_pool_exhaustion,
'authentication_failure': self.recover_from_auth_failure
}
async def attempt_recovery(self, error_type, context):
if error_type in self.recovery_strategies:
recovery_func = self.recovery_strategies[error_type]
return await recovery_func(context)
return {'success': False, 'message': 'No recovery strategy available'}
async def recover_from_timeout(self, context):
# Implement timeout recovery
# - Retry with exponential backoff
# - Switch to backup connection
# - Adjust timeout settings
max_retries = 3
base_delay = 1
for attempt in range(max_retries):
try:
delay = base_delay * (2 ** attempt)
await asyncio.sleep(delay)
# Attempt reconnection
connection = await self.mcp_client.connect(context['connection_config'])
return {
'success': True,
'message': f'Recovered after {attempt + 1} attempts',
'connection': connection
}
except Exception as e:
if attempt == max_retries - 1:
return {
'success': False,
'message': f'Recovery failed after {max_retries} attempts: {str(e)}'
}
Best Practices Summary
Prevention Strategies
- Connection pooling: Use appropriate pool sizes and timeouts
- Health checks: Implement regular health monitoring
- Retry logic: Add exponential backoff for transient failures
- Circuit breakers: Prevent cascade failures
- Monitoring: Track key metrics and set up alerts
Diagnostic Approach
- Start with basics: Network connectivity, DNS resolution
- Check authentication: Credentials, permissions, SSL
- Analyze performance: Query execution times, connection metrics
- Review logs: Application logs, database logs, system logs
- Test incrementally: Isolate components to identify root cause
Recovery Planning
- Document procedures: Create runbooks for common issues
- Automate recovery: Implement self-healing mechanisms where possible
- Test regularly: Practice recovery procedures in staging environments
- Monitor effectiveness: Track recovery success rates and times
Conclusion
Effective troubleshooting of database connection issues requires a systematic approach, proper tooling, and comprehensive monitoring. By implementing the diagnostic techniques and monitoring strategies outlined in this guide, you can quickly identify and resolve connection problems in your AI applications.
Remember that prevention is better than cure—invest in proper monitoring, health checks, and automated recovery mechanisms to minimize the impact of connection issues on your applications. With DataBridge AI's robust MCP integration and these troubleshooting techniques, you'll be well-equipped to maintain reliable database connectivity for your AI applications.