from sqlalchemy.orm import Session
from typing import List, Optional, Any
import logging
from lback.core.signals import dispatcher
from lback.models.adminuser import Permission
logger = logging.getLogger(__name__)
[docs]
class PermissionRepository:
"""
Repository for Permission model data access.
Provides methods to interact with the database for Permission entities.
Requires a SQLAlchemy Session to be provided upon initialization.
Integrates SignalDispatcher to emit events related to repository operations.
"""
[docs]
def __init__(self, session: Session):
"""
Initializes the PermissionRepository with a SQLAlchemy session.
Args:
session: The SQLAlchemy Session object to use for database operations.
This session should be managed externally (e.g., by a middleware).
"""
if not isinstance(session, Session):
logger.error("PermissionRepository initialized without a valid SQLAlchemy Session instance.")
self.session = session
logger.debug("PermissionRepository initialized with a database session.")
[docs]
def get_by_id(self, permission_id: int) -> Optional[Permission]:
"""
Gets a permission by its primary key ID.
# No signals here, as this is a simple read operation.
Args:
permission_id: The integer ID of the permission.
Returns:
The Permission object if found, otherwise None.
"""
logger.debug(f"Fetching Permission by ID: {permission_id}")
return self.session.query(Permission).get(permission_id)
[docs]
def get_by_name(self, name: str) -> Optional[Permission]:
"""
Gets a permission by its name.
# No signals here, as this is a simple read operation.
Args:
name: The name string of the permission.
Returns:
The Permission object if found, otherwise None.
"""
logger.debug(f"Fetching Permission by name: {name}")
return self.session.query(Permission).filter(Permission.name == name).first()
[docs]
def list_all(self) -> List[Permission]:
"""
Lists all permissions in the database.
# No signals here, as this is a simple read operation.
Returns:
A list of all Permission objects.
"""
logger.debug("Fetching all Permissions.")
return self.session.query(Permission).all()
[docs]
def create(self, **data: Any) -> Permission:
"""
Creates a new permission instance and adds it to the session.
Note: This method adds the object to the session but does NOT commit.
The caller is responsible for committing the session.
Emits 'permission_pre_create' and 'permission_post_create' signals.
Args:
**data: Keyword arguments corresponding to Permission model attributes.
Example: name='add_product', description='Can add new products'.
Returns:
The newly created Permission instance (staged in the session).
Raises:
Exception: For errors during instance creation or adding to session.
"""
logger.debug(f"Repository: Creating new Permission with data: {list(data.keys())}")
dispatcher.send("permission_pre_create", sender=self, data=data, session=self.session)
logger.debug("Signal 'permission_pre_create' sent.")
try:
permission = Permission(**data)
self.session.add(permission)
logger.info(f"Repository: Permission instance created and added to session (name: {data.get('name', 'N/A')}, ID: {getattr(permission, 'id', 'N/A')}).")
dispatcher.send("permission_post_create", sender=self, permission=permission, session=self.session)
logger.debug(f"Signal 'permission_post_create' sent for Permission ID '{getattr(permission, 'id', 'N/A')}'.")
return permission
except Exception as e:
logger.exception(f"Repository: Error creating Permission instance.")
raise
[docs]
def update(self, permission: Permission, **data: Any) -> Permission:
"""
Updates an existing permission instance with new data.
Note: This method modifies the object in the session but does NOT commit.
The caller is responsible for committing the session.
Emits 'permission_pre_update' and 'permission_post_update' signals.
Args:
permission: The Permission instance to update.
**data: Keyword arguments for the attributes to update.
Example: description='Updated description'.
Returns:
The updated Permission instance (changes staged in the session).
Raises:
Exception: For errors during attribute update.
"""
permission_id = getattr(permission, 'id', 'N/A')
logger.debug(f"Repository: Updating Permission ID {permission_id} with data: {list(data.keys())}")
dispatcher.send("permission_pre_update", sender=self, permission=permission, update_data=data, session=self.session)
logger.debug(f"Signal 'permission_pre_update' sent for Permission ID '{permission_id}'.")
try:
for key, value in data.items():
if hasattr(permission, key):
setattr(permission, key, value)
else:
logger.warning(f"Repository: Attempted to set non-existent attribute '{key}' on Permission ID {permission_id}.")
logger.info(f"Repository: Permission ID {permission_id} updated in session.")
dispatcher.send("permission_post_update", sender=self, permission=permission, session=self.session)
logger.debug(f"Signal 'permission_post_update' sent for Permission ID '{permission_id}'.")
return permission
except Exception as e:
logger.exception(f"Repository: Error updating Permission ID {permission_id}.")
raise
[docs]
def delete(self, permission: Permission):
"""
Deletes a permission instance from the session.
Note: This method marks the object for deletion but does NOT commit.
The caller is responsible for committing the session.
Emits 'permission_pre_delete' and 'permission_post_delete' signals.
Args:
permission: The Permission instance to delete.
Raises:
Exception: For errors during marking for deletion.
"""
permission_id = getattr(permission, 'id', 'N/A')
logger.debug(f"Repository: Deleting Permission ID {permission_id}")
dispatcher.send("permission_pre_delete", sender=self, permission=permission, session=self.session)
logger.debug(f"Signal 'permission_pre_delete' sent for Permission ID '{permission_id}'.")
try:
self.session.delete(permission)
logger.info(f"Repository: Permission ID {permission_id} marked for deletion in session.")
dispatcher.send("permission_post_delete", sender=self, permission_id=permission_id, session=self.session)
logger.debug(f"Signal 'permission_post_delete' sent for Permission ID '{permission_id}'.")
except Exception as e:
logger.exception(f"Repository: Error marking Permission ID {permission_id} for deletion.")
raise
[docs]
def search(self, **criteria: Any) -> List[Permission]:
"""Searches for permissions based on criteria.
# No signals here, as this is a read operation.
"""
logger.debug(f"Repository: Searching Permissions with criteria: {criteria}")
query = self.session.query(Permission)
for key, value in criteria.items():
if hasattr(Permission, key):
query = query.filter(getattr(Permission, key) == value)
else:
logger.warning(f"Repository: Search criteria '{key}' not found on Permission model. Skipping.")
return query.all()