Initial commit: API Anime Tracker avec authentification et synchronisation

This commit is contained in:
ɧσℓσ
2025-12-01 22:22:15 +01:00
commit 467cf313e4
11 changed files with 820 additions and 0 deletions

264
main.py Normal file
View File

@@ -0,0 +1,264 @@
"""
API Anime Tracker - Synchronisation multi-appareils
"""
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.middleware.cors import CORSMiddleware
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlalchemy.orm import Session
from datetime import datetime, timedelta
from typing import Optional, List
import os
from dotenv import load_dotenv
from database import get_db, init_db
from models import User, Anime
from schemas import (
UserCreate, UserLogin, UserResponse, Token,
AnimeCreate, AnimeUpdate, AnimeResponse
)
from auth import (
verify_password, get_password_hash,
create_access_token, verify_token,
get_current_user
)
load_dotenv()
app = FastAPI(
title="Anime Tracker API",
description="API de synchronisation pour l'extension Anime Tracker",
version="1.0.0"
)
# CORS pour permettre les requêtes depuis l'extension Chrome
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # En production, spécifier les domaines autorisés
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
security = HTTPBearer()
# Initialiser la base de données au démarrage
@app.on_event("startup")
async def startup_event():
init_db()
# ==================== AUTHENTIFICATION ====================
@app.post("/api/auth/register", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
async def register(user_data: UserCreate, db: Session = Depends(get_db)):
"""Inscription d'un nouvel utilisateur"""
# Vérifier si l'utilisateur existe déjà
existing_user = db.query(User).filter(User.email == user_data.email).first()
if existing_user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Un utilisateur avec cet email existe déjà"
)
# Vérifier si le nom d'utilisateur existe déjà
existing_username = db.query(User).filter(User.username == user_data.username).first()
if existing_username:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Ce nom d'utilisateur est déjà pris"
)
# Créer le nouvel utilisateur
hashed_password = get_password_hash(user_data.password)
new_user = User(
username=user_data.username,
email=user_data.email,
hashed_password=hashed_password
)
db.add(new_user)
db.commit()
db.refresh(new_user)
return UserResponse(
id=new_user.id,
username=new_user.username,
email=new_user.email,
created_at=new_user.created_at
)
@app.post("/api/auth/login", response_model=Token)
async def login(credentials: UserLogin, db: Session = Depends(get_db)):
"""Connexion d'un utilisateur"""
# Trouver l'utilisateur par email ou username
user = db.query(User).filter(
(User.email == credentials.email_or_username) |
(User.username == credentials.email_or_username)
).first()
if not user or not verify_password(credentials.password, user.hashed_password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Email/username ou mot de passe incorrect",
headers={"WWW-Authenticate": "Bearer"},
)
# Créer le token JWT
access_token = create_access_token(data={"sub": str(user.id)})
return Token(access_token=access_token, token_type="bearer")
@app.get("/api/auth/me", response_model=UserResponse)
async def get_current_user_info(current_user: User = Depends(get_current_user)):
"""Récupérer les informations de l'utilisateur connecté"""
return UserResponse(
id=current_user.id,
username=current_user.username,
email=current_user.email,
created_at=current_user.created_at
)
# ==================== ANIMES ====================
@app.get("/api/animes", response_model=List[AnimeResponse])
async def get_animes(
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""Récupérer tous les animés de l'utilisateur"""
animes = db.query(Anime).filter(Anime.user_id == current_user.id).all()
return [AnimeResponse.from_orm(anime) for anime in animes]
@app.post("/api/animes", response_model=AnimeResponse, status_code=status.HTTP_201_CREATED)
async def create_anime(
anime_data: AnimeCreate,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""Créer ou mettre à jour un animé"""
# Vérifier si l'animé existe déjà pour cet utilisateur
existing_anime = db.query(Anime).filter(
Anime.user_id == current_user.id,
Anime.anime_id == anime_data.anime_id
).first()
if existing_anime:
# Mettre à jour l'animé existant
for key, value in anime_data.dict(exclude_unset=True).items():
setattr(existing_anime, key, value)
existing_anime.last_updated = datetime.utcnow()
db.commit()
db.refresh(existing_anime)
return AnimeResponse.from_orm(existing_anime)
else:
# Créer un nouvel animé
new_anime = Anime(
user_id=current_user.id,
**anime_data.dict()
)
db.add(new_anime)
db.commit()
db.refresh(new_anime)
return AnimeResponse.from_orm(new_anime)
@app.put("/api/animes/{anime_id}", response_model=AnimeResponse)
async def update_anime(
anime_id: str,
anime_data: AnimeUpdate,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""Mettre à jour un animé spécifique"""
anime = db.query(Anime).filter(
Anime.user_id == current_user.id,
Anime.anime_id == anime_id
).first()
if not anime:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Animé non trouvé"
)
for key, value in anime_data.dict(exclude_unset=True).items():
setattr(anime, key, value)
anime.last_updated = datetime.utcnow()
db.commit()
db.refresh(anime)
return AnimeResponse.from_orm(anime)
@app.delete("/api/animes/{anime_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_anime(
anime_id: str,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""Supprimer un animé"""
anime = db.query(Anime).filter(
Anime.user_id == current_user.id,
Anime.anime_id == anime_id
).first()
if not anime:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Animé non trouvé"
)
db.delete(anime)
db.commit()
return None
@app.post("/api/animes/sync", response_model=List[AnimeResponse])
async def sync_animes(
animes_data: List[AnimeCreate],
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""Synchroniser plusieurs animés en une seule requête"""
synced_animes = []
for anime_data in animes_data:
existing_anime = db.query(Anime).filter(
Anime.user_id == current_user.id,
Anime.anime_id == anime_data.anime_id
).first()
if existing_anime:
# Mettre à jour
for key, value in anime_data.dict(exclude_unset=True).items():
setattr(existing_anime, key, value)
existing_anime.last_updated = datetime.utcnow()
else:
# Créer
new_anime = Anime(
user_id=current_user.id,
**anime_data.dict()
)
db.add(new_anime)
existing_anime = new_anime
synced_animes.append(existing_anime)
db.commit()
for anime in synced_animes:
db.refresh(anime)
return [AnimeResponse.from_orm(anime) for anime in synced_animes]
@app.get("/api/health")
async def health_check():
"""Vérification de l'état de l'API"""
return {"status": "ok", "message": "API Anime Tracker is running"}