feat: add reply/forward/pin message flow across backend and web
Some checks failed
CI / test (push) Failing after 24s
Some checks failed
CI / test (push) Failing after 24s
- add reply_to/forwarded_from message fields and chat pinned_message field - add forward and pin APIs plus reply support in message create - wire web actions: Reply, Fwd, Pin and reply composer state - fix spam policy bug: allow repeated identical messages, keep rate limiting
This commit is contained in:
@@ -30,6 +30,7 @@ class Chat(Base):
|
||||
id: Mapped[int] = mapped_column(primary_key=True, index=True)
|
||||
type: Mapped[ChatType] = mapped_column(SAEnum(ChatType), nullable=False, index=True)
|
||||
title: Mapped[str | None] = mapped_column(String(255), nullable=True)
|
||||
pinned_message_id: Mapped[int | None] = mapped_column(ForeignKey("messages.id", ondelete="SET NULL"), nullable=True, index=True)
|
||||
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now(), nullable=False)
|
||||
|
||||
members: Mapped[list["ChatMember"]] = relationship(back_populates="chat", cascade="all, delete-orphan")
|
||||
|
||||
@@ -8,6 +8,7 @@ from app.chats.schemas import (
|
||||
ChatMemberAddRequest,
|
||||
ChatMemberRead,
|
||||
ChatMemberRoleUpdateRequest,
|
||||
ChatPinMessageRequest,
|
||||
ChatRead,
|
||||
ChatTitleUpdateRequest,
|
||||
)
|
||||
@@ -17,6 +18,7 @@ from app.chats.service import (
|
||||
get_chat_for_user,
|
||||
get_chats_for_user,
|
||||
leave_chat_for_user,
|
||||
pin_chat_message_for_user,
|
||||
remove_chat_member_for_user,
|
||||
update_chat_member_role_for_user,
|
||||
update_chat_title_for_user,
|
||||
@@ -58,6 +60,7 @@ async def get_chat(
|
||||
id=chat.id,
|
||||
type=chat.type,
|
||||
title=chat.title,
|
||||
pinned_message_id=chat.pinned_message_id,
|
||||
created_at=chat.created_at,
|
||||
members=members,
|
||||
)
|
||||
@@ -127,3 +130,13 @@ async def leave_chat(
|
||||
current_user: User = Depends(get_current_user),
|
||||
) -> None:
|
||||
await leave_chat_for_user(db, chat_id=chat_id, user_id=current_user.id)
|
||||
|
||||
|
||||
@router.post("/{chat_id}/pin", response_model=ChatRead)
|
||||
async def pin_chat_message(
|
||||
chat_id: int,
|
||||
payload: ChatPinMessageRequest,
|
||||
db: AsyncSession = Depends(get_db),
|
||||
current_user: User = Depends(get_current_user),
|
||||
) -> ChatRead:
|
||||
return await pin_chat_message_for_user(db, chat_id=chat_id, user_id=current_user.id, payload=payload)
|
||||
|
||||
@@ -11,6 +11,7 @@ class ChatRead(BaseModel):
|
||||
id: int
|
||||
type: ChatType
|
||||
title: str | None = None
|
||||
pinned_message_id: int | None = None
|
||||
created_at: datetime
|
||||
|
||||
|
||||
@@ -43,3 +44,7 @@ class ChatMemberRoleUpdateRequest(BaseModel):
|
||||
|
||||
class ChatTitleUpdateRequest(BaseModel):
|
||||
title: str = Field(min_length=1, max_length=255)
|
||||
|
||||
|
||||
class ChatPinMessageRequest(BaseModel):
|
||||
message_id: int | None = None
|
||||
|
||||
@@ -4,7 +4,8 @@ from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from app.chats import repository
|
||||
from app.chats.models import Chat, ChatMember, ChatMemberRole, ChatType
|
||||
from app.chats.schemas import ChatCreateRequest, ChatTitleUpdateRequest
|
||||
from app.chats.schemas import ChatCreateRequest, ChatPinMessageRequest, ChatTitleUpdateRequest
|
||||
from app.messages.repository import get_message_by_id
|
||||
from app.users.repository import get_user_by_id
|
||||
|
||||
|
||||
@@ -211,3 +212,28 @@ async def leave_chat_for_user(db: AsyncSession, *, chat_id: int, user_id: int) -
|
||||
)
|
||||
await repository.delete_chat_member(db, membership)
|
||||
await db.commit()
|
||||
|
||||
|
||||
async def pin_chat_message_for_user(
|
||||
db: AsyncSession,
|
||||
*,
|
||||
chat_id: int,
|
||||
user_id: int,
|
||||
payload: ChatPinMessageRequest,
|
||||
) -> Chat:
|
||||
chat, membership = await _get_chat_and_membership(db, chat_id=chat_id, user_id=user_id)
|
||||
_ensure_group_or_channel(chat.type)
|
||||
_ensure_manage_permission(membership.role)
|
||||
if payload.message_id is None:
|
||||
chat.pinned_message_id = None
|
||||
await db.commit()
|
||||
await db.refresh(chat)
|
||||
return chat
|
||||
|
||||
message = await get_message_by_id(db, payload.message_id)
|
||||
if not message or message.chat_id != chat_id:
|
||||
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Message not found in chat")
|
||||
chat.pinned_message_id = message.id
|
||||
await db.commit()
|
||||
await db.refresh(chat)
|
||||
return chat
|
||||
|
||||
@@ -29,6 +29,16 @@ class Message(Base):
|
||||
id: Mapped[int] = mapped_column(primary_key=True, index=True)
|
||||
chat_id: Mapped[int] = mapped_column(ForeignKey("chats.id", ondelete="CASCADE"), nullable=False, index=True)
|
||||
sender_id: Mapped[int] = mapped_column(ForeignKey("users.id", ondelete="CASCADE"), nullable=False, index=True)
|
||||
reply_to_message_id: Mapped[int | None] = mapped_column(
|
||||
ForeignKey("messages.id", ondelete="SET NULL"),
|
||||
nullable=True,
|
||||
index=True,
|
||||
)
|
||||
forwarded_from_message_id: Mapped[int | None] = mapped_column(
|
||||
ForeignKey("messages.id", ondelete="SET NULL"),
|
||||
nullable=True,
|
||||
index=True,
|
||||
)
|
||||
type: Mapped[MessageType] = mapped_column(SAEnum(MessageType), nullable=False, default=MessageType.TEXT)
|
||||
text: Mapped[str | None] = mapped_column(Text, nullable=True)
|
||||
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now(), nullable=False)
|
||||
|
||||
@@ -10,10 +10,19 @@ async def create_message(
|
||||
*,
|
||||
chat_id: int,
|
||||
sender_id: int,
|
||||
reply_to_message_id: int | None,
|
||||
forwarded_from_message_id: int | None,
|
||||
message_type: MessageType,
|
||||
text: str | None,
|
||||
) -> Message:
|
||||
message = Message(chat_id=chat_id, sender_id=sender_id, type=message_type, text=text)
|
||||
message = Message(
|
||||
chat_id=chat_id,
|
||||
sender_id=sender_id,
|
||||
reply_to_message_id=reply_to_message_id,
|
||||
forwarded_from_message_id=forwarded_from_message_id,
|
||||
type=message_type,
|
||||
text=text,
|
||||
)
|
||||
db.add(message)
|
||||
await db.flush()
|
||||
return message
|
||||
|
||||
@@ -3,8 +3,8 @@ from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from app.auth.service import get_current_user
|
||||
from app.database.session import get_db
|
||||
from app.messages.schemas import MessageCreateRequest, MessageRead, MessageStatusUpdateRequest, MessageUpdateRequest
|
||||
from app.messages.service import create_chat_message, delete_message, get_messages, search_messages, update_message
|
||||
from app.messages.schemas import MessageCreateRequest, MessageForwardRequest, MessageRead, MessageStatusUpdateRequest, MessageUpdateRequest
|
||||
from app.messages.service import create_chat_message, delete_message, forward_message, get_messages, search_messages, update_message
|
||||
from app.realtime.schemas import MessageStatusPayload
|
||||
from app.realtime.service import realtime_gateway
|
||||
from app.users.models import User
|
||||
@@ -80,3 +80,15 @@ async def update_status(
|
||||
payload=MessageStatusPayload(chat_id=payload.chat_id, message_id=payload.message_id),
|
||||
event=payload.status,
|
||||
)
|
||||
|
||||
|
||||
@router.post("/{message_id}/forward", response_model=MessageRead, status_code=status.HTTP_201_CREATED)
|
||||
async def forward_message_endpoint(
|
||||
message_id: int,
|
||||
payload: MessageForwardRequest,
|
||||
db: AsyncSession = Depends(get_db),
|
||||
current_user: User = Depends(get_current_user),
|
||||
) -> MessageRead:
|
||||
message = await forward_message(db, source_message_id=message_id, sender_id=current_user.id, payload=payload)
|
||||
await realtime_gateway.publish_message_created(message=message, sender_id=current_user.id)
|
||||
return message
|
||||
|
||||
@@ -12,6 +12,8 @@ class MessageRead(BaseModel):
|
||||
id: int
|
||||
chat_id: int
|
||||
sender_id: int
|
||||
reply_to_message_id: int | None
|
||||
forwarded_from_message_id: int | None
|
||||
type: MessageType
|
||||
text: str | None
|
||||
created_at: datetime
|
||||
@@ -23,6 +25,7 @@ class MessageCreateRequest(BaseModel):
|
||||
type: MessageType = MessageType.TEXT
|
||||
text: str | None = Field(default=None, max_length=4096)
|
||||
client_message_id: str | None = Field(default=None, min_length=8, max_length=64)
|
||||
reply_to_message_id: int | None = None
|
||||
|
||||
|
||||
class MessageUpdateRequest(BaseModel):
|
||||
@@ -33,3 +36,7 @@ class MessageStatusUpdateRequest(BaseModel):
|
||||
chat_id: int
|
||||
message_id: int
|
||||
status: Literal["message_delivered", "message_read"]
|
||||
|
||||
|
||||
class MessageForwardRequest(BaseModel):
|
||||
target_chat_id: int
|
||||
|
||||
@@ -6,12 +6,16 @@ from app.chats.service import ensure_chat_membership
|
||||
from app.messages import repository
|
||||
from app.messages.models import Message
|
||||
from app.messages.spam_guard import enforce_message_spam_policy
|
||||
from app.messages.schemas import MessageCreateRequest, MessageStatusUpdateRequest, MessageUpdateRequest
|
||||
from app.messages.schemas import MessageCreateRequest, MessageForwardRequest, MessageStatusUpdateRequest, MessageUpdateRequest
|
||||
from app.notifications.service import dispatch_message_notifications
|
||||
|
||||
|
||||
async def create_chat_message(db: AsyncSession, *, sender_id: int, payload: MessageCreateRequest) -> Message:
|
||||
await ensure_chat_membership(db, chat_id=payload.chat_id, user_id=sender_id)
|
||||
if payload.reply_to_message_id is not None:
|
||||
reply_to = await repository.get_message_by_id(db, payload.reply_to_message_id)
|
||||
if not reply_to or reply_to.chat_id != payload.chat_id:
|
||||
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail="Invalid reply target")
|
||||
if payload.client_message_id:
|
||||
existing = await repository.get_message_by_client_message_id(
|
||||
db,
|
||||
@@ -30,6 +34,8 @@ async def create_chat_message(db: AsyncSession, *, sender_id: int, payload: Mess
|
||||
db,
|
||||
chat_id=payload.chat_id,
|
||||
sender_id=sender_id,
|
||||
reply_to_message_id=payload.reply_to_message_id,
|
||||
forwarded_from_message_id=None,
|
||||
message_type=payload.type,
|
||||
text=payload.text,
|
||||
)
|
||||
@@ -167,3 +173,29 @@ async def mark_message_status(
|
||||
"last_delivered_message_id": receipt.last_delivered_message_id or 0,
|
||||
"last_read_message_id": receipt.last_read_message_id or 0,
|
||||
}
|
||||
|
||||
|
||||
async def forward_message(
|
||||
db: AsyncSession,
|
||||
*,
|
||||
source_message_id: int,
|
||||
sender_id: int,
|
||||
payload: MessageForwardRequest,
|
||||
) -> Message:
|
||||
source = await repository.get_message_by_id(db, source_message_id)
|
||||
if not source:
|
||||
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Source message not found")
|
||||
await ensure_chat_membership(db, chat_id=source.chat_id, user_id=sender_id)
|
||||
await ensure_chat_membership(db, chat_id=payload.target_chat_id, user_id=sender_id)
|
||||
forwarded = await repository.create_message(
|
||||
db,
|
||||
chat_id=payload.target_chat_id,
|
||||
sender_id=sender_id,
|
||||
reply_to_message_id=None,
|
||||
forwarded_from_message_id=source.id,
|
||||
message_type=source.type,
|
||||
text=source.text,
|
||||
)
|
||||
await db.commit()
|
||||
await db.refresh(forwarded)
|
||||
return forwarded
|
||||
|
||||
@@ -1,16 +1,9 @@
|
||||
import hashlib
|
||||
|
||||
from fastapi import HTTPException, status
|
||||
from redis.exceptions import RedisError
|
||||
|
||||
from app.config.settings import settings
|
||||
from app.utils.redis_client import get_redis_client
|
||||
|
||||
|
||||
def _hash_text(text: str) -> str:
|
||||
return hashlib.sha256(text.encode("utf-8")).hexdigest()
|
||||
|
||||
|
||||
async def enforce_message_spam_policy(*, user_id: int, chat_id: int, text: str | None) -> None:
|
||||
redis = get_redis_client()
|
||||
rate_key = f"spam:msg_rate:{user_id}:{chat_id}"
|
||||
@@ -23,15 +16,5 @@ async def enforce_message_spam_policy(*, user_id: int, chat_id: int, text: str |
|
||||
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
|
||||
detail="Message rate limit exceeded for this chat.",
|
||||
)
|
||||
|
||||
normalized = (text or "").strip()
|
||||
if normalized:
|
||||
dup_key = f"spam:dup:{user_id}:{chat_id}:{_hash_text(normalized)}"
|
||||
if await redis.exists(dup_key):
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
|
||||
detail="Duplicate message cooldown is active.",
|
||||
)
|
||||
await redis.set(dup_key, "1", ex=settings.duplicate_message_cooldown_seconds)
|
||||
except RedisError:
|
||||
return
|
||||
|
||||
@@ -25,6 +25,7 @@ class SendMessagePayload(BaseModel):
|
||||
text: str | None = Field(default=None, max_length=4096)
|
||||
temp_id: str | None = None
|
||||
client_message_id: str | None = Field(default=None, min_length=8, max_length=64)
|
||||
reply_to_message_id: int | None = None
|
||||
|
||||
|
||||
class ChatEventPayload(BaseModel):
|
||||
|
||||
@@ -86,6 +86,7 @@ class RealtimeGateway:
|
||||
type=payload.type,
|
||||
text=payload.text,
|
||||
client_message_id=payload.client_message_id or payload.temp_id,
|
||||
reply_to_message_id=payload.reply_to_message_id,
|
||||
),
|
||||
)
|
||||
await self.publish_message_created(
|
||||
|
||||
Reference in New Issue
Block a user