mirror of
https://github.com/dalbodeule/hop-gate.git
synced 2025-12-09 13:25:44 +09:00
[feat](server, client): implement streaming-based HTTP tunnel with DTLS sessions
- Replaced single-envelope HTTP handling with stream-based tunneling (`StreamOpen`, `StreamData`, and `StreamClose`) for HTTP-over-DTLS. - Added unique StreamID generation for per-session HTTP requests. - Improved client and server logic for handling chunked body transmissions and reverse stream responses. - Enhanced pseudo-header handling for HTTP metadata in tunneling. - Updated error handling for local HTTP failures, ensuring proper stream-based responses.
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
@@ -29,8 +30,9 @@ import (
|
||||
)
|
||||
|
||||
type dtlsSessionWrapper struct {
|
||||
sess dtls.Session
|
||||
mu sync.Mutex
|
||||
sess dtls.Session
|
||||
mu sync.Mutex
|
||||
nextStreamID uint64
|
||||
}
|
||||
|
||||
// canonicalizeDomainForDNS 는 DTLS 핸드셰이크에서 전달된 도메인 문자열을
|
||||
@@ -155,8 +157,10 @@ func parseExpectedIPsFromEnv(logger logging.Logger, envKey string) []net.IP {
|
||||
return result
|
||||
}
|
||||
|
||||
// ForwardHTTP 는 단일 HTTP 요청을 DTLS 세션으로 포워딩하고 응답을 돌려받습니다.
|
||||
// ForwardHTTP forwards a single HTTP request over the DTLS session and returns the response.
|
||||
// ForwardHTTP 는 HTTP 요청을 DTLS 세션 위의 StreamOpen/StreamData/StreamClose 프레임으로 전송하고,
|
||||
// 역방향 스트림 응답을 수신해 protocol.Response 로 반환합니다. (ko)
|
||||
// ForwardHTTP forwards an HTTP request over the DTLS session using StreamOpen/StreamData/StreamClose
|
||||
// frames and reconstructs the reverse stream into a protocol.Response. (en)
|
||||
func (w *dtlsSessionWrapper) ForwardHTTP(ctx context.Context, logger logging.Logger, req *http.Request, serviceName string) (*protocol.Response, error) {
|
||||
w.mu.Lock()
|
||||
defer w.mu.Unlock()
|
||||
@@ -165,78 +169,220 @@ func (w *dtlsSessionWrapper) ForwardHTTP(ctx context.Context, logger logging.Log
|
||||
ctx = context.Background()
|
||||
}
|
||||
|
||||
// 요청 본문 읽기
|
||||
var body []byte
|
||||
if req.Body != nil {
|
||||
b, err := io.ReadAll(req.Body)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
body = b
|
||||
}
|
||||
codec := protocol.DefaultCodec
|
||||
|
||||
// 간단한 RequestID 생성 (실제 서비스에서는 UUID 등을 사용하는 것이 좋음)
|
||||
requestID := time.Now().UTC().Format("20060102T150405.000000000")
|
||||
|
||||
httpReq := &protocol.Request{
|
||||
RequestID: requestID,
|
||||
ClientID: "", // TODO: 클라이언트 식별자 도입 시 채우기
|
||||
ServiceName: serviceName,
|
||||
Method: req.Method,
|
||||
URL: req.URL.String(),
|
||||
Header: req.Header.Clone(),
|
||||
Body: body,
|
||||
}
|
||||
// 세션 내에서 고유한 StreamID 를 생성합니다. (ko)
|
||||
// Generate a unique StreamID for this HTTP request within the DTLS session. (en)
|
||||
streamID := w.nextHTTPStreamID()
|
||||
|
||||
log := logger.With(logging.Fields{
|
||||
"component": "http_to_dtls",
|
||||
"request_id": requestID,
|
||||
"request_id": string(streamID),
|
||||
"method": req.Method,
|
||||
"url": req.URL.String(),
|
||||
})
|
||||
|
||||
log.Info("forwarding http request over dtls", logging.Fields{
|
||||
log.Info("forwarding http request over dtls (stream mode)", logging.Fields{
|
||||
"host": req.Host,
|
||||
"scheme": req.URL.Scheme,
|
||||
})
|
||||
|
||||
// HTTP 요청을 Envelope 로 감싸서 전송합니다.
|
||||
env := &protocol.Envelope{
|
||||
Type: protocol.MessageTypeHTTP,
|
||||
HTTPRequest: httpReq,
|
||||
// 요청 헤더를 복사하고 pseudo-header 로 HTTP 메타데이터를 추가합니다. (ko)
|
||||
// Copy request headers and attach HTTP metadata as pseudo-headers. (en)
|
||||
hdr := make(map[string][]string, len(req.Header)+3)
|
||||
for k, vs := range req.Header {
|
||||
hdr[k] = append([]string(nil), vs...)
|
||||
}
|
||||
hdr[protocol.HeaderKeyMethod] = []string{req.Method}
|
||||
if req.URL != nil {
|
||||
hdr[protocol.HeaderKeyURL] = []string{req.URL.String()}
|
||||
}
|
||||
host := req.Host
|
||||
if host == "" && req.URL != nil {
|
||||
host = req.URL.Host
|
||||
}
|
||||
if host != "" {
|
||||
hdr[protocol.HeaderKeyHost] = []string{host}
|
||||
}
|
||||
|
||||
if err := protocol.DefaultCodec.Encode(w.sess, env); err != nil {
|
||||
log.Error("failed to encode http envelope", logging.Fields{
|
||||
// StreamOpen 전송: 어떤 서비스로 라우팅해야 하는지와 초기 헤더를 전달합니다. (ko)
|
||||
// Send StreamOpen to indicate which service to route to and initial headers. (en)
|
||||
openEnv := &protocol.Envelope{
|
||||
Type: protocol.MessageTypeStreamOpen,
|
||||
StreamOpen: &protocol.StreamOpen{
|
||||
ID: streamID,
|
||||
Service: serviceName,
|
||||
TargetAddr: "",
|
||||
Header: hdr,
|
||||
},
|
||||
}
|
||||
if err := codec.Encode(w.sess, openEnv); err != nil {
|
||||
log.Error("failed to encode stream_open envelope", logging.Fields{
|
||||
"error": err.Error(),
|
||||
})
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// 클라이언트로부터 HTTP 응답 Envelope 를 수신합니다.
|
||||
var respEnv protocol.Envelope
|
||||
if err := protocol.DefaultCodec.Decode(w.sess, &respEnv); err != nil {
|
||||
log.Error("failed to decode http envelope", logging.Fields{
|
||||
// 요청 바디를 4KiB(StreamChunkSize) 단위로 잘라 StreamData 프레임으로 전송합니다. (ko)
|
||||
// Chunk the request body into 4KiB (StreamChunkSize) StreamData frames. (en)
|
||||
var seq uint64
|
||||
if req.Body != nil {
|
||||
buf := make([]byte, protocol.StreamChunkSize)
|
||||
for {
|
||||
n, err := req.Body.Read(buf)
|
||||
if n > 0 {
|
||||
dataCopy := append([]byte(nil), buf[:n]...)
|
||||
dataEnv := &protocol.Envelope{
|
||||
Type: protocol.MessageTypeStreamData,
|
||||
StreamData: &protocol.StreamData{
|
||||
ID: streamID,
|
||||
Seq: seq,
|
||||
Data: dataCopy,
|
||||
},
|
||||
}
|
||||
if err2 := codec.Encode(w.sess, dataEnv); err2 != nil {
|
||||
log.Error("failed to encode stream_data envelope", logging.Fields{
|
||||
"error": err2.Error(),
|
||||
})
|
||||
return nil, err2
|
||||
}
|
||||
seq++
|
||||
}
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("read http request body for streaming: %w", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 바디 종료를 알리는 StreamClose 를 전송합니다. (ko)
|
||||
// Send StreamClose to mark the end of the request body. (en)
|
||||
closeReqEnv := &protocol.Envelope{
|
||||
Type: protocol.MessageTypeStreamClose,
|
||||
StreamClose: &protocol.StreamClose{
|
||||
ID: streamID,
|
||||
Error: "",
|
||||
},
|
||||
}
|
||||
if err := codec.Encode(w.sess, closeReqEnv); err != nil {
|
||||
log.Error("failed to encode request stream_close envelope", logging.Fields{
|
||||
"error": err.Error(),
|
||||
})
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if respEnv.Type != protocol.MessageTypeHTTP || respEnv.HTTPResponse == nil {
|
||||
log.Error("received non-http envelope from client", logging.Fields{
|
||||
"type": respEnv.Type,
|
||||
})
|
||||
return nil, fmt.Errorf("unexpected envelope type %q or empty http_response", respEnv.Type)
|
||||
// 클라이언트로부터 역방향 스트림 응답을 수신합니다. (ko)
|
||||
// Receive reverse stream response (StreamOpen + StreamData* + StreamClose). (en)
|
||||
var (
|
||||
resp protocol.Response
|
||||
bodyBuf bytes.Buffer
|
||||
gotOpen bool
|
||||
statusCode = http.StatusOK
|
||||
)
|
||||
|
||||
resp.RequestID = string(streamID)
|
||||
resp.Header = make(map[string][]string)
|
||||
|
||||
for {
|
||||
var env protocol.Envelope
|
||||
if err := codec.Decode(w.sess, &env); err != nil {
|
||||
log.Error("failed to decode stream response envelope", logging.Fields{
|
||||
"error": err.Error(),
|
||||
})
|
||||
return nil, err
|
||||
}
|
||||
|
||||
switch env.Type {
|
||||
case protocol.MessageTypeStreamOpen:
|
||||
so := env.StreamOpen
|
||||
if so == nil {
|
||||
return nil, fmt.Errorf("stream_open response payload is nil")
|
||||
}
|
||||
if so.ID != streamID {
|
||||
return nil, fmt.Errorf("unexpected stream_open for id %q (expected %q)", so.ID, streamID)
|
||||
}
|
||||
// 상태 코드 및 헤더 복원 (pseudo-header 제거). (ko)
|
||||
// Restore status code and headers (strip pseudo-headers). (en)
|
||||
statusStr := firstHeaderValue(so.Header, protocol.HeaderKeyStatus, strconv.Itoa(http.StatusOK))
|
||||
if sc, err := strconv.Atoi(statusStr); err == nil && sc > 0 {
|
||||
statusCode = sc
|
||||
}
|
||||
for k, vs := range so.Header {
|
||||
if k == protocol.HeaderKeyMethod ||
|
||||
k == protocol.HeaderKeyURL ||
|
||||
k == protocol.HeaderKeyHost ||
|
||||
k == protocol.HeaderKeyStatus {
|
||||
continue
|
||||
}
|
||||
resp.Header[k] = append([]string(nil), vs...)
|
||||
}
|
||||
gotOpen = true
|
||||
|
||||
case protocol.MessageTypeStreamData:
|
||||
sd := env.StreamData
|
||||
if sd == nil {
|
||||
return nil, fmt.Errorf("stream_data response payload is nil")
|
||||
}
|
||||
if sd.ID != streamID {
|
||||
return nil, fmt.Errorf("unexpected stream_data for id %q (expected %q)", sd.ID, streamID)
|
||||
}
|
||||
if len(sd.Data) > 0 {
|
||||
if _, err := bodyBuf.Write(sd.Data); err != nil {
|
||||
return nil, fmt.Errorf("buffer stream_data response: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
case protocol.MessageTypeStreamClose:
|
||||
sc := env.StreamClose
|
||||
if sc == nil {
|
||||
return nil, fmt.Errorf("stream_close response payload is nil")
|
||||
}
|
||||
if sc.ID != streamID {
|
||||
return nil, fmt.Errorf("unexpected stream_close for id %q (expected %q)", sc.ID, streamID)
|
||||
}
|
||||
// 스트림 종료: 지금까지 수신한 헤더/바디로 protocol.Response 를 완성합니다. (ko)
|
||||
// Stream finished: complete protocol.Response using collected headers/body. (en)
|
||||
resp.Status = statusCode
|
||||
resp.Body = bodyBuf.Bytes()
|
||||
resp.Error = sc.Error
|
||||
|
||||
log.Info("received stream http response over dtls", logging.Fields{
|
||||
"status": resp.Status,
|
||||
"error": resp.Error,
|
||||
})
|
||||
if !gotOpen {
|
||||
return nil, fmt.Errorf("received stream_close without prior stream_open for stream %q", streamID)
|
||||
}
|
||||
return &resp, nil
|
||||
|
||||
default:
|
||||
return nil, fmt.Errorf("unexpected envelope type %q in stream response", env.Type)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protoResp := respEnv.HTTPResponse
|
||||
// nextHTTPStreamID 는 DTLS 세션 내 HTTP 요청에 사용할 고유 StreamID 를 생성합니다. (ko)
|
||||
// nextHTTPStreamID generates a unique StreamID for HTTP requests on this DTLS session. (en)
|
||||
func (w *dtlsSessionWrapper) nextHTTPStreamID() protocol.StreamID {
|
||||
id := w.nextStreamID
|
||||
w.nextStreamID++
|
||||
return protocol.StreamID(fmt.Sprintf("http-%d", id))
|
||||
}
|
||||
|
||||
log.Info("received dtls response", logging.Fields{
|
||||
"status": protoResp.Status,
|
||||
"error": protoResp.Error,
|
||||
})
|
||||
|
||||
return protoResp, nil
|
||||
// firstHeaderValue 는 map[string][]string 형태의 헤더에서 첫 번째 값을 반환하고,
|
||||
// 값이 없으면 기본값을 반환합니다. (ko)
|
||||
// firstHeaderValue returns the first value for a header key in map[string][]string,
|
||||
// or the provided default if the key is missing or empty. (en)
|
||||
func firstHeaderValue(hdr map[string][]string, key, def string) string {
|
||||
if hdr == nil {
|
||||
return def
|
||||
}
|
||||
if vs, ok := hdr[key]; ok && len(vs) > 0 {
|
||||
return vs[0]
|
||||
}
|
||||
return def
|
||||
}
|
||||
|
||||
var (
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"net"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/dalbodeule/hop-gate/internal/dtls"
|
||||
@@ -50,10 +51,11 @@ func NewClientProxy(logger logging.Logger, localTarget string) *ClientProxy {
|
||||
}
|
||||
}
|
||||
|
||||
// StartLoop 는 DTLS 세션에서 protocol.Envelope 를 읽고, HTTP 요청의 경우 로컬 HTTP 요청을 수행한 뒤
|
||||
// protocol.Envelope(HTTP 응답 포함)을 다시 세션으로 쓰는 루프를 실행합니다. (ko)
|
||||
// StartLoop reads protocol.Envelope messages from the DTLS session; for HTTP messages it
|
||||
// performs local HTTP requests and writes back HTTP responses wrapped in an Envelope. (en)
|
||||
// StartLoop 는 DTLS 세션에서 protocol.Envelope 를 읽고, HTTP/스트림 요청의 경우 로컬 HTTP 요청을 수행한 뒤
|
||||
// protocol.Envelope(HTTP/스트림 응답 포함)을 다시 세션으로 쓰는 루프를 실행합니다. (ko)
|
||||
// StartLoop reads protocol.Envelope messages from the DTLS session; for HTTP/stream
|
||||
// messages it performs local HTTP requests and writes back responses over the DTLS
|
||||
// tunnel. (en)
|
||||
func (p *ClientProxy) StartLoop(ctx context.Context, sess dtls.Session) error {
|
||||
if ctx == nil {
|
||||
ctx = context.Background()
|
||||
@@ -92,60 +94,348 @@ func (p *ClientProxy) StartLoop(ctx context.Context, sess dtls.Session) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// 현재는 HTTP 타입만 지원하며, 그 외 타입은 에러로 처리합니다.
|
||||
if env.Type != protocol.MessageTypeHTTP || env.HTTPRequest == nil {
|
||||
switch env.Type {
|
||||
case protocol.MessageTypeHTTP:
|
||||
if err := p.handleHTTPEnvelope(ctx, sess, &env); err != nil {
|
||||
log.Error("failed to handle http envelope", logging.Fields{
|
||||
"error": err.Error(),
|
||||
})
|
||||
return err
|
||||
}
|
||||
case protocol.MessageTypeStreamOpen:
|
||||
if err := p.handleStreamRequest(ctx, sess, &env); err != nil {
|
||||
log.Error("failed to handle stream http envelope", logging.Fields{
|
||||
"error": err.Error(),
|
||||
})
|
||||
return err
|
||||
}
|
||||
default:
|
||||
log.Error("received unsupported envelope type from server", logging.Fields{
|
||||
"type": env.Type,
|
||||
})
|
||||
return fmt.Errorf("unsupported envelope type %q or missing http_request", env.Type)
|
||||
return fmt.Errorf("unsupported envelope type %q", env.Type)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
req := env.HTTPRequest
|
||||
// handleHTTPEnvelope 는 기존 단일 HTTP 요청/응답 Envelope 경로를 처리합니다. (ko)
|
||||
// handleHTTPEnvelope handles the legacy single HTTP request/response envelope path. (en)
|
||||
func (p *ClientProxy) handleHTTPEnvelope(ctx context.Context, sess dtls.Session, env *protocol.Envelope) error {
|
||||
if env.HTTPRequest == nil {
|
||||
return fmt.Errorf("http envelope missing http_request payload")
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
logReq := log.With(logging.Fields{
|
||||
"request_id": req.RequestID,
|
||||
"service": req.ServiceName,
|
||||
"method": req.Method,
|
||||
"url": req.URL,
|
||||
"client_id": req.ClientID,
|
||||
"local_target": p.LocalTarget,
|
||||
})
|
||||
logReq.Info("received http envelope from server", nil)
|
||||
req := env.HTTPRequest
|
||||
log := p.Logger
|
||||
start := time.Now()
|
||||
|
||||
resp := protocol.Response{
|
||||
RequestID: req.RequestID,
|
||||
Header: make(map[string][]string),
|
||||
}
|
||||
logReq := log.With(logging.Fields{
|
||||
"request_id": req.RequestID,
|
||||
"service": req.ServiceName,
|
||||
"method": req.Method,
|
||||
"url": req.URL,
|
||||
"client_id": req.ClientID,
|
||||
"local_target": p.LocalTarget,
|
||||
})
|
||||
logReq.Info("received http envelope from server", nil)
|
||||
|
||||
// 로컬 HTTP 요청 수행
|
||||
if err := p.forwardToLocal(ctx, req, &resp); err != nil {
|
||||
resp.Status = http.StatusBadGateway
|
||||
resp.Error = err.Error()
|
||||
logReq.Error("local http request failed", logging.Fields{
|
||||
"error": err.Error(),
|
||||
})
|
||||
}
|
||||
resp := protocol.Response{
|
||||
RequestID: req.RequestID,
|
||||
Header: make(map[string][]string),
|
||||
}
|
||||
|
||||
// HTTP 응답을 Envelope 로 감싸서 서버로 전송합니다.
|
||||
respEnv := protocol.Envelope{
|
||||
Type: protocol.MessageTypeHTTP,
|
||||
HTTPResponse: &resp,
|
||||
}
|
||||
|
||||
if err := codec.Encode(sess, &respEnv); err != nil {
|
||||
logReq.Error("failed to encode http response envelope", logging.Fields{
|
||||
"error": err.Error(),
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
logReq.Info("http response envelope sent to server", logging.Fields{
|
||||
"status": resp.Status,
|
||||
"elapsed_ms": time.Since(start).Milliseconds(),
|
||||
"error": resp.Error,
|
||||
// 로컬 HTTP 요청 수행
|
||||
if err := p.forwardToLocal(ctx, req, &resp); err != nil {
|
||||
resp.Status = http.StatusBadGateway
|
||||
resp.Error = err.Error()
|
||||
logReq.Error("local http request failed", logging.Fields{
|
||||
"error": err.Error(),
|
||||
})
|
||||
}
|
||||
|
||||
// HTTP 응답을 Envelope 로 감싸서 서버로 전송합니다.
|
||||
respEnv := protocol.Envelope{
|
||||
Type: protocol.MessageTypeHTTP,
|
||||
HTTPResponse: &resp,
|
||||
}
|
||||
|
||||
if err := protocol.DefaultCodec.Encode(sess, &respEnv); err != nil {
|
||||
logReq.Error("failed to encode http response envelope", logging.Fields{
|
||||
"error": err.Error(),
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
logReq.Info("http response envelope sent to server", logging.Fields{
|
||||
"status": resp.Status,
|
||||
"elapsed_ms": time.Since(start).Milliseconds(),
|
||||
"error": resp.Error,
|
||||
})
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// handleStreamRequest 는 StreamOpen/StreamData/StreamClose 기반 HTTP 요청/응답 스트림을 처리합니다. (ko)
|
||||
// handleStreamRequest handles an HTTP request/response exchange using StreamOpen/StreamData/StreamClose frames. (en)
|
||||
func (p *ClientProxy) handleStreamRequest(ctx context.Context, sess dtls.Session, openEnv *protocol.Envelope) error {
|
||||
codec := protocol.DefaultCodec
|
||||
log := p.Logger
|
||||
|
||||
so := openEnv.StreamOpen
|
||||
if so == nil {
|
||||
return fmt.Errorf("stream_open envelope missing payload")
|
||||
}
|
||||
|
||||
streamID := so.ID
|
||||
|
||||
// Pseudo-header 에서 HTTP 메타데이터를 추출합니다. (ko)
|
||||
// Extract HTTP metadata from pseudo-headers. (en)
|
||||
method := firstHeaderValue(so.Header, protocol.HeaderKeyMethod, http.MethodGet)
|
||||
urlStr := firstHeaderValue(so.Header, protocol.HeaderKeyURL, "/")
|
||||
_ = firstHeaderValue(so.Header, protocol.HeaderKeyHost, "")
|
||||
|
||||
if p.LocalTarget == "" {
|
||||
return fmt.Errorf("local target is empty")
|
||||
}
|
||||
|
||||
u, err := url.Parse(urlStr)
|
||||
if err != nil {
|
||||
return fmt.Errorf("parse url from stream_open: %w", err)
|
||||
}
|
||||
u.Scheme = "http"
|
||||
u.Host = p.LocalTarget
|
||||
|
||||
// 로컬 HTTP 요청용 헤더 맵을 생성하면서 pseudo-header 는 제거합니다. (ko)
|
||||
// Build local HTTP header map while stripping pseudo-headers. (en)
|
||||
httpHeader := make(http.Header, len(so.Header))
|
||||
for k, vs := range so.Header {
|
||||
if k == protocol.HeaderKeyMethod ||
|
||||
k == protocol.HeaderKeyURL ||
|
||||
k == protocol.HeaderKeyHost ||
|
||||
k == protocol.HeaderKeyStatus {
|
||||
continue
|
||||
}
|
||||
for _, v := range vs {
|
||||
httpHeader.Add(k, v)
|
||||
}
|
||||
}
|
||||
|
||||
// 요청 바디를 StreamData/StreamClose 프레임에서 모두 읽어 메모리에 적재합니다. (ko)
|
||||
// Read the entire request body from StreamData/StreamClose frames into memory. (en)
|
||||
var bodyBuf bytes.Buffer
|
||||
for {
|
||||
var env protocol.Envelope
|
||||
if err := codec.Decode(sess, &env); err != nil {
|
||||
if err == io.EOF {
|
||||
return fmt.Errorf("unexpected EOF while reading stream request body")
|
||||
}
|
||||
return fmt.Errorf("decode stream request frame: %w", err)
|
||||
}
|
||||
|
||||
switch env.Type {
|
||||
case protocol.MessageTypeStreamData:
|
||||
sd := env.StreamData
|
||||
if sd == nil {
|
||||
return fmt.Errorf("stream_data payload is nil")
|
||||
}
|
||||
if sd.ID != streamID {
|
||||
return fmt.Errorf("stream_data for unexpected stream id %q (expected %q)", sd.ID, streamID)
|
||||
}
|
||||
if len(sd.Data) > 0 {
|
||||
if _, err := bodyBuf.Write(sd.Data); err != nil {
|
||||
return fmt.Errorf("buffer stream_data: %w", err)
|
||||
}
|
||||
}
|
||||
case protocol.MessageTypeStreamClose:
|
||||
sc := env.StreamClose
|
||||
if sc == nil {
|
||||
return fmt.Errorf("stream_close payload is nil")
|
||||
}
|
||||
if sc.ID != streamID {
|
||||
return fmt.Errorf("stream_close for unexpected stream id %q (expected %q)", sc.ID, streamID)
|
||||
}
|
||||
// sc.Error 는 최소 구현에서는 로컬 요청 에러와 별도로 취급하지 않습니다. (ko)
|
||||
// For the minimal implementation we do not surface sc.Error here. (en)
|
||||
goto haveBody
|
||||
default:
|
||||
return fmt.Errorf("unexpected envelope type %q while reading stream request body", env.Type)
|
||||
}
|
||||
}
|
||||
|
||||
haveBody:
|
||||
bodyBytes := bodyBuf.Bytes()
|
||||
|
||||
// 로컬 HTTP 요청 생성 (stream 기반 요청을 실제 HTTP 요청으로 변환). (ko)
|
||||
// Build the local HTTP request from the stream-based metadata and body. (en)
|
||||
req, err := http.NewRequestWithContext(ctx, method, u.String(), nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("create http request from stream: %w", err)
|
||||
}
|
||||
if len(bodyBytes) > 0 {
|
||||
buf := bytes.NewReader(bodyBytes)
|
||||
req.Body = io.NopCloser(buf)
|
||||
req.ContentLength = int64(len(bodyBytes))
|
||||
}
|
||||
req.Header = httpHeader
|
||||
|
||||
start := time.Now()
|
||||
logReq := log.With(logging.Fields{
|
||||
"request_id": string(streamID),
|
||||
"service": so.Service,
|
||||
"method": method,
|
||||
"url": urlStr,
|
||||
"stream_id": string(streamID),
|
||||
"local_target": p.LocalTarget,
|
||||
})
|
||||
logReq.Info("received stream_open envelope from server", nil)
|
||||
|
||||
res, err := p.HTTPClient.Do(req)
|
||||
if err != nil {
|
||||
// 로컬 요청 실패 시, 502 + 에러 메시지를 스트림 응답으로 전송합니다. (ko)
|
||||
// On local request failure, send a 502 response over the stream. (en)
|
||||
errMsg := fmt.Sprintf("perform http request: %v", err)
|
||||
streamRespHeader := map[string][]string{
|
||||
"Content-Type": {"text/plain; charset=utf-8"},
|
||||
protocol.HeaderKeyStatus: {strconv.Itoa(http.StatusBadGateway)},
|
||||
}
|
||||
respOpen := protocol.Envelope{
|
||||
Type: protocol.MessageTypeStreamOpen,
|
||||
StreamOpen: &protocol.StreamOpen{
|
||||
ID: streamID,
|
||||
Service: so.Service,
|
||||
TargetAddr: so.TargetAddr,
|
||||
Header: streamRespHeader,
|
||||
},
|
||||
}
|
||||
if err2 := codec.Encode(sess, &respOpen); err2 != nil {
|
||||
logReq.Error("failed to encode stream response open envelope (error path)", logging.Fields{
|
||||
"error": err2.Error(),
|
||||
})
|
||||
return err2
|
||||
}
|
||||
|
||||
dataEnv := protocol.Envelope{
|
||||
Type: protocol.MessageTypeStreamData,
|
||||
StreamData: &protocol.StreamData{
|
||||
ID: streamID,
|
||||
Seq: 0,
|
||||
Data: []byte("HopGate: " + errMsg),
|
||||
},
|
||||
}
|
||||
if err2 := codec.Encode(sess, &dataEnv); err2 != nil {
|
||||
logReq.Error("failed to encode stream response data envelope (error path)", logging.Fields{
|
||||
"error": err2.Error(),
|
||||
})
|
||||
return err2
|
||||
}
|
||||
|
||||
closeEnv := protocol.Envelope{
|
||||
Type: protocol.MessageTypeStreamClose,
|
||||
StreamClose: &protocol.StreamClose{
|
||||
ID: streamID,
|
||||
Error: errMsg,
|
||||
},
|
||||
}
|
||||
if err2 := codec.Encode(sess, &closeEnv); err2 != nil {
|
||||
logReq.Error("failed to encode stream response close envelope (error path)", logging.Fields{
|
||||
"error": err2.Error(),
|
||||
})
|
||||
return err2
|
||||
}
|
||||
|
||||
logReq.Error("local http request failed (stream)", logging.Fields{
|
||||
"error": err.Error(),
|
||||
})
|
||||
return nil
|
||||
}
|
||||
defer res.Body.Close()
|
||||
|
||||
// 응답을 StreamOpen + StreamData(4KiB chunk) + StreamClose 프레임으로 전송합니다. (ko)
|
||||
// Send the response as StreamOpen + StreamData (4KiB chunks) + StreamClose frames. (en)
|
||||
|
||||
// 응답 헤더 맵을 복사하고 상태 코드를 pseudo-header 로 추가합니다. (ko)
|
||||
// Copy response headers and attach status code as a pseudo-header. (en)
|
||||
streamRespHeader := make(map[string][]string, len(res.Header)+1)
|
||||
for k, vs := range res.Header {
|
||||
streamRespHeader[k] = append([]string(nil), vs...)
|
||||
}
|
||||
statusCode := res.StatusCode
|
||||
if statusCode == 0 {
|
||||
statusCode = http.StatusOK
|
||||
}
|
||||
streamRespHeader[protocol.HeaderKeyStatus] = []string{strconv.Itoa(statusCode)}
|
||||
|
||||
respOpen := protocol.Envelope{
|
||||
Type: protocol.MessageTypeStreamOpen,
|
||||
StreamOpen: &protocol.StreamOpen{
|
||||
ID: streamID,
|
||||
Service: so.Service,
|
||||
TargetAddr: so.TargetAddr,
|
||||
Header: streamRespHeader,
|
||||
},
|
||||
}
|
||||
|
||||
if err := codec.Encode(sess, &respOpen); err != nil {
|
||||
logReq.Error("failed to encode stream response open envelope", logging.Fields{
|
||||
"error": err.Error(),
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
// 응답 바디를 4KiB(StreamChunkSize) 단위로 잘라 StreamData 프레임으로 전송합니다. (ko)
|
||||
// Chunk the response body into 4KiB (StreamChunkSize) StreamData frames. (en)
|
||||
var seq uint64
|
||||
chunk := make([]byte, protocol.StreamChunkSize)
|
||||
for {
|
||||
n, err := res.Body.Read(chunk)
|
||||
if n > 0 {
|
||||
dataCopy := append([]byte(nil), chunk[:n]...)
|
||||
dataEnv := protocol.Envelope{
|
||||
Type: protocol.MessageTypeStreamData,
|
||||
StreamData: &protocol.StreamData{
|
||||
ID: streamID,
|
||||
Seq: seq,
|
||||
Data: dataCopy,
|
||||
},
|
||||
}
|
||||
if err2 := codec.Encode(sess, &dataEnv); err2 != nil {
|
||||
logReq.Error("failed to encode stream response data envelope", logging.Fields{
|
||||
"error": err2.Error(),
|
||||
})
|
||||
return err2
|
||||
}
|
||||
seq++
|
||||
}
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
if err != nil {
|
||||
return fmt.Errorf("read http response body for streaming: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
closeEnv := protocol.Envelope{
|
||||
Type: protocol.MessageTypeStreamClose,
|
||||
StreamClose: &protocol.StreamClose{
|
||||
ID: streamID,
|
||||
Error: "",
|
||||
},
|
||||
}
|
||||
|
||||
if err := codec.Encode(sess, &closeEnv); err != nil {
|
||||
logReq.Error("failed to encode stream response close envelope", logging.Fields{
|
||||
"error": err.Error(),
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
logReq.Info("stream http response sent to server", logging.Fields{
|
||||
"status": statusCode,
|
||||
"elapsed_ms": time.Since(start).Milliseconds(),
|
||||
"error": "",
|
||||
})
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// forwardToLocal 는 protocol.Request 를 로컬 HTTP 요청으로 변환하고 protocol.Response 를 채웁니다. (ko)
|
||||
@@ -240,3 +530,15 @@ func (p *ClientProxy) forwardToLocal(ctx context.Context, preq *protocol.Request
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// firstHeaderValue 는 주어진 키의 첫 번째 헤더 값을 반환하고, 없으면 기본값을 반환합니다. (ko)
|
||||
// firstHeaderValue returns the first header value for a key, or a default if absent. (en)
|
||||
func firstHeaderValue(hdr map[string][]string, key, def string) string {
|
||||
if hdr == nil {
|
||||
return def
|
||||
}
|
||||
if vs, ok := hdr[key]; ok && len(vs) > 0 {
|
||||
return vs[0]
|
||||
}
|
||||
return def
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user