p0: harden realtime reconciliation and revoke-all token invalidation
All checks were successful
CI / test (push) Successful in 23s
All checks were successful
CI / test (push) Successful in 23s
This commit is contained in:
26
alembic/versions/0022_user_access_revoked_before.py
Normal file
26
alembic/versions/0022_user_access_revoked_before.py
Normal file
@@ -0,0 +1,26 @@
|
|||||||
|
"""add access token revoke marker for users
|
||||||
|
|
||||||
|
Revision ID: 0022_user_access_revoked_before
|
||||||
|
Revises: 0021_chat_avatar_url
|
||||||
|
Create Date: 2026-03-08
|
||||||
|
"""
|
||||||
|
|
||||||
|
from collections.abc import Sequence
|
||||||
|
|
||||||
|
from alembic import op
|
||||||
|
import sqlalchemy as sa
|
||||||
|
|
||||||
|
|
||||||
|
# revision identifiers, used by Alembic.
|
||||||
|
revision: str = "0022_user_access_revoked_before"
|
||||||
|
down_revision: str | None = "0021_chat_avatar_url"
|
||||||
|
branch_labels: str | Sequence[str] | None = None
|
||||||
|
depends_on: str | Sequence[str] | None = None
|
||||||
|
|
||||||
|
|
||||||
|
def upgrade() -> None:
|
||||||
|
op.add_column("users", sa.Column("access_revoked_before", sa.DateTime(timezone=True), nullable=True))
|
||||||
|
|
||||||
|
|
||||||
|
def downgrade() -> None:
|
||||||
|
op.drop_column("users", "access_revoked_before")
|
||||||
@@ -168,8 +168,11 @@ async def revoke_session(jti: str, current_user: User = Depends(get_current_user
|
|||||||
|
|
||||||
|
|
||||||
@router.delete("/sessions", status_code=status.HTTP_204_NO_CONTENT)
|
@router.delete("/sessions", status_code=status.HTTP_204_NO_CONTENT)
|
||||||
async def revoke_all_sessions(current_user: User = Depends(get_current_user)) -> None:
|
async def revoke_all_sessions(
|
||||||
await revoke_all_user_sessions(user_id=current_user.id)
|
db: AsyncSession = Depends(get_db),
|
||||||
|
current_user: User = Depends(get_current_user),
|
||||||
|
) -> None:
|
||||||
|
await revoke_all_user_sessions(db, user_id=current_user.id)
|
||||||
|
|
||||||
|
|
||||||
@router.post("/2fa/setup", response_model=TwoFactorSetupRead)
|
@router.post("/2fa/setup", response_model=TwoFactorSetupRead)
|
||||||
|
|||||||
@@ -212,8 +212,30 @@ async def revoke_user_session(*, user_id: int, jti: str) -> None:
|
|||||||
await revoke_refresh_token_jti(jti=jti)
|
await revoke_refresh_token_jti(jti=jti)
|
||||||
|
|
||||||
|
|
||||||
async def revoke_all_user_sessions(*, user_id: int) -> None:
|
async def revoke_all_user_sessions(db: AsyncSession, *, user_id: int) -> None:
|
||||||
await revoke_all_refresh_sessions_for_user(user_id=user_id)
|
await revoke_all_refresh_sessions_for_user(user_id=user_id)
|
||||||
|
user = await get_user_by_id(db, user_id)
|
||||||
|
if user:
|
||||||
|
user.access_revoked_before = datetime.now(timezone.utc)
|
||||||
|
await db.commit()
|
||||||
|
|
||||||
|
|
||||||
|
def _token_issued_at(payload: dict) -> datetime | None:
|
||||||
|
raw_iat = payload.get("iat")
|
||||||
|
if isinstance(raw_iat, datetime):
|
||||||
|
return raw_iat if raw_iat.tzinfo else raw_iat.replace(tzinfo=timezone.utc)
|
||||||
|
if isinstance(raw_iat, (int, float)):
|
||||||
|
try:
|
||||||
|
return datetime.fromtimestamp(float(raw_iat), tz=timezone.utc)
|
||||||
|
except Exception:
|
||||||
|
return None
|
||||||
|
if isinstance(raw_iat, str):
|
||||||
|
try:
|
||||||
|
parsed = datetime.fromisoformat(raw_iat.replace("Z", "+00:00"))
|
||||||
|
return parsed if parsed.tzinfo else parsed.replace(tzinfo=timezone.utc)
|
||||||
|
except Exception:
|
||||||
|
return None
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
def get_request_metadata(request: Request) -> tuple[str | None, str | None]:
|
def get_request_metadata(request: Request) -> tuple[str | None, str | None]:
|
||||||
@@ -320,6 +342,15 @@ async def get_current_user(token: str = Depends(oauth2_scheme), db: AsyncSession
|
|||||||
user = await get_user_by_id(db, int(user_id))
|
user = await get_user_by_id(db, int(user_id))
|
||||||
if not user:
|
if not user:
|
||||||
raise credentials_error
|
raise credentials_error
|
||||||
|
issued_at = _token_issued_at(payload)
|
||||||
|
if user.access_revoked_before is not None and issued_at is not None:
|
||||||
|
revoked_before = (
|
||||||
|
user.access_revoked_before
|
||||||
|
if user.access_revoked_before.tzinfo
|
||||||
|
else user.access_revoked_before.replace(tzinfo=timezone.utc)
|
||||||
|
)
|
||||||
|
if issued_at <= revoked_before:
|
||||||
|
raise credentials_error
|
||||||
return user
|
return user
|
||||||
|
|
||||||
|
|
||||||
@@ -339,6 +370,15 @@ async def get_current_user_for_ws(token: str, db: AsyncSession) -> User:
|
|||||||
user = await get_user_by_id(db, int(user_id))
|
user = await get_user_by_id(db, int(user_id))
|
||||||
if not user:
|
if not user:
|
||||||
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found")
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found")
|
||||||
|
issued_at = _token_issued_at(payload)
|
||||||
|
if user.access_revoked_before is not None and issued_at is not None:
|
||||||
|
revoked_before = (
|
||||||
|
user.access_revoked_before
|
||||||
|
if user.access_revoked_before.tzinfo
|
||||||
|
else user.access_revoked_before.replace(tzinfo=timezone.utc)
|
||||||
|
)
|
||||||
|
if issued_at <= revoked_before:
|
||||||
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Token was revoked")
|
||||||
return user
|
return user
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -37,6 +37,7 @@ class User(Base):
|
|||||||
nullable=False,
|
nullable=False,
|
||||||
)
|
)
|
||||||
last_seen_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True, index=True)
|
last_seen_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True, index=True)
|
||||||
|
access_revoked_before: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True)
|
||||||
|
|
||||||
memberships: Mapped[list["ChatMember"]] = relationship(back_populates="user", cascade="all, delete-orphan")
|
memberships: Mapped[list["ChatMember"]] = relationship(back_populates="user", cascade="all, delete-orphan")
|
||||||
sent_messages: Mapped[list["Message"]] = relationship(back_populates="sender")
|
sent_messages: Mapped[list["Message"]] = relationship(back_populates="sender")
|
||||||
|
|||||||
@@ -508,7 +508,8 @@ Response: `204`
|
|||||||
### DELETE `/api/v1/auth/sessions`
|
### DELETE `/api/v1/auth/sessions`
|
||||||
|
|
||||||
Auth required.
|
Auth required.
|
||||||
Response: `204`
|
Response: `204`
|
||||||
|
Behavior: revokes all refresh sessions and invalidates all access tokens issued before this request.
|
||||||
|
|
||||||
### POST `/api/v1/auth/2fa/setup`
|
### POST `/api/v1/auth/2fa/setup`
|
||||||
|
|
||||||
|
|||||||
@@ -236,5 +236,6 @@ Validation/runtime error during WS processing:
|
|||||||
- Send periodic `ping` and expect `pong`.
|
- Send periodic `ping` and expect `pong`.
|
||||||
- Reconnect with exponential backoff.
|
- Reconnect with exponential backoff.
|
||||||
- On `chat_updated`, refresh chat metadata via REST (`GET /api/v1/chats` or `GET /api/v1/chats/{chat_id}`).
|
- On `chat_updated`, refresh chat metadata via REST (`GET /api/v1/chats` or `GET /api/v1/chats/{chat_id}`).
|
||||||
|
- On reconnect/visibility restore, reconcile state by reloading already-opened chats/messages via REST
|
||||||
|
to recover missed `message_deleted`/delivery updates after transient disconnects or backend restarts.
|
||||||
- Use REST message history endpoints for pagination; WS is realtime transport, not history source.
|
- Use REST message history endpoints for pagination; WS is realtime transport, not history source.
|
||||||
|
|
||||||
|
|||||||
@@ -134,7 +134,7 @@ export function useRealtime() {
|
|||||||
const chatId = Number(event.payload.chat_id);
|
const chatId = Number(event.payload.chat_id);
|
||||||
if (Number.isFinite(chatId)) {
|
if (Number.isFinite(chatId)) {
|
||||||
scheduleReloadChats();
|
scheduleReloadChats();
|
||||||
if (chatStore.activeChatId === chatId) {
|
if (chatStore.activeChatId === chatId || (chatStore.messagesByChat[chatId]?.length ?? 0) > 0) {
|
||||||
void chatStore.loadMessages(chatId);
|
void chatStore.loadMessages(chatId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -282,8 +282,15 @@ export function useRealtime() {
|
|||||||
const storeBefore = useChatStore.getState();
|
const storeBefore = useChatStore.getState();
|
||||||
await storeBefore.loadChats();
|
await storeBefore.loadChats();
|
||||||
const storeAfter = useChatStore.getState();
|
const storeAfter = useChatStore.getState();
|
||||||
|
const loadedChatIds = Object.keys(storeAfter.messagesByChat)
|
||||||
|
.map((value) => Number(value))
|
||||||
|
.filter((value) => Number.isFinite(value) && (storeAfter.messagesByChat[value]?.length ?? 0) > 0);
|
||||||
|
const uniqueChatIds = new Set<number>(loadedChatIds);
|
||||||
if (storeAfter.activeChatId) {
|
if (storeAfter.activeChatId) {
|
||||||
await storeAfter.loadMessages(storeAfter.activeChatId);
|
uniqueChatIds.add(storeAfter.activeChatId);
|
||||||
|
}
|
||||||
|
for (const chatId of uniqueChatIds) {
|
||||||
|
await storeAfter.loadMessages(chatId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user