""" Fonctions d'authentification et de sécurité """ from datetime import datetime, timedelta from typing import Optional from jose import JWTError, jwt from passlib.context import CryptContext from fastapi import Depends, HTTPException, status from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from sqlalchemy.orm import Session import os from dotenv import load_dotenv from database import get_db from models import User load_dotenv() # Configuration JWT SECRET_KEY = os.getenv("SECRET_KEY", "your-secret-key-change-in-production-min-32-chars") ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 30 * 24 * 60 # 30 jours # Configuration du hachage de mot de passe pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") security = HTTPBearer() def verify_password(plain_password: str, hashed_password: str) -> bool: """Vérifier un mot de passe""" return pwd_context.verify(plain_password, hashed_password) def get_password_hash(password: str) -> str: """Hacher un mot de passe""" return pwd_context.hash(password) def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str: """Créer un token JWT""" to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt def verify_token(token: str) -> Optional[dict]: """Vérifier et décoder un token JWT""" try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) return payload except JWTError: return None def get_current_user( credentials: HTTPAuthorizationCredentials = Depends(security), db: Session = Depends(get_db) ) -> User: """Obtenir l'utilisateur actuel à partir du token JWT""" token = credentials.credentials payload = verify_token(token) if payload is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Token invalide ou expiré", headers={"WWW-Authenticate": "Bearer"}, ) user_id: str = payload.get("sub") if user_id is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Token invalide", headers={"WWW-Authenticate": "Bearer"}, ) user = db.query(User).filter(User.id == int(user_id)).first() if user is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Utilisateur non trouvé", headers={"WWW-Authenticate": "Bearer"}, ) return user