Source code for lback.auth.adminauth

import logging
from typing import Optional, Any
from sqlalchemy.orm import Session

from lback.utils.admin_user_manager import AdminUserManager
from lback.utils.session_manager import SessionManager
from lback.utils.app_session import AppSession
from lback.core.signals import dispatcher
from lback.auth.session_auth import SessionAuth
from lback.core.types import Request


logger = logging.getLogger(__name__)

[docs] class AdminAuth: """ Handles authentication and authorization logic specifically for Admin Users. Acts as a bridge between AdminUserManager, SessionManager, and the Auth utilities. Integrates SignalDispatcher to emit events related to admin authentication and authorization. """
[docs] def __init__(self, admin_user_manager: AdminUserManager, session_manager: SessionManager): """ Initialize AdminAuth with necessary managers. Emits 'admin_auth_initialized' signal. Args: admin_user_manager (AdminUserManager): Instance of AdminUserManager. session_manager (SessionManager): Instance of SessionManager for managing sessions. """ if not isinstance(admin_user_manager, AdminUserManager): logger.error("AdminAuth initialized without a valid AdminUserManager instance.") raise TypeError("admin_user_manager must be an instance of AdminUserManager.") if not isinstance(session_manager, SessionManager): logger.error("AdminAuth initialized without a valid SessionManager instance.") raise TypeError("session_manager must be an instance of SessionManager.") self.admin_user_manager = admin_user_manager self.session_manager = session_manager self.session_auth_utility = SessionAuth(session_manager=self.session_manager) logger.info("AdminAuth initialized.") if dispatcher: dispatcher.send("admin_auth_initialized", sender=self) logger.debug("Signal 'admin_auth_initialized' sent.") else: logger.warning("Dispatcher is None. Cannot send 'admin_auth_initialized' signal.")
[docs] def register(self, db_session: Session, username: str, email: str, password: str) -> bool: """ Registers a new admin user. Emits 'admin_registration_attempt', 'admin_registration_successful', or 'admin_registration_failed' signals. Args: db_session: The database session. username: The username for the new admin. email: The email for the new admin. password: The plain text password for the new admin. Returns: True if registration is successful, False otherwise. """ logger.info(f"AdminAuth: Attempting to register admin user: '{username}'.") if dispatcher: dispatcher.send("admin_registration_attempt", sender=self, username=username, email=email) logger.debug(f"Signal 'admin_registration_attempt' sent for admin '{username}'.") else: logger.warning("Dispatcher is None. Cannot send 'admin_registration_attempt' signal.") try: admin_user = self.admin_user_manager.register_admin(db_session, username, email, password) if admin_user: logger.info(f"Admin user '{username}' registered successfully (staged).") if dispatcher: dispatcher.send("admin_registration_successful", sender=self, admin_user=admin_user, username=username, email=email, db_session=db_session) logger.debug(f"Signal 'admin_registration_successful' sent for admin '{username}'.") else: logger.warning("Dispatcher is None. Cannot send 'admin_registration_successful' signal.") return True else: logger.error(f"AdminUserManager failed to register admin user '{username}' without raising an exception.") if dispatcher: dispatcher.send("admin_registration_failed", sender=self, username=username, email=email, error_type="manager_returned_none") logger.debug(f"Signal 'admin_registration_failed' (manager_returned_none) sent for admin '{username}'.") else: logger.warning("Dispatcher is None. Cannot send 'admin_registration_failed' signal.") return False except ValueError as e: logger.error(f"AdminAuth: Registration failed for admin user '{username}' due to validation error: {e}") if db_session: db_session.rollback() return False except Exception as e: logger.exception(f"AdminAuth: An unexpected error occurred during registration for admin user '{username}'.") if db_session: db_session.rollback() return False
[docs] def login(self, request: Request, db_session: Session, username: str, password: str) -> Optional[Any]: """ Authenticates an admin user and logs them in using session authentication. Emits 'admin_login_attempt', 'admin_login_successful', or 'admin_login_failed' signals. Args: request: The incoming request object. db_session: The database session. username: The username for login. password: The plain text password for login. Returns: The authenticated AdminUser object if successful, otherwise None. """ logger.info(f"AdminAuth: Attempting login for admin user: '{username}'.") if dispatcher: dispatcher.send("admin_login_attempt", sender=self, username=username, request=request) logger.debug(f"Signal 'admin_login_attempt' sent for admin '{username}'.") else: logger.warning("Dispatcher is None. Cannot send 'admin_login_attempt' signal.") try: admin_user = self.admin_user_manager.authenticate_admin(db_session, username, password) if admin_user: self.session_auth_utility.login(request, admin_user.id, user_type="admin") logger.info(f"Admin user '{username}' logged in successfully.") if dispatcher: dispatcher.send("admin_login_successful", sender=self, admin_user=admin_user, username=username, request=request) logger.debug(f"Signal 'admin_login_successful' sent for admin '{username}'.") else: logger.warning("Dispatcher is None. Cannot send 'admin_login_successful' signal.") return admin_user else: logger.warning(f"AdminAuth: Failed login attempt for admin user '{username}'. Invalid credentials or inactive user.") if dispatcher: dispatcher.send("admin_login_failed", sender=self, username=username, request=request, reason="authentication_failed") logger.debug(f"Signal 'admin_login_failed' (authentication_failed) sent for admin '{username}'.") else: logger.warning("Dispatcher is None. Cannot send 'admin_login_failed' signal.") return None except Exception as e: logger.exception(f"AdminAuth: An unexpected error occurred during login for admin user '{username}'.") if db_session: db_session.rollback() if dispatcher: dispatcher.send("admin_login_failed", sender=self, username=username, request=request, reason="exception", exception=e) logger.debug(f"Signal 'admin_login_failed' (exception) sent for admin '{username}'.") else: logger.warning("Dispatcher is None. Cannot send 'admin_login_failed' signal.") return None
[docs] def logout(self, request: Request) -> bool: """ Logs out the current admin user by ending their session. Emits 'admin_logout_attempt' and 'admin_logout_successful' signals. Args: request: The incoming request object. Assumes request.session is a valid AppSession instance. Returns: True if logout process was attempted (session object provided), False otherwise. Note: The actual session deletion is handled by AppSession.delete(). """ logger.info("AdminAuth: Attempting admin user logout.") user_session: Optional[AppSession] = request.session session_id = getattr(user_session, 'session_id', 'N/A') user_id = None if user_session: user_id = user_session.get('user_id') if dispatcher: dispatcher.send("admin_logout_attempt", sender=self, user_session=user_session, session_id=session_id, user_id=user_id) logger.debug(f"Signal 'admin_logout_attempt' sent for session ID '{session_id}'.") else: logger.warning("Dispatcher is None. Cannot send 'admin_logout_attempt' signal.") if user_session: try: user_session.delete() logger.info(f"Admin user session {session_id} ended.") if dispatcher: dispatcher.send("admin_logout_successful", sender=self, session_id=session_id, user_id=user_id) logger.debug(f"Signal 'admin_logout_successful' sent for session ID '{session_id}'.") else: logger.warning("Dispatcher is None. Cannot send 'admin_logout_successful' signal.") return True except Exception as e: logger.exception(f"AdminAuth: An error occurred during session deletion for session ID {session_id}.") return False else: logger.warning("AdminAuth: Logout called with no AppSession object available on request.") return False
[docs] def is_admin_logged_in(self, request: Request) -> bool: """ Checks if an admin user is currently logged in via session. Emits 'admin_authentication_check' signal, with outcome. Args: request: The incoming request object. Assumes request.session is a valid AppSession instance. Returns: True if a user ID is found in the session and is marked as admin, False otherwise. """ logger.debug("AdminAuth: Checking if admin user is logged in.") is_admin = False reason = "default_false" try: is_authenticated_session = self.session_auth_utility.is_authenticated(request) if is_authenticated_session: user_session_instance: Optional[AppSession] = request.session if user_session_instance: user_type = user_session_instance.get("user_type") is_admin = user_type == 'admin' logger.debug(f"Session authenticated. User type in session: '{user_type}'. Is admin: {is_admin}") reason = "authenticated_session_is_admin" if is_admin else "authenticated_session_not_admin" else: logger.error("AdminAuth: SessionAuth reported authenticated, but request.session is None.") reason = "session_auth_true_but_session_none" else: logger.debug("AdminAuth: Session authentication check failed. Admin is not logged in via session.") reason = "session_authentication_failed" except Exception as e: logger.exception("AdminAuth: An unexpected error occurred while checking admin login status.") reason = "exception" if dispatcher: dispatcher.send("admin_authentication_check", sender=self, request=request, is_admin=is_admin, reason=reason) logger.debug(f"Signal 'admin_authentication_check' sent. Is admin: {is_admin}, Reason: {reason}.") else: logger.warning("Dispatcher is None. Cannot send 'admin_authentication_check' signal.") return is_admin