轻叩SSE

轻叩SSE

工作中经常会遇到配置websocket的需求,本周开发同学提了一个sse的问题,感觉有些陌生,sse是什么呢?

wikipedia上这样解释SSE:

Server-Sent Events (SSE) is a server push technology enabling a client to receive automatic updates from a server via an HTTP connection, and describes how servers can initiate data transmission towards clients once an initial client connection has been established.

有几篇写的非常好的文章:

SSE 是单向通道,只能服务器向浏览器发送。

简单实现

 拿阮一峰老师的例子

var http = require("http");
const PORT = process.env.PORT || 8844;
const HOST = process.env.HOST || "127.0.0.1";

http.createServer(function (req, res) {
    const url = new URL(req.url, `http://${req.headers.host}`);
    const path = url.pathname;

    if (path === "/stream") {
        res.writeHead(200, {
            "Content-Type": "text/event-stream",
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "Access-Control-Allow-Origin": '*',
        });
        res.write("retry: 10000\n");
        res.write("event: connecttime\n");
        res.write("data: " + new Date().toLocaleString('zh-CN', { timeZone: 'Asia/Shanghai' }) + "\n\n");

        let intervalId = setInterval(() => {
            res.write("data: " + new Date().toLocaleString('zh-CN', { timeZone: 'Asia/Shanghai' }) + "\n\n");
        }, 1000);

        req.on("close", () => {
            clearInterval(intervalId);
            console.log("Client disconnected");
        });

        // Log the connection
        console.log(`Client connected to ${path} at ${new Date()}`);
    } else {
        res.writeHead(404, { "Content-Type": "text/plain" });
        res.end("Not Found");
    }
}).listen(PORT, HOST, () => {
    console.log(`Server running at http://${HOST}:${PORT}/`);
});
sse请求的响应标头里面Content-Type: text/event-stream

使用 SSE 时,浏览器首先生成一个EventSource实例,向服务器发起连接。

复杂应用

Server-Sent Event(SSE) GPT场景实现

这篇文章里面讲的思路实现起来有些麻烦,我在自己电脑上面运行ollama跑了llama3模型,在https://github.com/ollama/ollama 中Generate a response

使用AI编程网站神器(https://bolt.new和 https://v0.dev),上前端代码

后端代码如下:
import logging
import json
from fastapi import FastAPI, HTTPException, Depends, Query
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from sse_starlette.sse import EventSourceResponse
import httpx
import nest_asyncio
import uvicorn
from typing import Optional

logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)

nest_asyncio.apply()

app = FastAPI()

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)


class ContentNeedSummary(BaseModel):
    content: str


async def get_token_header():
    return True

OLLAMA_URL = "http://localhost:11434/api/generate"


async def create_chat_completion(content):
    async with httpx.AsyncClient() as client:
        try:
            logger.info(f"Sending request to Ollama: {content}")
            response = await client.post(
                OLLAMA_URL,
                json={
                    "model": "llama3:latest",
                    "prompt": f"{content}",
                    "stream": True
                },
                timeout=30.0
            )
            response.raise_for_status()

            accumulated_response = ""
            chunk_size = 100  # Adjust this value to control how often data is sent

            async for line in response.aiter_lines():
                if line:
                    try:
                        data = json.loads(line)
                        if 'response' in data:
                            accumulated_response += data['response']
                            if len(accumulated_response) >= chunk_size:
                                yield accumulated_response
                                accumulated_response = ""
                        if data.get('done', False):
                            if accumulated_response:  # Send any remaining data
                                yield accumulated_response
                            break
                    except json.JSONDecodeError:
                        logger.error(f"Error decoding JSON: {line}")

            if accumulated_response:  # Send any final remaining data
                yield accumulated_response

        except httpx.HTTPStatusError as exc:
            logger.error(f"HTTP error occurred: {exc}")
            raise HTTPException(
                status_code=exc.response.status_code, detail=str(exc))
        except httpx.RequestError as exc:
            logger.error(
                f"An error occurred while requesting {exc.request.url!r}.")
            raise HTTPException(status_code=500, detail=str(exc))


@app.post("/gpt/summary")
@app.get("/gpt/summary")
async def summary(
    content_need_summary: Optional[ContentNeedSummary] = None,
    content: Optional[str] = Query(None),
    user=Depends(get_token_header)
):
    try:
        if content_need_summary:
            content_to_summarize = content_to_summarize.content
        elif content:
            content_to_summarize = content
        else:
            raise HTTPException(status_code=400, detail="No content provided")

        logger.info(
            f"Received request for summary: {content_to_summarize[:50]}...")
        return EventSourceResponse(create_chat_completion(content_to_summarize))
    except Exception as e:
        logger.error(f"An error occurred: {str(e)}")
        raise HTTPException(status_code=500, detail=str(e))

if __name__ == "__main__":
    uvicorn.run(
        "main:app",
        host="0.0.0.0",
        port=8000,
        reload=True,
        loop="asyncio"
    )

启动起来后,后端通过一个连接来不断发送数据到前端页面,手动close后才可以停止,同时网络连接断开,如下图所示:

