Files
dify/api/services/entities/auth_entities.py

186 lines
6.1 KiB
Python

from enum import StrEnum, auto
from typing import Annotated, Literal
from pydantic import BaseModel, ConfigDict, Field, field_validator, model_validator
from libs.helper import EmailStr
from libs.password import valid_password
class LoginFailureReason(StrEnum):
"""Bounded reason codes for failed login audit logs."""
ACCOUNT_BANNED = auto()
ACCOUNT_IN_FREEZE = auto()
ACCOUNT_NOT_FOUND = auto()
EMAIL_CODE_EMAIL_MISMATCH = auto()
INVALID_CREDENTIALS = auto()
INVALID_EMAIL_CODE = auto()
INVALID_EMAIL_CODE_TOKEN = auto()
INVALID_INVITATION_EMAIL = auto()
LOGIN_RATE_LIMITED = auto()
class ChangeEmailPhase(StrEnum):
"""Change-email token state machine.
Allowed transitions:
`OLD_EMAIL -> OLD_EMAIL_VERIFIED -> NEW_EMAIL -> NEW_EMAIL_VERIFIED`
The flow starts by sending a code to the current email address. Only a
token in `OLD_EMAIL_VERIFIED` may request the new-email code, and only a
token in `NEW_EMAIL_VERIFIED` may perform the final email reset.
"""
OLD_EMAIL = "old_email"
OLD_EMAIL_VERIFIED = "old_email_verified"
NEW_EMAIL = "new_email"
NEW_EMAIL_VERIFIED = "new_email_verified"
class LoginPayloadBase(BaseModel):
email: EmailStr
password: str
class ForgotPasswordSendPayload(BaseModel):
email: EmailStr
language: str | None = None
class ForgotPasswordCheckPayload(BaseModel):
email: EmailStr
code: str
token: str = Field(min_length=1)
class ForgotPasswordResetPayload(BaseModel):
token: str = Field(min_length=1)
new_password: str
password_confirm: str
@field_validator("new_password", "password_confirm")
@classmethod
def validate_password(cls, value: str) -> str:
return valid_password(value)
class ChangeEmailTokenBase(BaseModel):
"""Stored change-email token payload.
The discriminator lives in `email_change_phase`; callers use the concrete
model type to decide which transitions are legal.
The full progression is:
`old_email -> old_email_verified -> new_email -> new_email_verified`
Every state is bound to the initiating `account_id` so the change-email
flow cannot be replayed across accounts.
"""
token_type: Literal["change_email"] = "change_email"
account_id: str = Field(min_length=1)
email: EmailStr
old_email: EmailStr
code: str = Field(min_length=1)
model_config = ConfigDict(extra="forbid")
def to_token_manager_payload(self) -> dict[str, str]:
return self.model_dump(exclude={"token_type", "account_id", "email"})
def is_bound_to_account(self, account_id: str) -> bool:
return self.account_id == account_id
class _ChangeEmailOldAddressMixin(ChangeEmailTokenBase):
"""States whose `email` must still be the account's current address."""
@model_validator(mode="after")
def validate_old_address_binding(self) -> "_ChangeEmailOldAddressMixin":
if self.email.lower() != self.old_email.lower():
raise ValueError("old-email token payload must bind email to old_email")
return self
class ChangeEmailOldEmailToken(_ChangeEmailOldAddressMixin):
"""Phase-1 token minted when sending a code to the old email address.
This token proves only that the flow started for the current account. It
must not unlock the new-email send step or the final reset step until the
old-email verification code has been checked.
"""
email_change_phase: Literal[ChangeEmailPhase.OLD_EMAIL] = ChangeEmailPhase.OLD_EMAIL
def promote(self) -> "ChangeEmailOldEmailVerifiedToken":
"""Advance to the state that is allowed to request the new-email code."""
return ChangeEmailOldEmailVerifiedToken(
**self.model_dump(exclude={"email_change_phase"}),
email_change_phase=ChangeEmailPhase.OLD_EMAIL_VERIFIED,
)
class ChangeEmailOldEmailVerifiedToken(_ChangeEmailOldAddressMixin):
"""Token returned after the old email verification code succeeds.
The token used to request a new-email code must come from this state. This
blocks the GHSA-4q3w-q5mc-45rq bypass where a phase-1 token was replayed to
skip the old-email verification step.
"""
email_change_phase: Literal[ChangeEmailPhase.OLD_EMAIL_VERIFIED] = ChangeEmailPhase.OLD_EMAIL_VERIFIED
class ChangeEmailNewEmailToken(ChangeEmailTokenBase):
"""Token minted when sending a code to the target new email address.
At this point the account binding is already fixed, but the new address has
not been verified yet, so the token may only be promoted by a successful
new-email verification code check.
"""
email_change_phase: Literal[ChangeEmailPhase.NEW_EMAIL] = ChangeEmailPhase.NEW_EMAIL
def promote(self) -> "ChangeEmailNewEmailVerifiedToken":
"""Advance to the only state that may perform the final email reset."""
return ChangeEmailNewEmailVerifiedToken(
**self.model_dump(exclude={"email_change_phase"}),
email_change_phase=ChangeEmailPhase.NEW_EMAIL_VERIFIED,
)
class ChangeEmailNewEmailVerifiedToken(ChangeEmailTokenBase):
"""Final verified token for the change-email flow.
Only this state may change the account email, and the reset endpoint must
additionally require that the request's `new_email` matches this token's
`email` so a verified token for address A cannot be replayed for address B.
"""
email_change_phase: Literal[ChangeEmailPhase.NEW_EMAIL_VERIFIED] = ChangeEmailPhase.NEW_EMAIL_VERIFIED
# Tokens that can still advance by verifying a code.
ChangeEmailPendingTokenData = Annotated[
ChangeEmailOldEmailToken | ChangeEmailNewEmailToken,
Field(discriminator="email_change_phase"),
]
# Tokens that already completed a verification step.
ChangeEmailVerifiedTokenData = Annotated[
ChangeEmailOldEmailVerifiedToken | ChangeEmailNewEmailVerifiedToken,
Field(discriminator="email_change_phase"),
]
# Complete change-email token state machine.
ChangeEmailTokenData = Annotated[
ChangeEmailOldEmailToken
| ChangeEmailOldEmailVerifiedToken
| ChangeEmailNewEmailToken
| ChangeEmailNewEmailVerifiedToken,
Field(discriminator="email_change_phase"),
]