Source code for lback.repositories.role_repository

from sqlalchemy.orm import Session
from typing import List, Optional, Any
import logging

from lback.core.signals import dispatcher 
from lback.models.adminuser import Role

logger = logging.getLogger(__name__)

[docs] class RoleRepository: """ Repository for Role model data access. Provides methods to interact with the database for Role entities. Requires a SQLAlchemy Session to be provided upon initialization. Integrates SignalDispatcher to emit events related to repository operations. """
[docs] def __init__(self, session: Session): """ Initializes the RoleRepository with a SQLAlchemy session. Args: session: The SQLAlchemy Session object to use for database operations. This session should be managed externally (e.g., by a middleware). """ if not isinstance(session, Session): logger.error("RoleRepository initialized without a valid SQLAlchemy Session instance.") self.session = session logger.debug("RoleRepository initialized with a database session.")
[docs] def get_by_id(self, role_id: int) -> Optional[Role]: """ Gets a role by its primary key ID. # No signals here, as this is a simple read operation. Args: role_id: The integer ID of the role. Returns: The Role object if found, otherwise None. """ logger.debug(f"Fetching Role by ID: {role_id}") return self.session.query(Role).get(role_id)
[docs] def get_by_name(self, name: str) -> Optional[Role]: """ Gets a role by its name. # No signals here, as this is a simple read operation. Args: name: The name string of the role. Returns: The Role object if found, otherwise None. """ logger.debug(f"Fetching Role by name: {name}") return self.session.query(Role).filter(Role.name == name).first()
[docs] def list_all(self) -> List[Role]: """ Lists all roles in the database. # No signals here, as this is a simple read operation. Returns: A list of all Role objects. """ logger.debug("Fetching all Roles.") return self.session.query(Role).all()
[docs] def create(self, **data: Any) -> Role: """ Creates a new role instance and adds it to the session. Note: This method adds the object to the session but does NOT commit. The caller is responsible for committing the session. Emits 'role_pre_create' and 'role_post_create' signals. Args: **data: Keyword arguments corresponding to Role model attributes. Example: name='editor', description='Can edit content'. Returns: The newly created Role instance (staged in the session). Raises: Exception: For errors during instance creation or adding to session. """ logger.debug(f"Repository: Creating new Role with data: {list(data.keys())}") dispatcher.send("role_pre_create", sender=self, data=data, session=self.session) logger.debug("Signal 'role_pre_create' sent.") try: role = Role(**data) self.session.add(role) logger.info(f"Repository: Role instance created and added to session (name: {data.get('name', 'N/A')}).") dispatcher.send("role_post_create", sender=self, role=role, session=self.session) logger.debug(f"Signal 'role_post_create' sent for Role ID '{getattr(role, 'id', 'N/A')}'.") return role except Exception as e: logger.exception(f"Repository: Error creating Role instance.") raise
[docs] def update(self, role: Role, **data: Any) -> Role: """ Updates an existing role instance with new data. Note: This method modifies the object in the session but does NOT commit. The caller is responsible for committing the session. Emits 'role_pre_update' and 'role_post_update' signals. Args: role: The Role instance to update. **data: Keyword arguments for the attributes to update. Example: description='Updated description'. Returns: The updated Role instance (changes staged in the session). Raises: Exception: For errors during attribute update. """ role_id = getattr(role, 'id', 'N/A') logger.debug(f"Repository: Updating Role ID {role_id} with data: {list(data.keys())}") dispatcher.send("role_pre_update", sender=self, role=role, update_data=data, session=self.session) logger.debug(f"Signal 'role_pre_update' sent for Role ID '{role_id}'.") try: for key, value in data.items(): if hasattr(role, key): setattr(role, key, value) else: logger.warning(f"Repository: Attempted to set non-existent attribute '{key}' on Role ID {role_id}.") logger.info(f"Repository: Role ID {role_id} updated in session.") dispatcher.send("role_post_update", sender=self, role=role, session=self.session) logger.debug(f"Signal 'role_post_update' sent for Role ID '{role_id}'.") return role except Exception as e: logger.exception(f"Repository: Error updating Role ID {role_id}.") raise
[docs] def delete(self, role: Role): """ Deletes a role instance from the session. Note: This method marks the object for deletion but does NOT commit. The caller is responsible for committing the session. Emits 'role_pre_delete' and 'role_post_delete' signals. Args: role: The Role instance to delete. Raises: Exception: For errors during marking for deletion. """ role_id = getattr(role, 'id', 'N/A') logger.debug(f"Repository: Deleting Role ID {role_id}") dispatcher.send("role_pre_delete", sender=self, role=role, session=self.session) logger.debug(f"Signal 'role_pre_delete' sent for Role ID '{role_id}'.") try: self.session.delete(role) logger.info(f"Repository: Role ID {role_id} marked for deletion in session.") dispatcher.send("role_post_delete", sender=self, role_id=role_id, session=self.session) logger.debug(f"Signal 'role_post_delete' sent for Role ID '{role_id}'.") except Exception as e: logger.exception(f"Repository: Error marking Role ID {role_id} for deletion.") raise
[docs] def search(self, **criteria: Any) -> List[Role]: """Searches for roles based on criteria. # No signals here, as this is a read operation. """ logger.debug(f"Repository: Searching Roles with criteria: {criteria}") query = self.session.query(Role) for key, value in criteria.items(): if hasattr(Role, key): query = query.filter(getattr(Role, key) == value) else: logger.warning(f"Repository: Search criteria '{key}' not found on Role model. Skipping.") return query.all()