from sqlalchemy.orm import Session
from typing import List, Optional, Any
import logging
from datetime import datetime
from lback.core.signals import dispatcher
from lback.models.user import User
logger = logging.getLogger(__name__)
[docs]
class UserRepository:
"""
Repository for User model data access.
Provides methods to interact with the database for User entities.
Requires a SQLAlchemy Session to be provided upon initialization.
Integrates SignalDispatcher to emit events related to repository operations.
"""
[docs]
def __init__(self, session: Session):
"""
Intializes the UserRepository with a SQLAlchemy session.
Args:
session: The SQLAlchemy Session object to use for database operations.
This session should be managed externally (e.g., by a middleware).
"""
if not isinstance(session, Session):
logger.error("UserRepository initialized without a valid SQLAlchemy Session instance.")
self.session = session
logger.debug("UserRepository initialized with a database session.")
[docs]
def get_by_id(self, user_id: int) -> Optional[User]:
"""
Gets a user by their primary key ID.
"""
logger.debug(f"Fetching User by ID: {user_id}")
try:
return self.session.query(User).get(user_id)
except Exception as e:
logger.exception(f"Error fetching User by ID: {user_id}")
return None
[docs]
def get_by_username(self, username: str) -> Optional[User]:
"""
Gets a user by their username.
"""
logger.debug(f"Fetching User by username: {username}")
try:
return self.session.query(User).filter(User.username == username).first()
except Exception as e:
logger.exception(f"Error fetching User by username: {username}")
return None
[docs]
def get_by_email(self, email: str) -> Optional[User]:
"""
Gets a user by their email address.
"""
logger.debug(f"Fetching User by email: {email}")
try:
return self.session.query(User).filter(User.email == email).first()
except Exception as e:
logger.exception(f"Error fetching User by email: {email}")
return None
[docs]
def list_all(self) -> List[User]:
"""
Lists all users in the database.
"""
logger.debug("Fetching all Users.")
try:
return self.session.query(User).all()
except Exception as e:
logger.exception("Error fetching all Users.")
return []
[docs]
def create(self, **data: Any) -> User:
"""
Creates a new user instance and adds it to the session.
Note: This method adds the object to the session but does NOT commit.
The caller is responsible for committing the session.
Emits 'user_pre_create' and 'user_post_create' signals.
"""
logger.debug(f"Repository: Creating new User with data: {list(data.keys())}")
signal_data = data.copy()
if 'password' in signal_data:
signal_data['password'] = '***'
dispatcher.send("user_pre_create", sender=self, data=signal_data, session=self.session)
logger.debug("Signal 'user_pre_create' sent.")
try:
user = User(**data)
self.session.add(user)
logger.info(f"Repository: User instance created and added to session (username: {data.get('username', 'N/A')}, ID: {getattr(user, 'id', 'N/A')}).")
dispatcher.send("user_post_create", sender=self, user=user, session=self.session)
logger.debug(f"Signal 'user_post_create' sent for User ID '{getattr(user, 'id', 'N/A')}'.")
return user
except Exception as e:
logger.exception(f"Repository: Error creating User instance.")
raise
[docs]
def update(self, user: User, **data: Any) -> User:
"""
Updates an existing user instance with new data.
Note: This method modifies the object in the session but does NOT commit.
The caller is responsible for committing the session.
Emits 'user_pre_update' and 'user_post_update' signals.
"""
user_id = getattr(user, 'id', 'N/A')
logger.debug(f"Repository: Updating User ID {user_id} with data: {list(data.keys())}")
signal_data = data.copy()
if 'password' in signal_data:
signal_data['password'] = '***'
dispatcher.send("user_pre_update", sender=self, user=user, update_data=signal_data, session=self.session)
logger.debug(f"Signal 'user_pre_update' sent for User ID '{user_id}'.")
try:
for key, value in data.items():
if hasattr(user, key):
setattr(user, key, value)
else:
logger.warning(f"Repository: Attempted to set non-existent attribute '{key}' on User ID {user_id}.")
logger.info(f"Repository: User ID {user_id} updated in session.")
dispatcher.send("user_post_update", sender=self, user=user, session=self.session)
logger.debug(f"Signal 'user_post_update' sent for User ID '{user_id}'.")
return user
except Exception as e:
logger.exception(f"Repository: Error updating User ID {user_id}.")
raise
[docs]
def delete(self, user: User):
"""
Deletes a user instance from the session.
Note: This method marks the object for deletion but does NOT commit.
The caller is responsible for committing the session.
Emits 'user_pre_delete' and 'user_post_delete' signals.
"""
user_id = getattr(user, 'id', 'N/A')
logger.debug(f"Repository: Deleting User ID {user_id}")
dispatcher.send("user_pre_delete", sender=self, user=user, session=self.session)
logger.debug(f"Signal 'user_pre_delete' sent for User ID '{user_id}'.")
try:
self.session.delete(user)
logger.info(f"Repository: User ID {user_id} marked for deletion in session.")
dispatcher.send("user_post_delete", sender=self, user_id=user_id, session=self.session)
logger.debug(f"Signal 'user_post_delete' sent for User ID '{user_id}'.")
except Exception as e:
logger.exception(f"Repository: Error marking User ID {user_id} for deletion.")
raise
[docs]
def search(self, **criteria: Any) -> List[User]:
"""Searches for users based on criteria."""
logger.debug(f"Repository: Searching Users with criteria: {criteria}")
query = self.session.query(User)
for key, value in criteria.items():
if hasattr(User, key):
query = query.filter(getattr(User, key) == value)
else:
logger.warning(f"Repository: Search criteria '{key}' not found on User model. Skipping.")
return query.all()
[docs]
def get_user_by_auth_token_and_expiry(self, token: str) -> Optional[User]:
"""
Gets a user by their authentication token (e.g., password reset token)
and ensures the token has not expired.
Args:
token: The authentication token.
Returns:
The User object if found and token is valid/not expired, otherwise None.
"""
logger.debug(f"Fetching user by auth token (prefix: {token[:10]}...) and expiry.")
try:
user = self.session.query(User).filter(
User.auth_token == token,
User.token_expiry > datetime.utcnow()
).first()
if user:
logger.debug(f"User '{user.username}' found with valid auth token.")
else:
logger.debug(f"No active user found for auth token (prefix: {token[:10]}...).")
return user
except Exception as e:
logger.exception(f"Error fetching user by auth token (prefix: {token[:10]}...) and expiry: {e}")
return None
[docs]
def get_by_email_verification_token(self, token: str) -> Optional[User]:
"""
Gets a user by their email verification token.
Note: The expiry check for email verification token is handled within the User model's
verify_email method to allow for more flexible expiry logic.
This method only retrieves the user based on the token itself.
Args:
token: The email verification token.
Returns:
The User object if found, otherwise None.
"""
logger.debug(f"Fetching user by email verification token (prefix: {token[:10]}...).")
try:
user = self.session.query(User).filter(
User.email_verification_token == token
).first()
if user:
logger.debug(f"User '{user.username}' found with email verification token.")
else:
logger.debug(f"No user found for email verification token (prefix: {token[:10]}...).")
return user
except Exception as e:
logger.exception(f"Error fetching user by email verification token (prefix: {token[:10]}...): {e}")
return None
[docs]
def get_user_by_reset_token(self, token: str) -> Optional[User]:
"""
Gets a user by a password reset token.
This method is now a wrapper for get_user_by_auth_token_and_expiry.
"""
logger.warning("UserRepository.get_user_by_reset_token is deprecated. Use get_user_by_auth_token_and_expiry instead.")
return self.get_user_by_auth_token_and_expiry(token)
[docs]
def get_user_by_auth_token(self, token: str) -> Optional[User]:
"""
Gets a user by an authentication token (e.g., API token).
Note: This is different from the reset/verification token as it might not have an expiry.
If your 'auth_token' is ONLY for password reset, use `get_user_by_auth_token_and_expiry`.
If it's for API tokens (persistent), then this method would be relevant.
"""
logger.debug(f"Fetching user by general auth token (prefix: {token[:10]}...).")
try:
user = self.session.query(User).filter(User.auth_token == token).first()
if user:
logger.debug(f"User '{user.username}' found with general auth token.")
else:
logger.debug(f"No user found for general auth token (prefix: {token[:10]}...).")
return user
except Exception as e:
logger.exception(f"Error fetching user by general auth token (prefix: {token[:10]}...): {e}")
return None