Source code for lback.middlewares.sql_injection_detection_middleware

import logging
from typing import Optional
from http import HTTPStatus

from lback.core.base_middleware import BaseMiddleware
from lback.core.types import Request
from lback.core.response import Response
from lback.security.sql_injection import SQLInjectionProtection


logger = logging.getLogger(__name__)


[docs] class SQLInjectionDetectionMiddleware(BaseMiddleware): """ Middleware to detect potential SQL injection attempts in request body and query parameters. """
[docs] def __init__(self): """ Initializes the SQLInjectionDetectionMiddleware. Note: This middleware uses class methods from SQLInjectionProtection, so it doesn't require an instance of that class. """ logger.info("SQLInjectionDetectionMiddleware initialized.")
[docs] def process_request(self, request: Request) -> Optional[Response]: """ Checks request body and query parameters for suspicious SQL injection patterns. Returns a 400 Bad Request response if patterns are detected. """ request_method = getattr(request, 'method', 'N/A') request_path = getattr(request, 'path', 'N/A') logger.debug(f"SQLInjectionDetectionMiddleware: Checking request from {request_method} {request_path} for injection patterns.") if request.parsed_body: logger.debug("SQLInjectionDetectionMiddleware: Validating request body.") if not SQLInjectionProtection.validate_inputs(request.parsed_body): logger.warning(f"SQLInjectionDetectionMiddleware: Suspicious pattern detected in request body for {request_method} {request_path}.") return Response( body=b"Bad Request: Potential SQL Injection attempt detected.", status_code=HTTPStatus.BAD_REQUEST.value, headers={'Content-Type': 'text/plain'} ) logger.debug("SQLInjectionDetectionMiddleware: Request body validation passed.") if request.query_params: logger.debug("SQLInjectionDetectionMiddleware: Validating query parameters.") if not SQLInjectionProtection.validate_inputs(request.query_params): logger.warning(f"SQLInjectionDetectionMiddleware: Suspicious pattern detected in query parameters for {request_method} {request_path}.") return Response( body=b"Bad Request: Potential SQL Injection attempt detected in query parameters.", status_code=HTTPStatus.BAD_REQUEST.value, headers={'Content-Type': 'text/plain'} ) logger.debug("SQLInjectionDetectionMiddleware: Query parameters validation passed.") logger.debug("SQLInjectionDetectionMiddleware: Request passed injection detection checks. Continuing chain.") return None
[docs] def process_response(self, request: Request, response: Response) -> Response: logger.debug("SQLInjectionDetectionMiddleware: Processing response (no changes made).") return response