感觉怪怪的,不像正常的那种聊天机器人,那来实现一个聊天机器人,界面看起来舒服多了

后端代码如下:
#chat.py
# coding:utf8
from flask import Flask, render_template, request, abort
import requests
import os
import logging
import time
import json

app = Flask(__name__)

# 配置项可以从环境变量或配置文件中读取

OLLAMA_API_URL = "http://localhost:11434/api/generate"
OLLAMA_MODEL_NAME = "llama3:latest"

# 设置日志记录
logging.basicConfig(level=logging.INFO)


def chat_with_ollama(message):
    if not message.strip():
        return "Error: Message cannot be empty"
    url = OLLAMA_API_URL
    data = {"model": OLLAMA_MODEL_NAME, "prompt": message}

    response = requests.post(url, json=data, timeout=5)
    data = response.text

    json_objs = data.strip().split('\n')
    json_list = [json.loads(obj) for obj in json_objs]

    final_response = ""
    for item in json_list:
        response = item.get("response", "")
        final_response += response + " "

    return final_response


@ app.route("/")
def index():
    return render_template("index.html")  # 使用相对路径


@ app.route("/chat", methods=["POST"])
def chat():
    message = request.form.get("message", "").strip()
    if not message:
        return "Error: Message cannot be empty"

    response = chat_with_ollama(message)
    return response


if __name__ == "__main__":
    app.run(debug=True)
前端代码如下:
#templates/index.html
<!DOCTYPE html>
<html lang="en">

<head>
    <meta charset="UTF-8">
    <meta http-equiv="X-UA-Compatible" content="IE=edge">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>Chatbot</title>
    <style>
        body {
            font-family: Arial, sans-serif;
            margin: 0;
            padding: 0;
            display: flex;
            flex-direction: column;
            align-items: center;
            min-height: 100vh;
            background-color: #f4f4f4;
        }

        #chatbox {
            width: 80%;
            max-width: 600px;
            border: 1px solid #ccc;
            padding: 10px;
            background-color: white;
            margin-bottom: 10px;
            overflow-y: auto;
            height: 300px;
            display: flex;
            flex-direction: column;
        }

        .input-container {
            width: 80%;
            max-width: 600px;
            display: flex;
            gap: 5px;
            /* 确保输入框和按钮之间有间距 */
        }

        #message {
            flex-grow: 1;
            /* 输入框自适应剩余空间 */
            padding: 10px;
            border: 1px solid #ccc;
            border-radius: 4px;
        }

        button {
            padding: 10px 20px;
            border: none;
            background-color: #007bff;
            color: white;
            cursor: pointer;
            border-radius: 4px;
        }

        button:hover {
            background-color: #0056b3;
        }

        .message {
            margin: 5px 0;
            padding: 8px 12px;
            border-radius: 12px;
            max-width: 70%;
        }

        .user-message {
            background-color: #dcf8c6;
            align-self: flex-end;
        }

        .ollama-message {
            background-color: #e5e5ea;
            align-self: flex-start;
        }

        .error {
            color: red;
        }
    </style>
</head>

<body>
    <h1>Chatbot</h1>
    <div id="chatbox"></div>
    <div class="input-container">
        <input type="text" id="message" placeholder="Type your message...">
        <button onclick="sendMessage()">Send</button>
    </div>

    <script>
        function sendMessage() {
            var message = document.getElementById("message").value.trim();
            if (!message) {
                alert("Message cannot be empty");
                return;
            }

            // 显示用户消息
            var userMessage = `<div class="message user-message">You: ${message}</div>`;
            document.getElementById("chatbox").innerHTML += userMessage;

            // 清空输入框
            document.getElementById("message").value = "";

            // 滚动到底部
            scrollToBottom();

            // 发送请求
            fetch("/chat", {
                method: "POST",
                headers: {
                    "Content-Type": "application/x-www-form-urlencoded"
                },
                body: "message=" + encodeURIComponent(message)
            })
                .then(response => {
                    if (!response.ok) {
                        throw new Error("Network response was not ok");
                    }
                    return response.text();
                })
                .then(data => {
                    // 显示Ollama的响应
                    var ollamaMessage = `<div class="message ollama-message">Ollama: ${data}</div>`;
                    document.getElementById("chatbox").innerHTML += ollamaMessage;

                    // 滚动到底部
                    scrollToBottom();
                })
                .catch(error => {
                    // 显示错误信息
                    var errorMessage = `<p class='error'>Error: ${error.message}</p>`;
                    document.getElementById("chatbox").innerHTML += errorMessage;

                    // 滚动到底部
                    scrollToBottom();
                });
        }

        function scrollToBottom() {
            var chatbox = document.getElementById("chatbox");
            chatbox.scrollTop = chatbox.scrollHeight;
        }
    </script>
</body>

</html>

写在最后

日常工作中SSE的应用很少,能想到的将最新的数据推送到客户端,无需客户端不断轮询的场景,比如股票价格、体验赛事比分、新闻、通知这一类,这块其实在websocket里面实现起来也很方便,而且还可以双方进行互动,具体还是需要看业务的选择,轻量级应用还是选择SSE更快一些。

留下回复

error: Content is protected !!