跳至主要內容
Skip to content

Streaming 與長連接:Chunked 傳輸實戰

HTTP Streaming 讓伺服器可以逐步發送資料,而不需要等待全部資料準備完成。本篇將深入解析這個強大的技術。


一、 Chunked Transfer Encoding

1.1 什麼是分塊傳輸?

傳統 HTTP 需要先計算 Content-Length:

http
HTTP/1.1 200 OK
Content-Length: 12345

[12345 bytes of data]

分塊傳輸允許逐步發送:

http
HTTP/1.1 200 OK
Transfer-Encoding: chunked

4
Wiki
5
pedia
e
 in chunks.
0

1.2 格式結構

[chunk size in hex]\r\n
[chunk data]\r\n
...
0\r\n
\r\n

二、 後端實作

2.1 Express 串流回應

javascript
app.get("/stream", (req, res) => {
  res.setHeader("Content-Type", "text/plain");
  res.setHeader("Transfer-Encoding", "chunked");

  let count = 0;
  const interval = setInterval(() => {
    count++;
    res.write(`Chunk ${count}\n`);

    if (count >= 10) {
      clearInterval(interval);
      res.end();
    }
  }, 500);

  req.on("close", () => {
    clearInterval(interval);
  });
});

2.2 JSON 串流(NDJSON)

NDJSON(Newline-Delimited JSON)是一種串流 JSON 格式:

json
{"id": 1, "name": "Alice"}
{"id": 2, "name": "Bob"}
{"id": 3, "name": "Charlie"}
javascript
app.get("/users/stream", async (req, res) => {
  res.setHeader("Content-Type", "application/x-ndjson");

  // 使用資料庫游標
  const cursor = User.find().cursor();

  cursor.on("data", (user) => {
    res.write(JSON.stringify(user) + "\n");
  });

  cursor.on("end", () => {
    res.end();
  });

  cursor.on("error", (err) => {
    res.status(500).end();
  });

  req.on("close", () => {
    cursor.close();
  });
});

2.3 大量資料匯出

javascript
const { Parser } = require("json2csv");

app.get("/export/users", async (req, res) => {
  res.setHeader("Content-Type", "text/csv");
  res.setHeader("Content-Disposition", "attachment; filename=users.csv");
  res.setHeader("Transfer-Encoding", "chunked");

  const parser = new Parser();
  const cursor = User.find().cursor();

  let isFirst = true;

  cursor.on("data", (user) => {
    if (isFirst) {
      // 寫入標頭
      res.write(parser.parse([user]));
      isFirst = false;
    } else {
      // 只寫資料行(不含標頭)
      const line = Object.values(user.toJSON()).join(",");
      res.write("\n" + line);
    }
  });

  cursor.on("end", () => res.end());
  cursor.on("error", () => res.status(500).end());
});

三、 前端接收串流

3.1 Fetch + ReadableStream

javascript
async function streamData() {
  const response = await fetch("/stream");
  const reader = response.body.getReader();
  const decoder = new TextDecoder();

  while (true) {
    const { done, value } = await reader.read();

    if (done) {
      console.log("Stream complete");
      break;
    }

    const text = decoder.decode(value, { stream: true });
    console.log("Received:", text);
  }
}

3.2 處理 NDJSON

javascript
async function streamNDJSON(url, onData) {
  const response = await fetch(url);
  const reader = response.body.getReader();
  const decoder = new TextDecoder();

  let buffer = "";

  while (true) {
    const { done, value } = await reader.read();

    if (done) break;

    buffer += decoder.decode(value, { stream: true });

    // 處理完整的行
    const lines = buffer.split("\n");
    buffer = lines.pop(); // 保留不完整的最後一行

    for (const line of lines) {
      if (line.trim()) {
        const data = JSON.parse(line);
        onData(data);
      }
    }
  }

  // 處理剩餘的 buffer
  if (buffer.trim()) {
    const data = JSON.parse(buffer);
    onData(data);
  }
}

// 使用
streamNDJSON("/users/stream", (user) => {
  console.log("User:", user);
  appendToList(user);
});

3.3 進度顯示

javascript
async function streamWithProgress(url) {
  const response = await fetch(url);

  // 取得總大小(如果有)
  const contentLength = response.headers.get("Content-Length");
  const total = contentLength ? parseInt(contentLength) : null;

  const reader = response.body.getReader();
  let received = 0;
  const chunks = [];

  while (true) {
    const { done, value } = await reader.read();

    if (done) break;

    chunks.push(value);
    received += value.length;

    if (total) {
      const progress = ((received / total) * 100).toFixed(1);
      console.log(`Progress: ${progress}%`);
    } else {
      console.log(`Received: ${received} bytes`);
    }
  }

  // 合併所有塊
  const allChunks = new Uint8Array(received);
  let position = 0;
  for (const chunk of chunks) {
    allChunks.set(chunk, position);
    position += chunk.length;
  }

  return new TextDecoder().decode(allChunks);
}

