轻叩SSE
工作中经常会遇到配置websocket的需求,本周开发同学提了一个sse的问题,感觉有些陌生,sse是什么呢?
wikipedia上这样解释SSE:
Server-Sent Events (SSE) is a server push technology enabling a client to receive automatic updates from a server via an HTTP connection, and describes how servers can initiate data transmission towards clients once an initial client connection has been established.

有几篇写的非常好的文章:
what-is-sse-server-sent-events-and-how-do-they-work
SSE 是单向通道,只能服务器向浏览器发送。
简单实现
拿阮一峰老师的例子
var http = require("http");
const PORT = process.env.PORT || 8844;
const HOST = process.env.HOST || "127.0.0.1";
http.createServer(function (req, res) {
const url = new URL(req.url, `http://${req.headers.host}`);
const path = url.pathname;
if (path === "/stream") {
res.writeHead(200, {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Access-Control-Allow-Origin": '*',
});
res.write("retry: 10000\n");
res.write("event: connecttime\n");
res.write("data: " + new Date().toLocaleString('zh-CN', { timeZone: 'Asia/Shanghai' }) + "\n\n");
let intervalId = setInterval(() => {
res.write("data: " + new Date().toLocaleString('zh-CN', { timeZone: 'Asia/Shanghai' }) + "\n\n");
}, 1000);
req.on("close", () => {
clearInterval(intervalId);
console.log("Client disconnected");
});
// Log the connection
console.log(`Client connected to ${path} at ${new Date()}`);
} else {
res.writeHead(404, { "Content-Type": "text/plain" });
res.end("Not Found");
}
}).listen(PORT, HOST, () => {
console.log(`Server running at http://${HOST}:${PORT}/`);
});
sse请求的响应标头里面Content-Type: text/event-stream

使用 SSE 时,浏览器首先生成一个EventSource
实例,向服务器发起连接。

复杂应用
Server-Sent Event(SSE) GPT场景实现
这篇文章里面讲的思路实现起来有些麻烦,我在自己电脑上面运行ollama跑了llama3模型,在https://github.com/ollama/ollama 中Generate a response

