Source code for lback.utils.user_manager

import logging
import re 
from sqlalchemy.orm import Session
from typing import Optional, List, Dict, Any
from datetime import datetime, timedelta

from lback.auth.password_hashing import PasswordHasher
from lback.core.signals import dispatcher
from lback.repositories.user_repository import UserRepository
from lback.models.user import User, Group
from lback.utils.email_sender import EmailSender
from lback.core.config import Config

from .validation import ValidationError, PasswordValidator

logger = logging.getLogger(__name__)

[docs] class UserManager: """ Service layer for User related business logic. Manages workflows like registration, authentication, and user management. Receives the request-scoped database session per method call and instantiates Repositories with that session. Integrates SignalDispatcher to emit events related to user management. """
[docs] def __init__(self, email_sender: EmailSender, password_validator: PasswordValidator): """ Initializes the UserManager. Repositories are instantiated per method call with the request-scoped database session provided to that method. """ self.email_sender = email_sender self.password_validator = password_validator logger.info("UserManager initialized.")
[docs] def register_user(self, session: Session, username: str, email: str, plain_password: str) -> Optional[User]: """ Registers a new user with validation, password complexity checks, and initiates email verification. Assigns the user to the 'basic_user' group. Emits 'user_registration_started', 'user_pre_register', 'user_registered', 'user_email_verification_initiated', or 'user_registration_failed' signals. """ logger.info(f"Attempting to register user: {username}") dispatcher.send("user_registration_started", sender=self, username=username, email=email) user_repo = UserRepository(session=session) try: self._validate_registration_data(username, email, plain_password) self.password_validator.validate(plain_password) if user_repo.get_by_username(username): logger.warning(f"Registration failed. Username already exists: {username}") dispatcher.send("user_registration_failed", sender=self, username=username, email=email, error_type="validation_error", error_message="Username already exists") raise ValidationError("Username already exists.") if user_repo.get_by_email(email): logger.warning(f"Registration failed. Email already exists: {email}") dispatcher.send("user_registration_failed", sender=self, username=username, email=email, error_type="validation_error", error_message="Email already exists.") raise ValidationError("Email already exists.") dispatcher.send("user_pre_register", sender=self, username=username, email=email) try: user = User( username=username, email=email, is_active=True, is_email_verified=False, ) user.set_password(plain_password) basic_user_group = session.query(Group).filter_by(name='basic_user').first() if not basic_user_group: logger.critical("Critical Error: 'basic_user' group not found in database. Please run database seeding (e.g., 'python manage.py seed_db').") raise ValueError("Default group 'basic_user' not found. Database not seeded correctly.") user.groups.append(basic_user_group) logger.info(f"User '{username}' assigned to 'basic_user' group.") session.add(user) session.flush() user.generate_email_verification_token() config=Config() allowed_hosts = getattr(config, 'ALLOWED_HOSTS', []) if allowed_hosts: verification_host = allowed_hosts[0] else: logger.warning("ALLOWED_HOSTS not configured or empty. Using default localhost for verification link.") verification_host = "127.0.0.1:10000" try: verification_link = f"http://{verification_host}/api/auth/verify-email/?token={user.email_verification_token}" self.email_sender.send_email( to_email=user.email, subject="Please Verify Your Email Address", body=f"Hello {user.username},\n\nPlease click the following link to verify your email address: {verification_link}" ) logger.info(f"Verification email sent to {user.email}.") dispatcher.send("user_email_verification_initiated", sender=self, user=user) except Exception as email_err: logger.error(f"Failed to send verification email to {user.email}: {email_err}", exc_info=True) dispatcher.send("user_registration_failed", sender=self, username=username, email=email, error_type="email_send_failed", exception=email_err) raise RuntimeError("Failed to send verification email.") from email_err logger.info(f"User '{username}' registered and email verification initiated.") dispatcher.send("user_registered", sender=self, user=user, session=session) return user except Exception as e: logger.exception(f"Error during user creation/email initiation for '{username}': {e}") dispatcher.send("user_registration_failed", sender=self, username=username, email=email, error_type="repository_error", exception=e) raise except ValidationError as e: raise e except ValueError as e: logger.error(f"Registration failed due to missing default group: {e}") raise except RuntimeError as e: raise e except Exception as e: logger.exception(f"An unexpected error occurred during user registration for '{username}': {e}") dispatcher.send("user_registration_failed", sender=self, username=username, email=email, error_type="unexpected_exception", exception=e) raise
@staticmethod def _validate_registration_data(username: str, email: str, password: str): """Validate user registration data (basic format checks for presence and length).""" if not username or not email or not password: raise ValidationError("Username, Email, and Password are required.") if len(username) < 3: raise ValidationError("Username must be at least 3 characters long.")
[docs] def authenticate_user(self, session: Session, username: str, password: str, require_email_verified: bool = True) -> Optional[User]: """ Authenticates user login credentials, with an option to require email verification. Emits 'user_authentication_started', 'user_authenticated', or 'user_authentication_failed' signals. """ logger.info(f"Attempting to authenticate user: {username}") dispatcher.send("user_authentication_started", sender=self, username=username) try: user_repo = UserRepository(session=session) user = user_repo.get_by_username(username) if not user: logger.warning(f"Authentication failed: User '{username}' not found.") dispatcher.send("user_authentication_failed", sender=self, username=username, reason="user_not_found") return None if not user.is_active: logger.warning(f"Authentication failed for user '{username}': User is inactive.") dispatcher.send("user_authentication_failed", sender=self, username=username, reason="user_inactive", user=user) return None if require_email_verified and not user.is_email_verified: logger.warning(f"Authentication failed for user '{username}': Email not verified.") dispatcher.send("user_authentication_failed", sender=self, username=username, reason="email_not_verified", user=user) raise ValidationError("Please verify your email address to log in.") if not user.password or not PasswordHasher.verify_password(password, user.password): logger.warning(f"Authentication failed for user '{username}': Incorrect password.") dispatcher.send("user_authentication_failed", sender=self, username=username, reason="incorrect_password", user=user) return None logger.info(f"User '{username}' authenticated successfully.") dispatcher.send("user_authenticated", sender=self, user=user, session=session) return user except ValidationError as e: raise e except Exception as e: logger.exception(f"Error during authentication for user '{username}': {e}") dispatcher.send("user_authentication_failed", sender=self, username=username, reason="unexpected_exception", exception=e) return None
[docs] def verify_user_email(self, session: Session, token: str) -> Optional[User]: """ Verifies a user's email using the provided token. Emits 'user_email_verification_successful' or 'user_email_verification_failed' signals. """ logger.info(f"Attempting to verify email with token: {token[:10]}...") user_repo = UserRepository(session=session) user = None try: user = user_repo.get_by_email_verification_token(token) if not user: logger.warning(f"Email verification failed: Invalid or non-existent token '{token[:10]}...'.") dispatcher.send("user_email_verification_failed", sender=self, token_prefix=token[:10] + "...", error_type="invalid_token") raise ValidationError("Invalid or expired verification link.") if user.is_email_verified: logger.info(f"Email already verified for user '{user.username}'.") dispatcher.send("user_email_verification_successful", sender=self, user=user, reason="already_verified") return user if user.verify_email(token): session.add(user) logger.info(f"Email verified successfully for user '{user.username}'.") dispatcher.send("user_email_verification_successful", sender=self, user=user) return user else: logger.warning(f"Email verification failed for user '{user.username}': Token expired or mismatched.") dispatcher.send("user_email_verification_failed", sender=self, user=user, error_type="token_mismatch_or_expired") raise ValidationError("Invalid or expired verification link.") except ValidationError as e: raise e except Exception as e: logger.exception(f"Error during email verification with token '{token[:10]}...': {e}") dispatcher.send("user_email_verification_failed", sender=self, token_prefix=token[:10] + "...", error_type="unexpected_exception", exception=e, user=user) raise
[docs] def update_user(self, session: Session, user_id: int, data: Dict[str, Any]) -> Optional[User]: """ Update an existing user's data using the repository. Handles password hashing, and can update email verification status. Note: Group membership changes should be handled explicitly, not via 'data' dict. """ logger.info(f"Attempting to update user with ID: {user_id}") dispatcher.send("user_update_started", sender=self, user_id=user_id, update_data=data) user_repo = UserRepository(session=session) user = None try: user = user_repo.get_by_id(user_id) if not user: logger.warning(f"Update failed: User with ID {user_id} not found.") dispatcher.send("user_update_failed", sender=self, user_id=user_id, update_data=data, error_type="user_not_found") raise ValidationError("User not found.") if 'password' in data and data['password']: self.password_validator.validate(data['password']) try: hashed_password = PasswordHasher.hash_password(data['password']) data['password'] = hashed_password except Exception as e: logger.error(f"Failed to hash new password during update: {e}", exc_info=True) dispatcher.send("user_update_failed", sender=self, user_id=user_id, update_data=data, error_type="password_hashing_error", exception=e, user=user) raise RuntimeError("Failed to process new password.") from e elif 'password' in data and not data['password']: del data['password'] if 'is_email_verified' in data: if data['is_email_verified'] is True and not user.is_email_verified: logger.info(f"User '{user.username}' email status changed to verified via update.") dispatcher.send("user_email_verified", sender=self, user=user, reason="manual_update") elif data['is_email_verified'] is False and user.is_email_verified: logger.warning(f"User '{user.username}' email status changed to unverified via update.") if 'role_name' in data: logger.warning(f"Attempted to update 'role_name' for user '{user.username}' via update_user. This is deprecated. Use group management methods instead.") del data['role_name'] dispatcher.send("user_pre_update", sender=self, user=user, update_data=data, session=session) updated_user = user_repo.update(user, **data) logger.info(f"User '{user.username}' prepared for update.") dispatcher.send("user_updated", sender=self, user=updated_user, session=session) return updated_user except ValidationError as e: raise e except RuntimeError as e: raise e except Exception as e: logger.exception(f"Error during user update via repository for ID '{user_id}': {e}") error_type = "repository_error" if isinstance(e, Exception) else "unexpected_exception" dispatcher.send("user_update_failed", sender=self, user_id=user_id, update_data=data, error_type=error_type, exception=e, user=user) raise
[docs] def delete_user(self, session: Session, user_id: int): """ Delete a user using the repository. Receives the request-scoped database session. Emits 'user_deletion_started', 'user_pre_delete', 'user_deleted', or 'user_deletion_failed' signals. Args: session: The SQLAlchemy Session for database operations. user_id: The ID of the user to delete. Raises: ValidationError: If user is not found. Exception: For other unexpected errors. """ logger.info(f"Attempting to delete user with ID: {user_id}") dispatcher.send("user_deletion_started", sender=self, user_id=user_id) logger.debug(f"Signal 'user_deletion_started' sent for user ID '{user_id}'.") user_repo = UserRepository(session=session) user = None try: user = user_repo.get_by_id(user_id) if not user: logger.warning(f"Deletion failed: User with ID {user_id} not found.") dispatcher.send("user_deletion_failed", sender=self, user_id=user_id, error_type="user_not_found") logger.debug(f"Signal 'user_deletion_failed' (user_not_found) sent for user ID '{user_id}'.") raise ValidationError("User not found.") dispatcher.send("user_pre_delete", sender=self, user=user, session=session) logger.debug(f"Signal 'user_pre_delete' sent for user '{user.username}'.") user_repo.delete(user) logger.info(f"User '{user.username}' prepared for deletion.") dispatcher.send("user_deleted", sender=self, user_id=user_id, username=user.username, session=session) logger.debug(f"Signal 'user_deleted' sent for user '{user.username}'.") except ValidationError as e: raise e except Exception as e: logger.exception(f"Error during user deletion via repository for ID '{user_id}': {e}") error_type = "repository_error" if isinstance(e, Exception) else "unexpected_exception" user_details = {"user_id": user_id} if user: user_details["username"] = user.username dispatcher.send("user_deletion_failed", sender=self, **user_details, error_type=error_type, exception=e) logger.debug(f"Signal 'user_deletion_failed' ({error_type}) sent for user ID '{user_id}'.") raise
[docs] def get_user_by_id(self, session: Session, user_id: int) -> Optional[User]: """ Get a user by their ID using the repository. Receives the request-scoped database session. """ logger.debug(f"Fetching user by ID: {user_id}") user_repo = UserRepository(session=session) user = user_repo.get_by_id(user_id) return user
[docs] def get_user_by_email(self, session: Session, email: str) -> Optional[User]: """ Get a user by their email address using the repository. Receives the request-scoped database session. """ logger.debug(f"Fetching user by email: {email}") user_repo = UserRepository(session=session) user = user_repo.get_by_email(email) return user
[docs] def get_user_by_username(self, session: Session, username: str) -> Optional[User]: """ Get a user by their username using the repository. Receives the request-scoped database session. """ logger.debug(f"Fetching user by username: {username}") user_repo = UserRepository(session=session) user = user_repo.get_by_username(username) return user
[docs] def get_all_users(self, session: Session) -> List[User]: """ Get all users using the repository. Receives the request-scoped database session. """ logger.debug("Fetching all users.") user_repo = UserRepository(session=session) users = user_repo.list_all() return users
[docs] def search_users(self, session: Session, **criteria: Any) -> List[User]: """ Search for users based on provided criteria using the repository or session. Receives the request-scoped database session. Args: session: The SQLAlchemy Session for database operations. **criteria: Keyword arguments for search criteria (e.g., username="test"). Returns: A list of User objects matching the criteria. """ logger.debug(f"Searching users with criteria: {criteria}") user_repo = UserRepository(session=session) try: query = session.query(User) for key, value in criteria.items(): if hasattr(User, key): query = query.filter(getattr(User, key) == value) else: logger.warning(f"Search criteria '{key}' not found on User model. Skipping.") return query.all() except Exception as e: logger.exception(f"Error during user search: {e}") return []
[docs] def reset_password_request(self, session: Session, email: str, reset_url_path: Optional[str] = None) -> bool: """ Initiates a password reset request for a user. Generates a reset token, stores it on the user, and prepares for email sending. Args: session: The SQLAlchemy Session. email: The email of the user requesting reset. reset_url_path: The base URL path to use for the reset link (e.g., "/reset-password-confirm/"). If None, it defaults to the API path. """ logger.info(f"Password reset requested for '{email}'.") dispatcher.send("password_reset_request_started", sender=self, email=email) user_repo = UserRepository(session=session) config = Config() allowed_hosts = getattr(config, 'ALLOWED_HOSTS', []) verification_host = "127.0.0.1:10000" if allowed_hosts: verification_host = allowed_hosts[0] else: logger.warning("ALLOWED_HOSTS not configured or empty. Using default localhost for verification link.") final_reset_path = reset_url_path if reset_url_path else "/api/auth/reset-password/" try: user = user_repo.get_by_email(email) if user: user.auth_token = PasswordHasher.generate_random_token(length=64) user.token_expiry = datetime.utcnow() + timedelta(hours=1) session.add(user) session.flush() reset_link = f"http://{verification_host}{final_reset_path}?token={user.auth_token}" self.email_sender.send_email( to_email=user.email, subject="Password Reset Request", body=f"Hello {user.username},\n\nPlease click the following link to reset your password: {reset_link}" ) logger.info(f"Password reset email sent to {user.email}.") dispatcher.send("password_reset_request_processed", sender=self, email=email, user=user, session=session) return True else: logger.warning(f"Password reset request failed: User with email '{email}' not found.") dispatcher.send("password_reset_request_failed", sender=self, email=email, error_type="user_not_found") return False except Exception as e: logger.exception(f"Error during password reset request for '{email}': {e}") dispatcher.send("password_reset_request_failed", sender=self, email=email, error_type="unexpected_exception", exception=e) raise
[docs] def reset_password(self, session: Session, token: str, new_password: str) -> bool: """ Resets a user's password using a password reset token. Verifies the token, updates the password, and clears the token. """ logger.info(f"Password reset with token: {token[:10]}...") dispatcher.send("password_reset_started", sender=self, token_prefix=token[:10] + "...") user_repo = UserRepository(session=session) user = None try: self.password_validator.validate(new_password) user = user_repo.get_user_by_auth_token_and_expiry(token) if not user: logger.warning(f"Password reset failed: Invalid or expired token.") dispatcher.send("password_reset_failed", sender=self, token_prefix=token[:10] + "...", error_type="invalid_token") raise ValidationError("Invalid or expired reset token.") user.set_password(new_password) user.auth_token = None user.token_expiry = None session.add(user) logger.info(f"Password reset successful for user ID {user.id}.") dispatcher.send("password_reset_successful", sender=self, user=user, session=session) return True except ValidationError as e: raise e except RuntimeError as e: raise e except Exception as e: logger.exception(f"Error during password reset with token for token prefix '{token[:10]}...': {e}") error_type = "repository_error" if isinstance(e, Exception) else "unexpected_exception" dispatcher.send("password_reset_failed", sender=self, token_prefix=token[:10] + "...", error_type=error_type, exception=e, user=user) raise
@staticmethod def _validate_email(email: str) -> bool: """Validate email format.""" regex = r'^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$' return re.fullmatch(regex, email) is not None