room 기능을 구현해보자.

 

이전 글과 server.py는 변한게 없다.

# sockets.py
import socketio

sio_server = socketio.AsyncServer(
    async_mode = 'asgi',
    cors_allowed_origins=[]
)

sio_app = socketio.ASGIApp(
    socketio_server=sio_server,
    socketio_path='/ws/socket.io'
)

@sio_server.on('connect')
async def connect(sid, environ, auth):
    print(f'{sid}: connected')

@sio_server.on('enter_room')
async def begin_chat(sid, room):
    print(f'{sid}: enterd "{room}"')
    await sio_server.enter_room(sid, room)
    
@sio_server.on('exit_room')
async def exit_chat(sid, room):
    await sio_server.leave_room(sid, room)

@sio_server.on('disconnect')
async def disconnect(sid):
    print(f'{sid}: disconnected')
# client.py
import socketio
import asyncio

sio_client = socketio.AsyncClient()

@sio_client.event
async def connect():
    print('I\'m connected')
    await sio_client.emit('enter_room', 'my_room')

@sio_client.event
async def disconnect():
    print('I\'m disconnected')

async def main():
    await sio_client.connect(
        url='http://localhost:8000/ws',
        socketio_path='/ws/socket.io'
    )
    await sio_client.disconnect()

asyncio.run(main())

터미널 세션 두 개를 열어 client를 두 명 만들어보자

server
client

 

FastAPI log를 확인해보면 sid가 다른 두 명의 클라이언트가 my_room 이라는 방에 들어왔다가 나간걸 확인할 수 있다.

클라이언트가 정말 같은 방에 있는지 확인하기 위해

서버에서 my_room에 있는 클라이언트들에게 메시지를 보내보자.

 

# socket.py

@sio_server.on('chat_message')
async def chat_message(sid, data):
    message = data['message']
    room = data['room']
    print(f'{sid} in [{room}]: {message}')
    await sio_server.emit('server_response', message, room=room)
# client.py

@sio_client.event
async def connect():
    print('I\'m connected')
    await sio_client.emit('enter_room', 'my_room')
    await sio_client.emit('chat_message', {'room': 'my_room', 'message': 'I\'m in "my_room"!'})
    
@sio_client.event
async def server_response(data):
    print('Server response:', data)
    
async def main():
    await sio_client.connect(
        url='http://localhost:8000/ws',
        socketio_path='/ws/socket.io'
    )
    await asyncio.sleep(30)
    await sio_client.disconnect()

 

서버에서 메시지를 보내는 로직을 추가하고

client 두 명을 테스트하기 위해 30초 sleep 또한 추가했다.

 

server
첫 번째 client 실행                                        두 번째 client 실행

 

두 번째 client를 실행했을 때 첫 번째 client에 I'm in "my_room"!이 한 번 더 출력됨을 확인할 수 있다.

 

이렇게 원하는 room을 만들고 접속하여 room 안에 client들이 통신할 수 있다.

 


 

지금까지는

uvicorn server:app --host 0.0.0.0 --port 8000 --reload 처럼 FastAPI의 worker가 하나였다.

 

worker가 여러 개 띄우고 결과를 보자.

gunicorn -k uvicorn.workers.UvicornWorker -w 5 -b 0.0.0.0:8000 server:app --reload

gunicorn을 사용해 worker를 5개 띄우고 client 2개를 실행해보자.

server
첫 번째 client 실행                                        두 번째 client 실행

 

첫 번째 실행한 client에 두 번째 client의 출력이 뜨지 않았다.

이유는 각각 독립적으로 실행되기 때문이다.

 

FastAPI의 워커를 여러 개 띄우면 각 워커는 고유한 프로세스로 실행된다.

이는 각 프로세스가 자체 메모리 공간을 가지고 독립적으로 실행된다는 것을 의미한다.

따라서, 각각의 워커는 고유한 Socket.IO 서버 인스턴스를 가지며, 트래픽은 운영체제에 의해 랜덤하게 워커로 할당된다.

인스턴스들은 서로의 상태를 공유하지 않는다.

 

이를 해결하기 위해 여러 워커 간의 상태를 공유할 수 있는 방법이 필요하다.

Socket.IO에는 이러한 환경을 위해 외부 메시지 브로커를 사용하는 방법이 있다.

 

메시지 브로커를 사용해 다른 워커/프로세스 간에 메시지를 중계할 수 있으며

이를 통해 모든 클라이언트가 동일한 my_room에 있는 것처럼 동작하게 할 수 있다.

 

 

외부 메시지 브로커로는 RabbitMQ를 사용하려고 한다.

 

# sockets.py
mgr = socketio.AsyncAioPikaManager('amqp://guest:guest@rabbitmq:5672/vhost')
sio_server = socketio.AsyncServer(
    async_mode = 'asgi',
    cors_allowed_origins=[],
    client_manager=mgr
)

 

aio-pika를 설치한 후

manager를 추가해주면 끝이다.

 

마찬가지로 worker를 5개 띄우고 client를 2개 띄워 RMQ와 함께 확인해보자.

 

server
첫 번째 client 실행                                        두 번째 client 실행

 

첫 번째 실행한 client 출력에 두 번째 client의 출력이 보인다.

client 별로 FastAPI에서 처리되는 worker가 다르더라도 같은 my_room에서 메시지가 오고가고 있다.

 

Rabbit MQ 웹을  확인해보자.

RMQ exchange

 

socketio exchange가 생겼다.

이를 통해 room에 있는 client 모두에게 서버 메시지를 전송할 수 있는 것이다.