[FastAPI] room 기능 구현 및 worker간 공유 (RabbitMQ)
room 기능을 구현해보자.
이전 글과 server.py는 변한게 없다.
# sockets.py
import socketio
sio_server = socketio.AsyncServer(
async_mode = 'asgi',
cors_allowed_origins=[]
)
sio_app = socketio.ASGIApp(
socketio_server=sio_server,
socketio_path='/ws/socket.io'
)
@sio_server.on('connect')
async def connect(sid, environ, auth):
print(f'{sid}: connected')
@sio_server.on('enter_room')
async def begin_chat(sid, room):
print(f'{sid}: enterd "{room}"')
await sio_server.enter_room(sid, room)
@sio_server.on('exit_room')
async def exit_chat(sid, room):
await sio_server.leave_room(sid, room)
@sio_server.on('disconnect')
async def disconnect(sid):
print(f'{sid}: disconnected')
# client.py
import socketio
import asyncio
sio_client = socketio.AsyncClient()
@sio_client.event
async def connect():
print('I\'m connected')
await sio_client.emit('enter_room', 'my_room')
@sio_client.event
async def disconnect():
print('I\'m disconnected')
async def main():
await sio_client.connect(
url='http://localhost:8000/ws',
socketio_path='/ws/socket.io'
)
await sio_client.disconnect()
asyncio.run(main())
터미널 세션 두 개를 열어 client를 두 명 만들어보자
FastAPI log를 확인해보면 sid가 다른 두 명의 클라이언트가 my_room 이라는 방에 들어왔다가 나간걸 확인할 수 있다.
클라이언트가 정말 같은 방에 있는지 확인하기 위해
서버에서 my_room에 있는 클라이언트들에게 메시지를 보내보자.
# socket.py
@sio_server.on('chat_message')
async def chat_message(sid, data):
message = data['message']
room = data['room']
print(f'{sid} in [{room}]: {message}')
await sio_server.emit('server_response', message, room=room)
# client.py
@sio_client.event
async def connect():
print('I\'m connected')
await sio_client.emit('enter_room', 'my_room')
await sio_client.emit('chat_message', {'room': 'my_room', 'message': 'I\'m in "my_room"!'})
@sio_client.event
async def server_response(data):
print('Server response:', data)
async def main():
await sio_client.connect(
url='http://localhost:8000/ws',
socketio_path='/ws/socket.io'
)
await asyncio.sleep(30)
await sio_client.disconnect()
서버에서 메시지를 보내는 로직을 추가하고
client 두 명을 테스트하기 위해 30초 sleep 또한 추가했다.
두 번째 client를 실행했을 때 첫 번째 client에 I'm in "my_room"!이 한 번 더 출력됨을 확인할 수 있다.
이렇게 원하는 room을 만들고 접속하여 room 안에 client들이 통신할 수 있다.
지금까지는
uvicorn server:app --host 0.0.0.0 --port 8000 --reload 처럼 FastAPI의 worker가 하나였다.
worker가 여러 개 띄우고 결과를 보자.
gunicorn -k uvicorn.workers.UvicornWorker -w 5 -b 0.0.0.0:8000 server:app --reload
gunicorn을 사용해 worker를 5개 띄우고 client 2개를 실행해보자.
첫 번째 실행한 client에 두 번째 client의 출력이 뜨지 않았다.
이유는 각각 독립적으로 실행되기 때문이다.
FastAPI의 워커를 여러 개 띄우면 각 워커는 고유한 프로세스로 실행된다.
이는 각 프로세스가 자체 메모리 공간을 가지고 독립적으로 실행된다는 것을 의미한다.
따라서, 각각의 워커는 고유한 Socket.IO 서버 인스턴스를 가지며, 트래픽은 운영체제에 의해 랜덤하게 워커로 할당된다.
이 인스턴스들은 서로의 상태를 공유하지 않는다.
이를 해결하기 위해 여러 워커 간의 상태를 공유할 수 있는 방법이 필요하다.
Socket.IO에는 이러한 환경을 위해 외부 메시지 브로커를 사용하는 방법이 있다.
메시지 브로커를 사용해 다른 워커/프로세스 간에 메시지를 중계할 수 있으며
이를 통해 모든 클라이언트가 동일한 my_room에 있는 것처럼 동작하게 할 수 있다.
외부 메시지 브로커로는 RabbitMQ를 사용하려고 한다.
# sockets.py
mgr = socketio.AsyncAioPikaManager('amqp://guest:guest@rabbitmq:5672/vhost')
sio_server = socketio.AsyncServer(
async_mode = 'asgi',
cors_allowed_origins=[],
client_manager=mgr
)
aio-pika를 설치한 후
manager를 추가해주면 끝이다.
마찬가지로 worker를 5개 띄우고 client를 2개 띄워 RMQ와 함께 확인해보자.
첫 번째 실행한 client 출력에 두 번째 client의 출력이 보인다.
client 별로 FastAPI에서 처리되는 worker가 다르더라도 같은 my_room에서 메시지가 오고가고 있다.
Rabbit MQ 웹을 확인해보자.
socketio exchange가 생겼다.
이를 통해 room에 있는 client 모두에게 서버 메시지를 전송할 수 있는 것이다.
'WebFramework > [FastAPI]' 카테고리의 다른 글
[FastAPI] Gunicorn threads 옵션 (0) | 2024.11.06 |
---|---|
[FastAPI] SocketIO Admin UI (0) | 2024.01.25 |
[FastAPI] SocketIO 마운트해서 사용하기 (0) | 2024.01.24 |
[FastAPI] BackgroundTasks, Celery (0) | 2024.01.09 |
[FastAPI] 중첩된 JSON 모델(Nested JSON Models) 사용 (0) | 2023.09.25 |