Source code for lback.commands.admin

import bcrypt
import logging
import sys 
from getpass import getpass
import re

from lback.models.database import DatabaseManager, Base 
from lback.repositories.user_repository import UserRepository
from lback.repositories.admin_user_repository import AdminUserRepository



logger = logging.getLogger(__name__)

[docs] class AdminCommands: """ Command-line utilities for administrative tasks, interacting with the database using the new system. """ def __init__(self): try: self.db_manager = DatabaseManager.get_instance() except Exception as e: logger.error(f"Failed to get DatabaseManager instance: {e}", exc_info=True) print("Error: Could not initialize database manager.", file=sys.stderr) sys.exit(1)
[docs] def create_superuser(self): """Creates a new superuser via command line.""" logger.info("Starting superuser creation process.") print("Create Superuser:") username = input("Username: ").strip() email = input("Email: ").strip() password = getpass("Password: ").strip() if not self._validate_email(email): logger.error("Invalid email format entered.") print("Error: Invalid email format.", file=sys.stderr) return session = self.db_manager.Session() admin_user_repo = AdminUserRepository(session) try: if admin_user_repo.get_by_username(username): logger.error(f"Username '{username}' already exists.") print(f"Error: Username '{username}' already exists.", file=sys.stderr) return if admin_user_repo.get_by_email(email): logger.error(f"Email '{email}' already exists.") print(f"Error: Email '{email}' already exists.", file=sys.stderr) return try: hashed_password = bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8') except Exception as e: logger.error(f"Failed to hash password: {e}", exc_info=True) print("Error: Failed to hash password.", file=sys.stderr) return user_data = { "username": username, "email": email, "password": hashed_password, "is_superuser": True, } admin_user = admin_user_repo.create(**user_data) session.commit() logger.info(f"Superuser '{username}' created successfully.") print(f"Superuser '{username}' created successfully.") except Exception as e: session.rollback() logger.error(f"Error creating superuser: {e}", exc_info=True) print(f"Error creating superuser: {e}", file=sys.stderr) finally: self.db_manager.Session.remove() logger.debug("Session removed after create_superuser command.")
[docs] def init_db(self): """ Initializes the database schema by creating all tables. Note: This bypasses Alembic and is typically for initial setup or testing. For production, use Alembic migrations (manage.py migrate). """ logger.info("🛠 Initializing Database (using create_all)...") print("Initializing Database...") try: self.db_manager.create_all_tables() logger.info("Database initialized successfully.") print("Database initialized successfully.") except Exception as e: logger.error(f"Error initializing database: {e}", exc_info=True) print(f"Error initializing database: {e}", file=sys.stderr)
[docs] def reset_password(self): """Resets a user's password via command line.""" logger.info("Starting password reset process.") print("Reset User Password:") username = input("Username: ").strip() session = self.db_manager.Session() user_repo = UserRepository(session) try: user = user_repo.get_by_username(username) if not user: logger.error(f"User '{username}' not found for password reset.") print(f"Error: User '{username}' not found.", file=sys.stderr) return password = getpass("New Password: ").strip() confirm = getpass("Confirm Password: ").strip() if password != confirm: logger.error("Passwords entered do not match.") print("Error: Passwords do not match.", file=sys.stderr) return try: hashed_password = bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8') except Exception as e: logger.error(f"Failed to hash new password: {e}", exc_info=True) print("Error: Failed to hash new password.", file=sys.stderr) return user.password = hashed_password session.commit() logger.info(f"Password updated successfully for user '{username}'.") print(f"Password updated successfully for user '{username}'.") except Exception as e: session.rollback() logger.error(f"Error resetting password for user '{username}': {e}", exc_info=True) print(f"Error resetting password: {e}", file=sys.stderr) finally: self.db_manager.Session.remove() logger.debug("Session removed after reset_password command.")
[docs] def deactivate_user(self): """Deactivates a user via command line.""" logger.info("Starting user deactivation process.") print("Deactivate User:") username = input("Username: ").strip() session = self.db_manager.Session() user_repo = UserRepository(session) try: user = user_repo.get_by_username(username) if not user: logger.error(f"User '{username}' not found for deactivation.") print(f"Error: User '{username}' not found.", file=sys.stderr) return user.is_active = False session.commit() logger.info(f"User '{username}' deactivated successfully.") print(f"User '{username}' deactivated successfully.") except Exception as e: session.rollback() logger.error(f"Error deactivating user '{username}': {e}", exc_info=True) print(f"Error deactivating user: {e}", file=sys.stderr) finally: self.db_manager.Session.remove() logger.debug("Session removed after deactivate_user command.")
[docs] def list_users(self): """Lists all registered users via command line.""" logger.info("Starting user listing process.") print("Listing Registered Users:") session = self.db_manager.Session() user_repo = UserRepository(session) try: users = user_repo.list_all() if not users: logger.info("No users found in the database.") print("No users found.") return logger.info(f"Found {len(users)} users.") for user in users: status = "Active" if getattr(user, "is_active", True) else "Inactive" role = "Superuser" if getattr(user, "is_superuser", False) else "Regular" print(f"- {user.username} ({user.email}) | Status: {status} | Role: {role}") except Exception as e: session.rollback() logger.error(f"Error listing users: {e}", exc_info=True) print(f"Error listing users: {e}", file=sys.stderr) finally: self.db_manager.Session.remove() logger.debug("Session removed after list_users command.")
[docs] def activate_user(self): """Activates a user via command line.""" logger.info("Starting user activation process.") print("Activate User:") username = input("Username: ").strip() session = self.db_manager.Session() user_repo = UserRepository(session) try: user = user_repo.get_by_username(username) if not user: logger.error(f"User '{username}' not found for activation.") print(f"Error: User '{username}' not found.", file=sys.stderr) return user.is_active = True session.commit() logger.info(f"User '{username}' activated successfully.") print(f"User '{username}' activated successfully.") except Exception as e: session.rollback() logger.error(f"Error activating user '{username}': {e}", exc_info=True) print(f"Error activating user: {e}", file=sys.stderr) finally: self.db_manager.Session.remove() logger.debug("Session removed after activate_user command.")
@staticmethod def _validate_email(email): """Validate email format.""" regex = r'^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$' return re.match(regex, email) is not None