from datetime import datetime, timedelta from typing import Optional from fastapi import HTTPException, Security, status, Depends from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer from jose import jwt, JWTError from passlib.context import CryptContext from sqlalchemy.orm import Session from .config import settings from .db import get_db from .models import User, UserRole pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") security_scheme = HTTPBearer() def verify_password(plain_password: str, password_hash: str) -> bool: return pwd_context.verify(plain_password, password_hash) def get_password_hash(password: str) -> str: return pwd_context.hash(password) def create_access_token(subject: str, expires_delta: Optional[timedelta] = None) -> str: if expires_delta is None: expires_delta = timedelta(minutes=settings.access_token_expire_minutes) expire = datetime.utcnow() + expires_delta to_encode = {"sub": subject, "exp": expire} encoded_jwt = jwt.encode(to_encode, settings.jwt_secret_key, algorithm=settings.jwt_algorithm) return encoded_jwt def get_current_user( credentials: HTTPAuthorizationCredentials = Security(security_scheme), db: Session = Depends(get_db), ): token = credentials.credentials try: payload = jwt.decode(token, settings.jwt_secret_key, algorithms=[settings.jwt_algorithm]) subject: str = payload.get("sub") if subject is None: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token") except JWTError: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token") user = db.query(User).filter(User.email == subject).first() if not user: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found") return user def require_admin(user: User = Depends(get_current_user)): if user.role != UserRole.ADMIN: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin only") return user