使用AI编程网站神器(https://bolt.new和 https://v0.dev),上前端代码

后端代码如下:
import logging
import json
from fastapi import FastAPI, HTTPException, Depends, Query
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from sse_starlette.sse import EventSourceResponse
import httpx
import nest_asyncio
import uvicorn
from typing import Optional
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
nest_asyncio.apply()
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
class ContentNeedSummary(BaseModel):
content: str
async def get_token_header():
return True
OLLAMA_URL = "http://localhost:11434/api/generate"
async def create_chat_completion(content):
async with httpx.AsyncClient() as client:
try:
logger.info(f"Sending request to Ollama: {content}")
response = await client.post(
OLLAMA_URL,
json={
"model": "llama3:latest",
"prompt": f"{content}",
"stream": True
},
timeout=30.0
)
response.raise_for_status()
accumulated_response = ""
chunk_size = 100 # Adjust this value to control how often data is sent
async for line in response.aiter_lines():
if line:
try:
data = json.loads(line)
if 'response' in data:
accumulated_response += data['response']
if len(accumulated_response) >= chunk_size:
yield accumulated_response
accumulated_response = ""
if data.get('done', False):
if accumulated_response: # Send any remaining data
yield accumulated_response
break
except json.JSONDecodeError:
logger.error(f"Error decoding JSON: {line}")
if accumulated_response: # Send any final remaining data
yield accumulated_response
except httpx.HTTPStatusError as exc:
logger.error(f"HTTP error occurred: {exc}")
raise HTTPException(
status_code=exc.response.status_code, detail=str(exc))
except httpx.RequestError as exc:
logger.error(
f"An error occurred while requesting {exc.request.url!r}.")
raise HTTPException(status_code=500, detail=str(exc))
@app.post("/gpt/summary")
@app.get("/gpt/summary")
async def summary(
content_need_summary: Optional[ContentNeedSummary] = None,
content: Optional[str] = Query(None),
user=Depends(get_token_header)
):
try:
if content_need_summary:
content_to_summarize = content_to_summarize.content
elif content:
content_to_summarize = content
else:
raise HTTPException(status_code=400, detail="No content provided")
logger.info(
f"Received request for summary: {content_to_summarize[:50]}...")
return EventSourceResponse(create_chat_completion(content_to_summarize))
except Exception as e:
logger.error(f"An error occurred: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
uvicorn.run(
"main:app",
host="0.0.0.0",
port=8000,
reload=True,
loop="asyncio"
)
启动起来后,后端通过一个连接来不断发送数据到前端页面,手动close后才可以停止,同时网络连接断开,如下图所示:


感觉怪怪的,不像正常的那种聊天机器人,那来实现一个聊天机器人,界面看起来舒服多了

后端代码如下:
#chat.py
# coding:utf8
from flask import Flask, render_template, request, abort
import requests
import os
import logging
import time
import json
app = Flask(__name__)
# 配置项可以从环境变量或配置文件中读取
OLLAMA_API_URL = "http://localhost:11434/api/generate"
OLLAMA_MODEL_NAME = "llama3:latest"
# 设置日志记录
logging.basicConfig(level=logging.INFO)
def chat_with_ollama(message):
if not message.strip():
return "Error: Message cannot be empty"
url = OLLAMA_API_URL
data = {"model": OLLAMA_MODEL_NAME, "prompt": message}
response = requests.post(url, json=data, timeout=5)
data = response.text
json_objs = data.strip().split('\n')
json_list = [json.loads(obj) for obj in json_objs]
final_response = ""
for item in json_list:
response = item.get("response", "")
final_response += response + " "
return final_response
@ app.route("/")
def index():
return render_template("index.html") # 使用相对路径
@ app.route("/chat", methods=["POST"])
def chat():
message = request.form.get("message", "").strip()
if not message:
return "Error: Message cannot be empty"
response = chat_with_ollama(message)
return response
if __name__ == "__main__":
app.run(debug=True)
前端代码如下:
#templates/index.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Chatbot</title>
<style>
body {
font-family: Arial, sans-serif;
margin: 0;
padding: 0;
display: flex;
flex-direction: column;
align-items: center;
min-height: 100vh;
background-color: #f4f4f4;
}
#chatbox {
width: 80%;
max-width: 600px;
border: 1px solid #ccc;
padding: 10px;
background-color: white;
margin-bottom: 10px;
overflow-y: auto;
height: 300px;
display: flex;
flex-direction: column;
}
.input-container {
width: 80%;
max-width: 600px;
display: flex;
gap: 5px;
/* 确保输入框和按钮之间有间距 */
}
#message {
flex-grow: 1;
/* 输入框自适应剩余空间 */
padding: 10px;
border: 1px solid #ccc;
border-radius: 4px;
}
button {
padding: 10px 20px;
border: none;
background-color: #007bff;
color: white;
cursor: pointer;
border-radius: 4px;
}
button:hover {
background-color: #0056b3;
}
.message {
margin: 5px 0;
padding: 8px 12px;
border-radius: 12px;
max-width: 70%;
}
.user-message {
background-color: #dcf8c6;
align-self: flex-end;
}
.ollama-message {
background-color: #e5e5ea;
align-self: flex-start;
}
.error {
color: red;
}
</style>
</head>
<body>
<h1>Chatbot</h1>
<div id="chatbox"></div>
<div class="input-container">
<input type="text" id="message" placeholder="Type your message...">
<button onclick="sendMessage()">Send</button>
</div>
<script>
function sendMessage() {
var message = document.getElementById("message").value.trim();
if (!message) {
alert("Message cannot be empty");
return;
}
// 显示用户消息
var userMessage = `<div class="message user-message">You: ${message}</div>`;
document.getElementById("chatbox").innerHTML += userMessage;
// 清空输入框
document.getElementById("message").value = "";
// 滚动到底部
scrollToBottom();
// 发送请求
fetch("/chat", {
method: "POST",
headers: {
"Content-Type": "application/x-www-form-urlencoded"
},
body: "message=" + encodeURIComponent(message)
})
.then(response => {
if (!response.ok) {
throw new Error("Network response was not ok");
}
return response.text();
})
.then(data => {
// 显示Ollama的响应
var ollamaMessage = `<div class="message ollama-message">Ollama: ${data}</div>`;
document.getElementById("chatbox").innerHTML += ollamaMessage;
// 滚动到底部
scrollToBottom();
})
.catch(error => {
// 显示错误信息
var errorMessage = `<p class='error'>Error: ${error.message}</p>`;
document.getElementById("chatbox").innerHTML += errorMessage;
// 滚动到底部
scrollToBottom();
});
}
function scrollToBottom() {
var chatbox = document.getElementById("chatbox");
chatbox.scrollTop = chatbox.scrollHeight;
}
</script>
</body>
</html>
写在最后
日常工作中SSE的应用很少,能想到的将最新的数据推送到客户端,无需客户端不断轮询的场景,比如股票价格、体验赛事比分、新闻、通知这一类,这块其实在websocket里面实现起来也很方便,而且还可以双方进行互动,具体还是需要看业务的选择,轻量级应用还是选择SSE更快一些。