Streaming 與長連接:Chunked 傳輸實戰
HTTP Streaming 讓伺服器可以逐步發送資料,而不需要等待全部資料準備完成。本篇將深入解析這個強大的技術。
一、 Chunked Transfer Encoding
1.1 什麼是分塊傳輸?
傳統 HTTP 需要先計算 Content-Length:
http
HTTP/1.1 200 OK
Content-Length: 12345
[12345 bytes of data]分塊傳輸允許逐步發送:
http
HTTP/1.1 200 OK
Transfer-Encoding: chunked
4
Wiki
5
pedia
e
in chunks.
01.2 格式結構
[chunk size in hex]\r\n
[chunk data]\r\n
...
0\r\n
\r\n二、 後端實作
2.1 Express 串流回應
javascript
app.get("/stream", (req, res) => {
res.setHeader("Content-Type", "text/plain");
res.setHeader("Transfer-Encoding", "chunked");
let count = 0;
const interval = setInterval(() => {
count++;
res.write(`Chunk ${count}\n`);
if (count >= 10) {
clearInterval(interval);
res.end();
}
}, 500);
req.on("close", () => {
clearInterval(interval);
});
});2.2 JSON 串流(NDJSON)
NDJSON(Newline-Delimited JSON)是一種串流 JSON 格式:
json
{"id": 1, "name": "Alice"}
{"id": 2, "name": "Bob"}
{"id": 3, "name": "Charlie"}javascript
app.get("/users/stream", async (req, res) => {
res.setHeader("Content-Type", "application/x-ndjson");
// 使用資料庫游標
const cursor = User.find().cursor();
cursor.on("data", (user) => {
res.write(JSON.stringify(user) + "\n");
});
cursor.on("end", () => {
res.end();
});
cursor.on("error", (err) => {
res.status(500).end();
});
req.on("close", () => {
cursor.close();
});
});2.3 大量資料匯出
javascript
const { Parser } = require("json2csv");
app.get("/export/users", async (req, res) => {
res.setHeader("Content-Type", "text/csv");
res.setHeader("Content-Disposition", "attachment; filename=users.csv");
res.setHeader("Transfer-Encoding", "chunked");
const parser = new Parser();
const cursor = User.find().cursor();
let isFirst = true;
cursor.on("data", (user) => {
if (isFirst) {
// 寫入標頭
res.write(parser.parse([user]));
isFirst = false;
} else {
// 只寫資料行(不含標頭)
const line = Object.values(user.toJSON()).join(",");
res.write("\n" + line);
}
});
cursor.on("end", () => res.end());
cursor.on("error", () => res.status(500).end());
});三、 前端接收串流
3.1 Fetch + ReadableStream
javascript
async function streamData() {
const response = await fetch("/stream");
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) {
console.log("Stream complete");
break;
}
const text = decoder.decode(value, { stream: true });
console.log("Received:", text);
}
}3.2 處理 NDJSON
javascript
async function streamNDJSON(url, onData) {
const response = await fetch(url);
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = "";
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
// 處理完整的行
const lines = buffer.split("\n");
buffer = lines.pop(); // 保留不完整的最後一行
for (const line of lines) {
if (line.trim()) {
const data = JSON.parse(line);
onData(data);
}
}
}
// 處理剩餘的 buffer
if (buffer.trim()) {
const data = JSON.parse(buffer);
onData(data);
}
}
// 使用
streamNDJSON("/users/stream", (user) => {
console.log("User:", user);
appendToList(user);
});3.3 進度顯示
javascript
async function streamWithProgress(url) {
const response = await fetch(url);
// 取得總大小(如果有)
const contentLength = response.headers.get("Content-Length");
const total = contentLength ? parseInt(contentLength) : null;
const reader = response.body.getReader();
let received = 0;
const chunks = [];
while (true) {
const { done, value } = await reader.read();
if (done) break;
chunks.push(value);
received += value.length;
if (total) {
const progress = ((received / total) * 100).toFixed(1);
console.log(`Progress: ${progress}%`);
} else {
console.log(`Received: ${received} bytes`);
}
}
// 合併所有塊
const allChunks = new Uint8Array(received);
let position = 0;
for (const chunk of chunks) {
allChunks.set(chunk, position);
position += chunk.length;
}
return new TextDecoder().decode(allChunks);
}四、 AI/LLM 應用場景
4.1 串流文字生成
現代 AI API(如 OpenAI)使用串流返回生成的文字:
javascript
// 後端代理
app.post("/api/chat", async (req, res) => {
res.setHeader("Content-Type", "text/event-stream");
res.setHeader("Cache-Control", "no-cache");
const response = await fetch("https://api.openai.com/v1/chat/completions", {
method: "POST",
headers: {
"Content-Type": "application/json",
Authorization: `Bearer ${OPENAI_API_KEY}`,
},
body: JSON.stringify({
model: "gpt-4",
messages: req.body.messages,
stream: true,
}),
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const text = decoder.decode(value);
res.write(text);
}
res.end();
});4.2 前端逐字顯示
javascript
async function streamChat(messages) {
const response = await fetch("/api/chat", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ messages }),
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = "";
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
// 解析 SSE 格式
const lines = buffer.split("\n");
buffer = lines.pop();
for (const line of lines) {
if (line.startsWith("data: ")) {
const data = line.slice(6);
if (data === "[DONE]") continue;
try {
const parsed = JSON.parse(data);
const content = parsed.choices[0]?.delta?.content;
if (content) {
// 逐字顯示
appendToChat(content);
}
} catch (e) {}
}
}
}
}五、 長連接管理
5.1 Keep-Alive 與 Streaming
javascript
app.get("/long-stream", (req, res) => {
res.setHeader("Content-Type", "text/event-stream");
res.setHeader("Cache-Control", "no-cache");
res.setHeader("Connection", "keep-alive");
// 防止代理超時
const keepAlive = setInterval(() => {
res.write(": keep-alive\n\n");
}, 30000);
// 業務邏輯
const dataStream = createDataStream();
dataStream.on("data", (data) => {
res.write(`data: ${JSON.stringify(data)}\n\n`);
});
req.on("close", () => {
clearInterval(keepAlive);
dataStream.destroy();
});
});5.2 超時處理
javascript
// Nginx 配置
location /stream {
proxy_pass http://backend;
proxy_buffering off;
proxy_read_timeout 3600s; # 1 小時
proxy_send_timeout 3600s;
}5.3 斷線重連
javascript
class StreamClient {
constructor(url) {
this.url = url;
this.retryDelay = 1000;
this.maxRetries = 5;
this.retries = 0;
}
async connect(onData) {
try {
const response = await fetch(this.url);
const reader = response.body.getReader();
this.retries = 0; // 重置重試計數
while (true) {
const { done, value } = await reader.read();
if (done) break;
onData(value);
}
} catch (error) {
console.error("Stream error:", error);
this.reconnect(onData);
}
}
reconnect(onData) {
if (this.retries >= this.maxRetries) {
console.error("Max retries reached");
return;
}
this.retries++;
const delay = this.retryDelay * Math.pow(2, this.retries - 1);
console.log(`Reconnecting in ${delay}ms...`);
setTimeout(() => this.connect(onData), delay);
}
}六、 效能優化
6.1 背壓處理(Backpressure)
javascript
const { pipeline } = require("stream");
app.get("/large-data", (req, res) => {
const dataStream = createLargeDataStream();
// 使用 pipeline 自動處理背壓
pipeline(dataStream, res, (err) => {
if (err) {
console.error("Pipeline failed:", err);
}
});
});6.2 壓縮串流
javascript
const zlib = require("zlib");
app.get("/compressed-stream", (req, res) => {
const acceptEncoding = req.headers["accept-encoding"] || "";
let stream = createDataStream();
if (acceptEncoding.includes("gzip")) {
res.setHeader("Content-Encoding", "gzip");
stream = stream.pipe(zlib.createGzip());
}
stream.pipe(res);
});總結
| 概念 | 說明 |
|---|---|
| Chunked | 分塊傳輸編碼 |
| Transfer-Encoding | 指定傳輸編碼 |
| NDJSON | 換行分隔的 JSON |
| ReadableStream | 前端串流 API |
| Backpressure | 背壓控制 |
> **使用場景**:
- 大量資料匯出
- AI 文字生成
- 即時日誌
- 長時間任務進度
進階挑戰
- 實作一個串流式的資料庫備份功能。
- 比較 SSE 和 Chunked Streaming 的效能差異。
- 研究 HTTP/2 的串流機制與 HTTP/1.1 的差異。