Source code for back_chat.routes.streaming_routes

"""
WebSocket endpoints for real-time messaging and notifications.

This module provides WebSocket routes for chat message exchange and
user connection tracking. Authentication is required for both endpoints
via a query parameter token.

Routes:
- `/messages`: WebSocket for real-time chat messaging.
- `/notifications`: WebSocket for real-time notifications.
- `/connected_users`: HTTP endpoint to retrieve the list of connected clients.
"""

import uuid

from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Query

from ..configuration import MANAGER
from ..descriptors import MessageType
from ..middleware.auth_websocket import WebSocketAuthMiddleware
from ..models import Message
from ..models.schemas import NotificationSchema, MessageSchema

ws_router = APIRouter()
websocket_auth = WebSocketAuthMiddleware()


@ws_router.websocket("/messages")
async def websocket_endpoint(websocket: WebSocket, token: str = Query(...)):
    """
    WebSocket endpoint for real-time chat messaging.

    Clients must authenticate using a token passed as a query parameter.
    Once connected, messages are broadcasted to all users and stored in the
    database.

    :param websocket: WebSocket connection instance.
    :param token: Authentication token passed as a query parameter.
    """
    client_id = websocket_auth.is_auth(token)
    if client_id == '':
        return websocket_auth.unauthorised(websocket)

    client_id = client_id['sub'][:9] + f"{uuid.uuid4().hex[:4]}"
    await MANAGER.connect(client_id, websocket)

    msg_ = MessageSchema(user_id=client_id)
    msg_.connection_msg()
    await MANAGER.broadcast(msg_.to_json())

    try:
        while True:
            data = await websocket.receive_text()
            message_data = MessageSchema.parse_raw(data)

            if message_data.mtype == MessageType.MESSAGE.value:
                message_data.user_id = (
                    message_data.user_id if message_data.user_id != "null"
                    else client_id
                )
                Message.create(
                    user_id=message_data.user_id,
                    content=message_data.content
                )
                await MANAGER.broadcast(message_data.to_json())
    except WebSocketDisconnect:
        MANAGER.disconnect(client_id)
        msg_ = MessageSchema(user_id=client_id)
        msg_.disconnection_msg()
        await MANAGER.broadcast(msg_.to_json())


[docs] @ws_router.get("/connected_users") async def get_connected_users(): """ Retrieve the list of currently connected WebSocket client IDs. :return: Dictionary containing a list of connected user IDs. """ users = MANAGER.get_connected_users() return {"connected_users": users}
[docs] @ws_router.websocket("/notifications") async def websocket_endpoint(websocket: WebSocket, token: str = Query(...)): """ WebSocket endpoint for receiving real-time notifications. Clients must authenticate using a token passed as a query parameter. The connection is registered with a combination of client ID and IP:port. Incoming messages are parsed as notifications. :param websocket: WebSocket connection instance. :param token: Authentication token passed as a query parameter. """ client_id = websocket_auth.is_auth(token) if client_id == '': return websocket_auth.unauthorised(websocket) ipp_ = websocket.client.host + str(websocket.client.port) name_connection = client_id + ipp_ await MANAGER.connect(name_connection, websocket) try: while True: data = await websocket.receive_text() message_data = NotificationSchema.parse_raw(data) print(message_data) # Uncomment below to publish to RabbitMQ exchange # await RABBITMQ_MANAGER.publish_message_to_exchange( # EXCHANGE_NAME, message_data.json()) except WebSocketDisconnect: MANAGER.disconnect(name_connection)