Source code for lback.utils.admin_user_manager

import logging
import re
from functools import wraps
from typing import Optional , List,Dict , Any , Callable
from sqlalchemy.orm import Session
from http import HTTPStatus

from lback.core.signals import dispatcher
from lback.auth.password_hashing import PasswordHasher
from lback.repositories.admin_user_repository import AdminUserRepository
from lback.repositories.role_repository import RoleRepository
from lback.models.adminuser import AdminUser
from lback.core.response import Response
from lback.core.types import Request


logger = logging.getLogger(__name__)

[docs] class AdminUserManager: """ Service layer for Admin User related business logic. Manages workflows like registration and authentication. Receives the request-scoped database session per method call and instantiates Repositories with that session. Integrates SignalDispatcher to emit events related to admin user management. """
[docs] def __init__(self): """ Initializes the AdminUserManager. Repositories are instantiated per method call with the request-scoped database session provided to that method. """ pass
[docs] def register_admin(self, session: Session, username: str, email: str, password: str, role_name: Optional[str] = None) -> Optional[AdminUser]: """ Registers a new admin user. Handles validation and password hashing before creating via repository. Receives the request-scoped database session. Emits 'admin_registration_started', 'admin_pre_register', 'admin_registered', or 'admin_registration_failed' signals. Args: session: The SQLAlchemy Session for database operations. username: The username for the new admin. email: The email for the new admin. password: The plain text password for the new admin. role_name: Optional name of the role to assign to the admin user. Returns: The newly created AdminUser object (added to session, but not committed) if successful, otherwise None. Raises: ValueError: If input data is invalid (missing fields, invalid email, username/email exists, role not found). RuntimeError: If password hashing fails. Exception: For other unexpected errors during database interaction. """ logger.info(f"Attempting to register admin user: {username}") dispatcher.send("admin_registration_started", sender=self, username=username, email=email, role_name=role_name) logger.debug(f"Signal 'admin_registration_started' sent for admin '{username}'.") admin_user_repo = AdminUserRepository(session=session) role_repo = RoleRepository(session=session) try: if not username or not email or not password: logger.warning("Attempted to register admin with missing fields.") dispatcher.send("admin_registration_failed", sender=self, username=username, email=email, role_name=role_name, error_type="validation_error", error_message="Missing required fields") logger.debug("Signal 'admin_registration_failed' (validation_error) sent.") raise ValueError("Username, Email, and Password are required.") if not self._validate_email(email): logger.warning(f"Invalid email format: {email}") dispatcher.send("admin_registration_failed", sender=self, username=username, email=email, role_name=role_name, error_type="validation_error", error_message="Invalid email format") logger.debug("Signal 'admin_registration_failed' (validation_error) sent.") raise ValueError("Invalid email format.") if admin_user_repo.get_by_username(username): logger.warning(f"Admin user registration failed. Username already exists: {username}") dispatcher.send("admin_registration_failed", sender=self, username=username, email=email, role_name=role_name, error_type="validation_error", error_message="Username already exists") logger.debug("Signal 'admin_registration_failed' (validation_error) sent.") raise ValueError("Admin Username already exists.") if admin_user_repo.get_by_email(email): logger.warning(f"Admin user registration failed. Email already exists: {email}") dispatcher.send("admin_registration_failed", sender=self, username=username, email=email, role_name=role_name, error_type="validation_error", error_message="Email already exists") logger.debug("Signal 'admin_registration_failed' (validation_error) sent.") raise ValueError("Admin Email already exists.") dispatcher.send("admin_pre_register", sender=self, username=username, email=email, role_name=role_name) logger.debug(f"Signal 'admin_pre_register' sent for admin '{username}'.") try: hashed_password = PasswordHasher.hash_password(password) logger.debug("Password hashed successfully for registration.") except Exception as e: logger.error(f"Failed to hash password during registration: {e}", exc_info=True) dispatcher.send("admin_registration_failed", sender=self, username=username, email=email, role_name=role_name, error_type="password_hashing_error", exception=e) logger.debug("Signal 'admin_registration_failed' (password_hashing_error) sent.") raise RuntimeError("Failed to process password.") from e admin_user_data: Dict[str, Any] = { "username": username, "email": email, "password": hashed_password, "is_superuser": False, "is_active": True } role = None if role_name: role = role_repo.get_by_name(role_name) if not role: logger.error(f"Role not found during admin registration: {role_name}") dispatcher.send("admin_registration_failed", sender=self, username=username, email=email, role_name=role_name, error_type="role_not_found") logger.debug("Signal 'admin_registration_failed' (role_not_found) sent.") raise ValueError(f"Role '{role_name}' not found.") admin_user_data["role"] = role admin_user = admin_user_repo.create(**admin_user_data) logger.info(f"Admin user '{username}' prepared for registration.") dispatcher.send("admin_registered", sender=self, admin_user=admin_user, session=session) logger.debug(f"Signal 'admin_registered' sent for admin '{username}'.") return admin_user except ValueError as e: raise e except RuntimeError as e: raise e except Exception as e: logger.exception(f"Error during admin user creation via repository for '{username}': {e}") dispatcher.send("admin_registration_failed", sender=self, username=username, email=email, role_name=role_name, error_type="unexpected_exception", exception=e) logger.debug("Signal 'admin_registration_failed' (unexpected_exception) sent.") raise
[docs] def authenticate_admin(self, session: Session, username: str, password: str) -> Optional[AdminUser]: """ Authenticates an admin user by username and password. Verifies the plain text password against the stored hash using PasswordHasher. Receives the request-scoped database session. Emits 'admin_authentication_started', 'admin_authenticated', or 'admin_authentication_failed' signals. Args: session: The SQLAlchemy Session for database operations. username: The username to authenticate. password: The plain text password. Returns: The AdminUser object if authentication is successful and user is active, otherwise None. """ logger.info(f"Attempting to authenticate admin user: {username}") dispatcher.send("admin_authentication_started", sender=self, username=username) logger.debug(f"Signal 'admin_authentication_started' sent for admin '{username}'.") try: admin_user_repo = AdminUserRepository(session=session) admin_user = admin_user_repo.get_by_username(username) if admin_user and admin_user.password: if PasswordHasher.verify_password(password, admin_user.password): if getattr(admin_user, 'is_active', True): logger.info(f"Admin user '{username}' authenticated successfully.") dispatcher.send("admin_authenticated", sender=self, admin_user=admin_user, session=session) logger.debug(f"Signal 'admin_authenticated' sent for admin '{username}'.") return admin_user else: logger.warning(f"Authentication failed for admin user '{username}': User is inactive.") dispatcher.send("admin_authentication_failed", sender=self, username=username, reason="user_inactive", admin_user=admin_user) logger.debug(f"Signal 'admin_authentication_failed' (user_inactive) sent for admin '{username}'.") return None else: logger.warning(f"Authentication failed for admin user '{username}': Incorrect password.") dispatcher.send("admin_authentication_failed", sender=self, username=username, reason="incorrect_password", admin_user=admin_user) logger.debug(f"Signal 'admin_authentication_failed' (incorrect_password) sent for admin '{username}'.") return None else: logger.warning(f"Authentication failed: Admin user '{username}' not found or password not set.") dispatcher.send("admin_authentication_failed", sender=self, username=username, reason="user_not_found") logger.debug(f"Signal 'admin_authentication_failed' (user_not_found) sent for admin '{username}'.") return None except Exception as e: logger.exception(f"Error during authentication for admin user '{username}': {e}") dispatcher.send("admin_authentication_failed", sender=self, username=username, reason="unexpected_exception", exception=e) logger.debug(f"Signal 'admin_authentication_failed' (unexpected_exception) sent for admin '{username}'.") return None
[docs] def get_admin_by_id(self, session: Session, admin_id: int) -> Optional[AdminUser]: """ Get an admin user by ID using the repository. Receives the request-scoped database session. # Consider adding signals if needed, e.g., 'admin_fetched_by_id', 'admin_not_found_by_id' """ logger.debug(f"Fetching admin user by ID: {admin_id}") admin_user_repo = AdminUserRepository(session=session) admin_user = admin_user_repo.get_by_id(admin_id) return admin_user
[docs] def get_admin_by_username(self, session: Session, username: str) -> Optional[AdminUser]: """ Get an admin user by username using the repository. Receives the request-scoped database session. # Consider adding signals if needed, e.g., 'admin_fetched_by_username', 'admin_not_found_by_username' """ logger.debug(f"Fetching admin user by username: {username}") admin_user_repo = AdminUserRepository(session=session) admin_user = admin_user_repo.get_by_username(username) return admin_user
[docs] def get_all_admins(self, session: Session) -> List[AdminUser]: """ Get all admin users using the repository. Receives the request-scoped database session. # Consider adding signals if needed, e.g., 'all_admins_fetched' """ logger.debug("Fetching all admin users.") admin_user_repo = AdminUserRepository(session=session) admins = admin_user_repo.list_all() return admins
[docs] @staticmethod def has_permission(admin_user: Optional[AdminUser], permission_name: str) -> bool: """ Check if the admin user has a specific permission. This is a static method as it operates on the user object, not the manager instance. Permission check signals are typically handled by the PermissionRequired decorator. """ if not admin_user: logger.debug(f"Permission check failed: User is None for permission '{permission_name}'.") return False if getattr(admin_user, 'is_superuser', False): logger.debug(f"Permission check passed: User '{getattr(admin_user, 'username', 'N/A')}' is superuser.") return True if admin_user.role and hasattr(admin_user.role, 'permissions') and isinstance(admin_user.role.permissions, (list, set)): user_role_permissions = set(perm.name for perm in admin_user.role.permissions if hasattr(perm, 'name')) has_perm = permission_name in user_role_permissions logger.debug(f"Permission check for user '{getattr(admin_user, 'username', 'N/A')}' ('{permission_name}'): {has_perm}. User Role Permissions: {list(user_role_permissions)}") return has_perm else: logger.debug(f"Permission check failed: User '{getattr(admin_user, 'username', 'N/A')}' has no role or role has no valid permissions list for permission '{permission_name}'.") return False
[docs] def permission_required(permission: str) -> Callable: """ Decorator to check if the authenticated user has a specific permission. Assumes the authenticated user object is available on request.user. Uses the static has_permission method. Permission check signals are handled by the PermissionRequired decorator logic itself. """ def decorator(func: Callable) -> Callable: @wraps(func) def wrapper(request: Request, *args, **kwargs) -> Response: user = getattr(request, "user", None) if not user or not AdminUserManager.has_permission(user, permission): logger.warning(f"Permission denied for user {getattr(user, 'username', 'N/A')} trying to access resource requiring '{permission}'") return Response( body=b"Permission Denied", status_code=HTTPStatus.FORBIDDEN.value, headers={'Content-Type': 'text/plain; charset=utf-8'} ) return func(request, *args, **kwargs) return wrapper return decorator
@staticmethod def _validate_email(email: str) -> bool: """Validate email format.""" regex = r'^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$' return re.fullmatch(regex, email) is not None