Source code for lback.middlewares.session_middleware

import logging
import json
from typing import Optional, Dict
from datetime import datetime
from http import HTTPStatus

from lback.core.types import Request
from lback.core.response import Response
from lback.core.base_middleware import BaseMiddleware
from lback.utils.session_manager import SessionManager
from lback.utils.app_session import AppSession
from sqlalchemy.orm import Session as DBSession

logger = logging.getLogger(__name__)

[docs] class SessionMiddleware(BaseMiddleware): """ Middleware to load session data at the start of the request and save session data (if modified) at the end of the request, using a database. Attaches a Session object (AppSession wrapper) to the request context using 'session'. Handles session cookie reading and writing. Requires SQLAlchemyMiddleware to run before it to provide the DB session. """
[docs] def __init__(self, session_manager: SessionManager): """ Initializes with a SessionManager instance (a dependency). Args: session_manager: The SessionManager instance for session storage operations. """ self.session_manager = session_manager self._session_cookie_name = 'session_id' logger.info("SessionMiddleware initialized.")
[docs] def process_request(self, request: Request) -> Optional[Response]: logger.debug("SessionMiddleware: Processing request.") db_session: Optional[DBSession] = request.get_context('db_session') if not db_session: logger.error("SessionMiddleware: Database session not found on request context. Ensure SQLAlchemyMiddleware runs before SessionMiddleware.") return Response(body=b"Internal Server Error: Database session not available for session handling.", status_code=HTTPStatus.INTERNAL_SERVER_ERROR.value) session_id = self._get_session_id_from_cookie(request, self._session_cookie_name) session_data_payload = None is_new_session = False expires_at = None full_session_data_from_db = None if session_id: logger.debug(f"SessionMiddleware: Found session ID cookie: {session_id}. Attempting to load session from DB.") full_session_data_from_db = self.session_manager.get_session_data(db_session, session_id) if full_session_data_from_db: logger.debug(f"SessionMiddleware: Session data found in DB for ID {session_id}.") raw_data_string = full_session_data_from_db.get('data', '{}') expires_at = full_session_data_from_db.get('expires_at') if not isinstance(raw_data_string, str): logger.warning(f"SessionMiddleware: Data for session ID {session_id} is not a string. Type: {type(raw_data_string)}. Attempting to convert.") try: raw_data_string = raw_data_string.decode('utf-8') except (AttributeError, UnicodeDecodeError): logger.error(f"SessionMiddleware: Could not convert session data to string for ID {session_id}. Treating as empty.", exc_info=True) raw_data_string = "{}" try: session_data_payload = json.loads(raw_data_string) logger.debug(f"SessionMiddleware: Successfully decoded session data JSON for ID {session_id}. Keys: {list(session_data_payload.keys())}") except json.JSONDecodeError: logger.error(f"SessionMiddleware: Failed to decode JSON data for session ID {session_id}. Treating session data as empty.", exc_info=True) session_data_payload = {} except Exception as e: logger.error(f"SessionMiddleware: Unexpected error processing session data for ID {session_id}: {e}", exc_info=True) session_data_payload = {} self.session_manager.renew_session(db_session, session_id) logger.debug(f"SessionMiddleware: Renewed session ID {session_id}.") else: logger.debug(f"SessionMiddleware: Session ID {session_id} from cookie not found or expired in manager. Treating as no valid session.") session_id = None expires_at = None if session_data_payload is None: session_data_payload = {} is_new_session = True expires_at = None logger.debug("SessionMiddleware: Prepared data for a potential new session.") request_session_wrapper = AppSession( session_id=session_id, initial_data=session_data_payload, session_manager=self.session_manager, db_session=db_session, is_new=is_new_session, expires_at=expires_at ) if hasattr(request, 'add_context') and callable(request.add_context): request.add_context('session', request_session_wrapper) logger.debug("SessionMiddleware: Attached Session wrapper to request context using add_context.") elif hasattr(request, '_context') and isinstance(request._context, dict): request._context['session'] = request_session_wrapper logger.debug("SessionMiddleware: Attached Session wrapper to request context using _context dictionary.") else: logger.error("SessionMiddleware: Request object does not support adding context data. Cannot attach session.") logger.debug("SessionMiddleware: Finished processing request.") return None
[docs] def process_response(self, request: Request, response: Response) -> Response: """ Saves the session data if it was modified during the request and adds the session cookie to the response if it's a new session or deleted. """ logger.debug(f"SessionMiddleware: Processing response (status: {response.status_code}).") db_session: Optional[DBSession] = request.get_context('db_session') if not db_session: logger.error("SessionMiddleware: Database session not found on request context during response processing. Cannot save/process session.") return response request_session_wrapper: Optional[AppSession] = request.get_context('session') if request_session_wrapper is None: logger.debug("SessionMiddleware: Could not retrieve Session wrapper from request context. Cannot save/process session.") return response if request_session_wrapper.deleted: logger.debug("SessionMiddleware: Session was deleted during the request. Attempting to delete cookie.") self._delete_session_cookie(response, self._session_cookie_name) logger.debug("SessionMiddleware: Added Set-Cookie header to delete session cookie.") elif request_session_wrapper.modified or request_session_wrapper.is_new: logger.debug("SessionMiddleware: Session was modified or is new. Attempting to save.") saved_session_id = request_session_wrapper.save() if saved_session_id: session_manager_from_wrapper = request_session_wrapper._session_manager if session_manager_from_wrapper and hasattr(session_manager_from_wrapper, 'session_timeout'): cookie_expiry_datetime = datetime.utcnow() + session_manager_from_wrapper.session_timeout self._set_session_cookie(response, self._session_cookie_name, saved_session_id, cookie_expiry_datetime) logger.debug(f"SessionMiddleware: Added Set-Cookie header for session ID {saved_session_id} with expiry {cookie_expiry_datetime}.") else: logger.error("SessionMiddleware: SessionManager or its timeout not available in wrapper. Cannot set Set-Cookie header.") else: logger.warning("SessionMiddleware: Session was not successfully saved. Cannot add/update Set-Cookie header.") logger.debug("SessionMiddleware: Finished processing response.") return response
def _get_session_id_from_cookie(self, request: Request, cookie_name: str) -> Optional[str]: """Extracts a specific cookie value (session ID) from the request headers.""" cookie_header = request.headers.get('COOKIE') if not cookie_header: logger.debug(f"SessionMiddleware: No '{cookie_name}' cookie header in request.") return None cookies: Dict[str, str] = {} try: cookie_list = [c.strip() for c in cookie_header.split(';')] for cookie_pair in cookie_list: if '=' in cookie_pair: key, value = cookie_pair.split('=', 1) cookies[key] = value except Exception as e: logger.error(f"SessionMiddleware: Error parsing 'COOKIE' header: {e}") return None session_id = cookies.get(cookie_name) if session_id: logger.debug(f"SessionMiddleware: Found '{cookie_name}' cookie with ID: {session_id}") return session_id else: logger.debug(f"SessionMiddleware: '{cookie_name}' cookie not found in header.") return None def _set_session_cookie(self, response: Response, cookie_name: str, cookie_value: str, expires: datetime): """Sets a Set-Cookie header on the response.""" expires_gmt = expires.strftime('%a, %d %b %Y %H:%M:%S GMT') set_cookie_header_value = ( f"{cookie_name}={cookie_value}; " f"Expires={expires_gmt}; " "Path=/; " "HttpOnly" ) if hasattr(response, 'headers') and isinstance(response.headers, dict): response.headers['Set-Cookie'] = set_cookie_header_value elif hasattr(response, 'headers') and isinstance(response.headers, list): found = False for i, (key, val) in enumerate(response.headers): if key.lower() == 'set-cookie': response.headers[i] = ('Set-Cookie', set_cookie_header_value) found = True break if not found: response.headers.append(('Set-Cookie', set_cookie_header_value)) else: logger.error("SessionMiddleware: Response object does not have a dict-like or list-like 'headers' attribute. Cannot add Set-Cookie header.") def _delete_session_cookie(self, response: Response, cookie_name: str): """Deletes a cookie by setting its expiration to the past.""" delete_cookie_header_value = f"{cookie_name}=; Expires=Thu, 01 Jan 1970 00:00:00 GMT; Path=/" if hasattr(response, 'headers') and isinstance(response.headers, dict): response.headers['Set-Cookie'] = delete_cookie_header_value elif hasattr(response, 'headers') and isinstance(response.headers, list): found = False for i, (key, val) in enumerate(response.headers): if key.lower() == 'set-cookie': response.headers[i] = ('Set-Cookie', delete_cookie_header_value) found = True break if not found: response.headers.append(('Set-Cookie', delete_cookie_header_value)) else: logger.error("SessionMiddleware: Response object does not have a dict-like or list-like 'headers' attribute. Cannot add Set-Cookie header for deletion.")