Source code for lback.core.config

import os
from cryptography.fernet import Fernet
import logging
import json
import yaml

logger = logging.getLogger(__name__)

CONFIG_FILE = "config.json"

DEFAULTS = {
    "SECRET_KEY": Fernet.generate_key().decode(),
    "ENCRYPTION_KEY": Fernet.generate_key().decode(),
    "DB_ENGINE": "sqlite",
    "DB_HOST": "localhost",
    "DB_PORT": "5432",
    "DB_USER": "admin",
    "DB_PASSWORD_ENCRYPTED": "",
    "DB_NAME": "db",
    "DATABASE_ECHO": "False",
    "API_KEY_SERVICE_1_ENCRYPTED": "",
    "API_KEY_SERVICE_2_ENCRYPTED": "",
    "JWT_SECRET_KEY": "",
    "SMTP_SERVER": "",
    "SMTP_PORT": "587",
    "EMAIL_USERNAME": "",
    "EMAIL_PASSWORD": "",
    "DEBUG": "True",
    "ALLOWED_HOSTS": [],
    "API_VERSION": "v1",
    "LOGGING_LEVEL": "DEBUG",
    "INSTALLED_APPS": [],
    "MIDDLEWARES": [],
    "ROOT_URLCONF": None,
    "USE_TLS": "True",
    "SENDER_NAME": "",
    "SENDER_EMAIL": "",
    "PROJECT_SETTINGS_MODULE": "settings",
}



