簡單實作 API Gateway,依照 config.yaml 將 Request 反向代理給其他服務,並有 Rate limit、JWT 驗證、Log、metrics 等功能
架構

接收到 Client 發送的 Request 後,經過多個 middleware,然後到達 proxy handler,依照 config.yaml 反向代理至指定的 upstream services
main.go:使用責任鏈模式組裝 handler + middlewares,啟動 HTTP server 與 metrics/health endpointshandler/proxy.go:專案基本功能,Reverse Proxy 到 upstreamhandler/health.go:檢查健康狀態middleware/*:專案的額外功能都由 middleware 實現,在 Request 進來時及 Response 出去時,實現限流、Log、斷路器和 JWT 驗證等功能helper/*:各種 middleware 的輔助 functionlogger/logger.go:zap logger + lumberjack rotation 此外還有/metrics回傳 Prometheus metrics
ReverseProxy
1// upstream url
2url := "http://order.example.com"
3proxy := httputil.NewSingleHostReverseProxy(url)
4proxy.ServeHTTP(w, r)
在 config.yaml 中定義 request path 轉發給哪個 upstream,例如:
/order/*->http://order.example.com/user/*->http://user.example.com
Metric
除 prometheus package 自帶的 Go runtime metric collector 和 process metric collector 外,額外定義三個 metric collector:
gateway_requests_total{upstream,operation,uri}:記錄 rate limit 接受(accept)和拒絕(reject)了多少 request。counterhttp_requests_total{upstream,method,code,uri}:總共收到多少 request。counterhttp_request_duration_seconds{upstream,uri}:request 持續多少時間,以秒計算。histogram,下邊界 0.1,factor 5,共 6 個 bucketcircuit_breaker_open{upstream}:斷路器斷開次數
Middleware
Recovery
為最開始的 middleware,防止其他 middleware 或 proxy panic 導致整個服務掛掉
recovery 時,logger 會記錄 error,然後 JSON response 500 Internal Server Error
1return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
2 defer func() {
3 if err := recover(); err != nil {
4 logger := logger.NewLogger()
5 logger.Error(
6 "Panic recovered.",
7 zap.String("err", fmt.Sprintf("%v", err)),
8 )
9 helper.JSONResponse(w, http.StatusInternalServerError, map[string]string{}, map[string]string{
10 "msg": "Internal Server Error",
11 })
12 }
13 }()
14
15 m.next.ServeHTTP(w, r)
16})
Upstream
從 request url 解析出對應的 upstream,然後將對應的 config 塞進 request context 中,找不到對應的 upstream 則回傳 404
1# helper
2type upstreamContextKey struct{}
3var upstreamCtxKey upstreamContextKey
4func WithUpstream(ctx context.Context, upstream *Upstream) context.Context {
5 return context.WithValue(ctx, upstreamCtxKey, upstream)
6}
7
8# upstream middleware
9return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
10 upstreamKey := upstreamKeyFromRequestPath(r.URL.Path)
11 upstream, ok := helper.Upstreams[upstreamKey]
12 if ok {
13 r = r.WithContext(helper.WithUpstream(r.Context(), &upstream))
14 m.next.ServeHTTP(w, r)
15 return
16 }
17 http.NotFound(w, r)
18})
CORS
設置 CORS 相關 response header。如果為 preflight request,則回傳 204 No Content
Rate Limit
因為 API Gateway 可能有突發高峰的情況,所以使用實作 Token bucket 的 time/rate,而非 Leaky bucket
這邊簡單選擇每 1 個 request 佔用 1 個 token 的方式,沒有依照 bytes 去佔用 token
根據 config.yaml 設定 token bucket 每秒放入多少 token 以及上限
rate limit 接受和拒絕都會記錄 metric
拒絕時回傳 429,header 包含 retry after,表示要等待多久才會有 token 能用
1return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
2 upstream := helper.GetUpstream(r.Context())
3 reservation := m.getLimiter(upstream).Reserve()
4 if !reservation.OK() {
5 // 記錄 reject metric
6 helper.GatewayRequestTotal.WithLabelValues(upstream.Name, "reject", r.URL.Path).Inc()
7 // 回傳 429,header 包含 retry after,表示要等待多久才會有 token 能用
8 helper.JSONResponse(
9 w,
10 http.StatusTooManyRequests,
11 map[string]string{
12 "Retry-After": strconv.Itoa(int(math.Ceil(reservation.Delay().Seconds()))),
13 },
14 map[string]string{
15 "msg": "Too Many Requests",
16 },
17 )
18 return
19 }
20 // 記錄 accept metric
21 helper.GatewayRequestTotal.WithLabelValues(upstream.Name, "accept", r.URL.Path).Inc()
22 m.next.ServeHTTP(w, r)
23})
Circuit Breaker
斷路器實作參考 Martin Fowler 的這篇文章
這邊簡單設定 upstream response status code >= 500 超過 5 次時,斷路器開啟 30 秒。30 秒後斷路器半開,新的 request 進來時,進行試探性呼叫 upstream,成功的話斷路器閉合,恢復反向代理
因為有多個 request,所以失敗次數、最後失敗時間,以及試探性呼叫都利用 atomic 實現 lock
1# helper
2type CircuitBreaker struct {
3 failureCount atomic.Uint32
4 failureThreshold uint32
5 resetTimeout time.Duration
6 lastFailureTime atomic.Pointer[time.Time]
7 halfOpenInFlight atomic.Bool
8}
9
10func (c *CircuitBreaker) GetState() CircuitBreakerState {
11 if c.failureCount.Load() < c.failureThreshold {
12 return Closed
13 }
14
15 lastFailureTime := c.lastFailureTime.Load()
16 if lastFailureTime == nil {
17 return Open
18 }
19
20 if time.Since(*lastFailureTime) > c.resetTimeout {
21 return HalfOpen
22 }
23 return Open
24}
25
26func (c *CircuitBreaker) Reset() {
27 c.failureCount.Store(0)
28 c.lastFailureTime.Store(nil)
29 c.halfOpenInFlight.Store(false)
30}
31
32func (c *CircuitBreaker) RecordFailure() {
33 c.failureCount.Add(1)
34 now := time.Now()
35 c.lastFailureTime.Store(&now)
36}
37
38func (c *CircuitBreaker) TrialCallStart() bool {
39 return c.halfOpenInFlight.CompareAndSwap(false, true)
40}
41
42func (c *CircuitBreaker) TrialCallOver() {
43 c.halfOpenInFlight.Store(false)
44}
回到 middleware
斷路器開啟,或者半開且未取得試探性呼叫的 lock 時,記錄 metric,然後回傳伺服器忙碌的 Response
斷路器閉合,或半開且成功取得試探性呼叫的 lock 時,允許呼叫 upstream。如果 upstream 回傳 stauts code >=500,記錄失敗次數及時間,否則重置斷路器
斷路器半開時,會設定 defer function 釋放 lock
1# middleware
2return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
3 upstream := helper.GetUpstream(r.Context()).Name
4 circuitBreaker := m.CircuitBreakers[upstream]
5 if circuitBreaker == nil {
6 m.next.ServeHTTP(w, r)
7 return
8 }
9
10 state := circuitBreaker.GetState()
11 if state == helper.Open {
12 // 記錄斷路器開啟次數
13 helper.CircuitBreakerOpen.WithLabelValues(upstream).Inc()
14 // 回傳伺服器忙碌的 Response
15 helper.JSONResponse(
16 w,
17 http.StatusServiceUnavailable,
18 map[string]string{},
19 map[string]string{"msg": "Server is overloaded. Please try later."},
20 )
21 return
22 }
23
24 // 斷路器半開且未取得試探性呼叫的 lock
25 if state == helper.HalfOpen && !circuitBreaker.TrialCallStart() {
26 helper.CircuitBreakerOpen.WithLabelValues(upstream).Inc()
27 // 回傳伺服器忙碌的 Response
28 helper.JSONResponse(
29 w,
30 http.StatusServiceUnavailable,
31 map[string]string{},
32 map[string]string{"msg": "Server is overloaded. Please try later."},
33 )
34 return
35 }
36
37 if state == helper.HalfOpen {
38 defer circuitBreaker.TrialCallOver()
39 }
40
41 // 斷路器閉合,或半開且成功取得試探性呼叫的 lock 時,允許呼叫 upstream
42 rw := helper.NewResponseWriter(w)
43 m.next.ServeHTTP(rw, r)
44
45 if rw.StatusCode >= http.StatusInternalServerError {
46 // upstream 回傳 status code >= 500 時,斷路器記錄失敗次數及時間
47 circuitBreaker.RecordFailure()
48 return
49 }
50
51 // 重置斷路器
52 circuitBreaker.Reset()
53})
Log
記錄 request 和 response log,一個 request 對應的 response 會有同一個 trace_id 方便追蹤
因為沒辦法直接從 http.ResponseWriter 取得 http status code 或 content,所以參考這篇 SO 回答,建立一個 struct embedding http.ResponseWriter,然後把 status code 和 content 設為 export field
1import "net/http"
2
3type ResponseWriter struct {
4 http.ResponseWriter
5 StatusCode int
6 Body []byte
7}
8
9func (r *ResponseWriter) WriteHeader(code int) {
10 r.StatusCode = code
11 r.ResponseWriter.WriteHeader(code)
12}
13
14func (r *ResponseWriter) Write(b []byte) (int, error) {
15 r.Body = b
16 return r.ResponseWriter.Write(b)
17}
18
19func NewResponseWriter(w http.ResponseWriter) *ResponseWriter {
20 return &ResponseWriter{ResponseWriter: w}
21}
Authenticate
取得 request 的 JWT,然後使用 app key(即 JWT secret)驗證簽名
根據取得的 claim 決定處理動作:
- 解析 JWT 失敗,JWT 缺失、驗證不通過或沒有過期時間,回傳 401
- JWT 過期,回傳 403
- JWT scope 與 upstream 服務不合,回傳 403
- JWT 驗證成功,在 request header 設定 UserId、Email、Scope 然後通過此 middleware,讓 upstream 不用重複解析 JWT
1return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
2 upstream := helper.GetUpstream(r.Context())
3 if upstream != nil && upstream.Auth {
4 userClaim, ok := m.getUserClaim(r)
5 if !ok {
6 helper.JSONResponse(w, http.StatusUnauthorized, map[string]string{}, map[string]string{
7 "msg": "Unauthorized",
8 })
9 return
10 }
11 if userClaim.ExpiresAt.Compare(time.Now()) == -1 {
12 helper.JSONResponse(w, http.StatusForbidden, map[string]string{}, map[string]string{
13 "msg": "Token expired",
14 })
15 return
16 }
17 if !slices.Contains(strings.Split(userClaim.Scope, ","), upstream.Name) {
18 helper.JSONResponse(w, http.StatusForbidden, map[string]string{}, map[string]string{
19 "msg": "Forbidden",
20 })
21 return
22 }
23 m.setClaimToHeader(userClaim, r)
24 }
25 m.next.ServeHTTP(w, r)
26})
後記
這算是儘量用標準庫或標準擴展庫寫出來的簡單專案,其中還有不少可以優化的地方,例如 reload config、Rate Limit 用 byte 佔用 token、支援其他網路協定等
但也學到了以前沒聽過的一些概念,例如限速和斷路器。同時也震驚 http.ResponseWriter 無法直接取得 http status code 這件事,每次想到都很震驚(。
最後,完整的 code 可以參考我的 Github repository