Code source de api.views

import base64
import hashlib
import hmac
import json
import logging
import secrets
import string
from datetime import datetime, timedelta
from datetime import timezone as datetime_timezone
from io import BytesIO

from django.conf import settings
from django.contrib.auth import authenticate, get_user_model
from django.contrib.auth.password_validation import (
    validate_password as validate_django_password,
)
from django.core.cache import cache
from django.core.exceptions import ValidationError
from django.core.validators import validate_email
from django.db import transaction
from django.db.models import Exists, OuterRef, Prefetch, Q
from django.utils import timezone
from django.utils.dateparse import parse_datetime
from drf_spectacular.utils import OpenApiParameter, OpenApiTypes, extend_schema
from rest_framework import status
from rest_framework.decorators import api_view, permission_classes, throttle_classes
from rest_framework.pagination import PageNumberPagination
from rest_framework.permissions import AllowAny, IsAuthenticated
from rest_framework.response import Response
from rest_framework_simplejwt.serializers import TokenRefreshSerializer
from rest_framework_simplejwt.tokens import RefreshToken

from .models import (
    Authentification,
    Borne,
    Consentement,
    Device,
    DeviceStatus,
    Emplacement,
    JournalSecurite,
    Maintenance,
    ProfilUtilisateur,
    Reservation,
    Station,
    Technicien,
    TechnicienStation,
    UserStatus,
)
from .models import (
    Session as ChargingSession,
)
from .permissions import IsStaffUser
from .reservation_rules import (
    RESERVATION_DURATION_MINUTES,
    RESERVATION_STEP_MINUTES,
    reservation_duration_error_message,
    reservation_duration_is_allowed,
    reservation_duration_minutes,
)
from .serializers import (
    AdminUserSerializer,
    AdminUserUpdateSerializer,
    BorneSerializer,
    ConsentPreferenceSerializer,
    DeviceAdminSerializer,
    EmplacementSerializer,
    MaintenanceAdminSerializer,
    PasswordChangeSerializer,
    ProfileSerializer,
    RegisterSerializer,
    ReservationCreateSerializer,
    ReservationSerializer,
    StationSerializer,
    TechnicianMaintenanceSerializer,
    TechnicianSerializer,
    TechnicianStationAssignmentSerializer,
)
from .services.alert_workflow import (
    AlertWorkflowError,
    acknowledge_alert,
    alert_queryset,
    alert_station_id,
    resolve_alert,
)
from .services.auth_methods import (
    RfidCredentialConflict,
)
from .services.auth_methods import (
    ensure_rfid_auth as _ensure_rfid_auth,
)
from .services.auth_methods import (
    get_saved_rfid_auth as _get_saved_rfid_auth,
)
from .services.auth_methods import (
    get_type_auth as _get_type_auth,
)
from .services.auth_methods import (
    hash_secret as _hash_secret,
)
from .services.charge_session_state import (
    ChargeSessionStateError,
    mark_access_validated,
    mark_stop_requested,
    mark_stopped,
)
from .services.email_notifications import send_reservation_qr_email
from .services.firmware import schedule_ota_update, update_firmware_compliance_status
from .services.sms import SmsDeliveryError, mask_phone_number, send_sms
from .services.sms_access_policy import (
    configured_sms_access_code_alphabet,
    configured_sms_access_code_length,
    configured_sms_access_code_max_attempts,
    sms_access_code_policy_payload,
)
from .services.supervision import (
    build_admin_supervision_payload,
    serialize_alert_supervision,
)
from .throttles import (
    AuthFactorRateThrottle,
    LoginRateThrottle,
    MobileSessionRateThrottle,
    RegistrationRateThrottle,
    SmsRateThrottle,
    TerminalRateThrottle,
    TokenRefreshRateThrottle,
)

logger = logging.getLogger(__name__)
User = get_user_model()
RESERVATION_START_GRACE_MINUTES = getattr(settings, "RESERVATION_START_GRACE_MINUTES", 5)
RESERVATION_END_GRACE_MINUTES = getattr(settings, "RESERVATION_END_GRACE_MINUTES", 10)
DEFAULT_SESSION_MAX_MINUTES = getattr(settings, "CHARGING_SESSION_MAX_MINUTES", 240)
ADMIN_TEMPORARY_PASSWORD_LENGTH = 16
ADMIN_TEMPORARY_PASSWORD_SPECIALS = "!@#$%&*-_?"
LIST_PAGINATION_QUERY_PARAMS = {"page", "page_size"}
STATIONS_CATALOG_CACHE_KEY = "api:stations_catalog:v1"
KEYPAD_CODE_ALPHABET = configured_sms_access_code_alphabet()
RESERVATION_CODE_LENGTH = 8
TEMPORARY_CODE_LENGTH = configured_sms_access_code_length()
SMS_ACCESS_CODE_MAX_ATTEMPTS = configured_sms_access_code_max_attempts()
ALERT_WORKFLOW_ERROR_DETAIL = "Transition d'alerte refusee."
CHARGE_SESSION_STATE_ERROR_DETAIL = "Transition de session de charge refusee."
RFID_LINK_CODE_LENGTH = 4
RFID_LINK_CODE_TTL_SECONDS = 10 * 60
MOBILE_SESSION_TTL_SECONDS = getattr(settings, "MOBILE_SESSION_TTL_SECONDS", 10 * 60)
MOBILE_SESSION_TOKEN_BYTES = 18
OFFLINE_CACHE_HORIZON_HOURS = getattr(settings, "TERMINAL_OFFLINE_CACHE_HOURS", 24)
PRIVILEGED_USER_FIELDS = {"is_staff", "is_superuser"}
CONSENT_DEFINITIONS = {
    "cgu": {
        "label": "Conditions générales d'utilisation",
        "required": True,
        "modifiable": False,
    },
    "politique_confidentialite": {
        "label": "Politique de confidentialité",
        "required": True,
        "modifiable": False,
    },
    "notifications": {
        "label": "Notifications optionnelles",
        "required": False,
        "modifiable": True,
    },
}

def _refresh_cookie_kwargs():
    """Return the shared security attributes for the refresh-token cookie."""
    return {
        "httponly": True,
        "secure": settings.JWT_REFRESH_COOKIE_SECURE,
        "samesite": settings.JWT_REFRESH_COOKIE_SAMESITE,
        "path": settings.JWT_REFRESH_COOKIE_PATH,
    }


def _set_refresh_cookie(response, refresh_token):
    """Store the refresh token in an HttpOnly cookie instead of JavaScript storage."""
    max_age = int(settings.SIMPLE_JWT["REFRESH_TOKEN_LIFETIME"].total_seconds())
    response.set_cookie(
        settings.JWT_REFRESH_COOKIE_NAME,
        str(refresh_token),
        max_age=max_age,
        **_refresh_cookie_kwargs(),
    )
    return response


def _clear_refresh_cookie(response):
    """Remove the refresh-token cookie on logout or invalid session state."""
    response.delete_cookie(
        settings.JWT_REFRESH_COOKIE_NAME,
        path=settings.JWT_REFRESH_COOKIE_PATH,
        samesite=settings.JWT_REFRESH_COOKIE_SAMESITE,
    )
    return response


