From 5e94dd7aa99015118fe29d29b19c9fa5e503f1ee Mon Sep 17 00:00:00 2001 From: dalbodeule <11470513+dalbodeule@users.noreply.github.com> Date: Mon, 8 Dec 2025 23:05:45 +0900 Subject: [PATCH] [feat](server, client): implement streaming-based HTTP tunnel with DTLS sessions - Replaced single-envelope HTTP handling with stream-based tunneling (`StreamOpen`, `StreamData`, and `StreamClose`) for HTTP-over-DTLS. - Added unique StreamID generation for per-session HTTP requests. - Improved client and server logic for handling chunked body transmissions and reverse stream responses. - Enhanced pseudo-header handling for HTTP metadata in tunneling. - Updated error handling for local HTTP failures, ensuring proper stream-based responses. --- cmd/server/main.go | 244 +++++++++++++++++++----- internal/proxy/client.go | 396 ++++++++++++++++++++++++++++++++++----- 2 files changed, 544 insertions(+), 96 deletions(-) diff --git a/cmd/server/main.go b/cmd/server/main.go index 46c9118..b898f2b 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -1,6 +1,7 @@ package main import ( + "bytes" "context" "crypto/tls" "fmt" @@ -29,8 +30,9 @@ import ( ) type dtlsSessionWrapper struct { - sess dtls.Session - mu sync.Mutex + sess dtls.Session + mu sync.Mutex + nextStreamID uint64 } // canonicalizeDomainForDNS 는 DTLS 핸드셰이크에서 전달된 도메인 문자열을 @@ -155,8 +157,10 @@ func parseExpectedIPsFromEnv(logger logging.Logger, envKey string) []net.IP { return result } -// ForwardHTTP 는 단일 HTTP 요청을 DTLS 세션으로 포워딩하고 응답을 돌려받습니다. -// ForwardHTTP forwards a single HTTP request over the DTLS session and returns the response. +// ForwardHTTP 는 HTTP 요청을 DTLS 세션 위의 StreamOpen/StreamData/StreamClose 프레임으로 전송하고, +// 역방향 스트림 응답을 수신해 protocol.Response 로 반환합니다. (ko) +// ForwardHTTP forwards an HTTP request over the DTLS session using StreamOpen/StreamData/StreamClose +// frames and reconstructs the reverse stream into a protocol.Response. (en) func (w *dtlsSessionWrapper) ForwardHTTP(ctx context.Context, logger logging.Logger, req *http.Request, serviceName string) (*protocol.Response, error) { w.mu.Lock() defer w.mu.Unlock() @@ -165,78 +169,220 @@ func (w *dtlsSessionWrapper) ForwardHTTP(ctx context.Context, logger logging.Log ctx = context.Background() } - // 요청 본문 읽기 - var body []byte - if req.Body != nil { - b, err := io.ReadAll(req.Body) - if err != nil { - return nil, err - } - body = b - } + codec := protocol.DefaultCodec - // 간단한 RequestID 생성 (실제 서비스에서는 UUID 등을 사용하는 것이 좋음) - requestID := time.Now().UTC().Format("20060102T150405.000000000") - - httpReq := &protocol.Request{ - RequestID: requestID, - ClientID: "", // TODO: 클라이언트 식별자 도입 시 채우기 - ServiceName: serviceName, - Method: req.Method, - URL: req.URL.String(), - Header: req.Header.Clone(), - Body: body, - } + // 세션 내에서 고유한 StreamID 를 생성합니다. (ko) + // Generate a unique StreamID for this HTTP request within the DTLS session. (en) + streamID := w.nextHTTPStreamID() log := logger.With(logging.Fields{ "component": "http_to_dtls", - "request_id": requestID, + "request_id": string(streamID), "method": req.Method, "url": req.URL.String(), }) - log.Info("forwarding http request over dtls", logging.Fields{ + log.Info("forwarding http request over dtls (stream mode)", logging.Fields{ "host": req.Host, "scheme": req.URL.Scheme, }) - // HTTP 요청을 Envelope 로 감싸서 전송합니다. - env := &protocol.Envelope{ - Type: protocol.MessageTypeHTTP, - HTTPRequest: httpReq, + // 요청 헤더를 복사하고 pseudo-header 로 HTTP 메타데이터를 추가합니다. (ko) + // Copy request headers and attach HTTP metadata as pseudo-headers. (en) + hdr := make(map[string][]string, len(req.Header)+3) + for k, vs := range req.Header { + hdr[k] = append([]string(nil), vs...) + } + hdr[protocol.HeaderKeyMethod] = []string{req.Method} + if req.URL != nil { + hdr[protocol.HeaderKeyURL] = []string{req.URL.String()} + } + host := req.Host + if host == "" && req.URL != nil { + host = req.URL.Host + } + if host != "" { + hdr[protocol.HeaderKeyHost] = []string{host} } - if err := protocol.DefaultCodec.Encode(w.sess, env); err != nil { - log.Error("failed to encode http envelope", logging.Fields{ + // StreamOpen 전송: 어떤 서비스로 라우팅해야 하는지와 초기 헤더를 전달합니다. (ko) + // Send StreamOpen to indicate which service to route to and initial headers. (en) + openEnv := &protocol.Envelope{ + Type: protocol.MessageTypeStreamOpen, + StreamOpen: &protocol.StreamOpen{ + ID: streamID, + Service: serviceName, + TargetAddr: "", + Header: hdr, + }, + } + if err := codec.Encode(w.sess, openEnv); err != nil { + log.Error("failed to encode stream_open envelope", logging.Fields{ "error": err.Error(), }) return nil, err } - // 클라이언트로부터 HTTP 응답 Envelope 를 수신합니다. - var respEnv protocol.Envelope - if err := protocol.DefaultCodec.Decode(w.sess, &respEnv); err != nil { - log.Error("failed to decode http envelope", logging.Fields{ + // 요청 바디를 4KiB(StreamChunkSize) 단위로 잘라 StreamData 프레임으로 전송합니다. (ko) + // Chunk the request body into 4KiB (StreamChunkSize) StreamData frames. (en) + var seq uint64 + if req.Body != nil { + buf := make([]byte, protocol.StreamChunkSize) + for { + n, err := req.Body.Read(buf) + if n > 0 { + dataCopy := append([]byte(nil), buf[:n]...) + dataEnv := &protocol.Envelope{ + Type: protocol.MessageTypeStreamData, + StreamData: &protocol.StreamData{ + ID: streamID, + Seq: seq, + Data: dataCopy, + }, + } + if err2 := codec.Encode(w.sess, dataEnv); err2 != nil { + log.Error("failed to encode stream_data envelope", logging.Fields{ + "error": err2.Error(), + }) + return nil, err2 + } + seq++ + } + if err == io.EOF { + break + } + if err != nil { + return nil, fmt.Errorf("read http request body for streaming: %w", err) + } + } + } + + // 바디 종료를 알리는 StreamClose 를 전송합니다. (ko) + // Send StreamClose to mark the end of the request body. (en) + closeReqEnv := &protocol.Envelope{ + Type: protocol.MessageTypeStreamClose, + StreamClose: &protocol.StreamClose{ + ID: streamID, + Error: "", + }, + } + if err := codec.Encode(w.sess, closeReqEnv); err != nil { + log.Error("failed to encode request stream_close envelope", logging.Fields{ "error": err.Error(), }) return nil, err } - if respEnv.Type != protocol.MessageTypeHTTP || respEnv.HTTPResponse == nil { - log.Error("received non-http envelope from client", logging.Fields{ - "type": respEnv.Type, - }) - return nil, fmt.Errorf("unexpected envelope type %q or empty http_response", respEnv.Type) + // 클라이언트로부터 역방향 스트림 응답을 수신합니다. (ko) + // Receive reverse stream response (StreamOpen + StreamData* + StreamClose). (en) + var ( + resp protocol.Response + bodyBuf bytes.Buffer + gotOpen bool + statusCode = http.StatusOK + ) + + resp.RequestID = string(streamID) + resp.Header = make(map[string][]string) + + for { + var env protocol.Envelope + if err := codec.Decode(w.sess, &env); err != nil { + log.Error("failed to decode stream response envelope", logging.Fields{ + "error": err.Error(), + }) + return nil, err + } + + switch env.Type { + case protocol.MessageTypeStreamOpen: + so := env.StreamOpen + if so == nil { + return nil, fmt.Errorf("stream_open response payload is nil") + } + if so.ID != streamID { + return nil, fmt.Errorf("unexpected stream_open for id %q (expected %q)", so.ID, streamID) + } + // 상태 코드 및 헤더 복원 (pseudo-header 제거). (ko) + // Restore status code and headers (strip pseudo-headers). (en) + statusStr := firstHeaderValue(so.Header, protocol.HeaderKeyStatus, strconv.Itoa(http.StatusOK)) + if sc, err := strconv.Atoi(statusStr); err == nil && sc > 0 { + statusCode = sc + } + for k, vs := range so.Header { + if k == protocol.HeaderKeyMethod || + k == protocol.HeaderKeyURL || + k == protocol.HeaderKeyHost || + k == protocol.HeaderKeyStatus { + continue + } + resp.Header[k] = append([]string(nil), vs...) + } + gotOpen = true + + case protocol.MessageTypeStreamData: + sd := env.StreamData + if sd == nil { + return nil, fmt.Errorf("stream_data response payload is nil") + } + if sd.ID != streamID { + return nil, fmt.Errorf("unexpected stream_data for id %q (expected %q)", sd.ID, streamID) + } + if len(sd.Data) > 0 { + if _, err := bodyBuf.Write(sd.Data); err != nil { + return nil, fmt.Errorf("buffer stream_data response: %w", err) + } + } + + case protocol.MessageTypeStreamClose: + sc := env.StreamClose + if sc == nil { + return nil, fmt.Errorf("stream_close response payload is nil") + } + if sc.ID != streamID { + return nil, fmt.Errorf("unexpected stream_close for id %q (expected %q)", sc.ID, streamID) + } + // 스트림 종료: 지금까지 수신한 헤더/바디로 protocol.Response 를 완성합니다. (ko) + // Stream finished: complete protocol.Response using collected headers/body. (en) + resp.Status = statusCode + resp.Body = bodyBuf.Bytes() + resp.Error = sc.Error + + log.Info("received stream http response over dtls", logging.Fields{ + "status": resp.Status, + "error": resp.Error, + }) + if !gotOpen { + return nil, fmt.Errorf("received stream_close without prior stream_open for stream %q", streamID) + } + return &resp, nil + + default: + return nil, fmt.Errorf("unexpected envelope type %q in stream response", env.Type) + } } +} - protoResp := respEnv.HTTPResponse +// nextHTTPStreamID 는 DTLS 세션 내 HTTP 요청에 사용할 고유 StreamID 를 생성합니다. (ko) +// nextHTTPStreamID generates a unique StreamID for HTTP requests on this DTLS session. (en) +func (w *dtlsSessionWrapper) nextHTTPStreamID() protocol.StreamID { + id := w.nextStreamID + w.nextStreamID++ + return protocol.StreamID(fmt.Sprintf("http-%d", id)) +} - log.Info("received dtls response", logging.Fields{ - "status": protoResp.Status, - "error": protoResp.Error, - }) - - return protoResp, nil +// firstHeaderValue 는 map[string][]string 형태의 헤더에서 첫 번째 값을 반환하고, +// 값이 없으면 기본값을 반환합니다. (ko) +// firstHeaderValue returns the first value for a header key in map[string][]string, +// or the provided default if the key is missing or empty. (en) +func firstHeaderValue(hdr map[string][]string, key, def string) string { + if hdr == nil { + return def + } + if vs, ok := hdr[key]; ok && len(vs) > 0 { + return vs[0] + } + return def } var ( diff --git a/internal/proxy/client.go b/internal/proxy/client.go index e6308df..ff02f63 100644 --- a/internal/proxy/client.go +++ b/internal/proxy/client.go @@ -8,6 +8,7 @@ import ( "net" "net/http" "net/url" + "strconv" "time" "github.com/dalbodeule/hop-gate/internal/dtls" @@ -50,10 +51,11 @@ func NewClientProxy(logger logging.Logger, localTarget string) *ClientProxy { } } -// StartLoop 는 DTLS 세션에서 protocol.Envelope 를 읽고, HTTP 요청의 경우 로컬 HTTP 요청을 수행한 뒤 -// protocol.Envelope(HTTP 응답 포함)을 다시 세션으로 쓰는 루프를 실행합니다. (ko) -// StartLoop reads protocol.Envelope messages from the DTLS session; for HTTP messages it -// performs local HTTP requests and writes back HTTP responses wrapped in an Envelope. (en) +// StartLoop 는 DTLS 세션에서 protocol.Envelope 를 읽고, HTTP/스트림 요청의 경우 로컬 HTTP 요청을 수행한 뒤 +// protocol.Envelope(HTTP/스트림 응답 포함)을 다시 세션으로 쓰는 루프를 실행합니다. (ko) +// StartLoop reads protocol.Envelope messages from the DTLS session; for HTTP/stream +// messages it performs local HTTP requests and writes back responses over the DTLS +// tunnel. (en) func (p *ClientProxy) StartLoop(ctx context.Context, sess dtls.Session) error { if ctx == nil { ctx = context.Background() @@ -92,60 +94,348 @@ func (p *ClientProxy) StartLoop(ctx context.Context, sess dtls.Session) error { return err } - // 현재는 HTTP 타입만 지원하며, 그 외 타입은 에러로 처리합니다. - if env.Type != protocol.MessageTypeHTTP || env.HTTPRequest == nil { + switch env.Type { + case protocol.MessageTypeHTTP: + if err := p.handleHTTPEnvelope(ctx, sess, &env); err != nil { + log.Error("failed to handle http envelope", logging.Fields{ + "error": err.Error(), + }) + return err + } + case protocol.MessageTypeStreamOpen: + if err := p.handleStreamRequest(ctx, sess, &env); err != nil { + log.Error("failed to handle stream http envelope", logging.Fields{ + "error": err.Error(), + }) + return err + } + default: log.Error("received unsupported envelope type from server", logging.Fields{ "type": env.Type, }) - return fmt.Errorf("unsupported envelope type %q or missing http_request", env.Type) + return fmt.Errorf("unsupported envelope type %q", env.Type) } + } +} - req := env.HTTPRequest +// handleHTTPEnvelope 는 기존 단일 HTTP 요청/응답 Envelope 경로를 처리합니다. (ko) +// handleHTTPEnvelope handles the legacy single HTTP request/response envelope path. (en) +func (p *ClientProxy) handleHTTPEnvelope(ctx context.Context, sess dtls.Session, env *protocol.Envelope) error { + if env.HTTPRequest == nil { + return fmt.Errorf("http envelope missing http_request payload") + } - start := time.Now() - logReq := log.With(logging.Fields{ - "request_id": req.RequestID, - "service": req.ServiceName, - "method": req.Method, - "url": req.URL, - "client_id": req.ClientID, - "local_target": p.LocalTarget, - }) - logReq.Info("received http envelope from server", nil) + req := env.HTTPRequest + log := p.Logger + start := time.Now() - resp := protocol.Response{ - RequestID: req.RequestID, - Header: make(map[string][]string), - } + logReq := log.With(logging.Fields{ + "request_id": req.RequestID, + "service": req.ServiceName, + "method": req.Method, + "url": req.URL, + "client_id": req.ClientID, + "local_target": p.LocalTarget, + }) + logReq.Info("received http envelope from server", nil) - // 로컬 HTTP 요청 수행 - if err := p.forwardToLocal(ctx, req, &resp); err != nil { - resp.Status = http.StatusBadGateway - resp.Error = err.Error() - logReq.Error("local http request failed", logging.Fields{ - "error": err.Error(), - }) - } + resp := protocol.Response{ + RequestID: req.RequestID, + Header: make(map[string][]string), + } - // HTTP 응답을 Envelope 로 감싸서 서버로 전송합니다. - respEnv := protocol.Envelope{ - Type: protocol.MessageTypeHTTP, - HTTPResponse: &resp, - } - - if err := codec.Encode(sess, &respEnv); err != nil { - logReq.Error("failed to encode http response envelope", logging.Fields{ - "error": err.Error(), - }) - return err - } - - logReq.Info("http response envelope sent to server", logging.Fields{ - "status": resp.Status, - "elapsed_ms": time.Since(start).Milliseconds(), - "error": resp.Error, + // 로컬 HTTP 요청 수행 + if err := p.forwardToLocal(ctx, req, &resp); err != nil { + resp.Status = http.StatusBadGateway + resp.Error = err.Error() + logReq.Error("local http request failed", logging.Fields{ + "error": err.Error(), }) } + + // HTTP 응답을 Envelope 로 감싸서 서버로 전송합니다. + respEnv := protocol.Envelope{ + Type: protocol.MessageTypeHTTP, + HTTPResponse: &resp, + } + + if err := protocol.DefaultCodec.Encode(sess, &respEnv); err != nil { + logReq.Error("failed to encode http response envelope", logging.Fields{ + "error": err.Error(), + }) + return err + } + + logReq.Info("http response envelope sent to server", logging.Fields{ + "status": resp.Status, + "elapsed_ms": time.Since(start).Milliseconds(), + "error": resp.Error, + }) + + return nil +} + +// handleStreamRequest 는 StreamOpen/StreamData/StreamClose 기반 HTTP 요청/응답 스트림을 처리합니다. (ko) +// handleStreamRequest handles an HTTP request/response exchange using StreamOpen/StreamData/StreamClose frames. (en) +func (p *ClientProxy) handleStreamRequest(ctx context.Context, sess dtls.Session, openEnv *protocol.Envelope) error { + codec := protocol.DefaultCodec + log := p.Logger + + so := openEnv.StreamOpen + if so == nil { + return fmt.Errorf("stream_open envelope missing payload") + } + + streamID := so.ID + + // Pseudo-header 에서 HTTP 메타데이터를 추출합니다. (ko) + // Extract HTTP metadata from pseudo-headers. (en) + method := firstHeaderValue(so.Header, protocol.HeaderKeyMethod, http.MethodGet) + urlStr := firstHeaderValue(so.Header, protocol.HeaderKeyURL, "/") + _ = firstHeaderValue(so.Header, protocol.HeaderKeyHost, "") + + if p.LocalTarget == "" { + return fmt.Errorf("local target is empty") + } + + u, err := url.Parse(urlStr) + if err != nil { + return fmt.Errorf("parse url from stream_open: %w", err) + } + u.Scheme = "http" + u.Host = p.LocalTarget + + // 로컬 HTTP 요청용 헤더 맵을 생성하면서 pseudo-header 는 제거합니다. (ko) + // Build local HTTP header map while stripping pseudo-headers. (en) + httpHeader := make(http.Header, len(so.Header)) + for k, vs := range so.Header { + if k == protocol.HeaderKeyMethod || + k == protocol.HeaderKeyURL || + k == protocol.HeaderKeyHost || + k == protocol.HeaderKeyStatus { + continue + } + for _, v := range vs { + httpHeader.Add(k, v) + } + } + + // 요청 바디를 StreamData/StreamClose 프레임에서 모두 읽어 메모리에 적재합니다. (ko) + // Read the entire request body from StreamData/StreamClose frames into memory. (en) + var bodyBuf bytes.Buffer + for { + var env protocol.Envelope + if err := codec.Decode(sess, &env); err != nil { + if err == io.EOF { + return fmt.Errorf("unexpected EOF while reading stream request body") + } + return fmt.Errorf("decode stream request frame: %w", err) + } + + switch env.Type { + case protocol.MessageTypeStreamData: + sd := env.StreamData + if sd == nil { + return fmt.Errorf("stream_data payload is nil") + } + if sd.ID != streamID { + return fmt.Errorf("stream_data for unexpected stream id %q (expected %q)", sd.ID, streamID) + } + if len(sd.Data) > 0 { + if _, err := bodyBuf.Write(sd.Data); err != nil { + return fmt.Errorf("buffer stream_data: %w", err) + } + } + case protocol.MessageTypeStreamClose: + sc := env.StreamClose + if sc == nil { + return fmt.Errorf("stream_close payload is nil") + } + if sc.ID != streamID { + return fmt.Errorf("stream_close for unexpected stream id %q (expected %q)", sc.ID, streamID) + } + // sc.Error 는 최소 구현에서는 로컬 요청 에러와 별도로 취급하지 않습니다. (ko) + // For the minimal implementation we do not surface sc.Error here. (en) + goto haveBody + default: + return fmt.Errorf("unexpected envelope type %q while reading stream request body", env.Type) + } + } + +haveBody: + bodyBytes := bodyBuf.Bytes() + + // 로컬 HTTP 요청 생성 (stream 기반 요청을 실제 HTTP 요청으로 변환). (ko) + // Build the local HTTP request from the stream-based metadata and body. (en) + req, err := http.NewRequestWithContext(ctx, method, u.String(), nil) + if err != nil { + return fmt.Errorf("create http request from stream: %w", err) + } + if len(bodyBytes) > 0 { + buf := bytes.NewReader(bodyBytes) + req.Body = io.NopCloser(buf) + req.ContentLength = int64(len(bodyBytes)) + } + req.Header = httpHeader + + start := time.Now() + logReq := log.With(logging.Fields{ + "request_id": string(streamID), + "service": so.Service, + "method": method, + "url": urlStr, + "stream_id": string(streamID), + "local_target": p.LocalTarget, + }) + logReq.Info("received stream_open envelope from server", nil) + + res, err := p.HTTPClient.Do(req) + if err != nil { + // 로컬 요청 실패 시, 502 + 에러 메시지를 스트림 응답으로 전송합니다. (ko) + // On local request failure, send a 502 response over the stream. (en) + errMsg := fmt.Sprintf("perform http request: %v", err) + streamRespHeader := map[string][]string{ + "Content-Type": {"text/plain; charset=utf-8"}, + protocol.HeaderKeyStatus: {strconv.Itoa(http.StatusBadGateway)}, + } + respOpen := protocol.Envelope{ + Type: protocol.MessageTypeStreamOpen, + StreamOpen: &protocol.StreamOpen{ + ID: streamID, + Service: so.Service, + TargetAddr: so.TargetAddr, + Header: streamRespHeader, + }, + } + if err2 := codec.Encode(sess, &respOpen); err2 != nil { + logReq.Error("failed to encode stream response open envelope (error path)", logging.Fields{ + "error": err2.Error(), + }) + return err2 + } + + dataEnv := protocol.Envelope{ + Type: protocol.MessageTypeStreamData, + StreamData: &protocol.StreamData{ + ID: streamID, + Seq: 0, + Data: []byte("HopGate: " + errMsg), + }, + } + if err2 := codec.Encode(sess, &dataEnv); err2 != nil { + logReq.Error("failed to encode stream response data envelope (error path)", logging.Fields{ + "error": err2.Error(), + }) + return err2 + } + + closeEnv := protocol.Envelope{ + Type: protocol.MessageTypeStreamClose, + StreamClose: &protocol.StreamClose{ + ID: streamID, + Error: errMsg, + }, + } + if err2 := codec.Encode(sess, &closeEnv); err2 != nil { + logReq.Error("failed to encode stream response close envelope (error path)", logging.Fields{ + "error": err2.Error(), + }) + return err2 + } + + logReq.Error("local http request failed (stream)", logging.Fields{ + "error": err.Error(), + }) + return nil + } + defer res.Body.Close() + + // 응답을 StreamOpen + StreamData(4KiB chunk) + StreamClose 프레임으로 전송합니다. (ko) + // Send the response as StreamOpen + StreamData (4KiB chunks) + StreamClose frames. (en) + + // 응답 헤더 맵을 복사하고 상태 코드를 pseudo-header 로 추가합니다. (ko) + // Copy response headers and attach status code as a pseudo-header. (en) + streamRespHeader := make(map[string][]string, len(res.Header)+1) + for k, vs := range res.Header { + streamRespHeader[k] = append([]string(nil), vs...) + } + statusCode := res.StatusCode + if statusCode == 0 { + statusCode = http.StatusOK + } + streamRespHeader[protocol.HeaderKeyStatus] = []string{strconv.Itoa(statusCode)} + + respOpen := protocol.Envelope{ + Type: protocol.MessageTypeStreamOpen, + StreamOpen: &protocol.StreamOpen{ + ID: streamID, + Service: so.Service, + TargetAddr: so.TargetAddr, + Header: streamRespHeader, + }, + } + + if err := codec.Encode(sess, &respOpen); err != nil { + logReq.Error("failed to encode stream response open envelope", logging.Fields{ + "error": err.Error(), + }) + return err + } + + // 응답 바디를 4KiB(StreamChunkSize) 단위로 잘라 StreamData 프레임으로 전송합니다. (ko) + // Chunk the response body into 4KiB (StreamChunkSize) StreamData frames. (en) + var seq uint64 + chunk := make([]byte, protocol.StreamChunkSize) + for { + n, err := res.Body.Read(chunk) + if n > 0 { + dataCopy := append([]byte(nil), chunk[:n]...) + dataEnv := protocol.Envelope{ + Type: protocol.MessageTypeStreamData, + StreamData: &protocol.StreamData{ + ID: streamID, + Seq: seq, + Data: dataCopy, + }, + } + if err2 := codec.Encode(sess, &dataEnv); err2 != nil { + logReq.Error("failed to encode stream response data envelope", logging.Fields{ + "error": err2.Error(), + }) + return err2 + } + seq++ + } + if err == io.EOF { + break + } + if err != nil { + return fmt.Errorf("read http response body for streaming: %w", err) + } + } + + closeEnv := protocol.Envelope{ + Type: protocol.MessageTypeStreamClose, + StreamClose: &protocol.StreamClose{ + ID: streamID, + Error: "", + }, + } + + if err := codec.Encode(sess, &closeEnv); err != nil { + logReq.Error("failed to encode stream response close envelope", logging.Fields{ + "error": err.Error(), + }) + return err + } + + logReq.Info("stream http response sent to server", logging.Fields{ + "status": statusCode, + "elapsed_ms": time.Since(start).Milliseconds(), + "error": "", + }) + + return nil } // forwardToLocal 는 protocol.Request 를 로컬 HTTP 요청으로 변환하고 protocol.Response 를 채웁니다. (ko) @@ -240,3 +530,15 @@ func (p *ClientProxy) forwardToLocal(ctx context.Context, preq *protocol.Request return nil } + +// firstHeaderValue 는 주어진 키의 첫 번째 헤더 값을 반환하고, 없으면 기본값을 반환합니다. (ko) +// firstHeaderValue returns the first header value for a key, or a default if absent. (en) +func firstHeaderValue(hdr map[string][]string, key, def string) string { + if hdr == nil { + return def + } + if vs, ok := hdr[key]; ok && len(vs) > 0 { + return vs[0] + } + return def +}