Server-Sent Eventsでリアルタイムチャットを実装する話

チャットシステムを開発していると、生成AIのように処理結果が逐次的に生成されるケースに遭遇します。
従来の方式では、すべてのメッセージが生成完了してから一括表示されるため、特に長文の場合はユーザーを長時間待たせることになってしまいます。

この課題を解決する方法が、生成されたテキストを逐次的に表示する実装です。
そして、このストリーミング表示を実現する技術が「text/event-stream」です。

本記事では、React + Pythonを使ってServer-Sent Events (text/event-stream)を実装する方法について解説します。

操作はシンプルです
POST /chat/ask
=> GET /chat/stream

フロントエンド側(TypeScript)はこちらです
送信ボタン後の処理を書いています

// 質問送信
const handleSend = async () => {
  if (!input.trim() || streaming) return;
  setChat((prev) => [...prev, { role: "user", content: input }]);
  setInput("");
  setStreaming(true);

  // サーバーにPOST(仮実装: /chat/ask で質問を送信)
  await fetch("http://localhost:8000/chat/ask", {
    method: "POST",
    headers: { "Content-Type": "application/json" },
    body: JSON.stringify({ question: input }),
  });

  // 回答ストリーム受信
  let answer = "";
  const eventSource = new EventSource("http://localhost:8000/chat/stream");
  controllerRef.current = eventSource;
  eventSource.onmessage = (event) => {
    answer += event.data;
    setChat((prev) => {
      // 直近のassistantの回答を更新
      if (prev.length > 0 && prev[prev.length - 1].role === "assistant") {
        return [...prev.slice(0, -1), { role: "assistant", content: answer }];
      } else {
        return [...prev, { role: "assistant", content: answer }];
      }
    });
  };
  eventSource.onerror = () => {
    eventSource.close();
    setStreaming(false);
  };
  eventSource.onopen = () => {
    // ストリーム開始時に空のassistant回答を追加
    setChat((prev) => [...prev, { role: "assistant", content: "" }]);
  };
};

バックエンド側(Python)はこの様な実装です

# 質問を一時保存(簡易実装)
last_question = ""

@app.post("/chat/ask")
async def chat_ask(data: dict = Body(...)):
    global last_question
    last_question = data.get("question", "")
    return {"status": "ok"}

@app.get("/chat/stream")
async def chat_stream(request: Request):
    async def event_generator():
        global last_question
        if not last_question:
            return
        # ここでlast_questionを使って回答を生成
        answer = f"「{last_question}」というご質問ですね。\nこれはサンプルのストリーミング回答です。\n---\n"
        for c in answer:
            if await request.is_disconnected():
                return
            yield {
                "event": "message",
                "data": c
            }
            await asyncio.sleep(0.05)
        last_question = ""  # 1回使ったらリセット
    return EventSourceResponse(event_generator()) 

終いに

今回は、逐次的にチャットメッセージを表示するシンプルなサンプル実装を紹介しました。

この実装方法を調べる過程で、複雑な処理を含むサンプルは多く見つかりましたが、シンプルなEcho処理だけのサンプルは意外と少ない印象でした。
そこで、基本的な実装パターンを自分用のメモも兼ねて記事として残しておくことにしました。

なお、本記事の実装はあくまで一例です。より効率的な方法として、例えば POST /ask のレスポンス自体をストリーム形式にすることで、単一のAPIエンドポイントで完結させることも可能かもしれません。
現在の実装では、サーバー側で last_question というリソースを占有してしまう設計になっているため、改善の余地があります。

今後、よりシンプルで実装しやすい方法を見つけた際には、改めて記事にする予定です。