feat(auth): add change password functionality for authenticated users

This commit is contained in:
Davidson Gomes 2025-05-07 20:18:09 -03:00
parent 0e3043779d
commit d17f241967
6 changed files with 181 additions and 19 deletions

View File

@ -10,6 +10,7 @@ from src.schemas.user import (
ForgotPassword,
PasswordReset,
MessageResponse,
ChangePassword,
)
from src.services.user_service import (
authenticate_user,
@ -18,6 +19,7 @@ from src.services.user_service import (
resend_verification,
forgot_password,
reset_password,
change_password,
)
from src.services.auth_service import (
create_access_token,
@ -240,3 +242,35 @@ async def get_current_user(
HTTPException: If the user is not authenticated
"""
return current_user
@router.post("/change-password", response_model=MessageResponse)
async def change_user_password(
password_data: ChangePassword,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""
Change the password of the authenticated user
Args:
password_data: Current and new password
db: Database session
current_user: Authenticated user
Returns:
MessageResponse: Success message
Raises:
HTTPException: If the current password is invalid
"""
success, message = change_password(
db, current_user.id, password_data.current_password, password_data.new_password
)
if not success:
logger.warning(f"Failed to change password for user: {current_user.email}")
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=message)
logger.info(f"Password changed successfully for user: {current_user.email}")
return {"message": message}

View File

@ -14,10 +14,12 @@ from src.schemas.schemas import (
Client,
ClientCreate,
)
from src.schemas.user import UserCreate
from src.schemas.user import UserCreate, TokenResponse
from src.services import (
client_service,
)
from src.services.auth_service import create_access_token
from src.models.models import User
import logging
logger = logging.getLogger(__name__)
@ -138,3 +140,49 @@ async def delete_client(
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Client not found"
)
@router.post("/{client_id}/impersonate", response_model=TokenResponse)
async def impersonate_client(
client_id: uuid.UUID,
db: Session = Depends(get_db),
payload: dict = Depends(get_jwt_token),
):
"""
Allows an administrator to obtain a token to impersonate a client
Args:
client_id: ID of the client to impersonate
db: Database session
payload: JWT payload of the administrator
Returns:
TokenResponse: Access token for the client
Raises:
HTTPException: If the user is not an administrator or the client does not exist
"""
# Verify if the user is an administrator
await verify_admin(payload)
# Search for the client
client = client_service.get_client(db, client_id)
if not client:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Client not found"
)
user = client_service.get_client_user(db, client_id)
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User associated with the client not found",
)
access_token = create_access_token(user)
logger.info(
f"Administrator {payload.get('sub')} impersonated client {client.name} (ID: {client_id})"
)
return {"access_token": access_token, "token_type": "bearer"}

View File

@ -1,7 +1,7 @@
from pydantic import BaseModel, EmailStr
from pydantic import BaseModel, EmailStr, Field
from typing import Optional
from datetime import datetime
from uuid import UUID
import uuid
class UserBase(BaseModel):
@ -9,8 +9,8 @@ class UserBase(BaseModel):
class UserCreate(UserBase):
password: str
name: str # For client creation
password: str = Field(..., min_length=8, description="User password")
name: str = Field(..., description="User's name")
class AdminUserCreate(UserBase):
@ -18,18 +18,18 @@ class AdminUserCreate(UserBase):
name: str
class UserLogin(BaseModel):
email: EmailStr
class UserLogin(UserBase):
password: str
class UserResponse(UserBase):
id: UUID
client_id: Optional[UUID] = None
id: uuid.UUID
is_active: bool
email_verified: bool
is_admin: bool
created_at: datetime
client_id: Optional[uuid.UUID] = None
email_verified: bool
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
class Config:
from_attributes = True
@ -41,20 +41,26 @@ class TokenResponse(BaseModel):
class TokenData(BaseModel):
sub: str # user email
exp: datetime
user_id: Optional[uuid.UUID] = None
sub: Optional[str] = None
is_admin: bool
client_id: Optional[UUID] = None
client_id: Optional[uuid.UUID] = None
exp: datetime
class PasswordReset(BaseModel):
token: str
new_password: str
new_password: str = Field(..., min_length=8, description="New password")
class ForgotPassword(BaseModel):
email: EmailStr
class ChangePassword(BaseModel):
current_password: str = Field(..., description="Current password for verification")
new_password: str = Field(..., min_length=8, description="New password to set")
class MessageResponse(BaseModel):
message: str