[docs] def load_config(config_file=CONFIG_FILE): """Load configuration from JSON or YAML file.""" if os.path.exists(config_file): logger.info(f"Loading configuration from {config_file}") try: with open(config_file, 'r') as f: if config_file.endswith('.json'): return json.load(f) elif config_file.endswith('.yaml') or config_file.endswith('.yml'): return yaml.safe_load(f) else: logger.warning(f"Unsupported config file format for {config_file}. Only .json, .yaml, .yml are supported.") return {} except Exception as e: logger.error(f"Error loading config file {config_file}: {e}", exc_info=True) return {} else: logger.info(f"No config file found at {config_file}. Relying on environment variables and defaults.") return {}
[docs] class Config: """ Manages application configuration, loading with priority: Environment Variables -> Config File -> Defaults. Provides access to settings and generates derived values. """
[docs] def __init__(self, config_file=CONFIG_FILE): """ Intializes a Config instance by loading configuration data with priority Environment -> ConfigFile -> Defaults. Assumes environment variables (e.g., from .env) are already loaded. """ logger.info(f"Config instance initialized. Attempting to load configuration from {config_file}, environment, and defaults.") loaded_config_data = load_config(config_file) def _get_value(key, conversion_func=None, default=None): """Helper to get a config value with priority: Env -> ConfigFile -> specified default -> DEFAULTS default.""" env_value = os.getenv(key) if env_value is not None: if conversion_func: try: return conversion_func(env_value) except (ValueError, TypeError) as e: logger.warning(f"Failed to convert environment variable '{key}' value '{env_value}' using {getattr(conversion_func, '__name__', 'conversion_func')}: {e}. Falling back to config file or default.") pass else: return env_value config_value = loaded_config_data.get(key) if config_value is not None: if conversion_func: try: return conversion_func(config_value) except (ValueError, TypeError) as e: logger.warning(f"Failed to convert config file value for '{key}' value '{config_value}' using {getattr(conversion_func, '__name__', 'conversion_func')}: {e}. Falling back to default.") pass else: return config_value if default is not None: if conversion_func: try: return conversion_func(default) except (ValueError, TypeError) as e: logger.error(f"Failed to convert explicit default value for '{key}' value '{default}' using {getattr(conversion_func, '__name__', 'conversion_func')}: {e}. Using raw explicit default.") return default else: return default default_value_from_defaults = DEFAULTS.get(key) if default_value_from_defaults is not None: if conversion_func: try: return conversion_func(default_value_from_defaults) except (ValueError, TypeError) as e: logger.error(f"Failed to convert DEFAULTS value for '{key}' value '{default_value_from_defaults}' using {getattr(conversion_func, '__name__', 'conversion_func')}: {e}. Using raw DEFAULTS value.") return default_value_from_defaults else: return default_value_from_defaults return None def _str_to_bool(value): if value is None: return None if isinstance(value, bool): return value return str(value).lower() in ("true", "1", "t") def _to_int(value): if value is None: return None if isinstance(value, int): return value try: return int(str(value)) except (ValueError, TypeError): logger.warning(f"Could not convert value '{value}' to integer.") return None def _to_list(value): if value is None: return None if isinstance(value, list): return value if isinstance(value, str): return [item.strip() for item in value.split(',') if item.strip()] logger.warning(f"Could not convert value '{value}' to list.") return None encryption_key_str = _get_value("ENCRYPTION_KEY", default=DEFAULTS["ENCRYPTION_KEY"]) self.fernet = None if not encryption_key_str: logger.error("ENCRYPTION_KEY not found in environment, config file, or defaults. Cannot initialize Fernet cipher.") else: try: key_bytes = encryption_key_str.encode() if len(key_bytes) != 44: logger.error("Invalid ENCRYPTION_KEY format (incorrect length). Must be 32 url-safe base64 bytes (44 characters).") self.fernet = None else: self.fernet = Fernet(key_bytes) logger.info("Fernet cipher initialized.") except Exception as e: logger.error(f"Error initializing Fernet cipher with ENCRYPTION_KEY: {e}", exc_info=True) def decrypt_instance(data): """Decrypts data using the instance's Fernet cipher.""" if data is None or data == "": return None if isinstance(data, str): data_bytes = data.encode() elif isinstance(data, bytes): data_bytes = data else: return None if self.fernet is None: return None try: return self.fernet.decrypt(data_bytes).decode() except Exception as e: return None self.decrypt = decrypt_instance self.SECRET_KEY = _get_value("SECRET_KEY", default=DEFAULTS["SECRET_KEY"]) self.DEBUG = _get_value("DEBUG", conversion_func=_str_to_bool, default=DEFAULTS["DEBUG"]) if self.DEBUG is None: self.DEBUG = _str_to_bool(DEFAULTS["DEBUG"]) if "DEBUG" in DEFAULTS else False self.API_VERSION = _get_value("API_VERSION", default=DEFAULTS["API_VERSION"]) logging_level_val = _get_value("LOGGING_LEVEL", default=DEFAULTS["LOGGING_LEVEL"]) self.LOGGING_LEVEL = str(logging_level_val).upper() if logging_level_val is not None else DEFAULTS["LOGGING_LEVEL"].upper() db_engine_val = _get_value("DB_ENGINE", default=DEFAULTS["DB_ENGINE"]) self.DB_ENGINE = str(db_engine_val).lower() if db_engine_val is not None else DEFAULTS["DB_ENGINE"].lower() self.ENVIRONMENT = os.getenv("ENVIRONMENT", "development") self.INSTALLED_APPS = _get_value("INSTALLED_APPS", conversion_func=_to_list, default=DEFAULTS["INSTALLED_APPS"]) if not isinstance(self.INSTALLED_APPS, list): self.INSTALLED_APPS = [] self.DATABASE_ECHO = _get_value("DATABASE_ECHO", conversion_func=_str_to_bool, default=DEFAULTS["DATABASE_ECHO"]) if self.DATABASE_ECHO is None: self.DATABASE_ECHO = _str_to_bool(DEFAULTS["DATABASE_ECHO"]) if "DATABASE_ECHO" in DEFAULTS else False self.MIDDLEWARES = _get_value("MIDDLEWARES", conversion_func=_to_list, default=DEFAULTS.get("MIDDLEWARES", [])) if not isinstance(self.MIDDLEWARES, list): self.MIDDLEWARES = [] self.ALLOWED_HOSTS = _get_value("ALLOWED_HOSTS", conversion_func=_to_list, default=DEFAULTS.get("ALLOWED_HOSTS", [])) if not isinstance(self.ALLOWED_HOSTS, list): self.ALLOWED_HOSTS = [] db_host = _get_value("DB_HOST", default=DEFAULTS["DB_HOST"]) db_port = _get_value("DB_PORT", conversion_func=_to_int, default=DEFAULTS["DB_PORT"]) db_user = _get_value("DB_USER", default=DEFAULTS["DB_USER"]) db_password_encrypted_raw = _get_value("DB_PASSWORD_ENCRYPTED", default=DEFAULTS["DB_PASSWORD_ENCRYPTED"]) db_name = _get_value("DB_NAME", default=DEFAULTS["DB_NAME"]) self.DATABASE_URL = _get_value("DATABASE_URL", default=None) self.DATABASE_CONFIG = { "engine": self.DB_ENGINE, "host": db_host, "port": db_port, "user": db_user, "password": self.decrypt(db_password_encrypted_raw), "database": db_name } api_key_1_encrypted_raw = _get_value("API_KEY_SERVICE_1_ENCRYPTED", default=DEFAULTS["API_KEY_SERVICE_1_ENCRYPTED"]) api_key_2_encrypted_raw = _get_value("API_KEY_SERVICE_2_ENCRYPTED", default=DEFAULTS["API_KEY_SERVICE_2_ENCRYPTED"]) self.API_KEYS = { "service_1": self.decrypt(api_key_1_encrypted_raw), "service_2": self.decrypt(api_key_2_encrypted_raw), } smtp_server = _get_value("SMTP_SERVER", default=DEFAULTS["SMTP_SERVER"]) smtp_port = _get_value("SMTP_PORT", conversion_func=_to_int, default=DEFAULTS["SMTP_PORT"]) email_username = _get_value("EMAIL_USERNAME", default=DEFAULTS["EMAIL_USERNAME"]) email_password = _get_value("EMAIL_PASSWORD", default=DEFAULTS["EMAIL_PASSWORD"]) use_tls = _get_value("USE_TLS", default=DEFAULTS["USE_TLS"]) sender_name = _get_value("SENDER_NAME", default=DEFAULTS["SENDER_NAME"]) sender_email = _get_value("SENDER_EMAIL", default=DEFAULTS["SENDER_EMAIL"]) self.EMAIL_CONFIG = { "smtp_server": smtp_server, "smtp_port": smtp_port, "username": email_username, "password": email_password, "use_tls": use_tls, "sender_name": sender_name, "sender_email": sender_email } jwt_secret_key = _get_value("JWT_SECRET_KEY", default=DEFAULTS["JWT_SECRET_KEY"]) self.JWT_SECRET = { "secret_key": jwt_secret_key, } logger.info("Config instance attributes loaded.")
@property def DATABASE_URI(self): """Generates the database connection URI based on configured engine and credentials.""" engine = self.DB_ENGINE host = self.DATABASE_CONFIG.get("host") port = self.DATABASE_CONFIG.get("port") user = self.DATABASE_CONFIG.get("user") password = self.DATABASE_CONFIG.get("password") name = self.DATABASE_CONFIG.get("database") sqlite_url = self.DATABASE_URL if engine == "postgresql": if not all([user, password, host, port, name]): missing = [] if user is None: missing.append("DB_USER") if password is None: missing.append("DB_PASSWORD_ENCRYPTED (decrypted password is None)") if host is None: missing.append("DB_HOST") if port is None: missing.append("DB_PORT") if name is None: missing.append("DB_NAME") raise ValueError(f"Missing database credentials for PostgreSQL: {', '.join(missing)}") return f"postgresql://{user}:{password}@{host}:{port}/{name}" elif engine == "mysql": if not all([user, password, host, port, name]): missing = [] if user is None: missing.append("DB_USER") if password is None: missing.append("DB_PASSWORD_ENCRYPTED (decrypted password is None)") if host is None: missing.append("DB_HOST") if port is None: missing.append("DB_PORT") if name is None: missing.append("DB_NAME") raise ValueError(f"Missing database credentials for MySQL: {', '.join(missing)}") return f"mysql://{user}:{password}@{host}:{port}/{name}" elif engine in ("sqlite", "sqlite3"): if sqlite_url: return sqlite_url else: if name is None or name == "": logger.warning("DATABASE_URL and DB_NAME not set for SQLite. Using default filename 'db.db'.") db_file = "db.db" else: db_file = name return f"sqlite:///{db_file}" else: raise ValueError(f"Unsupported database engine: {engine}")
[docs] @staticmethod def validate(config_instance): """Validates critical configuration settings of a Config instance.""" if not config_instance.SECRET_KEY: raise ValueError("SECRET_KEY is required.") if config_instance.DB_ENGINE not in ("postgresql", "mysql", "sqlite", "sqlite3"): raise ValueError(f"Invalid DB_ENGINE '{config_instance.DB_ENGINE}'. Supported values are 'postgresql', 'mysql', 'sqlite', and 'sqlite3'.") if config_instance.DB_ENGINE in ("postgresql", "mysql"): required_keys_in_dict = ["host", "port", "user", "password", "database"] if not all(config_instance.DATABASE_CONFIG.get(key) is not None for key in required_keys_in_dict): missing = [key for key in required_keys_in_dict if config_instance.DATABASE_CONFIG.get(key) is None] if "password" in missing and config_instance.DATABASE_CONFIG.get("password") is None: missing[missing.index("password")] = "DB_PASSWORD_ENCRYPTED (decrypted password is None)" raise ValueError(f"For {config_instance.DB_ENGINE}, missing required database credentials: {', '.join(missing)}") elif config_instance.DB_ENGINE in ("sqlite", "sqlite3"): if not (config_instance.DATABASE_URL or config_instance.DATABASE_CONFIG.get("database")): logger.warning("DATABASE_URL or DB_NAME not set for SQLite. Using default filename 'db.db'.") if config_instance.LOGGING_LEVEL not in ("DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"): raise ValueError(f"Invalid LOGGING_LEVEL '{config_instance.LOGGING_LEVEL}'.") if not config_instance.EMAIL_CONFIG.get("smtp_server"): raise ValueError("SMTP_SERVER is required for email configuration.") if not config_instance.EMAIL_CONFIG.get("username"): raise ValueError("EMAIL_USERNAME is required for email configuration.")