四、 AI/LLM 應用場景

4.1 串流文字生成

現代 AI API(如 OpenAI)使用串流返回生成的文字:

javascript
// 後端代理
app.post("/api/chat", async (req, res) => {
  res.setHeader("Content-Type", "text/event-stream");
  res.setHeader("Cache-Control", "no-cache");

  const response = await fetch("https://api.openai.com/v1/chat/completions", {
    method: "POST",
    headers: {
      "Content-Type": "application/json",
      Authorization: `Bearer ${OPENAI_API_KEY}`,
    },
    body: JSON.stringify({
      model: "gpt-4",
      messages: req.body.messages,
      stream: true,
    }),
  });

  const reader = response.body.getReader();
  const decoder = new TextDecoder();

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;

    const text = decoder.decode(value);
    res.write(text);
  }

  res.end();
});

4.2 前端逐字顯示

javascript
async function streamChat(messages) {
  const response = await fetch("/api/chat", {
    method: "POST",
    headers: { "Content-Type": "application/json" },
    body: JSON.stringify({ messages }),
  });

  const reader = response.body.getReader();
  const decoder = new TextDecoder();

  let buffer = "";

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;

    buffer += decoder.decode(value, { stream: true });

    // 解析 SSE 格式
    const lines = buffer.split("\n");
    buffer = lines.pop();

    for (const line of lines) {
      if (line.startsWith("data: ")) {
        const data = line.slice(6);
        if (data === "[DONE]") continue;

        try {
          const parsed = JSON.parse(data);
          const content = parsed.choices[0]?.delta?.content;
          if (content) {
            // 逐字顯示
            appendToChat(content);
          }
        } catch (e) {}
      }
    }
  }
}

五、 長連接管理

5.1 Keep-Alive 與 Streaming

javascript
app.get("/long-stream", (req, res) => {
  res.setHeader("Content-Type", "text/event-stream");
  res.setHeader("Cache-Control", "no-cache");
  res.setHeader("Connection", "keep-alive");

  // 防止代理超時
  const keepAlive = setInterval(() => {
    res.write(": keep-alive\n\n");
  }, 30000);

  // 業務邏輯
  const dataStream = createDataStream();

  dataStream.on("data", (data) => {
    res.write(`data: ${JSON.stringify(data)}\n\n`);
  });

  req.on("close", () => {
    clearInterval(keepAlive);
    dataStream.destroy();
  });
});

5.2 超時處理

javascript
// Nginx 配置
location /stream {
    proxy_pass http://backend;
    proxy_buffering off;
    proxy_read_timeout 3600s;  # 1 小時
    proxy_send_timeout 3600s;
}

5.3 斷線重連

javascript
class StreamClient {
  constructor(url) {
    this.url = url;
    this.retryDelay = 1000;
    this.maxRetries = 5;
    this.retries = 0;
  }

  async connect(onData) {
    try {
      const response = await fetch(this.url);
      const reader = response.body.getReader();

      this.retries = 0; // 重置重試計數

      while (true) {
        const { done, value } = await reader.read();
        if (done) break;
        onData(value);
      }
    } catch (error) {
      console.error("Stream error:", error);
      this.reconnect(onData);
    }
  }

  reconnect(onData) {
    if (this.retries >= this.maxRetries) {
      console.error("Max retries reached");
      return;
    }

    this.retries++;
    const delay = this.retryDelay * Math.pow(2, this.retries - 1);

    console.log(`Reconnecting in ${delay}ms...`);
    setTimeout(() => this.connect(onData), delay);
  }
}

六、 效能優化

6.1 背壓處理(Backpressure)

javascript
const { pipeline } = require("stream");

app.get("/large-data", (req, res) => {
  const dataStream = createLargeDataStream();

  // 使用 pipeline 自動處理背壓
  pipeline(dataStream, res, (err) => {
    if (err) {
      console.error("Pipeline failed:", err);
    }
  });
});

6.2 壓縮串流

javascript
const zlib = require("zlib");

app.get("/compressed-stream", (req, res) => {
  const acceptEncoding = req.headers["accept-encoding"] || "";

  let stream = createDataStream();

  if (acceptEncoding.includes("gzip")) {
    res.setHeader("Content-Encoding", "gzip");
    stream = stream.pipe(zlib.createGzip());
  }

  stream.pipe(res);
});

總結

概念說明
Chunked分塊傳輸編碼
Transfer-Encoding指定傳輸編碼
NDJSON換行分隔的 JSON
ReadableStream前端串流 API
Backpressure背壓控制

> **使用場景**:

  • 大量資料匯出
  • AI 文字生成
  • 即時日誌
  • 長時間任務進度

進階挑戰

  1. 實作一個串流式的資料庫備份功能。
  2. 比較 SSE 和 Chunked Streaming 的效能差異。
  3. 研究 HTTP/2 的串流機制與 HTTP/1.1 的差異。

延伸閱讀與資源