[docs] class OptionalPageNumberPagination(PageNumberPagination): """Paginate function-based list endpoints when the client requests it.""" page_size = settings.API_DEFAULT_PAGE_SIZE page_size_query_param = "page_size" max_page_size = settings.API_MAX_PAGE_SIZE
def _serialize_collection(items, serializer_class=None, serializer_func=None): """Serialize a queryset or list with either a DRF serializer or function.""" if serializer_class is not None: return serializer_class(items, many=True).data return [serializer_func(item) for item in items] def _list_response(request, queryset, serializer_class=None, serializer_func=None): """Return a list response, with opt-in page/page_size pagination.""" if not LIST_PAGINATION_QUERY_PARAMS.intersection(request.query_params): return Response( { "results": _serialize_collection( queryset, serializer_class=serializer_class, serializer_func=serializer_func, ) } ) paginator = OptionalPageNumberPagination() page = paginator.paginate_queryset(queryset, request) return paginator.get_paginated_response( _serialize_collection( page, serializer_class=serializer_class, serializer_func=serializer_func, ) ) def _clear_station_catalog_cache(): """Invalidate cached station catalog data after station or bay changes.""" cache.delete(STATIONS_CATALOG_CACHE_KEY) def _client_ip(request): """Return the best effort client IP for audit entries.""" forwarded_for = request.META.get("HTTP_X_FORWARDED_FOR", "") if forwarded_for: return forwarded_for.split(",", 1)[0].strip() return request.META.get("REMOTE_ADDR") or "0.0.0.0" def _log_security_event(request, target_user, action, statut="success"): """Persist a compact security audit event.""" JournalSecurite.objects.create( utilisateur=target_user, action=action, ip=_client_ip(request), statut=statut, ) def _terminal_secret_response(request): """Return an error response when a protected terminal call lacks the shared secret.""" expected_secret = getattr(settings, "TERMINAL_API_SECRET", "").strip() if not expected_secret: if settings.DEBUG: return None logger.critical("TERMINAL_API_SECRET is missing; terminal API is unavailable.") return Response( {"detail": "Configuration terminal indisponible."}, status=status.HTTP_503_SERVICE_UNAVAILABLE, ) provided_secret = (request.headers.get("X-Terminal-Secret") or "").strip() authorization = (request.headers.get("Authorization") or "").strip() if not provided_secret and authorization.lower().startswith("bearer "): provided_secret = authorization.split(" ", 1)[1].strip() if hmac.compare_digest(provided_secret, expected_secret): return None return Response({"detail": "Terminal non autorise."}, status=status.HTTP_401_UNAUTHORIZED) def _schema( summary, request=None, responses=OpenApiTypes.OBJECT, parameters=None, tags=None, operation_id=None, methods=None, ): """Build a reusable drf-spectacular schema decorator for API views.""" return extend_schema( summary=summary, request=request, responses=responses, parameters=parameters, tags=tags or ["API"], operation_id=operation_id, methods=methods, ) def _serialize_user(user): """Return the account payload shared by authentication and profile views.""" profile = getattr(user, "profil", None) return { "id": str(user.id_user), "pseudo": user.pseudo, "email": user.email, "telephone": getattr(profile, "telephone", ""), "telephone_verifie": getattr(profile, "telephone_verifie", False), "statut": user.statut, "nom": getattr(profile, "nom", ""), "is_staff": user.is_staff, "is_superuser": user.is_superuser, "is_technician": _user_is_technician(user), } def _get_or_create_profile(user): """Return the profile for a user, creating the default record when absent.""" profile, _created = ProfilUtilisateur.objects.get_or_create( utilisateur=user, defaults={"nom": user.pseudo, "telephone": ""}, ) return profile def _serialize_consent(consent): """Return a compact consent history entry.""" return { "id": str(consent.id_consentement), "type": consent.type, "label": CONSENT_DEFINITIONS.get(consent.type, {}).get("label", consent.type), "statut": consent.statut, "date_consentement": consent.date_consentement.isoformat() if consent.date_consentement else None, } def _serialize_user_consents(user): """Return the latest consent state per known type plus full history.""" history = list(user.consentements.all().order_by("-date_consentement")) latest_by_type = {} for consent in history: latest_by_type.setdefault(consent.type, consent) current = [] for consent_type, definition in CONSENT_DEFINITIONS.items(): latest = latest_by_type.get(consent_type) current.append( { "type": consent_type, "label": definition["label"], "statut": latest.statut if latest else "non_renseigne", "date_consentement": ( latest.date_consentement.isoformat() if latest and latest.date_consentement else None ), "required": definition["required"], "modifiable": definition["modifiable"], } ) return { "current": current, "history": [_serialize_consent(consent) for consent in history], } def _as_bool(value): """Coerce form-like boolean values from API payloads.""" if isinstance(value, bool): return value if value is None: return False return str(value).strip().lower() in {"1", "true", "yes", "on"} def _payload_requests_privileged_role(payload): """Return whether an admin payload modifies staff or superuser privileges.""" return any(field in payload for field in PRIVILEGED_USER_FIELDS) def _forbid_privileged_role_change_response(request, payload): """Block non-superusers from changing administrator-level role flags.""" if request.user.is_superuser or not _payload_requests_privileged_role(payload): return None return Response( {"detail": "Seul un super-administrateur peut modifier les roles administrateur."}, status=status.HTTP_403_FORBIDDEN, ) def _shuffle_secret(characters): """Shuffle password characters with the system random source.""" shuffled = list(characters) for index in range(len(shuffled) - 1, 0, -1): swap_index = secrets.randbelow(index + 1) shuffled[index], shuffled[swap_index] = shuffled[swap_index], shuffled[index] return "".join(shuffled) def _generate_temporary_password(user): """Generate a password that satisfies Django and project complexity rules.""" alphabet = string.ascii_letters + string.digits + ADMIN_TEMPORARY_PASSWORD_SPECIALS for _attempt in range(100): required_characters = [ secrets.choice(string.ascii_lowercase), secrets.choice(string.ascii_uppercase), secrets.choice(string.digits), secrets.choice(ADMIN_TEMPORARY_PASSWORD_SPECIALS), ] remaining_length = ADMIN_TEMPORARY_PASSWORD_LENGTH - len(required_characters) password = _shuffle_secret( required_characters + [secrets.choice(alphabet) for _index in range(remaining_length)] ) try: validate_django_password(password, user=user) except ValidationError: continue return password raise RuntimeError("Impossible de generer un mot de passe temporaire conforme.") def _serialize_reservation(reservation): """Return a frontend-friendly reservation representation.""" return { "id": str(reservation.id_reservation), "id_reservation": str(reservation.id_reservation), "reference_code": reservation.reference_code, "utilisateur": str(reservation.utilisateur_id) if reservation.utilisateur_id else None, "utilisateur_pseudo": reservation.utilisateur.pseudo if reservation.utilisateur else "-", "station": reservation.emplacement.station.nom, "station_nom": reservation.emplacement.station.nom, "borne": reservation.emplacement.borne.nom, "borne_nom": reservation.emplacement.borne.nom, "borne_numero": reservation.emplacement.borne.numero, "mqtt_prefix": reservation.emplacement.borne.mqtt_prefix, "emplacement": reservation.emplacement.numero, "emplacement_numero": reservation.emplacement.numero, "date_debut": reservation.date_debut.isoformat(), "date_fin": reservation.date_fin.isoformat(), "duration_minutes": reservation_duration_minutes(reservation.date_debut, reservation.date_fin), "statut": reservation.statut, "auth_mode": reservation.auth_choice, "can_cancel": _reservation_can_be_cancelled(reservation), } def _device_slot_id(device, emplacement): """Return the logical MQTT slot identifier for a charging bay.""" if device and (device.slot_id or device.mac_address): return device.slot_id or device.mac_address return f"slot{emplacement.numero}" def _serialize_session(session): """Return a compact representation of a charging session.""" device = getattr(session.emplacement, "device", None) return { "id": str(session.id_session), "station": session.emplacement.station.nom, "borne": session.emplacement.borne.nom, "borne_numero": session.emplacement.borne.numero, "mqtt_prefix": session.emplacement.borne.mqtt_prefix, "emplacement": session.emplacement.numero, "slot_id": _device_slot_id(device, session.emplacement), "start_time": session.start_time.isoformat(), "end_time": session.end_time.isoformat() if session.end_time else None, "charge_state": session.charge_state, "charge_state_display": session.get_charge_state_display(), "status": "active" if session.end_time is None else "closed", } def _close_charging_session(session, now=None): """Close an active charging session and mark its reservation as finished.""" closed_at = now or timezone.now() updated = False if session.end_time is None: session.end_time = max(closed_at, session.start_time + timedelta(seconds=1)) mark_stopped(session, now=closed_at, save=False) session.save(update_fields=["end_time", "charge_state", "charge_state_updated_at"]) updated = True else: mark_stopped(session, now=closed_at) reservation = session.reservation if reservation.statut != "terminee": reservation.statut = "terminee" reservation.save(update_fields=["statut"]) device = getattr(session.emplacement, "device", None) if device and device.status == DeviceStatus.CHARGING: device.status = DeviceStatus.ONLINE device.last_seen = closed_at device.save(update_fields=["status", "last_seen"]) return updated def _reservation_can_be_cancelled(reservation): """Return whether a reservation is still cancellable.""" return reservation.statut not in {"annulee", "terminee"} and reservation.date_debut > timezone.now() def _cancel_reservation(reservation): """Cancel a reservation when business rules still allow it.""" if not _reservation_can_be_cancelled(reservation): return False reservation.statut = "annulee" reservation.save(update_fields=["statut"]) return True def _temporary_access_can_be_rotated(reservation): """Return whether a reservation can receive a new temporary credential.""" return ( reservation.auth_choice in {"qr", "code"} and reservation.statut not in {"annulee", "terminee"} and reservation.date_fin >= timezone.now() ) def _serialize_station(station): """Return public station fields used by catalog and admin screens.""" return { "id_station": str(station.id_station), "nom": station.nom, "localisation_text": station.localisation_text, "latitude": station.latitude, "longitude": station.longitude, "statut": station.statut, } def _serialize_borne(borne): """Return physical cabinet fields with its parent station label.""" return { "id_borne": str(borne.id_borne), "station": str(borne.station_id), "station_nom": borne.station.nom, "numero": borne.numero, "nom": borne.nom, "mqtt_prefix": borne.mqtt_prefix, "statut": borne.statut, "emplacements_count": borne.emplacements.count(), } def _serialize_emplacement(emplacement): """Return charging bay details with the parent station label.""" return { "id_emplacement": str(emplacement.id_emplacement), "station": str(emplacement.station_id), "station_nom": emplacement.station.nom, "borne": str(emplacement.borne_id), "borne_nom": emplacement.borne.nom, "borne_numero": emplacement.borne.numero, "numero": emplacement.numero, "statut": emplacement.statut, "lock_state": emplacement.lock_state, } def _serialize_auth_method(method): """Return a safe authentication method payload without secret material.""" display_value = method.display_value if method.type_auth and method.type_auth.libelle == "RFID": display_value = _mask_secret(method.display_value) return { "id_auth": str(method.id_auth), "type_auth": str(method.type_auth_id), "type_label": method.type_auth.libelle, "display_value": display_value, "expiration": method.expiration.isoformat() if method.expiration else None, "is_active": method.is_active, "created_at": method.created_at.isoformat() if method.created_at else None, } def _serialize_admin_auth_method(method): """Return an admin-safe authentication method without exposing the raw badge value.""" payload = _serialize_auth_method(method) payload["display_value"] = _mask_secret(method.display_value) return payload def _serialize_maintenance(maintenance): """Return maintenance data used by technician and admin dashboards.""" return { "id_maintenance": str(maintenance.id_maintenance), "station": str(maintenance.station_id), "station_nom": maintenance.station.nom, "technicien": str(maintenance.technicien_id), "type": maintenance.type, "description": maintenance.description, "status": maintenance.status, "date_planifiee": maintenance.date_planifiee.isoformat(), "date_realisee": maintenance.date_realisee.isoformat() if maintenance.date_realisee else None, } def _serialize_technician(technician): """Return technician profile data with optional linked account metadata.""" return { "id_technicien": str(technician.id_technicien), "utilisateur": str(technician.utilisateur_id) if technician.utilisateur_id else None, "utilisateur_pseudo": technician.utilisateur.pseudo if technician.utilisateur else None, "nom": technician.nom, "email": technician.email, "telephone": technician.telephone, } def _active_station_assignments_prefetch(): """Return the prefetch used to expose active technician station scopes.""" return Prefetch( "station_assignments", queryset=TechnicienStation.objects.filter(is_active=True) .select_related("station") .order_by("station__nom"), to_attr="prefetched_active_station_assignments", ) def _user_is_technician(user): """Return whether the user has an associated technician profile.""" if user is None or not getattr(user, "is_authenticated", False): return False if hasattr(user, "_technician_profile_cache"): return user._technician_profile_cache exists = Technicien.objects.filter(utilisateur_id=user.id_user).exists() user._technician_profile_cache = exists return exists def _technician_for_user(user): """Return the technician profile linked to an authenticated account.""" try: return user.technicien_profile except Technicien.DoesNotExist: return None def _station_ids_for_technician(technicien): """Return active station identifiers assigned to a technician.""" return list( TechnicienStation.objects.filter(technicien=technicien, is_active=True) .order_by("station__nom") .values_list("station_id", flat=True) ) def _technician_can_manage_alert(user, alert): """Return the technician profile allowed to operate on an alert.""" if user.is_staff: return _technician_for_user(user), None technicien = _technician_for_user(user) if technicien is None: return None, Response( {"detail": "Aucun profil technicien associe a ce compte."}, status=status.HTTP_404_NOT_FOUND, ) station_id = alert_station_id(alert) if station_id is None: return None, Response( {"detail": "Impossible de rattacher cette alerte a une station."}, status=status.HTTP_400_BAD_REQUEST, ) if station_id not in set(_station_ids_for_technician(technicien)): return None, Response( {"detail": "Cette alerte ne fait pas partie de vos stations."}, status=status.HTTP_404_NOT_FOUND, ) return technicien, None def _generate_reservation_code(): """Generate a short unique reservation reference code.""" for _ in range(20): code = "".join(secrets.choice(KEYPAD_CODE_ALPHABET) for _ in range(RESERVATION_CODE_LENGTH)) if not Reservation.objects.filter(reference_code=code).exists(): return code raise RuntimeError("Impossible de generer un code de reservation unique.") def _rfid_link_cache_key(code): """Return the cache key for a temporary RFID association code.""" return f"rfid-link:{code}" def _mobile_session_cache_key(token): """Return the cache key for a short kiosk/mobile pairing session.""" return f"mobile-session:{token}" def _generate_rfid_link_code(): """Generate a short one-use code that can be typed on the kiosk.""" for _ in range(20): code = "".join(secrets.choice(KEYPAD_CODE_ALPHABET) for _ in range(RFID_LINK_CODE_LENGTH)) if cache.get(_rfid_link_cache_key(code)) is None: return code raise RuntimeError("Impossible de generer un code RFID temporaire unique.") def _generate_mobile_session_token(): """Generate a URL-safe token for a kiosk/mobile pairing session.""" for _attempt in range(20): token = secrets.token_urlsafe(MOBILE_SESSION_TOKEN_BYTES) if cache.get(_mobile_session_cache_key(token)) is None: return token raise RuntimeError("Impossible de generer une session mobile unique.") def _mobile_session_url(token): """Return the PWA URL opened by a phone for one kiosk pairing session.""" base_url = settings.FRONTEND_BASE_URL.rstrip("/") return f"{base_url}/mobile-session.html?token={token}" def _store_mobile_session(payload): """Persist a mobile pairing payload until its absolute expiry.""" expires_at = datetime.fromisoformat(payload["expires_at"]) if timezone.is_naive(expires_at): expires_at = timezone.make_aware(expires_at, timezone.get_current_timezone()) ttl = max(1, int((expires_at - timezone.now()).total_seconds())) cache.set(_mobile_session_cache_key(payload["token"]), payload, timeout=ttl) def _get_mobile_session_payload(token): """Return a cached mobile pairing session or None when it expired.""" token = (token or "").strip() if not token: return None return cache.get(_mobile_session_cache_key(token)) def _serialize_mobile_session(payload): """Return public metadata about a kiosk/mobile pairing session.""" expires_at = datetime.fromisoformat(payload["expires_at"]) if timezone.is_naive(expires_at): expires_at = timezone.make_aware(expires_at, timezone.get_current_timezone()) return { "token": payload["token"], "status": payload.get("status", "pending"), "station_id": payload.get("station_id"), "station_nom": payload.get("station_nom"), "borne_id": payload.get("borne_id"), "borne_nom": payload.get("borne_nom"), "borne_numero": payload.get("borne_numero"), "created_at": payload.get("created_at"), "expires_at": payload.get("expires_at"), "expires_in": max(0, int((expires_at - timezone.now()).total_seconds())), } def _generate_access_nonce(): """Generate a public nonce used to derive a temporary access secret.""" return secrets.token_urlsafe(18) def _digest_to_keypad_code(digest, length=TEMPORARY_CODE_LENGTH): """Convert digest bytes to a code that can be typed on the matrix keypad.""" code = [] source = digest round_index = 0 max_accepted = (256 // len(KEYPAD_CODE_ALPHABET)) * len(KEYPAD_CODE_ALPHABET) while len(code) < length: for byte in source: if byte >= max_accepted: continue code.append(KEYPAD_CODE_ALPHABET[byte % len(KEYPAD_CODE_ALPHABET)]) if len(code) == length: break if len(code) < length: round_index += 1 source = hashlib.sha256(digest + round_index.to_bytes(2, "big")).digest() return "".join(code) def _derive_temporary_access_secret(auth_method, auth_mode, reservation_code): """Derive a stable temporary secret without storing the clear value.""" metadata = auth_method.metadata or {} nonce = metadata.get("access_nonce") if not nonce: return None material = "|".join( [ str(auth_method.id_auth), str(reservation_code or ""), str(auth_mode or "").lower(), str(nonce), ] ).encode("utf-8") digest = hmac.new(settings.SECRET_KEY.encode("utf-8"), material, hashlib.sha256).digest() if str(auth_mode).lower() == "code": return _digest_to_keypad_code(digest) token = base64.urlsafe_b64encode(digest).decode("ascii").rstrip("=")[:24] return f"{str(auth_mode).upper()}-{token}" def _mask_secret(secret): """Return a display-safe hint for a secret value.""" if not secret: return "" return f"****{secret[-4:]}" def _sms_access_is_locked(auth_method): """Return whether the SMS code is locked after too many failed attempts.""" if not auth_method: return False metadata = auth_method.metadata or {} attempts = int(metadata.get("sms_failed_attempts") or 0) return bool(metadata.get("sms_locked_at")) or attempts >= SMS_ACCESS_CODE_MAX_ATTEMPTS def _record_sms_access_failure(auth_method): """Increment failed SMS-code attempts and lock the credential if needed.""" metadata = dict(auth_method.metadata or {}) failed_attempts = int(metadata.get("sms_failed_attempts") or 0) + 1 metadata["sms_failed_attempts"] = failed_attempts metadata["sms_last_failed_at"] = timezone.now().isoformat() metadata["sms_max_attempts"] = SMS_ACCESS_CODE_MAX_ATTEMPTS update_fields = ["metadata"] if failed_attempts >= SMS_ACCESS_CODE_MAX_ATTEMPTS: metadata["sms_locked_at"] = timezone.now().isoformat() metadata["sms_locked_reason"] = "too_many_attempts" auth_method.is_active = False update_fields.append("is_active") auth_method.metadata = metadata auth_method.save(update_fields=update_fields) return failed_attempts >= SMS_ACCESS_CODE_MAX_ATTEMPTS def _reset_sms_access_failures(auth_method): """Clear failed-attempt metadata after a valid SMS code entry.""" if not auth_method: return metadata = dict(auth_method.metadata or {}) changed = False for key in ( "sms_failed_attempts", "sms_last_failed_at", "sms_locked_at", "sms_locked_reason", ): if key in metadata: metadata.pop(key, None) changed = True if changed: auth_method.metadata = metadata auth_method.save(update_fields=["metadata"]) def _user_phone_number(user): """Return the phone number stored on a user's profile.""" profile = getattr(user, "profil", None) return (getattr(profile, "telephone", "") or "").strip() def _user_phone_is_verified(user): """Return whether the user's phone number is marked as verified.""" profile = getattr(user, "profil", None) return bool(getattr(profile, "telephone_verifie", False)) def _build_sms_access_message(reservation, access_value): """Build the transactional SMS sent for a temporary code reservation.""" sender = getattr(settings, "SMS_SENDER_NAME", "Station") return ( f"{sender}: votre code de recharge est {access_value}. " f"Code reservation: {reservation.reference_code}. " f"Valable jusqu'au {timezone.localtime(reservation.date_fin).strftime('%d/%m/%Y %H:%M')}." ) def _sms_resend_wait_seconds(auth_method): """Return remaining cooldown seconds before another SMS can be sent.""" cooldown = getattr(settings, "SMS_RESEND_COOLDOWN_SECONDS", 60) if cooldown <= 0 or auth_method is None: return 0 last_sent = parse_datetime((auth_method.metadata or {}).get("sms_last_sent_at", "")) if last_sent is None: return 0 if timezone.is_naive(last_sent): last_sent = timezone.make_aware(last_sent, timezone=datetime_timezone.utc) elapsed = (timezone.now() - last_sent).total_seconds() return max(0, int(cooldown - elapsed)) def _build_qr_payload(reservation_code, token): """Encode reservation access data into the QR payload format.""" return json.dumps( { "v": 1, "type": "reservation_access", "reservation": reservation_code, "token": token, }, separators=(",", ":"), ) def _parse_qr_payload(raw_value): """Parse supported QR payload formats into reservation code and token.""" value = (raw_value or "").strip() if not value: return None try: payload = json.loads(value) except json.JSONDecodeError: payload = None if isinstance(payload, dict) and payload.get("type") == "reservation_access": reservation_code = str(payload.get("reservation") or "").strip().upper() token = str(payload.get("token") or "").strip() if reservation_code and token: return {"reservation_code": reservation_code, "token": token} if value.startswith("MVCHARGE|"): parts = value.split("|", 3) if len(parts) == 4 and parts[1] == "v1": return {"reservation_code": parts[2].strip().upper(), "token": parts[3].strip()} return None def _build_qr_svg(payload): """Render a QR code SVG for a reservation access payload.""" try: import qrcode from qrcode.image.svg import SvgPathImage except ImportError: return None qr = qrcode.QRCode( error_correction=qrcode.constants.ERROR_CORRECT_H, box_size=10, border=4, ) qr.add_data(payload) qr.make(fit=True) image = qr.make_image(image_factory=SvgPathImage) buffer = BytesIO() image.save(buffer) svg = buffer.getvalue().decode("utf-8") if svg.startswith("<?xml"): svg = svg.split("?>", 1)[1].lstrip() pictogram = """ <svg x="41%" y="41%" width="18%" height="18%" viewBox="0 0 64 64" role="img" aria-label="Acces securise Metropole Verte"> <rect x="2" y="2" width="60" height="60" rx="12" fill="#ffffff"/> <path d="M32 9c-9 9-15 18-15 27 0 10 7 17 15 17s15-7 15-17C47 27 41 18 32 9Z" fill="#1f8f55"/> <path d="M22 36c8-1 15-6 21-17-1 16-8 26-21 30" fill="none" stroke="#ffffff" stroke-width="5" stroke-linecap="round"/> </svg> """ return svg.replace("</svg>", f"{pictogram}</svg>") def _temporary_auth_metadata(auth_mode, expiration, reservation_code, display_value, access_nonce=None): """Build metadata stored alongside a generated temporary credential.""" metadata = { "version": 1, "secret_storage": "derived_sha256", "access_nonce": access_nonce or _generate_access_nonce(), "token_hint": display_value, "expires_at": expiration.isoformat(), } if auth_mode == "code": metadata.update( { "delivery_channel": "sms", "code_policy": sms_access_code_policy_payload(), } ) if auth_mode == "qr": metadata.update( { "payload_type": "reservation_access", "payload_encoding": "json", "reservation_code": reservation_code, } ) return metadata def _build_access_payload(auth_mode, access_value, auth_method, expiration): """Return the one-time access payload shown after reservation creation.""" payload = { "mode": auth_mode, "display_value": auth_method.display_value if auth_method else "", "expires_at": expiration.isoformat(), } if auth_mode == "qr": payload["value"] = access_value payload["qr_svg"] = _build_qr_svg(access_value) payload["format"] = "reservation_access_v1" if auth_mode == "code": metadata = auth_method.metadata if auth_method else {} payload["delivery_channel"] = "sms" payload["recipient_hint"] = metadata.get("sms_recipient_hint", "") payload["sent_at"] = metadata.get("sms_last_sent_at", "") payload["code_policy"] = metadata.get("code_policy") or sms_access_code_policy_payload() return payload def _send_reservation_sms_code(request, reservation, auth_method, access_value): """Send the temporary reservation code by SMS and update audit metadata.""" phone_number = _user_phone_number(reservation.utilisateur) if not phone_number: return None, Response( {"telephone": "Un numéro de téléphone est obligatoire pour recevoir un code SMS."}, status=status.HTTP_400_BAD_REQUEST, ) if getattr(settings, "SMS_REQUIRE_VERIFIED_PHONE", False) and not _user_phone_is_verified(reservation.utilisateur): return None, Response( {"telephone": "Le numéro de téléphone doit être vérifié avant l'envoi d'un code SMS."}, status=status.HTTP_400_BAD_REQUEST, ) wait_seconds = _sms_resend_wait_seconds(auth_method) if wait_seconds > 0: return None, Response( { "detail": "Un code SMS a déjà été envoyé récemment.", "retry_after_seconds": wait_seconds, }, status=status.HTTP_429_TOO_MANY_REQUESTS, ) try: delivery = send_sms( phone_number, _build_sms_access_message(reservation, access_value), ) except SmsDeliveryError as error: logger.warning("SMS delivery failed for reservation %s: %s", reservation.id_reservation, error) return None, Response({"detail": "Envoi SMS temporairement indisponible."}, status=status.HTTP_502_BAD_GATEWAY) sent_at = timezone.now() recipient_hint = mask_phone_number(phone_number) auth_method.metadata = { **(auth_method.metadata or {}), "delivery_channel": "sms", "sms_last_sent_at": sent_at.isoformat(), "sms_recipient_hint": recipient_hint, "sms_backend": delivery.backend, "sms_message_id": delivery.message_id, } auth_method.save(update_fields=["metadata"]) _log_security_event( request, reservation.utilisateur, f"user:{reservation.utilisateur.pseudo}:sms_access_code_sent:{reservation.reference_code}", ) return ( { "sent_at": sent_at, "recipient_hint": recipient_hint, "backend": delivery.backend, }, None, ) def _schedule_reservation_qr_email(request, reservation, auth_method, access_payload): """Send the reservation QR email after the reservation transaction commits.""" if not getattr(settings, "RESERVATION_QR_EMAIL_ENABLED", True): return def _send_after_commit(): try: sent_count = send_reservation_qr_email(reservation, access_payload) except Exception as error: # pragma: no cover - depends on SMTP availability logger.warning("QR email delivery failed for reservation %s: %s", reservation.id_reservation, error) return if not sent_count: return sent_at = timezone.now() auth_method.metadata = { **(auth_method.metadata or {}), "delivery_channel": "email", "qr_email_last_sent_at": sent_at.isoformat(), "qr_email_recipient": reservation.utilisateur.email, } auth_method.save(update_fields=["metadata"]) _log_security_event( request, reservation.utilisateur, f"user:{reservation.utilisateur.pseudo}:qr_access_email_sent:{reservation.reference_code}", ) transaction.on_commit(_send_after_commit) def _create_temporary_auth(user, auth_mode, expiration, reservation_code): """Create a temporary QR or code credential for a reservation.""" type_auth = _get_type_auth(auth_mode) auth_method = Authentification.objects.create( utilisateur=user, type_auth=type_auth, hash_valeur="pending", metadata=_temporary_auth_metadata(auth_mode, expiration, reservation_code, ""), expiration=expiration, ) clear_value = _derive_temporary_access_secret(auth_method, auth_mode, reservation_code) display_value = _mask_secret(clear_value) auth_method.hash_valeur = _hash_secret(clear_value) auth_method.display_value = display_value auth_method.metadata = _temporary_auth_metadata( auth_mode, expiration, reservation_code, display_value, access_nonce=auth_method.metadata.get("access_nonce"), ) auth_method.save(update_fields=["hash_valeur", "display_value", "metadata"]) if auth_mode == "qr": return auth_method, _build_qr_payload(reservation_code, clear_value) return auth_method, clear_value def _refresh_temporary_auth(reservation): """Rotate the temporary credential attached to a reservation.""" auth_mode = reservation.auth_choice auth_method = reservation.authentification if auth_method is None: auth_method = Authentification( utilisateur=reservation.utilisateur, type_auth=_get_type_auth(auth_mode), ) access_nonce = _generate_access_nonce() auth_method.metadata = _temporary_auth_metadata( auth_mode, reservation.date_fin, reservation.reference_code, "", access_nonce=access_nonce, ) clear_value = _derive_temporary_access_secret(auth_method, auth_mode, reservation.reference_code) display_value = _mask_secret(clear_value) auth_method.type_auth = _get_type_auth(auth_mode) auth_method.hash_valeur = _hash_secret(clear_value) auth_method.display_value = display_value auth_method.metadata = _temporary_auth_metadata( auth_mode, reservation.date_fin, reservation.reference_code, display_value, access_nonce=access_nonce, ) auth_method.expiration = reservation.date_fin auth_method.is_active = True auth_method.save() if reservation.authentification_id != auth_method.id_auth: reservation.authentification = auth_method reservation.save(update_fields=["authentification"]) if auth_mode == "qr": return auth_method, _build_qr_payload(reservation.reference_code, clear_value) return auth_method, clear_value def _get_temporary_access_value(reservation): """Return the current temporary access value, deriving it when possible.""" auth_method = reservation.authentification if auth_method is None or not auth_method.is_active: return None, None clear_value = _derive_temporary_access_secret( auth_method, reservation.auth_choice, reservation.reference_code, ) if not clear_value: return None, None expected_hash = _hash_secret(clear_value) if auth_method.hash_valeur != expected_hash: auth_method.hash_valeur = expected_hash auth_method.display_value = _mask_secret(clear_value) auth_method.metadata = { **(auth_method.metadata or {}), "token_hint": auth_method.display_value, } auth_method.save(update_fields=["hash_valeur", "display_value", "metadata"]) if reservation.auth_choice == "qr": return auth_method, _build_qr_payload(reservation.reference_code, clear_value) return auth_method, clear_value def _get_reservation_session(reservation): """Return the charging session linked to a reservation when it exists.""" try: return reservation.session except ChargingSession.DoesNotExist: return None def _start_grace_delta(): """Return the allowed early-arrival margin for a reservation.""" return timedelta(minutes=RESERVATION_START_GRACE_MINUTES) def _end_grace_delta(): """Return the allowed late-arrival margin for a reservation.""" return timedelta(minutes=RESERVATION_END_GRACE_MINUTES) def _terminal_reservation_queryset(): """Return reservations with the related data needed by terminal flows.""" return Reservation.objects.select_related( "utilisateur", "emplacement", "emplacement__station", "emplacement__borne", "authentification", "authentification__type_auth", "session", ) def _active_rfid_reservations_for_auth(auth_method, now): """Return active or startable reservations that match a saved RFID badge.""" start_grace = _start_grace_delta() end_grace = _end_grace_delta() return list( _terminal_reservation_queryset() .filter( Q(date_fin__gte=now - end_grace) | Q(session__end_time__isnull=True, session__isnull=False), utilisateur=auth_method.utilisateur, authentification=auth_method, auth_choice="rfid", statut__in=["confirmee", "active"], date_debut__lte=now + start_grace, ) .order_by("date_debut", "id_reservation")[:2] ) def _startable_user_reservations(user, now): """Return reservations that can be started from a kiosk soon or now.""" start_grace = _start_grace_delta() end_grace = _end_grace_delta() return list( _terminal_reservation_queryset() .filter( Q(date_fin__gte=now - end_grace) | Q(session__end_time__isnull=True, session__isnull=False), utilisateur=user, statut__in=["confirmee", "active"], date_debut__lte=now + start_grace, ) .order_by("date_debut", "id_reservation") ) def _serialize_offline_cached_reservation(reservation): """Return the minimal secret-free payload cached by a kiosk for offline checks.""" auth_method = reservation.authentification device = getattr(reservation.emplacement, "device", None) return { "id_reservation": str(reservation.id_reservation), "reference_code": reservation.reference_code, "station_id": str(reservation.emplacement.station_id), "station_nom": reservation.emplacement.station.nom, "borne_id": str(reservation.emplacement.borne_id), "borne_nom": reservation.emplacement.borne.nom, "borne_numero": reservation.emplacement.borne.numero, "mqtt_prefix": reservation.emplacement.borne.mqtt_prefix, "emplacement_id": str(reservation.emplacement_id), "emplacement_numero": reservation.emplacement.numero, "slot_id": _device_slot_id(device, reservation.emplacement), "auth_mode": reservation.auth_choice, "auth_hash": auth_method.hash_valeur if auth_method else "", "auth_expires_at": auth_method.expiration.isoformat() if auth_method and auth_method.expiration else None, "date_debut": reservation.date_debut.isoformat(), "date_fin": reservation.date_fin.isoformat(), "statut": reservation.statut, } def _offline_event_timestamp(value): """Parse a kiosk event timestamp, falling back to server time.""" if not value: return timezone.now() parsed = parse_datetime(str(value)) if parsed is None: return timezone.now() if timezone.is_naive(parsed): parsed = timezone.make_aware(parsed, timezone=datetime_timezone.utc) return parsed def _reconcile_offline_session_started(request, event, station_id, borne_id=""): """Create or recover the backend session for one accepted offline start event.""" reservation_id = event.get("reservation_id") or (event.get("payload") or {}).get("reservation", {}).get("id_reservation") borne_id = ( borne_id or (event.get("payload") or {}).get("borne_id") or (event.get("payload") or {}).get("reservation", {}).get("borne_id") ) if not reservation_id: return None, "reservation_id manquant." reservation = _terminal_reservation_queryset().filter(id_reservation=reservation_id).first() if reservation is None: return None, "Reservation introuvable." if station_id and str(reservation.emplacement.station_id) != station_id: return None, "Reservation associee a une autre station." if borne_id and str(reservation.emplacement.borne_id) != str(borne_id): return None, "Reservation associee a une autre borne." start_time = _offline_event_timestamp(event.get("created_at")) with transaction.atomic(): session, created = ChargingSession.objects.get_or_create( reservation=reservation, defaults={ "emplacement": reservation.emplacement, "start_time": start_time, }, ) try: mark_access_validated(session, now=start_time) except ChargeSessionStateError: return None, CHARGE_SESSION_STATE_ERROR_DETAIL if reservation.statut != "active": reservation.statut = "active" reservation.save(update_fields=["statut"]) auth_method = reservation.authentification if created and auth_method and reservation.auth_choice in {"qr", "code"}: auth_method.expiration = session.start_time + timedelta(minutes=DEFAULT_SESSION_MAX_MINUTES) auth_method.metadata = { **(auth_method.metadata or {}), "session_started_at": session.start_time.isoformat(), "session_max_minutes": DEFAULT_SESSION_MAX_MINUTES, "offline_event_id": event.get("id"), "offline_session_id": event.get("session_id"), } auth_method.save(update_fields=["expiration", "metadata"]) JournalSecurite.objects.create( utilisateur=reservation.utilisateur, action=f"terminal:offline_session_synced:{reservation.reference_code}:{event.get('id')}", ip=_client_ip(request), statut="success", ) return { "event_id": event.get("id"), "reservation_id": str(reservation.id_reservation), "session_id": str(session.id_session), "session_created": created, }, None def _resolve_direct_rfid_reservation(auth_value, now): """Resolve a badge UID directly to one active reservation.""" type_auth = _get_type_auth("RFID") matching_auth_methods = list( Authentification.objects.select_related("utilisateur", "type_auth") .filter( type_auth=type_auth, hash_valeur=_hash_secret(auth_value), is_active=True, )[:2] ) if not matching_auth_methods: return None, None, Response({"detail": "Badge RFID inconnu."}, status=status.HTTP_401_UNAUTHORIZED) if len(matching_auth_methods) > 1: return None, None, Response( {"detail": "Ce badge RFID est associe a plusieurs comptes."}, status=status.HTTP_409_CONFLICT, ) auth_method = matching_auth_methods[0] if auth_method.expiration and auth_method.expiration < now: return None, None, Response({"detail": "Le badge RFID est expire."}, status=status.HTTP_400_BAD_REQUEST) reservations = _active_rfid_reservations_for_auth(auth_method, now) if not reservations: return None, None, Response( {"detail": "Aucune reservation active pour ce badge RFID."}, status=status.HTTP_404_NOT_FOUND, ) if len(reservations) > 1: return None, None, Response( { "detail": "Plusieurs reservations actives correspondent a ce badge RFID.", "reservations": [_serialize_reservation(reservation) for reservation in reservations], }, status=status.HTTP_409_CONFLICT, ) return auth_method, reservations[0], None def _reservation_overlaps(emplacement, date_debut, date_fin): """Return whether a charging bay is already booked in the paid window.""" return Reservation.objects.filter( emplacement=emplacement, date_debut__lt=date_fin, date_fin__gt=date_debut, statut__in=["confirmee", "active", "en_attente"], ).exists() def _session_start_conflict_response(reservation, now): """Return a conflict response if another booking still owns the bay.""" active_session = ( ChargingSession.objects.select_related("reservation") .filter(emplacement=reservation.emplacement, end_time__isnull=True) .exclude(reservation=reservation) .first() ) if active_session: return Response( {"detail": "L'emplacement est deja utilise par une autre session de charge."}, status=status.HTTP_409_CONFLICT, ) if now < reservation.date_debut: paid_window_owner = ( Reservation.objects.filter( emplacement=reservation.emplacement, date_debut__lte=now, date_fin__gt=now, statut__in=["confirmee", "active", "en_attente"], ) .exclude(id_reservation=reservation.id_reservation) .first() ) if paid_window_owner: return Response( {"detail": "L'emplacement est encore reserve par le creneau precedent."}, status=status.HTTP_409_CONFLICT, ) return None def _available_emplacements(station, date_debut, date_fin): """Return available charging bays for a station and paid reservation window.""" overlapping_reservations = Reservation.objects.filter( emplacement=OuterRef("pk"), date_debut__lt=date_fin, date_fin__gt=date_debut, statut__in=["confirmee", "active", "en_attente"], ) return list( Emplacement.objects.filter(station=station) .select_related("station", "borne") .annotate(has_overlap=Exists(overlapping_reservations)) .filter(has_overlap=False) .order_by("borne__numero", "numero") ) def _round_to_thirty_minutes(dt): """Return whether a datetime is aligned to a 30-minute boundary.""" if dt.minute % RESERVATION_STEP_MINUTES != 0 or dt.second != 0 or dt.microsecond != 0: return False return True def _duration_is_valid(date_debut, date_fin): """Return whether the requested reservation duration is supported.""" return reservation_duration_is_allowed(date_debut, date_fin)
[docs] @_schema( "Verifier la disponibilite d'un pseudo ou d'un email", parameters=[ OpenApiParameter("pseudo", OpenApiTypes.STR, OpenApiParameter.QUERY), OpenApiParameter("email", OpenApiTypes.STR, OpenApiParameter.QUERY), ], tags=["Authentification"], ) @api_view(["GET"]) @permission_classes([AllowAny]) @throttle_classes([RegistrationRateThrottle]) def register_availability(request): """Report whether a pseudo or email can be used for registration.""" pseudo = (request.query_params.get("pseudo") or "").strip() email = (request.query_params.get("email") or "").strip() response = {} if pseudo: pseudo_exists = User.objects.filter(pseudo__iexact=pseudo).exists() response["pseudo"] = { "available": not pseudo_exists, "message": "Ce pseudo est déjà utilisé." if pseudo_exists else "Ce pseudo est disponible.", } if email: try: validate_email(email) except ValidationError: response["email"] = { "available": False, "valid_format": False, "message": "Le format de l’adresse email est invalide.", } else: email_exists = User.objects.filter(email__iexact=email).exists() response["email"] = { "available": not email_exists, "valid_format": True, "message": "Cette adresse email est déjà utilisée." if email_exists else "Cette adresse email est disponible.", } return Response(response)
[docs] @_schema("Creer un compte utilisateur", request=RegisterSerializer, tags=["Authentification"]) @api_view(["POST"]) @throttle_classes([RegistrationRateThrottle]) def register(request): """Create a user account and return an authenticated JWT session.""" serializer = RegisterSerializer(data=request.data) if serializer.is_valid(): user = serializer.save() refresh = RefreshToken.for_user(user) response = Response( { "message": "Utilisateur cree", "user": _serialize_user(user), "access": str(refresh.access_token), }, status=status.HTTP_201_CREATED, ) return _set_refresh_cookie(response, refresh) return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
[docs] @_schema("Connecter un utilisateur", request=OpenApiTypes.OBJECT, tags=["Authentification"]) @api_view(["POST"]) @throttle_classes([LoginRateThrottle]) def login(request): """Authenticate a user and issue access and refresh tokens.""" identifiant = ( request.data.get("identifiant") or request.data.get("username") or request.data.get("email") ) password = request.data.get("password") user = authenticate(request, identifiant=identifiant, password=password) if user is not None: user.last_login = timezone.now() user.save(update_fields=["last_login"]) refresh = RefreshToken.for_user(user) response = Response( { "access": str(refresh.access_token), "user": _serialize_user(user), } ) return _set_refresh_cookie(response, refresh) return Response({"error": "Invalid credentials"}, status=status.HTTP_401_UNAUTHORIZED)
[docs] @_schema("Rafraichir le jeton d'acces", request=OpenApiTypes.OBJECT, tags=["Authentification"]) @api_view(["POST"]) @permission_classes([AllowAny]) @throttle_classes([TokenRefreshRateThrottle]) def refresh_token(request): """Issue a new access token from the HttpOnly refresh cookie.""" refresh = request.data.get("refresh") or request.COOKIES.get(settings.JWT_REFRESH_COOKIE_NAME) if not refresh: return Response({"detail": "Refresh token absent."}, status=status.HTTP_401_UNAUTHORIZED) serializer = TokenRefreshSerializer(data={"refresh": refresh}) serializer.is_valid(raise_exception=True) response = Response({"access": serializer.validated_data["access"]}) rotated_refresh = serializer.validated_data.get("refresh") if rotated_refresh: _set_refresh_cookie(response, rotated_refresh) return response
[docs] @_schema("Deconnecter l'utilisateur", tags=["Authentification"]) @api_view(["POST"]) @permission_classes([AllowAny]) def logout(request): """Clear the HttpOnly refresh cookie used by the browser session.""" return _clear_refresh_cookie(Response({"message": "Deconnexion effectuee."}))
[docs] @_schema("Lire le profil de l'utilisateur connecte", tags=["Compte"]) @api_view(["GET"]) @permission_classes([IsAuthenticated]) def me(request): """Return the current authenticated user's profile summary.""" profile = _get_or_create_profile(request.user) return Response( { "user": _serialize_user(request.user), "profile": { "nom": profile.nom, }, } )
[docs] @_schema("Lire, modifier ou supprimer le profil", request=ProfileSerializer, tags=["Compte"]) @api_view(["GET", "PATCH", "DELETE"]) @permission_classes([IsAuthenticated]) def profile(request): """Read, update or delete the authenticated user's account profile.""" user = request.user profile = _get_or_create_profile(user) if request.method == "GET": return Response( { "user": _serialize_user(user), "profile": {"nom": profile.nom}, } ) if request.method == "PATCH": serializer = ProfileSerializer(instance=user, data=request.data, partial=True) serializer.is_valid(raise_exception=True) serializer.save() profile.refresh_from_db() user.refresh_from_db() return Response( { "message": "Profil mis a jour", "user": _serialize_user(user), "profile": {"nom": profile.nom}, } ) if request.method == "DELETE": user_id = user.id_user user.delete() return Response( { "message": "Compte supprime", "user_id": str(user_id), }, status=status.HTTP_200_OK, ) return Response(status=status.HTTP_405_METHOD_NOT_ALLOWED)
[docs] @_schema("Lire ou modifier les consentements", request=ConsentPreferenceSerializer, tags=["Compte"]) @api_view(["GET", "PATCH"]) @permission_classes([IsAuthenticated]) def consentements(request): """Expose consent history and append editable preference changes.""" user = request.user if request.method == "GET": return Response(_serialize_user_consents(user)) serializer = ConsentPreferenceSerializer(data=request.data) serializer.is_valid(raise_exception=True) target_status = "accepte" if serializer.validated_data["notifications_accepted"] else "refuse" latest = ( user.consentements.filter(type="notifications") .order_by("-date_consentement") .first() ) changed = latest is None or latest.statut != target_status if changed: Consentement.objects.create( utilisateur=user, type="notifications", statut=target_status, ) return Response( { "message": "Consentements mis a jour" if changed else "Consentements inchanges", **_serialize_user_consents(user), } )
[docs] @_schema("Changer le mot de passe", request=PasswordChangeSerializer, tags=["Compte"]) @api_view(["POST"]) @permission_classes([IsAuthenticated]) def change_password(request): """Change the authenticated user's password after current-password check.""" serializer = PasswordChangeSerializer(data=request.data) serializer.is_valid(raise_exception=True) user = request.user if not user.check_password(serializer.validated_data["current_password"]): return Response( {"current_password": "Mot de passe actuel incorrect."}, status=status.HTTP_400_BAD_REQUEST, ) user.set_password(serializer.validated_data["new_password"]) user.save(update_fields=["password"]) return Response({"message": "Mot de passe mis a jour"})
[docs] @_schema("Exporter les donnees du compte", tags=["Compte"]) @api_view(["GET"]) @permission_classes([IsAuthenticated]) def export_account(request): """Export the authenticated user's account, reservations and audit history.""" user = request.user profile = _get_or_create_profile(user) reservations = user.reservations.select_related("emplacement", "emplacement__station", "emplacement__borne").order_by("-date_debut") sessions = ChargingSession.objects.filter(reservation__utilisateur=user).select_related( "reservation", "emplacement", "emplacement__station", "emplacement__borne" ).order_by("-start_time") payload = { "user": { **_serialize_user(user), "date_joined": user.date_joined.isoformat() if user.date_joined else None, "last_login": user.last_login.isoformat() if user.last_login else None, }, "profile": {"nom": profile.nom}, "reservations": [_serialize_reservation(reservation) for reservation in reservations], "sessions": [_serialize_session(session) for session in sessions], "consentements": [ { "id": str(consentement.id_consentement), "type": consentement.type, "statut": consentement.statut, "date_consentement": consentement.date_consentement.isoformat(), } for consentement in user.consentements.all().order_by("-date_consentement") ], "journaux_securite": [ { "id": str(log.id_log), "action": log.action, "ip": log.ip, "statut": log.statut, "date_action": log.date_action.isoformat(), } for log in user.journaux_securite.all().order_by("-date_action") ], } return Response(payload)
[docs] @_schema("Lire le tableau de bord utilisateur", tags=["Tableau de bord"]) @api_view(["GET"]) @permission_classes([IsAuthenticated]) def dashboard(request): """Return the authenticated user's dashboard metrics and recent activity.""" sessions_qs = ( ChargingSession.objects.filter(reservation__utilisateur=request.user) .select_related("reservation", "emplacement", "emplacement__station", "emplacement__borne") .order_by("-start_time") ) reservations_qs = ( Reservation.objects.filter(utilisateur=request.user) .select_related("emplacement", "emplacement__station", "emplacement__borne", "authentification", "authentification__type_auth") .order_by("-date_debut") ) active_session = sessions_qs.filter(end_time__isnull=True).first() past_sessions = sessions_qs.filter(end_time__isnull=False) now = timezone.now() upcoming_reservations = reservations_qs.filter(date_debut__gte=now).exclude(statut="annulee") archived_reservations = reservations_qs.filter(date_debut__lt=now) | reservations_qs.filter(statut="annulee") recent_reservations = archived_reservations.distinct().order_by("-date_debut")[:5] return Response( { "user": _serialize_user(request.user), "stats": { "sessions_total": sessions_qs.count(), "active_sessions": sessions_qs.filter(end_time__isnull=True).count(), "past_sessions": past_sessions.count(), "reservations_total": reservations_qs.count(), "reservations_upcoming": upcoming_reservations.count(), }, "active_session": _serialize_session(active_session) if active_session else None, "recharge_history": [ _serialize_session(session) for session in sessions_qs[:5] ], "reservations_history": [ ReservationSerializer(reservation).data for reservation in recent_reservations ], "upcoming_reservations": [ ReservationSerializer(reservation).data for reservation in upcoming_reservations[:5] ], } )
[docs] @_schema("Lire la synthese administrateur", tags=["Administration"]) @api_view(["GET"]) @permission_classes([IsAuthenticated, IsStaffUser]) def admin_overview(request): """Return aggregate administration metrics and recent records.""" return Response( { "stats": { "users_total": User.objects.count(), "active_users": User.objects.filter(is_active=True).count(), "stations_total": Station.objects.count(), "bornes_total": Borne.objects.count(), "emplacements_total": Emplacement.objects.count(), "reservations_total": Reservation.objects.count(), "reservations_upcoming": Reservation.objects.filter(date_debut__gte=timezone.now()).exclude(statut="annulee").count(), "reservations_cancelled": Reservation.objects.filter(statut="annulee").count(), "sessions_total": ChargingSession.objects.count(), "sessions_active": ChargingSession.objects.filter(end_time__isnull=True).count(), "sessions_archived": ChargingSession.objects.filter(end_time__isnull=False).count(), }, "recent_users": [ _serialize_user(user) for user in User.objects.select_related("profil", "technicien_profile").order_by("-date_joined")[:5] ], "recent_stations": [ _serialize_station(station) for station in Station.objects.order_by("-id_station")[:5] ], "recent_bornes": [ _serialize_borne(borne) for borne in Borne.objects.select_related("station").order_by("-id_borne")[:5] ], "recent_emplacements": [ _serialize_emplacement(emplacement) for emplacement in Emplacement.objects.select_related("station", "borne").order_by("-id_emplacement")[:5] ], } )
[docs] @_schema("Lire la supervision technique administrateur", tags=["Administration"]) @api_view(["GET"]) @permission_classes([IsAuthenticated, IsStaffUser]) def admin_supervision(request): """Return live supervision data for devices, alerts and sensors.""" return Response(build_admin_supervision_payload())
[docs] @_schema("Lire la supervision technique du technicien connecte", tags=["Technicien"]) @api_view(["GET"]) @permission_classes([IsAuthenticated]) def technician_supervision(request): """Return supervision data limited to the current technician station scope.""" if request.user.is_staff: return Response(build_admin_supervision_payload()) technicien = _technician_for_user(request.user) if technicien is None: return Response( {"detail": "Aucun profil technicien associe a ce compte."}, status=status.HTTP_404_NOT_FOUND, ) return Response(build_admin_supervision_payload(station_ids=_station_ids_for_technician(technicien)))
[docs] @_schema("Prendre en charge une alerte", tags=["Technicien"]) @api_view(["POST"]) @permission_classes([IsAuthenticated]) def technician_acknowledge_alert(request, alert_id): """Mark one scoped alert as taken in charge by the current technician.""" alert = alert_queryset().filter(id_alerte=alert_id).first() if alert is None: return Response({"detail": "Alerte introuvable."}, status=status.HTTP_404_NOT_FOUND) technicien, error_response = _technician_can_manage_alert(request.user, alert) if error_response is not None: return error_response try: alert = acknowledge_alert(alert, technicien) except AlertWorkflowError: return Response( {"detail": ALERT_WORKFLOW_ERROR_DETAIL}, status=status.HTTP_400_BAD_REQUEST, ) return Response({"alert": serialize_alert_supervision(alert)})
[docs] @_schema("Résoudre une alerte", tags=["Technicien"]) @api_view(["POST"]) @permission_classes([IsAuthenticated]) def technician_resolve_alert(request, alert_id): """Resolve one scoped alert after intervention or operational verification.""" alert = alert_queryset().filter(id_alerte=alert_id).first() if alert is None: return Response({"detail": "Alerte introuvable."}, status=status.HTTP_404_NOT_FOUND) technicien, error_response = _technician_can_manage_alert(request.user, alert) if error_response is not None: return error_response note = str(request.data.get("note") or "").strip() try: alert = resolve_alert(alert, technicien, note=note) except AlertWorkflowError: return Response( {"detail": ALERT_WORKFLOW_ERROR_DETAIL}, status=status.HTTP_400_BAD_REQUEST, ) return Response({"alert": serialize_alert_supervision(alert)})
[docs] @_schema("Lister les reservations administrateur", tags=["Administration"]) @api_view(["GET"]) @permission_classes([IsAuthenticated, IsStaffUser]) def admin_reservations(request): """Return upcoming and archived reservations for administrators.""" reservations_qs = ( Reservation.objects.select_related( "utilisateur", "emplacement", "emplacement__station", "emplacement__borne", "authentification", "authentification__type_auth", ) .order_by("-date_debut") ) now = timezone.now() upcoming = reservations_qs.filter(date_fin__gte=now).exclude(statut="annulee") archived = reservations_qs.filter(date_fin__lt=now) | reservations_qs.filter(statut="annulee") return Response( { "upcoming": [ReservationSerializer(reservation).data for reservation in upcoming], "archived": [ReservationSerializer(reservation).data for reservation in archived.distinct()], } )
[docs] @_schema("Lister les sessions de recharge administrateur", tags=["Administration"]) @api_view(["GET"]) @permission_classes([IsAuthenticated, IsStaffUser]) def admin_sessions(request): """Return active and archived charging sessions for administrators.""" sessions_qs = ( ChargingSession.objects.select_related( "reservation", "reservation__utilisateur", "emplacement", "emplacement__station", "emplacement__borne", ) .order_by("-start_time") ) active = sessions_qs.filter(end_time__isnull=True) archived = sessions_qs.filter(end_time__isnull=False) return Response( { "active": [_serialize_session(session) | { "utilisateur_pseudo": session.reservation.utilisateur.pseudo if session.reservation.utilisateur else "-", "reference_code": session.reservation.reference_code, } for session in active], "archived": [_serialize_session(session) | { "utilisateur_pseudo": session.reservation.utilisateur.pseudo if session.reservation.utilisateur else "-", "reference_code": session.reservation.reference_code, } for session in archived], } )
[docs] @_schema("Lister les utilisateurs administrateur", tags=["Administration"], operation_id="admin_users_list", methods=["GET"]) @_schema("Creer un utilisateur administrateur", request=AdminUserUpdateSerializer, tags=["Administration"], operation_id="admin_users_create", methods=["POST"]) @api_view(["GET", "POST"]) @permission_classes([IsAuthenticated, IsStaffUser]) def admin_users(request): """List users or create a user from the administration API.""" if request.method == "GET": users = User.objects.select_related("profil", "technicien_profile").order_by("-date_joined") return _list_response(request, users, serializer_class=AdminUserSerializer) payload = request.data required = ("pseudo", "email", "password") missing = [field for field in required if not payload.get(field)] if missing: return Response( {"detail": f"Champs manquants: {', '.join(missing)}"}, status=status.HTTP_400_BAD_REQUEST, ) if not request.user.is_superuser and any(_as_bool(payload.get(field, False)) for field in PRIVILEGED_USER_FIELDS): return Response( {"detail": "Seul un super-administrateur peut modifier les roles administrateur."}, status=status.HTTP_403_FORBIDDEN, ) is_superuser = _as_bool(payload.get("is_superuser", False)) is_staff = _as_bool(payload.get("is_staff", False)) or is_superuser user_status = payload.get("statut", UserStatus.ACTIVE) if user_status not in UserStatus.values: return Response( {"statut": ["Statut utilisateur invalide."]}, status=status.HTTP_400_BAD_REQUEST, ) if is_superuser: user = User.objects.create_superuser( pseudo=payload["pseudo"], email=payload["email"], password=payload["password"], statut=user_status, is_staff=is_staff, ) else: user = User.objects.create_user( pseudo=payload["pseudo"], email=payload["email"], password=payload["password"], statut=user_status, is_staff=is_staff, is_superuser=False, ) profile_name = payload.get("nom") or payload["pseudo"] ProfilUtilisateur.objects.create( utilisateur=user, nom=profile_name, telephone=payload.get("telephone", ""), ) return Response(AdminUserSerializer(user).data, status=status.HTTP_201_CREATED)
[docs] @_schema("Lire un utilisateur", tags=["Administration"], operation_id="admin_user_retrieve", methods=["GET"]) @_schema("Modifier un utilisateur", request=AdminUserUpdateSerializer, tags=["Administration"], operation_id="admin_user_update", methods=["PATCH"]) @_schema("Supprimer un utilisateur", responses={204: None}, tags=["Administration"], operation_id="admin_user_delete", methods=["DELETE"]) @api_view(["GET", "PATCH", "DELETE"]) @permission_classes([IsAuthenticated, IsStaffUser]) def admin_user_detail(request, user_id): """Retrieve, update or delete one user from the administration API.""" try: user = User.objects.select_related("profil", "technicien_profile").get(id_user=user_id) except User.DoesNotExist: return Response({"detail": "Utilisateur introuvable."}, status=status.HTTP_404_NOT_FOUND) if request.method == "GET": return Response(AdminUserSerializer(user).data) if request.method == "PATCH": role_error = _forbid_privileged_role_change_response(request, request.data) if role_error is not None: return role_error serializer = AdminUserUpdateSerializer( instance=user, data=request.data, partial=True, ) serializer.is_valid(raise_exception=True) serializer.save() return Response(AdminUserSerializer(user).data) user.delete() return Response(status=status.HTTP_204_NO_CONTENT)
[docs] @_schema("Generer un mot de passe temporaire utilisateur", tags=["Administration"]) @api_view(["POST"]) @permission_classes([IsAuthenticated, IsStaffUser]) def admin_reset_user_password(request, user_id): """Generate and set a new temporary password for a user.""" try: user = User.objects.get(id_user=user_id) except User.DoesNotExist: return Response({"detail": "Utilisateur introuvable."}, status=status.HTTP_404_NOT_FOUND) temporary_password = _generate_temporary_password(user) user.set_password(temporary_password) user.save(update_fields=["password"]) _log_security_event( request, user, f"admin:{request.user.pseudo}:password_reset", ) return Response( { "message": "Mot de passe temporaire genere.", "temporary_password": temporary_password, } )
[docs] @_schema("Lister ou creer les badges RFID d'un utilisateur", request=OpenApiTypes.OBJECT, tags=["Administration"]) @api_view(["GET", "POST"]) @permission_classes([IsAuthenticated, IsStaffUser]) def admin_user_auth_methods(request, user_id): """List active RFID badges for a user, or register a replacement badge.""" try: user = User.objects.get(id_user=user_id) except User.DoesNotExist: return Response({"detail": "Utilisateur introuvable."}, status=status.HTTP_404_NOT_FOUND) if request.method == "GET": methods = ( Authentification.objects.filter(utilisateur=user, type_auth__libelle="RFID", is_active=True) .exclude(display_value="") .select_related("type_auth") .order_by("-expiration", "-created_at", "-id_auth") ) return _list_response(request, methods, serializer_func=_serialize_admin_auth_method) auth_value = (request.data.get("auth_value") or request.data.get("value") or "").strip() try: auth_method = _ensure_rfid_auth(user, auth_value) except RfidCredentialConflict: return Response({"auth_value": "Ce badge RFID est deja associe a un autre compte."}, status=status.HTTP_409_CONFLICT) except ValueError: return Response({"auth_value": "Badge RFID invalide."}, status=status.HTTP_400_BAD_REQUEST) now = timezone.now() Authentification.objects.filter( utilisateur=user, type_auth__libelle="RFID", is_active=True, ).exclude(id_auth=auth_method.id_auth).update( is_active=False, expiration=now, metadata={ "revoked_at": now.isoformat(), "revoked_by": str(request.user.id_user), "revocation_source": "admin_replacement", }, ) _log_security_event( request, user, f"admin:{request.user.pseudo}:rfid_registered:{auth_method.id_auth}", ) return Response( { "message": "Badge RFID enregistre.", "auth_method": _serialize_admin_auth_method(auth_method), }, status=status.HTTP_201_CREATED, )
[docs] @_schema("Revoquer un badge RFID utilisateur", responses={204: None}, tags=["Administration"]) @api_view(["POST", "DELETE"]) @permission_classes([IsAuthenticated, IsStaffUser]) def admin_revoke_user_auth_method(request, user_id, auth_id): """Soft-revoke an active RFID badge from the administration API.""" try: user = User.objects.get(id_user=user_id) except User.DoesNotExist: return Response({"detail": "Utilisateur introuvable."}, status=status.HTTP_404_NOT_FOUND) auth_method = ( Authentification.objects.filter( id_auth=auth_id, utilisateur=user, type_auth__libelle="RFID", is_active=True, ) .select_related("type_auth") .first() ) if auth_method is None: return Response({"detail": "Badge RFID introuvable."}, status=status.HTTP_404_NOT_FOUND) auth_method.is_active = False auth_method.expiration = timezone.now() auth_method.metadata = { **(auth_method.metadata or {}), "revoked_at": timezone.now().isoformat(), "revoked_by": str(request.user.id_user), "revocation_source": "admin", } auth_method.save(update_fields=["is_active", "expiration", "metadata"]) _log_security_event( request, user, f"admin:{request.user.pseudo}:rfid_revoked:{auth_method.id_auth}", ) return Response(status=status.HTTP_204_NO_CONTENT)
[docs] @_schema("Lister les stations", tags=["Administration"], operation_id="admin_stations_list", methods=["GET"]) @_schema("Creer une station", request=StationSerializer, tags=["Administration"], operation_id="admin_station_create", methods=["POST"]) @api_view(["GET", "POST"]) @permission_classes([IsAuthenticated, IsStaffUser]) def admin_stations(request): """List stations or create a station from the administration API.""" if request.method == "GET": stations = Station.objects.order_by("nom") return _list_response(request, stations, serializer_class=StationSerializer) serializer = StationSerializer(data=request.data) serializer.is_valid(raise_exception=True) station = serializer.save() _clear_station_catalog_cache() return Response(StationSerializer(station).data, status=status.HTTP_201_CREATED)
[docs] @_schema("Lire une station", tags=["Administration"], operation_id="admin_station_retrieve", methods=["GET"]) @_schema("Modifier une station", request=StationSerializer, tags=["Administration"], operation_id="admin_station_update", methods=["PATCH"]) @_schema("Supprimer une station", responses={204: None}, tags=["Administration"], operation_id="admin_station_delete", methods=["DELETE"]) @api_view(["GET", "PATCH", "DELETE"]) @permission_classes([IsAuthenticated, IsStaffUser]) def admin_station_detail(request, station_id): """Retrieve, update or delete one station from the administration API.""" try: station = Station.objects.get(id_station=station_id) except Station.DoesNotExist: return Response({"detail": "Station introuvable."}, status=status.HTTP_404_NOT_FOUND) if request.method == "GET": return Response(StationSerializer(station).data) if request.method == "PATCH": serializer = StationSerializer(instance=station, data=request.data, partial=True) serializer.is_valid(raise_exception=True) serializer.save() _clear_station_catalog_cache() return Response(StationSerializer(station).data) _clear_station_catalog_cache() station.delete() return Response(status=status.HTTP_204_NO_CONTENT)
[docs] @_schema("Lister les bornes", tags=["Administration"], operation_id="admin_bornes_list", methods=["GET"]) @_schema("Creer une borne", request=BorneSerializer, tags=["Administration"], operation_id="admin_borne_create", methods=["POST"]) @api_view(["GET", "POST"]) @permission_classes([IsAuthenticated, IsStaffUser]) def admin_bornes(request): """List physical charging cabinets or create one from the administration API.""" if request.method == "GET": bornes = Borne.objects.select_related("station").prefetch_related("emplacements").order_by("station__nom", "numero") return _list_response(request, bornes, serializer_class=BorneSerializer) serializer = BorneSerializer(data=request.data) serializer.is_valid(raise_exception=True) borne = serializer.save() _clear_station_catalog_cache() return Response(BorneSerializer(borne).data, status=status.HTTP_201_CREATED)
[docs] @_schema("Lire une borne", tags=["Administration"], operation_id="admin_borne_retrieve", methods=["GET"]) @_schema("Modifier une borne", request=BorneSerializer, tags=["Administration"], operation_id="admin_borne_update", methods=["PATCH"]) @_schema("Supprimer une borne", responses={204: None}, tags=["Administration"], operation_id="admin_borne_delete", methods=["DELETE"]) @api_view(["GET", "PATCH", "DELETE"]) @permission_classes([IsAuthenticated, IsStaffUser]) def admin_borne_detail(request, borne_id): """Retrieve, update or delete one physical charging cabinet.""" try: borne = Borne.objects.select_related("station").prefetch_related("emplacements").get(id_borne=borne_id) except Borne.DoesNotExist: return Response({"detail": "Borne introuvable."}, status=status.HTTP_404_NOT_FOUND) if request.method == "GET": return Response(BorneSerializer(borne).data) if request.method == "PATCH": serializer = BorneSerializer(instance=borne, data=request.data, partial=True) serializer.is_valid(raise_exception=True) serializer.save() _clear_station_catalog_cache() return Response(BorneSerializer(borne).data) _clear_station_catalog_cache() borne.delete() return Response(status=status.HTTP_204_NO_CONTENT)
[docs] @_schema("Lister les emplacements", tags=["Administration"], operation_id="admin_emplacements_list", methods=["GET"]) @_schema("Creer un emplacement", request=EmplacementSerializer, tags=["Administration"], operation_id="admin_emplacement_create", methods=["POST"]) @api_view(["GET", "POST"]) @permission_classes([IsAuthenticated, IsStaffUser]) def admin_emplacements(request): """List charging bays or create one from the administration API.""" if request.method == "GET": emplacements = Emplacement.objects.select_related("station", "borne").order_by("station__nom", "borne__numero", "numero") return _list_response(request, emplacements, serializer_class=EmplacementSerializer) serializer = EmplacementSerializer(data=request.data) serializer.is_valid(raise_exception=True) emplacement = serializer.save() _clear_station_catalog_cache() return Response(EmplacementSerializer(emplacement).data, status=status.HTTP_201_CREATED)
[docs] @_schema("Lire un emplacement", tags=["Administration"], operation_id="admin_emplacement_retrieve", methods=["GET"]) @_schema("Modifier un emplacement", request=EmplacementSerializer, tags=["Administration"], operation_id="admin_emplacement_update", methods=["PATCH"]) @_schema("Supprimer un emplacement", responses={204: None}, tags=["Administration"], operation_id="admin_emplacement_delete", methods=["DELETE"]) @api_view(["GET", "PATCH", "DELETE"]) @permission_classes([IsAuthenticated, IsStaffUser]) def admin_emplacement_detail(request, emplacement_id): """Retrieve, update or delete one charging bay from the admin API.""" try: emplacement = Emplacement.objects.select_related("station", "borne").get(id_emplacement=emplacement_id) except Emplacement.DoesNotExist: return Response({"detail": "Emplacement introuvable."}, status=status.HTTP_404_NOT_FOUND) if request.method == "GET": return Response(EmplacementSerializer(emplacement).data) if request.method == "PATCH": serializer = EmplacementSerializer(instance=emplacement, data=request.data, partial=True) serializer.is_valid(raise_exception=True) serializer.save() _clear_station_catalog_cache() return Response(EmplacementSerializer(emplacement).data) _clear_station_catalog_cache() emplacement.delete() return Response(status=status.HTTP_204_NO_CONTENT)
def _device_queryset(): """Return ESP32 devices with their physical location loaded.""" return Device.objects.select_related( "emplacement", "emplacement__station", "emplacement__borne", ).order_by( "emplacement__station__nom", "emplacement__borne__numero", "emplacement__numero", )
[docs] @_schema("Lister les ESP32", tags=["Administration"], operation_id="admin_devices_list") @api_view(["GET"]) @permission_classes([IsAuthenticated, IsStaffUser]) def admin_devices(request): """List embedded devices with inventory and firmware policy fields.""" return _list_response( request, _device_queryset(), serializer_class=DeviceAdminSerializer, )
[docs] @_schema("Lire un ESP32", tags=["Administration"], operation_id="admin_device_retrieve", methods=["GET"]) @_schema("Modifier la politique firmware d'un ESP32", request=DeviceAdminSerializer, tags=["Administration"], operation_id="admin_device_update", methods=["PATCH"]) @api_view(["GET", "PATCH"]) @permission_classes([IsAuthenticated, IsStaffUser]) def admin_device_detail(request, device_id): """Retrieve or update one ESP32 inventory record.""" try: device = _device_queryset().get(id_device=device_id) except Device.DoesNotExist: return Response({"detail": "ESP32 introuvable."}, status=status.HTTP_404_NOT_FOUND) if request.method == "GET": return Response(DeviceAdminSerializer(device).data) serializer = DeviceAdminSerializer(instance=device, data=request.data, partial=True) serializer.is_valid(raise_exception=True) device = serializer.save() update_fields = [] update_firmware_compliance_status(device, update_fields=update_fields) if update_fields: device.save(update_fields=list(dict.fromkeys(update_fields))) return Response(DeviceAdminSerializer(device).data)
[docs] @_schema("Planifier une mise a jour OTA ESP32", request=OpenApiTypes.OBJECT, tags=["Administration"]) @api_view(["POST"]) @permission_classes([IsAuthenticated, IsStaffUser]) def admin_device_schedule_ota(request, device_id): """Record an admin-approved firmware update target for one ESP32.""" try: device = _device_queryset().get(id_device=device_id) except Device.DoesNotExist: return Response({"detail": "ESP32 introuvable."}, status=status.HTTP_404_NOT_FOUND) try: schedule_ota_update( device, target_version=request.data.get("target_version"), update_url=request.data.get("update_url", ""), ) except ValueError: return Response({"target_version": "La version cible du firmware est obligatoire."}, status=status.HTTP_400_BAD_REQUEST) device = _device_queryset().get(id_device=device_id) return Response(DeviceAdminSerializer(device).data)
[docs] @_schema("Lister les moyens d'authentification", tags=["Compte"]) @api_view(["GET"]) @permission_classes([IsAuthenticated]) def auth_methods(request): """List saved RFID authentication methods for the authenticated user.""" if request.method == "GET": methods = ( Authentification.objects.filter(utilisateur=request.user, type_auth__libelle="RFID", is_active=True) .exclude(display_value="") .select_related("type_auth") .order_by("-expiration", "-id_auth") ) return _list_response(request, methods, serializer_func=_serialize_auth_method) return Response(status=status.HTTP_405_METHOD_NOT_ALLOWED)
[docs] @_schema("Creer un moyen d'authentification", request=OpenApiTypes.OBJECT, tags=["Compte"]) @api_view(["POST"]) @permission_classes([IsAuthenticated]) @throttle_classes([AuthFactorRateThrottle]) def create_auth_method(request): """Create or refresh a saved RFID authentication method for the user.""" auth_mode = (request.data.get("auth_mode") or request.data.get("type") or "").strip().lower() auth_value = (request.data.get("auth_value") or request.data.get("value") or "").strip() if auth_mode != "rfid": return Response( {"detail": "Seul RFID peut etre cree manuellement depuis le profil."}, status=status.HTTP_400_BAD_REQUEST, ) try: auth_method = _ensure_rfid_auth(request.user, auth_value) except RfidCredentialConflict: return Response({"auth_value": "Ce badge RFID est deja associe a un autre compte."}, status=status.HTTP_409_CONFLICT) except ValueError: return Response({"auth_value": "Badge RFID invalide."}, status=status.HTTP_400_BAD_REQUEST) return Response( { "message": "Moyen d'authentification enregistre.", "auth_method": _serialize_auth_method(auth_method), "clear_value": auth_value, }, status=status.HTTP_201_CREATED, )
[docs] @_schema("Supprimer un moyen d'authentification", responses={204: None}, tags=["Compte"]) @api_view(["DELETE"]) @permission_classes([IsAuthenticated]) def delete_auth_method(request, auth_id): """Delete a saved RFID authentication method owned by the user.""" auth_method = Authentification.objects.filter( id_auth=auth_id, utilisateur=request.user, type_auth__libelle="RFID", is_active=True, ).first() if auth_method is None: return Response({"detail": "Badge RFID introuvable."}, status=status.HTTP_404_NOT_FOUND) auth_method.is_active = False auth_method.expiration = timezone.now() auth_method.metadata = { **(auth_method.metadata or {}), "revoked_at": timezone.now().isoformat(), "revocation_source": "self_service", } auth_method.save(update_fields=["is_active", "expiration", "metadata"]) _log_security_event(request, request.user, f"user:{request.user.pseudo}:rfid_revoked:{auth_method.id_auth}") return Response(status=status.HTTP_204_NO_CONTENT)
[docs] @_schema("Lister l'historique d'authentification", tags=["Compte"]) @api_view(["GET"]) @permission_classes([IsAuthenticated]) def auth_history(request): """Return the reservation authentication history for the user.""" reservations_qs = ( Reservation.objects.filter(utilisateur=request.user) .select_related( "emplacement", "emplacement__station", "emplacement__borne", "authentification", "authentification__type_auth", ) .order_by("-date_debut") ) history = [] for reservation in reservations_qs: auth_method = reservation.authentification history.append( { "id_reservation": str(reservation.id_reservation), "reference_code": reservation.reference_code, "station_nom": reservation.emplacement.station.nom, "borne_nom": reservation.emplacement.borne.nom, "borne_numero": reservation.emplacement.borne.numero, "emplacement": reservation.emplacement.numero, "auth_mode": reservation.auth_choice, "auth_type": auth_method.type_auth.libelle if auth_method and auth_method.type_auth else reservation.auth_choice.upper(), "auth_value": auth_method.display_value if auth_method else "", "auth_expiration": auth_method.expiration.isoformat() if auth_method else None, "auth_source": "generated" if reservation.auth_choice in {"qr", "code"} else "saved", "date_debut": reservation.date_debut.isoformat(), "date_fin": reservation.date_fin.isoformat(), "statut": reservation.statut, } ) return _list_response(request, history, serializer_func=lambda item: item)
[docs] @_schema( "Lister les emplacements disponibles pour une reservation", parameters=[ OpenApiParameter("station", OpenApiTypes.UUID, OpenApiParameter.QUERY, required=True), OpenApiParameter("date_debut", OpenApiTypes.DATETIME, OpenApiParameter.QUERY, required=True), OpenApiParameter("date_fin", OpenApiTypes.DATETIME, OpenApiParameter.QUERY, required=True), ], tags=["Reservations"], ) @api_view(["GET"]) @permission_classes([IsAuthenticated]) def reservation_availability(request): """List charging bays available for one station and time slot.""" station_id = request.query_params.get("station") date_debut = request.query_params.get("date_debut") date_fin = request.query_params.get("date_fin") if not station_id: return Response({"detail": "Le parametre station est obligatoire."}, status=status.HTTP_400_BAD_REQUEST) if not date_debut or not date_fin: return Response({"detail": "Les dates sont obligatoires."}, status=status.HTTP_400_BAD_REQUEST) try: date_debut_dt = datetime.fromisoformat(date_debut.replace("Z", "+00:00")) date_fin_dt = datetime.fromisoformat(date_fin.replace("Z", "+00:00")) except ValueError: return Response({"detail": "Format de date invalide."}, status=status.HTTP_400_BAD_REQUEST) if not _round_to_thirty_minutes(date_debut_dt) or not _round_to_thirty_minutes(date_fin_dt): return Response( {"detail": "Les horaires doivent etre alignes sur des tranches de 30 minutes."}, status=status.HTTP_400_BAD_REQUEST, ) if date_debut_dt < timezone.now(): return Response( {"detail": "La reservation doit commencer dans le futur."}, status=status.HTTP_400_BAD_REQUEST, ) if not _duration_is_valid(date_debut_dt, date_fin_dt): return Response( {"detail": reservation_duration_error_message()}, status=status.HTTP_400_BAD_REQUEST, ) station = Station.objects.filter(id_station=station_id).first() if not station: return Response({"detail": "Station introuvable."}, status=status.HTTP_404_NOT_FOUND) available_emplacements = _available_emplacements(station, date_debut_dt, date_fin_dt) return Response( { "station": _serialize_station(station), "available_count": len(available_emplacements), "total_count": station.emplacements.count(), "available": bool(available_emplacements), "duration_minutes": reservation_duration_minutes(date_debut_dt, date_fin_dt), "allowed_durations_minutes": list(RESERVATION_DURATION_MINUTES), "available_emplacements": [ _serialize_emplacement(emplacement) for emplacement in available_emplacements ], } )
[docs] @_schema("Lister le catalogue des stations", tags=["Stations"]) @api_view(["GET"]) @permission_classes([IsAuthenticated]) def stations_catalog(request): """Return stations with their charging bays for reservation screens.""" payload = cache.get(STATIONS_CATALOG_CACHE_KEY) if payload is None: stations = ( Station.objects.prefetch_related( Prefetch( "bornes", queryset=Borne.objects.prefetch_related("emplacements").order_by("numero"), ), Prefetch( "emplacements", queryset=Emplacement.objects.select_related("station", "borne").order_by("borne__numero", "numero"), ) ) .order_by("nom") ) payload = { "results": [ { **_serialize_station(station), "bornes": [ _serialize_borne(borne) for borne in station.bornes.all() ], "emplacements": [ _serialize_emplacement(emplacement) for emplacement in station.emplacements.all() ], } for station in stations ] } cache.set( STATIONS_CATALOG_CACHE_KEY, payload, timeout=settings.STATIONS_CATALOG_CACHE_SECONDS, ) return Response(payload)
[docs] @_schema("Lister ou creer des reservations", request=ReservationCreateSerializer, tags=["Reservations"]) @api_view(["GET", "POST"]) @permission_classes([IsAuthenticated]) def reservations(request): """List user reservations or create a new reservation with access data.""" if request.method == "GET": queryset = ( Reservation.objects.filter(utilisateur=request.user) .select_related("emplacement", "emplacement__station", "emplacement__borne", "authentification", "authentification__type_auth") .order_by("-date_debut") ) return _list_response(request, queryset, serializer_class=ReservationSerializer) serializer = ReservationCreateSerializer(data=request.data) serializer.is_valid(raise_exception=True) data = serializer.validated_data if not _round_to_thirty_minutes(data["date_debut"]) or not _round_to_thirty_minutes(data["date_fin"]): return Response( {"detail": "Les horaires doivent etre alignes sur des tranches de 30 minutes."}, status=status.HTTP_400_BAD_REQUEST, ) if data["date_debut"] < timezone.now(): return Response( {"detail": "La reservation doit commencer dans le futur."}, status=status.HTTP_400_BAD_REQUEST, ) if not _duration_is_valid(data["date_debut"], data["date_fin"]): return Response( {"detail": reservation_duration_error_message()}, status=status.HTTP_400_BAD_REQUEST, ) with transaction.atomic(): station = Station.objects.select_for_update().filter(id_station=data["station"]).first() if not station: return Response({"detail": "Station introuvable."}, status=status.HTTP_404_NOT_FOUND) requested_emplacement_id = data.get("emplacement") if requested_emplacement_id: emplacement = Emplacement.objects.select_related("station").filter( id_emplacement=requested_emplacement_id, station=station, ).first() if not emplacement: return Response( {"detail": "L'emplacement ne correspond pas a la station choisie."}, status=status.HTTP_400_BAD_REQUEST, ) if _reservation_overlaps(emplacement, data["date_debut"], data["date_fin"]): return Response( {"detail": "Cet emplacement n'est pas disponible sur cette plage horaire."}, status=status.HTTP_409_CONFLICT, ) else: available_emplacements = _available_emplacements(station, data["date_debut"], data["date_fin"]) if not available_emplacements: return Response( {"detail": "Aucun emplacement disponible dans cette station sur ce creneau."}, status=status.HTTP_409_CONFLICT, ) emplacement = available_emplacements[0] auth_mode = data["auth_mode"].lower() auth_method = None clear_value = None reservation_code = _generate_reservation_code() if auth_mode == "rfid": auth_method = _get_saved_rfid_auth(request.user) if auth_method is None: return Response( {"auth_value": "Ajoute d'abord un badge RFID dans ton profil."}, status=status.HTTP_400_BAD_REQUEST, ) elif auth_mode == "code" and not _user_phone_number(request.user): return Response( {"telephone": "Un numéro de téléphone est obligatoire pour recevoir un code SMS."}, status=status.HTTP_400_BAD_REQUEST, ) elif ( auth_mode == "code" and getattr(settings, "SMS_REQUIRE_VERIFIED_PHONE", False) and not _user_phone_is_verified(request.user) ): return Response( {"telephone": "Le numéro de téléphone doit être vérifié pour choisir le mode code SMS."}, status=status.HTTP_400_BAD_REQUEST, ) else: auth_method, clear_value = _create_temporary_auth( request.user, auth_mode, data["date_fin"], reservation_code, ) reservation = Reservation.objects.create( utilisateur=request.user, emplacement=emplacement, reference_code=reservation_code, authentification=auth_method, auth_choice=auth_mode, date_debut=data["date_debut"], date_fin=data["date_fin"], statut="confirmee", ) response = { "message": "Reservation creee.", "reservation": ReservationSerializer(reservation).data, } if clear_value: access_payload = _build_access_payload( auth_mode, clear_value, auth_method, data["date_fin"], ) response["access_payload"] = access_payload if auth_mode == "qr": _schedule_reservation_qr_email(request, reservation, auth_method, access_payload) response["reservation_code"] = reservation.reference_code return Response(response, status=status.HTTP_201_CREATED)
[docs] @_schema("Annuler une reservation", tags=["Reservations"]) @api_view(["POST"]) @permission_classes([IsAuthenticated]) def cancel_reservation(request, reservation_id): """Cancel one reservation owned by the authenticated user.""" try: reservation = Reservation.objects.select_related("utilisateur", "emplacement", "emplacement__station", "emplacement__borne").get( id_reservation=reservation_id, utilisateur=request.user, ) except Reservation.DoesNotExist: return Response({"detail": "Reservation introuvable."}, status=status.HTTP_404_NOT_FOUND) if not _cancel_reservation(reservation): return Response( {"detail": "Cette reservation ne peut plus etre annulee."}, status=status.HTTP_400_BAD_REQUEST, ) return Response(ReservationSerializer(reservation).data)
[docs] @_schema("Afficher l'acces temporaire d'une reservation", tags=["Reservations"]) @api_view(["POST"]) @permission_classes([IsAuthenticated]) def reservation_access_payload(request, reservation_id): """Return the current temporary QR or code access for an active reservation.""" try: reservation = Reservation.objects.select_related( "utilisateur", "emplacement", "emplacement__station", "emplacement__borne", "authentification", "authentification__type_auth", ).get( id_reservation=reservation_id, utilisateur=request.user, ) except Reservation.DoesNotExist: return Response({"detail": "Reservation introuvable."}, status=status.HTTP_404_NOT_FOUND) if reservation.auth_choice not in {"qr", "code"}: return Response( {"detail": "Cette reservation utilise un badge RFID."}, status=status.HTTP_400_BAD_REQUEST, ) if reservation.statut in {"annulee", "terminee"} or reservation.date_fin < timezone.now(): return Response( {"detail": "L'acces temporaire ne peut plus etre regenere pour cette reservation."}, status=status.HTTP_400_BAD_REQUEST, ) auth_method, access_value = _get_temporary_access_value(reservation) if auth_method is None or access_value is None: return Response( {"detail": "L'acces temporaire ne peut pas etre affiche. Regenerez-le depuis l'administration."}, status=status.HTTP_400_BAD_REQUEST, ) return Response( { "reservation_code": reservation.reference_code, "access_payload": _build_access_payload( reservation.auth_choice, access_value, auth_method, reservation.date_fin, ), } )
[docs] @_schema("Envoyer le code SMS d'une reservation", tags=["Reservations"]) @api_view(["POST"]) @permission_classes([IsAuthenticated]) @throttle_classes([SmsRateThrottle]) def send_reservation_access_code(request, reservation_id): """Send the current temporary reservation code by SMS.""" try: reservation = Reservation.objects.select_related( "utilisateur", "utilisateur__profil", "emplacement", "emplacement__station", "emplacement__borne", "authentification", "authentification__type_auth", ).get( id_reservation=reservation_id, utilisateur=request.user, ) except Reservation.DoesNotExist: return Response({"detail": "Reservation introuvable."}, status=status.HTTP_404_NOT_FOUND) if reservation.auth_choice != "code": return Response( {"detail": "Cette reservation n'utilise pas de code SMS."}, status=status.HTTP_400_BAD_REQUEST, ) if reservation.statut in {"annulee", "terminee"} or reservation.date_fin < timezone.now(): return Response( {"detail": "Le code SMS ne peut plus être envoyé pour cette réservation."}, status=status.HTTP_400_BAD_REQUEST, ) auth_method, access_value = _get_temporary_access_value(reservation) if auth_method is None or access_value is None: return Response( {"detail": "Le code SMS ne peut pas être envoyé. Régénérez l'accès depuis l'administration."}, status=status.HTTP_400_BAD_REQUEST, ) delivery, error_response = _send_reservation_sms_code(request, reservation, auth_method, access_value) if error_response is not None: return error_response return Response( { "reservation_code": reservation.reference_code, "message": "Code SMS envoyé.", "sms": { "sent_at": delivery["sent_at"].isoformat(), "recipient_hint": delivery["recipient_hint"], "backend": delivery["backend"], }, "access_payload": _build_access_payload( reservation.auth_choice, access_value, auth_method, reservation.date_fin, ), } )
[docs] @_schema("Annuler une reservation en administration", tags=["Administration"]) @api_view(["POST"]) @permission_classes([IsAuthenticated, IsStaffUser]) def admin_cancel_reservation(request, reservation_id): """Cancel any cancellable reservation from the administration API.""" try: reservation = Reservation.objects.select_related("utilisateur", "emplacement", "emplacement__station", "emplacement__borne").get( id_reservation=reservation_id, ) except Reservation.DoesNotExist: return Response({"detail": "Reservation introuvable."}, status=status.HTTP_404_NOT_FOUND) if not _cancel_reservation(reservation): return Response( {"detail": "Cette reservation ne peut plus etre annulee."}, status=status.HTTP_400_BAD_REQUEST, ) return Response(ReservationSerializer(reservation).data)
[docs] @_schema("Regenerer l'acces temporaire d'une reservation en administration", tags=["Administration"]) @api_view(["POST"]) @permission_classes([IsAuthenticated, IsStaffUser]) def admin_reservation_access_payload(request, reservation_id): """Regenerate a temporary QR or code credential from the administration API.""" try: reservation = Reservation.objects.select_related( "utilisateur", "emplacement", "emplacement__station", "emplacement__borne", "authentification", "authentification__type_auth", ).get(id_reservation=reservation_id) except Reservation.DoesNotExist: return Response({"detail": "Reservation introuvable."}, status=status.HTTP_404_NOT_FOUND) if not _temporary_access_can_be_rotated(reservation): return Response( {"detail": "L'acces temporaire ne peut pas etre regenere pour cette reservation."}, status=status.HTTP_400_BAD_REQUEST, ) _refresh_temporary_auth(reservation) if reservation.utilisateur: _log_security_event( request, reservation.utilisateur, f"admin:{request.user.pseudo}:temporary_access_regenerated:{reservation.reference_code}", ) return Response( { "reservation": ReservationSerializer(reservation).data, "message": "Acces temporaire regenere.", } )
[docs] @_schema("Revoquer l'acces temporaire d'une reservation en administration", tags=["Administration"]) @api_view(["POST", "DELETE"]) @permission_classes([IsAuthenticated, IsStaffUser]) def admin_revoke_reservation_access(request, reservation_id): """Revoke the temporary credential attached to a QR/code reservation.""" try: reservation = Reservation.objects.select_related( "utilisateur", "emplacement", "emplacement__station", "emplacement__borne", "authentification", "authentification__type_auth", ).get(id_reservation=reservation_id) except Reservation.DoesNotExist: return Response({"detail": "Reservation introuvable."}, status=status.HTTP_404_NOT_FOUND) if reservation.auth_choice not in {"qr", "code"}: return Response( {"detail": "Cette reservation utilise un badge RFID."}, status=status.HTTP_400_BAD_REQUEST, ) if reservation.authentification is None or not reservation.authentification.is_active: return Response({"detail": "Aucun acces temporaire actif."}, status=status.HTTP_404_NOT_FOUND) auth_method = reservation.authentification auth_method.is_active = False auth_method.expiration = timezone.now() auth_method.metadata = { **(auth_method.metadata or {}), "revoked_at": timezone.now().isoformat(), "revoked_by": str(request.user.id_user), "revocation_source": "admin", } auth_method.save(update_fields=["is_active", "expiration", "metadata"]) if reservation.utilisateur: _log_security_event( request, reservation.utilisateur, f"admin:{request.user.pseudo}:temporary_access_revoked:{reservation.reference_code}", ) return Response(ReservationSerializer(reservation).data)
[docs] @_schema("Lister les maintenances du technicien connecte", tags=["Technicien"]) @api_view(["GET"]) @permission_classes([IsAuthenticated]) def technician_maintenances(request): """List maintenance interventions assigned to the current technician.""" technicien = _technician_for_user(request.user) if technicien is None and not request.user.is_staff: return Response( {"detail": "Aucun profil technicien associe a ce compte."}, status=status.HTTP_404_NOT_FOUND, ) maintenances = Maintenance.objects.select_related("station", "technicien").prefetch_related("station__emplacements") if technicien is not None: maintenances = maintenances.filter(technicien=technicien) maintenances = maintenances.order_by("date_planifiee") return _list_response(request, maintenances, serializer_class=TechnicianMaintenanceSerializer)
[docs] @_schema("Lister les techniciens", tags=["Administration"], operation_id="admin_technicians_list", methods=["GET"]) @_schema("Creer un technicien", request=TechnicianSerializer, tags=["Administration"], operation_id="admin_technician_create", methods=["POST"]) @api_view(["GET", "POST"]) @permission_classes([IsAuthenticated, IsStaffUser]) def admin_technicians(request): """List technicians or create a technician from the administration API.""" if request.method == "GET": technicians = ( Technicien.objects.select_related("utilisateur") .prefetch_related(_active_station_assignments_prefetch()) .order_by("nom") ) return _list_response(request, technicians, serializer_class=TechnicianSerializer) serializer = TechnicianSerializer(data=request.data) serializer.is_valid(raise_exception=True) utilisateur = None utilisateur_id = request.data.get("utilisateur") if utilisateur_id: utilisateur = User.objects.filter(id_user=utilisateur_id).first() if utilisateur is None: return Response({"utilisateur": "Utilisateur introuvable."}, status=status.HTTP_404_NOT_FOUND) technician = Technicien.objects.create( utilisateur=utilisateur, nom=serializer.validated_data["nom"], email=serializer.validated_data["email"], telephone=serializer.validated_data.get("telephone", ""), ) return Response(TechnicianSerializer(technician).data, status=status.HTTP_201_CREATED)
[docs] @_schema("Lire un technicien", tags=["Administration"], operation_id="admin_technician_retrieve", methods=["GET"]) @_schema("Modifier un technicien", request=TechnicianSerializer, tags=["Administration"], operation_id="admin_technician_update", methods=["PATCH"]) @_schema("Supprimer un technicien", responses={204: None}, tags=["Administration"], operation_id="admin_technician_delete", methods=["DELETE"]) @api_view(["GET", "PATCH", "DELETE"]) @permission_classes([IsAuthenticated, IsStaffUser]) def admin_technician_detail(request, technician_id): """Retrieve, update or delete one technician from the admin API.""" try: technician = ( Technicien.objects.select_related("utilisateur") .prefetch_related(_active_station_assignments_prefetch()) .get(id_technicien=technician_id) ) except Technicien.DoesNotExist: return Response({"detail": "Technicien introuvable."}, status=status.HTTP_404_NOT_FOUND) if request.method == "GET": return Response(TechnicianSerializer(technician).data) if request.method == "DELETE": technician.delete() return Response(status=status.HTTP_204_NO_CONTENT) serializer = TechnicianSerializer(instance=technician, data=request.data, partial=True) serializer.is_valid(raise_exception=True) utilisateur_id = request.data.get("utilisateur") utilisateur = None if utilisateur_id: utilisateur = User.objects.filter(id_user=utilisateur_id).first() if utilisateur is None: return Response({"utilisateur": "Utilisateur introuvable."}, status=status.HTTP_404_NOT_FOUND) technician.nom = serializer.validated_data.get("nom", technician.nom) technician.email = serializer.validated_data.get("email", technician.email) technician.telephone = serializer.validated_data.get("telephone", technician.telephone) if utilisateur is not None: technician.utilisateur = utilisateur technician.save() return Response(TechnicianSerializer(technician).data)
[docs] @_schema( "Lister les affectations technicien-station", tags=["Administration"], operation_id="admin_technician_station_assignments_list", methods=["GET"], ) @_schema( "Affecter une station a un technicien", request=TechnicianStationAssignmentSerializer, tags=["Administration"], operation_id="admin_technician_station_assignment_create", methods=["POST"], ) @api_view(["GET", "POST"]) @permission_classes([IsAuthenticated, IsStaffUser]) def admin_technician_station_assignments(request): """List or create station assignments for technician least privilege.""" if request.method == "GET": assignments = TechnicienStation.objects.select_related("technicien", "station").order_by( "technicien__nom", "station__nom", ) technician_id = request.query_params.get("technicien") station_id = request.query_params.get("station") if technician_id: assignments = assignments.filter(technicien_id=technician_id) if station_id: assignments = assignments.filter(station_id=station_id) return _list_response(request, assignments, serializer_class=TechnicianStationAssignmentSerializer) serializer = TechnicianStationAssignmentSerializer(data=request.data) serializer.is_valid(raise_exception=True) assignment = serializer.save() assignment = TechnicienStation.objects.select_related("technicien", "station").get( id_assignment=assignment.id_assignment ) return Response(TechnicianStationAssignmentSerializer(assignment).data, status=status.HTTP_201_CREATED)
[docs] @_schema( "Lire une affectation technicien-station", tags=["Administration"], operation_id="admin_technician_station_assignment_retrieve", methods=["GET"], ) @_schema( "Modifier une affectation technicien-station", request=TechnicianStationAssignmentSerializer, tags=["Administration"], operation_id="admin_technician_station_assignment_update", methods=["PATCH"], ) @_schema( "Supprimer une affectation technicien-station", responses={204: None}, tags=["Administration"], operation_id="admin_technician_station_assignment_delete", methods=["DELETE"], ) @api_view(["GET", "PATCH", "DELETE"]) @permission_classes([IsAuthenticated, IsStaffUser]) def admin_technician_station_assignment_detail(request, assignment_id): """Retrieve, update or delete one technician station assignment.""" try: assignment = TechnicienStation.objects.select_related("technicien", "station").get( id_assignment=assignment_id ) except TechnicienStation.DoesNotExist: return Response({"detail": "Affectation introuvable."}, status=status.HTTP_404_NOT_FOUND) if request.method == "GET": return Response(TechnicianStationAssignmentSerializer(assignment).data) if request.method == "DELETE": assignment.delete() return Response(status=status.HTTP_204_NO_CONTENT) serializer = TechnicianStationAssignmentSerializer( instance=assignment, data=request.data, partial=True, ) serializer.is_valid(raise_exception=True) assignment = serializer.save() return Response(TechnicianStationAssignmentSerializer(assignment).data)
[docs] @_schema("Lister les maintenances", tags=["Administration"], operation_id="admin_maintenances_list", methods=["GET"]) @_schema("Creer une maintenance", request=MaintenanceAdminSerializer, tags=["Administration"], operation_id="admin_maintenance_create", methods=["POST"]) @api_view(["GET", "POST"]) @permission_classes([IsAuthenticated, IsStaffUser]) def admin_maintenances(request): """List maintenance records or create one from the admin API.""" if request.method == "GET": maintenances = Maintenance.objects.select_related("station", "technicien").order_by("-date_planifiee") return _list_response(request, maintenances, serializer_class=MaintenanceAdminSerializer) serializer = MaintenanceAdminSerializer(data=request.data) serializer.is_valid(raise_exception=True) maintenance = serializer.save() return Response(MaintenanceAdminSerializer(maintenance).data, status=status.HTTP_201_CREATED)
[docs] @_schema("Lire une maintenance", tags=["Administration"], operation_id="admin_maintenance_retrieve", methods=["GET"]) @_schema("Modifier une maintenance", request=MaintenanceAdminSerializer, tags=["Administration"], operation_id="admin_maintenance_update", methods=["PATCH"]) @_schema("Supprimer une maintenance", responses={204: None}, tags=["Administration"], operation_id="admin_maintenance_delete", methods=["DELETE"]) @api_view(["GET", "PATCH", "DELETE"]) @permission_classes([IsAuthenticated, IsStaffUser]) def admin_maintenance_detail(request, maintenance_id): """Retrieve, update or delete one maintenance record from the admin API.""" try: maintenance = Maintenance.objects.select_related("station", "technicien").get(id_maintenance=maintenance_id) except Maintenance.DoesNotExist: return Response({"detail": "Maintenance introuvable."}, status=status.HTTP_404_NOT_FOUND) if request.method == "GET": return Response(MaintenanceAdminSerializer(maintenance).data) if request.method == "DELETE": maintenance.delete() return Response(status=status.HTTP_204_NO_CONTENT) serializer = MaintenanceAdminSerializer(instance=maintenance, data=request.data, partial=True) serializer.is_valid(raise_exception=True) serializer.save() maintenance.refresh_from_db() return Response(MaintenanceAdminSerializer(maintenance).data)
[docs] @_schema("Creer une session mobile de borne", request=OpenApiTypes.OBJECT, tags=["Terminal"]) @api_view(["POST"]) @permission_classes([AllowAny]) @throttle_classes([TerminalRateThrottle]) def terminal_create_mobile_session(request): """Create a short-lived pairing session opened by a phone from the kiosk QR.""" secret_error = _terminal_secret_response(request) if secret_error is not None: return secret_error station_id = (request.data.get("station_id") or "").strip() borne_id = (request.data.get("borne_id") or "").strip() station = None borne = None if borne_id: borne = Borne.objects.select_related("station").filter(id_borne=borne_id).first() if borne is None: return Response({"detail": "Borne introuvable."}, status=status.HTTP_404_NOT_FOUND) if station_id and str(borne.station_id) != station_id: return Response( {"detail": "Cette borne est associee a une autre station."}, status=status.HTTP_409_CONFLICT, ) station = borne.station if station_id: station = station or Station.objects.filter(id_station=station_id).first() if station is None: return Response({"detail": "Station introuvable."}, status=status.HTTP_404_NOT_FOUND) token = _generate_mobile_session_token() now = timezone.now() expires_at = now + timedelta(seconds=MOBILE_SESSION_TTL_SECONDS) payload = { "token": token, "status": "pending", "station_id": str(station.id_station) if station else None, "station_nom": station.nom if station else "", "borne_id": str(borne.id_borne) if borne else None, "borne_nom": borne.nom if borne else "", "borne_numero": borne.numero if borne else None, "created_at": now.isoformat(), "expires_at": expires_at.isoformat(), } _store_mobile_session(payload) session_url = _mobile_session_url(token) return Response( { "mobile_session": _serialize_mobile_session(payload), "url": session_url, "qr_payload": session_url, "qr_svg": _build_qr_svg(session_url), }, status=status.HTTP_201_CREATED, )
[docs] @_schema("Lire une session mobile de borne", tags=["Mobile"]) @api_view(["GET"]) @permission_classes([IsAuthenticated]) @throttle_classes([MobileSessionRateThrottle]) def mobile_session_detail(request, token): """Return a kiosk pairing session and the user's reservations usable from it.""" payload = _get_mobile_session_payload(token) if payload is None: return Response({"detail": "Session mobile invalide ou expiree."}, status=status.HTTP_404_NOT_FOUND) station_id = payload.get("station_id") borne_id = payload.get("borne_id") reservations = [] for reservation in _startable_user_reservations(request.user, timezone.now()): serialized = _serialize_reservation(reservation) serialized["compatible_station"] = not station_id or str(reservation.emplacement.station_id) == station_id serialized["compatible_borne"] = not borne_id or str(reservation.emplacement.borne_id) == borne_id serialized["compatible_terminal"] = serialized["compatible_station"] and serialized["compatible_borne"] serialized["station_id"] = str(reservation.emplacement.station_id) serialized["borne_id"] = str(reservation.emplacement.borne_id) reservations.append(serialized) return Response( { "mobile_session": _serialize_mobile_session(payload), "reservations": reservations, } )
[docs] @_schema("Autoriser une reservation depuis le mobile", request=OpenApiTypes.OBJECT, tags=["Mobile"]) @api_view(["POST"]) @permission_classes([IsAuthenticated]) @throttle_classes([MobileSessionRateThrottle]) def mobile_session_authorize(request, token): """Authorize one user reservation for the kiosk paired with this token.""" payload = _get_mobile_session_payload(token) if payload is None: return Response({"detail": "Session mobile invalide ou expiree."}, status=status.HTTP_404_NOT_FOUND) reservation_id = request.data.get("reservation_id") if not reservation_id: return Response({"detail": "reservation_id est obligatoire."}, status=status.HTTP_400_BAD_REQUEST) reservation = ( _terminal_reservation_queryset() .filter(id_reservation=reservation_id, utilisateur=request.user) .first() ) if reservation is None: return Response({"detail": "Reservation introuvable."}, status=status.HTTP_404_NOT_FOUND) startable_ids = { item.id_reservation for item in _startable_user_reservations(request.user, timezone.now()) } if reservation.id_reservation not in startable_ids: return Response( {"detail": "Cette reservation ne peut pas encore etre demarree."}, status=status.HTTP_400_BAD_REQUEST, ) station_id = payload.get("station_id") if station_id and str(reservation.emplacement.station_id) != station_id: return Response( { "detail": "Cette reservation est associee a une autre station.", "station": reservation.emplacement.station.nom, "station_id": str(reservation.emplacement.station_id), "emplacement": reservation.emplacement.numero, }, status=status.HTTP_409_CONFLICT, ) borne_id = payload.get("borne_id") if borne_id and str(reservation.emplacement.borne_id) != borne_id: return Response( { "detail": "Cette reservation est associee a une autre borne.", "station": reservation.emplacement.station.nom, "station_id": str(reservation.emplacement.station_id), "borne": reservation.emplacement.borne.nom, "borne_id": str(reservation.emplacement.borne_id), "emplacement": reservation.emplacement.numero, }, status=status.HTTP_409_CONFLICT, ) now = timezone.now() payload.update( { "status": "authorized", "authorized_at": now.isoformat(), "authorized_by": str(request.user.id_user), "authorized_user": request.user.pseudo, "reservation_id": str(reservation.id_reservation), } ) _store_mobile_session(payload) _log_security_event( request, request.user, f"user:{request.user.pseudo}:mobile_session_authorized:{reservation.reference_code}", ) return Response( { "mobile_session": _serialize_mobile_session(payload), "reservation": _serialize_reservation(reservation), } )
[docs] @_schema("Consulter une session mobile depuis le terminal", tags=["Terminal"]) @api_view(["GET"]) @permission_classes([AllowAny]) @throttle_classes([TerminalRateThrottle]) def terminal_mobile_session_status(request, token): """Return the current authorization state for a kiosk/mobile pairing token.""" secret_error = _terminal_secret_response(request) if secret_error is not None: return secret_error return _terminal_mobile_session_status_response(token)
[docs] @_schema("Consulter une session mobile depuis le terminal", request=OpenApiTypes.OBJECT, tags=["Terminal"]) @api_view(["POST"]) @permission_classes([AllowAny]) @throttle_classes([TerminalRateThrottle]) def terminal_mobile_session_status_from_body(request): """Return a mobile session state using a fixed terminal endpoint.""" secret_error = _terminal_secret_response(request) if secret_error is not None: return secret_error token = (request.data.get("token") or "").strip() if not token: return Response({"detail": "token est obligatoire."}, status=status.HTTP_400_BAD_REQUEST) return _terminal_mobile_session_status_response(token)
def _terminal_mobile_session_status_response(token): """Build the status response for one mobile session token.""" payload = _get_mobile_session_payload(token) if payload is None: return Response({"detail": "Session mobile invalide ou expiree."}, status=status.HTTP_404_NOT_FOUND) response = {"mobile_session": _serialize_mobile_session(payload)} reservation_id = payload.get("reservation_id") if reservation_id: reservation = _terminal_reservation_queryset().filter(id_reservation=reservation_id).first() if reservation: response["reservation"] = _serialize_reservation(reservation) return Response(response)
[docs] @_schema( "Consommer une autorisation mobile depuis le terminal", tags=["Terminal"], operation_id="terminal_mobile_session_consume_by_token", ) @api_view(["POST"]) @permission_classes([AllowAny]) @throttle_classes([TerminalRateThrottle]) def terminal_consume_mobile_session(request, token): """Open the charging session authorized from the user's phone.""" secret_error = _terminal_secret_response(request) if secret_error is not None: return secret_error return _terminal_consume_mobile_session_response(request, token)
[docs] @_schema( "Consommer une autorisation mobile depuis le terminal", request=OpenApiTypes.OBJECT, tags=["Terminal"], operation_id="terminal_mobile_session_consume_from_body", ) @api_view(["POST"]) @permission_classes([AllowAny]) @throttle_classes([TerminalRateThrottle]) def terminal_consume_mobile_session_from_body(request): """Consume a mobile authorization using a fixed terminal endpoint.""" secret_error = _terminal_secret_response(request) if secret_error is not None: return secret_error token = (request.data.get("token") or "").strip() if not token: return Response({"detail": "token est obligatoire."}, status=status.HTTP_400_BAD_REQUEST) return _terminal_consume_mobile_session_response(request, token)
def _terminal_consume_mobile_session_response(request, token): """Open the charging session authorized for one mobile session token.""" payload = _get_mobile_session_payload(token) if payload is None: return Response({"detail": "Session mobile invalide ou expiree."}, status=status.HTTP_404_NOT_FOUND) if payload.get("status") not in {"authorized", "consumed"}: return Response({"detail": "Session mobile en attente d'autorisation."}, status=status.HTTP_409_CONFLICT) reservation = _terminal_reservation_queryset().filter(id_reservation=payload.get("reservation_id")).first() if reservation is None: return Response({"detail": "Reservation introuvable."}, status=status.HTTP_404_NOT_FOUND) now = timezone.now() session = _get_reservation_session(reservation) session_is_active = session is not None and session.end_time is None start_grace = _start_grace_delta() end_grace = _end_grace_delta() if reservation.statut == "annulee": return Response({"detail": "La reservation est annulee."}, status=status.HTTP_400_BAD_REQUEST) if reservation.date_debut - start_grace > now and not session_is_active: return Response({"detail": "La reservation n'a pas encore commence."}, status=status.HTTP_400_BAD_REQUEST) if reservation.date_fin + end_grace < now and not session_is_active: return Response({"detail": "La fenetre de reservation est expiree."}, status=status.HTTP_400_BAD_REQUEST) station_id = payload.get("station_id") or (request.data.get("station_id") or "").strip() if station_id and str(reservation.emplacement.station_id) != station_id: return Response( { "detail": "Cette reservation est associee a une autre station.", "station": reservation.emplacement.station.nom, "station_id": str(reservation.emplacement.station_id), "emplacement": reservation.emplacement.numero, }, status=status.HTTP_409_CONFLICT, ) borne_id = payload.get("borne_id") or (request.data.get("borne_id") or "").strip() if borne_id and str(reservation.emplacement.borne_id) != borne_id: return Response( { "detail": "Cette reservation est associee a une autre borne.", "station": reservation.emplacement.station.nom, "station_id": str(reservation.emplacement.station_id), "borne": reservation.emplacement.borne.nom, "borne_id": str(reservation.emplacement.borne_id), "emplacement": reservation.emplacement.numero, }, status=status.HTTP_409_CONFLICT, ) conflict_response = _session_start_conflict_response(reservation, now) if conflict_response is not None: return conflict_response session, created = ChargingSession.objects.get_or_create( reservation=reservation, defaults={ "emplacement": reservation.emplacement, "start_time": now, }, ) try: mark_access_validated(session, now=now) except ChargeSessionStateError: return Response( {"detail": CHARGE_SESSION_STATE_ERROR_DETAIL}, status=status.HTTP_409_CONFLICT, ) if reservation.statut != "active": reservation.statut = "active" reservation.save(update_fields=["statut"]) if payload.get("status") != "consumed": payload.update( { "status": "consumed", "consumed_at": now.isoformat(), "session_id": str(session.id_session), } ) _store_mobile_session(payload) return Response( { "message": "Autorisation mobile consommee.", "session_created": created, "mobile_session": _serialize_mobile_session(payload), "session": _serialize_session(session), } )
[docs] @_schema("Synchroniser le cache offline d'une borne", tags=["Terminal"]) @api_view(["GET"]) @permission_classes([AllowAny]) @throttle_classes([TerminalRateThrottle]) def terminal_offline_cache(request): """Return near-term reservations a kiosk may validate locally if backend is down.""" secret_error = _terminal_secret_response(request) if secret_error is not None: return secret_error station_id = (request.query_params.get("station_id") or "").strip() borne_id = (request.query_params.get("borne_id") or "").strip() now = timezone.now() horizon = now + timedelta(hours=OFFLINE_CACHE_HORIZON_HOURS) end_grace = _end_grace_delta() queryset = ( _terminal_reservation_queryset() .filter( statut__in=["confirmee", "active"], date_fin__gte=now - end_grace, date_debut__lte=horizon, authentification__isnull=False, authentification__is_active=True, ) .order_by("date_debut", "id_reservation") ) if station_id: queryset = queryset.filter(emplacement__station_id=station_id) if borne_id: queryset = queryset.filter(emplacement__borne_id=borne_id) return Response( { "generated_at": now.isoformat(), "expires_at": horizon.isoformat(), "station_id": station_id, "borne_id": borne_id, "start_grace_minutes": RESERVATION_START_GRACE_MINUTES, "end_grace_minutes": RESERVATION_END_GRACE_MINUTES, "results": [_serialize_offline_cached_reservation(reservation) for reservation in queryset], } )
[docs] @_schema("Synchroniser les evenements offline d'une borne", request=OpenApiTypes.OBJECT, tags=["Terminal"]) @api_view(["POST"]) @permission_classes([AllowAny]) @throttle_classes([TerminalRateThrottle]) def terminal_offline_events(request): """Reconcile locally accepted kiosk events after a backend outage.""" secret_error = _terminal_secret_response(request) if secret_error is not None: return secret_error events = request.data.get("events") station_id = (request.data.get("station_id") or "").strip() borne_id = (request.data.get("borne_id") or "").strip() if not isinstance(events, list): return Response({"detail": "events doit etre une liste."}, status=status.HTTP_400_BAD_REQUEST) accepted = [] rejected = [] for event in events: if not isinstance(event, dict): rejected.append({"event_id": None, "detail": "Evenement invalide."}) continue event_id = event.get("id") event_type = event.get("event_type") if not event_id: rejected.append({"event_id": None, "detail": "id evenement manquant."}) continue if event_type != "offline_session_started": rejected.append({"event_id": event_id, "detail": "Type evenement non supporte."}) continue result, error = _reconcile_offline_session_started(request, event, station_id, borne_id=borne_id) if error: rejected.append({"event_id": event_id, "detail": error}) else: accepted.append(result) return Response( { "accepted": accepted, "rejected": rejected, "synced_event_ids": [item["event_id"] for item in accepted if item.get("event_id")], }, status=status.HTTP_207_MULTI_STATUS if rejected else status.HTTP_200_OK, )
[docs] @_schema("Authentifier un acces depuis le terminal", request=OpenApiTypes.OBJECT, tags=["Terminal"]) @api_view(["POST"]) @permission_classes([AllowAny]) @throttle_classes([TerminalRateThrottle]) def terminal_authenticate(request): """Validate terminal access credentials and open a charging session.""" secret_error = _terminal_secret_response(request) if secret_error is not None: return secret_error reservation_id = request.data.get("reservation_id") reservation_code = (request.data.get("reservation_code") or "").strip().upper() auth_mode = (request.data.get("auth_mode") or "").strip().lower() auth_value = (request.data.get("auth_value") or "").strip() station_id = (request.data.get("station_id") or "").strip() borne_id = (request.data.get("borne_id") or "").strip() parsed_qr = _parse_qr_payload(auth_value) if parsed_qr: reservation_code = reservation_code or parsed_qr["reservation_code"] auth_mode = auth_mode or "qr" auth_value = parsed_qr["token"] now = timezone.now() direct_rfid = auth_mode == "rfid" and auth_value and not (reservation_id or reservation_code) if direct_rfid: auth_method, reservation, error_response = _resolve_direct_rfid_reservation(auth_value, now) if error_response is not None: return error_response else: if not (reservation_id or reservation_code) or not auth_mode or not auth_value: return Response({"detail": "reservation_code (ou reservation_id), auth_mode et auth_value sont obligatoires."}, status=status.HTTP_400_BAD_REQUEST) reservation_qs = _terminal_reservation_queryset() if reservation_code: reservation = reservation_qs.filter(reference_code=reservation_code).first() else: reservation = reservation_qs.filter(id_reservation=reservation_id).first() if reservation is None: return Response({"detail": "Reservation introuvable."}, status=status.HTTP_404_NOT_FOUND) auth_method = reservation.authentification if reservation.statut == "annulee": return Response({"detail": "La reservation est annulee."}, status=status.HTTP_400_BAD_REQUEST) session = _get_reservation_session(reservation) session_is_active = session is not None and session.end_time is None start_grace = _start_grace_delta() end_grace = _end_grace_delta() if reservation.date_debut - start_grace > now and not session_is_active: return Response({"detail": "La reservation n'a pas encore commence."}, status=status.HTTP_400_BAD_REQUEST) if reservation.date_fin + end_grace < now and not session_is_active: return Response({"detail": "La fenetre de reservation est expiree."}, status=status.HTTP_400_BAD_REQUEST) expected_mode = reservation.auth_choice if expected_mode != auth_mode: return Response({"detail": "Mauvais moyen d'authentification."}, status=status.HTTP_400_BAD_REQUEST) if auth_method is None: return Response({"detail": "Aucun moyen d'authentification associe a cette reservation."}, status=status.HTTP_400_BAD_REQUEST) if auth_mode == "code" and _sms_access_is_locked(auth_method): return Response( {"detail": "Code SMS temporaire verrouille apres trop d'essais."}, status=status.HTTP_423_LOCKED, ) if not auth_method.is_active: return Response({"detail": "Le moyen d'authentification est revoque."}, status=status.HTTP_400_BAD_REQUEST) if auth_method.expiration and auth_method.expiration + end_grace < now and not session_is_active: return Response({"detail": "Le moyen d'authentification est expire."}, status=status.HTTP_400_BAD_REQUEST) if station_id and str(reservation.emplacement.station_id) != station_id: return Response( { "detail": "Cette reservation est associee a une autre station.", "station": reservation.emplacement.station.nom, "station_id": str(reservation.emplacement.station_id), "emplacement": reservation.emplacement.numero, }, status=status.HTTP_409_CONFLICT, ) if borne_id and str(reservation.emplacement.borne_id) != borne_id: return Response( { "detail": "Cette reservation est associee a une autre borne.", "station": reservation.emplacement.station.nom, "station_id": str(reservation.emplacement.station_id), "borne": reservation.emplacement.borne.nom, "borne_id": str(reservation.emplacement.borne_id), "emplacement": reservation.emplacement.numero, }, status=status.HTTP_409_CONFLICT, ) conflict_response = _session_start_conflict_response(reservation, now) if conflict_response is not None: return conflict_response provided_hash = _hash_secret(auth_value) if not direct_rfid and provided_hash != auth_method.hash_valeur: if auth_mode == "code": if _record_sms_access_failure(auth_method): return Response( {"detail": "Code SMS temporaire verrouille apres trop d'essais."}, status=status.HTTP_423_LOCKED, ) return Response({"detail": "Authentification refusee."}, status=status.HTTP_401_UNAUTHORIZED) if auth_mode == "code": _reset_sms_access_failures(auth_method) session, created = ChargingSession.objects.get_or_create( reservation=reservation, defaults={ "emplacement": reservation.emplacement, "start_time": now, }, ) try: mark_access_validated(session, now=now) except ChargeSessionStateError: return Response( {"detail": CHARGE_SESSION_STATE_ERROR_DETAIL}, status=status.HTTP_409_CONFLICT, ) if reservation.statut != "active": reservation.statut = "active" reservation.save(update_fields=["statut"]) if auth_mode in {"qr", "code"}: auth_method.expiration = session.start_time + timedelta(minutes=DEFAULT_SESSION_MAX_MINUTES) auth_method.metadata = { **(auth_method.metadata or {}), "session_started_at": session.start_time.isoformat(), "session_max_minutes": DEFAULT_SESSION_MAX_MINUTES, } auth_method.save(update_fields=["expiration", "metadata"]) return Response( { "message": "Authentification reussie.", "session_created": created, "session": _serialize_session(session), "auth_expiration": auth_method.expiration.isoformat() if auth_method.expiration else None, } )
[docs] @_schema("Clôturer une session de charge depuis le terminal", request=OpenApiTypes.OBJECT, tags=["Terminal"]) @api_view(["POST"]) @permission_classes([AllowAny]) @throttle_classes([TerminalRateThrottle]) def terminal_stop_session(request): """Close the active charging session after the ESP32 confirms a stop command.""" secret_error = _terminal_secret_response(request) if secret_error is not None: return secret_error session_id = (request.data.get("session_id") or "").strip() station_id = (request.data.get("station_id") or "").strip() if not session_id: return Response({"detail": "session_id est obligatoire."}, status=status.HTTP_400_BAD_REQUEST) session = ( ChargingSession.objects.select_related("reservation", "emplacement", "emplacement__station", "emplacement__borne") .filter(id_session=session_id) .first() ) if session is None: return Response({"detail": "Session introuvable."}, status=status.HTTP_404_NOT_FOUND) if station_id and str(session.emplacement.station_id) != station_id: return Response( { "detail": "Cette session est associée à une autre station.", "station": session.emplacement.station.nom, "station_id": str(session.emplacement.station_id), "emplacement": session.emplacement.numero, }, status=status.HTTP_409_CONFLICT, ) borne_id = (request.data.get("borne_id") or "").strip() if borne_id and str(session.emplacement.borne_id) != borne_id: return Response( { "detail": "Cette session est associee a une autre borne.", "station": session.emplacement.station.nom, "station_id": str(session.emplacement.station_id), "borne": session.emplacement.borne.nom, "borne_id": str(session.emplacement.borne_id), "emplacement": session.emplacement.numero, }, status=status.HTTP_409_CONFLICT, ) mark_stop_requested(session) closed = _close_charging_session(session) session.refresh_from_db() return Response( { "message": "Session de charge clôturée." if closed else "Session de charge déjà clôturée.", "session_closed": closed, "session": _serialize_session(session), } )