View File

@ -57,7 +57,6 @@ async def get_current_user(
logger.warning(f"Token expired for {email}")
raise credentials_exception
# Create TokenData object
token_data = TokenData(
sub=email,
exp=datetime.fromtimestamp(exp),
@ -70,9 +69,9 @@ async def get_current_user(
raise credentials_exception
# Search for user in the database
user = get_user_by_email(db, email=token_data.sub)
user = get_user_by_email(db, email=email)
if user is None:
logger.warning(f"User not found for email: {token_data.sub}")
logger.warning(f"User not found for email: {email}")
raise credentials_exception
if not user.is_active:
@ -146,6 +145,7 @@ def create_access_token(user: User) -> str:
# Data to be included in the token
token_data = {
"sub": user.email,
"user_id": str(user.id),
"is_admin": user.is_admin,
}

View File

@ -1,7 +1,7 @@
from sqlalchemy.orm import Session
from sqlalchemy.exc import SQLAlchemyError
from fastapi import HTTPException, status
from src.models.models import Client
from src.models.models import Client, User
from src.schemas.schemas import ClientCreate
from src.schemas.user import UserCreate
from src.services.user_service import create_user
@ -148,3 +148,28 @@ def create_client_with_user(
db.rollback()
logger.error(f"Unexpected error creating client with user: {str(e)}")
return None, f"Unexpected error: {str(e)}"
def get_client_user(db: Session, client_id: uuid.UUID) -> Optional[User]:
"""
Search for the user associated with a client
Args:
db: Database session
client_id: ID of the client
Returns:
Optional[User]: User associated with the client or None
"""
try:
user = db.query(User).filter(User.client_id == client_id).first()
if not user:
logger.warning(f"User not found for client: {client_id}")
return None
return user
except SQLAlchemyError as e:
logger.error(f"Error searching for user for client {client_id}: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Error searching for user for client",
)

View File

@ -437,3 +437,52 @@ def deactivate_user(db: Session, user_id: uuid.UUID) -> Tuple[bool, str]:
except Exception as e:
logger.error(f"Unexpected error deactivating user: {str(e)}")
return False, f"Unexpected error: {str(e)}"
def change_password(
db: Session, user_id: uuid.UUID, current_password: str, new_password: str
) -> Tuple[bool, str]:
"""
Changes the password of an authenticated user
Args:
db: Database session
user_id: ID of the user
current_password: Current password for verification
new_password: New password to set
Returns:
Tuple[bool, str]: Tuple with operation status and message
"""
try:
# Search for user by ID
user = db.query(User).filter(User.id == user_id).first()
if not user:
logger.warning(
f"Attempt to change password for non-existent user: {user_id}"
)
return False, "User not found"
# Verify current password
if not verify_password(current_password, user.password_hash):
logger.warning(
f"Attempt to change password with invalid current password for user: {user.email}"
)
return False, "Current password is incorrect"
# Update password
user.password_hash = get_password_hash(new_password)
db.commit()
logger.info(f"Password changed successfully for user: {user.email}")
return True, "Password changed successfully"
except SQLAlchemyError as e:
db.rollback()
logger.error(f"Error changing password: {str(e)}")
return False, f"Error changing password: {str(e)}"
except Exception as e:
logger.error(f"Unexpected error changing password: {str(e)}")
return False, f"Unexpected error: {str(e)}"