diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index 1ed1adb..da0dbe4 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -101,17 +101,22 @@ This document describes the overall architecture of the HopGate system. (en) ### 2.5 `internal/protocol` -- 서버와 클라이언트가 DTLS 위에서 주고받는 HTTP 요청/응답 메시지 포맷을 정의합니다. (ko) +- 서버와 클라이언트가 DTLS 위에서 주고받는 HTTP 요청/응답 메시지 포맷을 정의합니다. (ko) - Defines HTTP request/response message formats exchanged over DTLS between server and clients. (en) -- 요청 메시지 / Request message: (ko/en) +- 요청 메시지 / Request message: (ko/en) - `RequestID`, `ClientID`, `ServiceName`, `Method`, `URL`, `Header`, `Body`. (ko/en) -- 응답 메시지 / Response message: (ko/en) +- 응답 메시지 / Response message: (ko/en) - `RequestID`, `Status`, `Header`, `Body`, `Error`. (ko/en) -- 인코딩은 초기에는 JSON을 사용하고, 필요 시 MsgPack/Protobuf 등으로 확장 가능합니다. (ko) -- Encoding starts with JSON and may be extended to MsgPack/Protobuf later. (en) +- 인코딩은 현재 JSON 을 사용하며, 각 HTTP 요청/응답을 하나의 Envelope 로 감싸 DTLS 위에서 전송합니다. (ko) +- Encoding currently uses JSON, wrapping each HTTP request/response in a single Envelope sent over DTLS. (en) + +- 향후에는 `Envelope.StreamOpen` / `StreamData` / `StreamClose` 필드를 활용한 **스트림/프레임 기반 프로토콜**로 전환하여, + 대용량 HTTP 바디도 DTLS/UDP MTU 한계를 넘지 않도록 chunk 단위로 안전하게 전송할 계획입니다. (ko) +- In the future, the plan is to move to a **stream/frame-based protocol** using `Envelope.StreamOpen` / `StreamData` / `StreamClose`, + so that large HTTP bodies can be safely chunked under DTLS/UDP MTU limits. (en) --- @@ -217,11 +222,13 @@ The server decodes the `protocol.Response`, converts it back into an HTTP respon - `internal/acme` 에 ACME 클라이언트(certmagic 또는 lego 등)를 연결해 TLS 인증서 발급/갱신을 구현합니다. (ko) - Wire an ACME client (certmagic, lego, etc.) into `internal/acme` to implement TLS certificate issuance/renewal. (en) -- `internal/dtls` 에서 pion/dtls 기반 DTLS 전송 계층 및 핸드셰이크를 안정화합니다. (ko) +- `internal/dtls` 에서 pion/dtls 기반 DTLS 전송 계층 및 핸드셰이크를 안정화합니다. (ko) - Stabilize the pion/dtls-based DTLS transport and handshake logic in `internal/dtls`. (en) -- `internal/protocol` 과 `internal/proxy` 를 통해 실제 HTTP 터널링을 구현하고, 라우팅 규칙을 구성합니다. (ko) -- Implement real HTTP tunneling and routing rules via `internal/protocol` and `internal/proxy`. (en) +- `internal/protocol` 과 `internal/proxy` 를 통해 실제 HTTP 터널링을 구현하고, + 단일 JSON Envelope 기반 모델에서 `StreamOpen` / `StreamData` / `StreamClose` 중심의 스트림 기반 DTLS 터널링으로 전환합니다. (ko) +- Implement real HTTP tunneling and routing rules via `internal/protocol` and `internal/proxy`, + and move from a single JSON-Envelope model to a stream-based DTLS tunneling model built around `StreamOpen` / `StreamData` / `StreamClose`. (en) - `internal/admin` + `ent` + PostgreSQL 을 사용해 Domain 등록/해제 및 클라이언트 API Key 발급을 완성합니다. (ko) - Complete domain registration/unregistration and client API key issuing using `internal/admin` + `ent` + PostgreSQL. (en) diff --git a/Makefile b/Makefile index f9afeea..dd3911d 100644 --- a/Makefile +++ b/Makefile @@ -66,3 +66,21 @@ docker-server: @echo "Building server Docker image..." docker build -f Dockerfile.server -t hop-gate-server:$(VERSION) . +# --- Protobuf code generation ------------------------------------------------- +# Requires: +# - protoc (https://grpc.io/docs/protoc-installation/) +# - protoc-gen-go (go install google.golang.org/protobuf/cmd/protoc-gen-go@latest) +# +# Generates Go types under internal/protocol/pb from internal/protocol/hopgate_stream.proto. +# NOTE: +# - go_package in hopgate_stream.proto is set to: +# github.com/dalbodeule/hop-gate/internal/protocol/pb;protocolpb +# - With --go_out=. (without paths=source_relative), protoc will place the +# generated file under internal/protocol/pb according to go_package. +proto: + @echo "Generating Go code from Protobuf schemas..." + protoc \ + --go_out=. \ + internal/protocol/hopgate_stream.proto + @echo "Protobuf generation completed." + diff --git a/README.md b/README.md index 4bb7b35..3fce420 100644 --- a/README.md +++ b/README.md @@ -4,20 +4,23 @@ ## 1. 프로젝트 개요 (Project Overview) -HopGate는 공인 서버와 여러 프라이빗 네트워크 클라이언트 사이에 **DTLS 기반 HTTP 터널**을 제공하는 게이트웨이입니다. +HopGate는 공인 서버와 여러 프라이빗 네트워크 클라이언트 사이에 **DTLS 기반 HTTP 터널**을 제공하는 게이트웨이입니다. HopGate is a gateway that provides a **DTLS-based HTTP tunnel** between a public server and multiple private-network clients. 주요 특징 (Key features): -- 서버는 80/443 포트를 점유하고, ACME(Let's Encrypt 등)로 TLS 인증서를 자동 발급/갱신합니다. +- 서버는 80/443 포트를 점유하고, ACME(Let's Encrypt 등)로 TLS 인증서를 자동 발급/갱신합니다. The server listens on ports 80/443 and automatically issues/renews TLS certificates via ACME (e.g. Let's Encrypt). -- 서버–클라이언트 간 전송은 DTLS 위에서 이루어지며, HTTP 요청/응답을 메시지로 터널링합니다. - Transport between server and clients uses DTLS, tunneling HTTP request/response messages. -- 관리 Plane(REST API)을 통해 도메인 등록/해제 및 클라이언트 API Key 발급을 수행합니다. +- 서버–클라이언트 간 전송은 DTLS 위에서 이루어지며, 현재는 HTTP 요청/응답을 **Protobuf 기반 length-prefixed Envelope** 로 터널링합니다. + Transport between server and clients uses DTLS; HTTP requests/responses are tunneled as **Protobuf-based, length-prefixed envelopes**. +- 관리 Plane(REST API)을 통해 도메인 등록/해제 및 클라이언트 API Key 발급을 수행합니다. An admin management plane (REST API) handles domain registration/unregistration and client API key issuance. -- 로그는 JSON 구조 형태로 stdout 에 출력되며, Prometheus + Loki + Grafana 스택에 친화적으로 설계되었습니다. +- 로그는 JSON 구조 형태로 stdout 에 출력되며, Prometheus + Loki + Grafana 스택에 친화적으로 설계되었습니다. Logs are JSON-structured and designed to work well with a Prometheus + Loki + Grafana stack. +> 참고: 대용량 HTTP 바디에 대해서는 DTLS/UDP MTU 한계 때문에 **단일 Envelope** 로는 한계가 있으므로, `progress.md` 의 3.3A 섹션에 정리된 것처럼 `StreamOpen` / `StreamData` / `StreamClose` 기반의 스트림/프레임 터널링으로 점진적으로 전환할 예정입니다. (ko) +> Note: For very large HTTP bodies, a single-envelope model still hits DTLS/UDP MTU limits. As outlined in section 3.3A of `progress.md`, the plan is to gradually move to a stream/frame-based tunneling model using `StreamOpen` / `StreamData` / `StreamClose`. (en) + 아키텍처 세부 내용은 [`ARCHITECTURE.md`](ARCHITECTURE.md)에 정리되어 있습니다. Detailed architecture is documented in [`ARCHITECTURE.md`](ARCHITECTURE.md). @@ -38,10 +41,10 @@ Detailed architecture is documented in [`ARCHITECTURE.md`](ARCHITECTURE.md). ### 3.1 의존성 (Dependencies) -- Go 1.21+ 권장 (go.mod 상 버전보다 최신 Go 사용을 추천) +- Go 1.21+ 권장 (go.mod 상 버전보다 최신 Go 사용을 추천) Go 1.21+ is recommended (even if go.mod specifies an older minor). -- PostgreSQL (추후 DomainValidator 실제 구현 시 필요) - PostgreSQL (only required when implementing real domain validation). +- PostgreSQL (관리 Plane + 실제 DomainValidator 에 필수) + PostgreSQL (required for the admin plane and the real DomainValidator). Go 모듈 의존성 설치 / 정리는 다음으로 수행할 수 있습니다: You can install/cleanup Go module deps via: @@ -104,11 +107,11 @@ HOP_CLIENT_LOCAL_TARGET=127.0.0.1:8080 HOP_CLIENT_DEBUG=true ``` -- `HOP_CLIENT_SERVER_ADDR` : DTLS 서버 주소 (예: `localhost:8443`) +- `HOP_CLIENT_SERVER_ADDR` : DTLS 서버 주소 (예: `localhost:8443`) DTLS server address, e.g. `localhost:8443`. -- `HOP_CLIENT_DOMAIN` / `HOP_CLIENT_API_KEY` : 관리 Plane 에서 발급받은 도메인/키 (현재는 DummyValidator 로 아무 값이나 허용) - Domain and API key issued by the admin plane (currently any values are accepted by DummyValidator). -- `HOP_CLIENT_LOCAL_TARGET` : 실제로 HTTP 요청을 보낼 로컬 서버 주소 +- `HOP_CLIENT_DOMAIN` / `HOP_CLIENT_API_KEY` : 관리 Plane 에서 발급받은 도메인/키 (실제 ent + PostgreSQL 기반 DomainValidator 에 의해 검증) + Domain and API key issued by the admin plane (validated by a real ent + PostgreSQL based DomainValidator). +- `HOP_CLIENT_LOCAL_TARGET` : 실제로 HTTP 요청을 보낼 로컬 서버 주소 Local HTTP target address. - `HOP_CLIENT_DEBUG=true` : 서버 인증서 체인 검증을 스킵(InsecureSkipVerify)하여 self-signed 인증서를 신뢰 Skips server certificate chain verification (InsecureSkipVerify) and trusts the self-signed cert. @@ -162,10 +165,14 @@ For implementation skeleton, see [`internal/admin`](internal/admin) and [`ent/sc ## 6. 주의사항 (Caveats) -- `Debug=true` 설정은 **개발/테스트 용도**입니다. self-signed 인증서 및 InsecureSkipVerify 사용은 프로덕션 환경에서 절대 사용하지 마세요. +- `Debug=true` 설정은 **개발/테스트 용도**입니다. self-signed 인증서 및 InsecureSkipVerify 사용은 프로덕션 환경에서 절대 사용하지 마세요. `Debug=true` is strictly for development/testing. Do not use self-signed certs or InsecureSkipVerify in production. -- 실제 운영 시에는 ACME 기반 인증서, PostgreSQL + ent 기반 DomainValidator, Proxy 레이어 연동 등을 완성해야 합니다. - For production you must wire ACME certificates, a PostgreSQL+ent-based DomainValidator, and the proxy layer. +- 현재 버전은 ACME 기반 인증서, PostgreSQL + ent 기반 DomainValidator, Proxy 레이어가 기본적으로 연동되어 있으나, + 대용량 HTTP 바디에 대해서는 JSON 단일 메시지 기반 터널링 특성상 DTLS/UDP MTU 한계에 부딪힐 수 있습니다. + 스트림/프레임 기반 DTLS 터널링으로의 전환 및 하드닝 작업은 `progress.md` 에 정의된 다음 단계에 포함되어 있습니다. (ko) + The current version wires ACME certificates, a PostgreSQL+ent-based DomainValidator, and the proxy layer by default, + but for very large HTTP bodies the JSON single-message tunneling model can still hit DTLS/UDP MTU limits. + Moving to a stream/frame-based DTLS tunneling model and further hardening are tracked as next steps in `progress.md`. (en) -HopGate는 아직 초기 단계의 실험적 프로젝트입니다. API 및 동작은 언제든지 변경될 수 있습니다. +HopGate는 아직 초기 단계의 실험적 프로젝트입니다. API 및 동작은 언제든지 변경될 수 있습니다. HopGate is still experimental; APIs and behavior may change at any time. diff --git a/cmd/server/main.go b/cmd/server/main.go index f71258b..b898f2b 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -1,10 +1,9 @@ package main import ( - "bufio" + "bytes" "context" "crypto/tls" - "encoding/json" "fmt" "io" stdfs "io/fs" @@ -31,8 +30,9 @@ import ( ) type dtlsSessionWrapper struct { - sess dtls.Session - mu sync.Mutex + sess dtls.Session + mu sync.Mutex + nextStreamID uint64 } // canonicalizeDomainForDNS 는 DTLS 핸드셰이크에서 전달된 도메인 문자열을 @@ -157,8 +157,10 @@ func parseExpectedIPsFromEnv(logger logging.Logger, envKey string) []net.IP { return result } -// ForwardHTTP 는 단일 HTTP 요청을 DTLS 세션으로 포워딩하고 응답을 돌려받습니다. -// ForwardHTTP forwards a single HTTP request over the DTLS session and returns the response. +// ForwardHTTP 는 HTTP 요청을 DTLS 세션 위의 StreamOpen/StreamData/StreamClose 프레임으로 전송하고, +// 역방향 스트림 응답을 수신해 protocol.Response 로 반환합니다. (ko) +// ForwardHTTP forwards an HTTP request over the DTLS session using StreamOpen/StreamData/StreamClose +// frames and reconstructs the reverse stream into a protocol.Response. (en) func (w *dtlsSessionWrapper) ForwardHTTP(ctx context.Context, logger logging.Logger, req *http.Request, serviceName string) (*protocol.Response, error) { w.mu.Lock() defer w.mu.Unlock() @@ -167,88 +169,220 @@ func (w *dtlsSessionWrapper) ForwardHTTP(ctx context.Context, logger logging.Log ctx = context.Background() } - // 요청 본문 읽기 - var body []byte - if req.Body != nil { - b, err := io.ReadAll(req.Body) - if err != nil { - return nil, err - } - body = b - } + codec := protocol.DefaultCodec - // 간단한 RequestID 생성 (실제 서비스에서는 UUID 등을 사용하는 것이 좋음) - requestID := time.Now().UTC().Format("20060102T150405.000000000") - - httpReq := &protocol.Request{ - RequestID: requestID, - ClientID: "", // TODO: 클라이언트 식별자 도입 시 채우기 - ServiceName: serviceName, - Method: req.Method, - URL: req.URL.String(), - Header: req.Header.Clone(), - Body: body, - } + // 세션 내에서 고유한 StreamID 를 생성합니다. (ko) + // Generate a unique StreamID for this HTTP request within the DTLS session. (en) + streamID := w.nextHTTPStreamID() log := logger.With(logging.Fields{ "component": "http_to_dtls", - "request_id": requestID, + "request_id": string(streamID), "method": req.Method, "url": req.URL.String(), }) - log.Info("forwarding http request over dtls", logging.Fields{ + log.Info("forwarding http request over dtls (stream mode)", logging.Fields{ "host": req.Host, "scheme": req.URL.Scheme, }) - // HTTP 요청을 Envelope 로 감싸서 전송합니다. - env := &protocol.Envelope{ - Type: protocol.MessageTypeHTTP, - HTTPRequest: httpReq, + // 요청 헤더를 복사하고 pseudo-header 로 HTTP 메타데이터를 추가합니다. (ko) + // Copy request headers and attach HTTP metadata as pseudo-headers. (en) + hdr := make(map[string][]string, len(req.Header)+3) + for k, vs := range req.Header { + hdr[k] = append([]string(nil), vs...) + } + hdr[protocol.HeaderKeyMethod] = []string{req.Method} + if req.URL != nil { + hdr[protocol.HeaderKeyURL] = []string{req.URL.String()} + } + host := req.Host + if host == "" && req.URL != nil { + host = req.URL.Host + } + if host != "" { + hdr[protocol.HeaderKeyHost] = []string{host} } - enc := json.NewEncoder(w.sess) - if err := enc.Encode(env); err != nil { - log.Error("failed to encode http envelope", logging.Fields{ + // StreamOpen 전송: 어떤 서비스로 라우팅해야 하는지와 초기 헤더를 전달합니다. (ko) + // Send StreamOpen to indicate which service to route to and initial headers. (en) + openEnv := &protocol.Envelope{ + Type: protocol.MessageTypeStreamOpen, + StreamOpen: &protocol.StreamOpen{ + ID: streamID, + Service: serviceName, + TargetAddr: "", + Header: hdr, + }, + } + if err := codec.Encode(w.sess, openEnv); err != nil { + log.Error("failed to encode stream_open envelope", logging.Fields{ "error": err.Error(), }) return nil, err } - // 클라이언트로부터 HTTP 응답 Envelope 를 수신합니다. - var respEnv protocol.Envelope + // 요청 바디를 4KiB(StreamChunkSize) 단위로 잘라 StreamData 프레임으로 전송합니다. (ko) + // Chunk the request body into 4KiB (StreamChunkSize) StreamData frames. (en) + var seq uint64 + if req.Body != nil { + buf := make([]byte, protocol.StreamChunkSize) + for { + n, err := req.Body.Read(buf) + if n > 0 { + dataCopy := append([]byte(nil), buf[:n]...) + dataEnv := &protocol.Envelope{ + Type: protocol.MessageTypeStreamData, + StreamData: &protocol.StreamData{ + ID: streamID, + Seq: seq, + Data: dataCopy, + }, + } + if err2 := codec.Encode(w.sess, dataEnv); err2 != nil { + log.Error("failed to encode stream_data envelope", logging.Fields{ + "error": err2.Error(), + }) + return nil, err2 + } + seq++ + } + if err == io.EOF { + break + } + if err != nil { + return nil, fmt.Errorf("read http request body for streaming: %w", err) + } + } + } - // NOTE: pion/dtls 는 복호화된 애플리케이션 데이터를 호출자가 제공한 버퍼에 채웁니다. - // 기본 JSON 디코더 버퍼만 사용하면 큰 HTTP 응답/Envelope 에서 "dtls: buffer too small" - // 오류가 발생할 수 있으므로, 충분히 큰 bufio.Reader(64KiB)를 사용합니다. (ko) - // NOTE: pion/dtls decrypts application data into the buffer provided by the caller. - // Using only the default JSON decoder buffer can cause "dtls: buffer too small" - // errors for large HTTP responses/envelopes, so we wrap the session with a - // reasonably large bufio.Reader (64KiB). (en) - dec := json.NewDecoder(bufio.NewReaderSize(w.sess, 64*1024)) - if err := dec.Decode(&respEnv); err != nil { - log.Error("failed to decode http envelope", logging.Fields{ + // 바디 종료를 알리는 StreamClose 를 전송합니다. (ko) + // Send StreamClose to mark the end of the request body. (en) + closeReqEnv := &protocol.Envelope{ + Type: protocol.MessageTypeStreamClose, + StreamClose: &protocol.StreamClose{ + ID: streamID, + Error: "", + }, + } + if err := codec.Encode(w.sess, closeReqEnv); err != nil { + log.Error("failed to encode request stream_close envelope", logging.Fields{ "error": err.Error(), }) return nil, err } - if respEnv.Type != protocol.MessageTypeHTTP || respEnv.HTTPResponse == nil { - log.Error("received non-http envelope from client", logging.Fields{ - "type": respEnv.Type, - }) - return nil, fmt.Errorf("unexpected envelope type %q or empty http_response", respEnv.Type) + // 클라이언트로부터 역방향 스트림 응답을 수신합니다. (ko) + // Receive reverse stream response (StreamOpen + StreamData* + StreamClose). (en) + var ( + resp protocol.Response + bodyBuf bytes.Buffer + gotOpen bool + statusCode = http.StatusOK + ) + + resp.RequestID = string(streamID) + resp.Header = make(map[string][]string) + + for { + var env protocol.Envelope + if err := codec.Decode(w.sess, &env); err != nil { + log.Error("failed to decode stream response envelope", logging.Fields{ + "error": err.Error(), + }) + return nil, err + } + + switch env.Type { + case protocol.MessageTypeStreamOpen: + so := env.StreamOpen + if so == nil { + return nil, fmt.Errorf("stream_open response payload is nil") + } + if so.ID != streamID { + return nil, fmt.Errorf("unexpected stream_open for id %q (expected %q)", so.ID, streamID) + } + // 상태 코드 및 헤더 복원 (pseudo-header 제거). (ko) + // Restore status code and headers (strip pseudo-headers). (en) + statusStr := firstHeaderValue(so.Header, protocol.HeaderKeyStatus, strconv.Itoa(http.StatusOK)) + if sc, err := strconv.Atoi(statusStr); err == nil && sc > 0 { + statusCode = sc + } + for k, vs := range so.Header { + if k == protocol.HeaderKeyMethod || + k == protocol.HeaderKeyURL || + k == protocol.HeaderKeyHost || + k == protocol.HeaderKeyStatus { + continue + } + resp.Header[k] = append([]string(nil), vs...) + } + gotOpen = true + + case protocol.MessageTypeStreamData: + sd := env.StreamData + if sd == nil { + return nil, fmt.Errorf("stream_data response payload is nil") + } + if sd.ID != streamID { + return nil, fmt.Errorf("unexpected stream_data for id %q (expected %q)", sd.ID, streamID) + } + if len(sd.Data) > 0 { + if _, err := bodyBuf.Write(sd.Data); err != nil { + return nil, fmt.Errorf("buffer stream_data response: %w", err) + } + } + + case protocol.MessageTypeStreamClose: + sc := env.StreamClose + if sc == nil { + return nil, fmt.Errorf("stream_close response payload is nil") + } + if sc.ID != streamID { + return nil, fmt.Errorf("unexpected stream_close for id %q (expected %q)", sc.ID, streamID) + } + // 스트림 종료: 지금까지 수신한 헤더/바디로 protocol.Response 를 완성합니다. (ko) + // Stream finished: complete protocol.Response using collected headers/body. (en) + resp.Status = statusCode + resp.Body = bodyBuf.Bytes() + resp.Error = sc.Error + + log.Info("received stream http response over dtls", logging.Fields{ + "status": resp.Status, + "error": resp.Error, + }) + if !gotOpen { + return nil, fmt.Errorf("received stream_close without prior stream_open for stream %q", streamID) + } + return &resp, nil + + default: + return nil, fmt.Errorf("unexpected envelope type %q in stream response", env.Type) + } } +} - protoResp := respEnv.HTTPResponse +// nextHTTPStreamID 는 DTLS 세션 내 HTTP 요청에 사용할 고유 StreamID 를 생성합니다. (ko) +// nextHTTPStreamID generates a unique StreamID for HTTP requests on this DTLS session. (en) +func (w *dtlsSessionWrapper) nextHTTPStreamID() protocol.StreamID { + id := w.nextStreamID + w.nextStreamID++ + return protocol.StreamID(fmt.Sprintf("http-%d", id)) +} - log.Info("received dtls response", logging.Fields{ - "status": protoResp.Status, - "error": protoResp.Error, - }) - - return protoResp, nil +// firstHeaderValue 는 map[string][]string 형태의 헤더에서 첫 번째 값을 반환하고, +// 값이 없으면 기본값을 반환합니다. (ko) +// firstHeaderValue returns the first value for a header key in map[string][]string, +// or the provided default if the key is missing or empty. (en) +func firstHeaderValue(hdr map[string][]string, key, def string) string { + if hdr == nil { + return def + } + if vs, ok := hdr[key]; ok && len(vs) > 0 { + return vs[0] + } + return def } var ( @@ -277,8 +411,8 @@ var hopGateOwnedHeaders = map[string]struct{}{ "Referrer-Policy": {}, } -// writeErrorPage 는 주요 HTTP 에러 코드(400/404/500/525)에 대해 정적 HTML 에러 페이지를 렌더링합니다. (ko) -// writeErrorPage renders static HTML error pages for key HTTP error codes (400/404/500/525). (en) +// writeErrorPage 는 주요 HTTP 에러 코드(400/404/500/502/504/525)에 대해 정적 HTML 에러 페이지를 렌더링합니다. (ko) +// writeErrorPage renders static HTML error pages for key HTTP error codes (400/404/500/502/504/525). (en) // // 템플릿 로딩 우선순위: (ko) // 1. HOP_ERROR_PAGES_DIR/.html (또는 ./errors/.html) (ko) @@ -294,9 +428,31 @@ func writeErrorPage(w http.ResponseWriter, r *http.Request, status int) { setSecurityAndIdentityHeaders(w, r) } - // Delegates actual HTML rendering to internal/errorpages. (en) - // 실제 HTML 렌더링은 internal/errorpages 패키지에 위임합니다. (ko) - errorpages.Render(w, r, status) + // 4xx / 5xx 대역에 대한 템플릿 매핑 규칙: (ko) + // - 400 series: 400.html 로 렌더링 (단, 404 는 404.html 사용) (ko) + // - 500 series: 500.html 로 렌더링 (단, 502/504/525 는 개별 템플릿 사용) (ko) + // + // Mapping rules for 4xx / 5xx ranges: (en) + // - 400 series: render using 400.html (except 404 uses 404.html). (en) + // - 500 series: render using 500.html (except 502/504/525 which have dedicated templates). (en) + mapped := status + switch { + case status >= 400 && status < 500: + if status != http.StatusBadRequest && status != http.StatusNotFound { + mapped = http.StatusBadRequest + } + case status >= 500 && status < 600: + if status != http.StatusInternalServerError && + status != http.StatusBadGateway && + status != errorpages.StatusGatewayTimeout && + status != errorpages.StatusTLSHandshakeFailed { + mapped = http.StatusInternalServerError + } + } + + // Delegates actual HTML rendering to internal/errorpages with mapped status. (en) + // 실제 HTML 렌더링은 매핑된 상태 코드로 internal/errorpages 패키지에 위임합니다. (ko) + errorpages.Render(w, r, mapped) } // setSecurityAndIdentityHeaders 는 HopGate 에서 공통으로 추가하는 보안/식별 헤더를 설정합니다. (ko) diff --git a/internal/errorpages/assets/errors.css b/internal/errorpages/assets/errors.css index ee70008..9c6e084 100644 --- a/internal/errorpages/assets/errors.css +++ b/internal/errorpages/assets/errors.css @@ -1,2 +1,2 @@ /*! tailwindcss v4.1.17 | MIT License | https://tailwindcss.com */ -@layer properties{@supports (((-webkit-hyphens:none)) and (not (margin-trim:inline))) or ((-moz-orient:inline) and (not (color:rgb(from red r g b)))){*,:before,:after,::backdrop{--tw-tracking:initial;--tw-blur:initial;--tw-brightness:initial;--tw-contrast:initial;--tw-grayscale:initial;--tw-hue-rotate:initial;--tw-invert:initial;--tw-opacity:initial;--tw-saturate:initial;--tw-sepia:initial;--tw-drop-shadow:initial;--tw-drop-shadow-color:initial;--tw-drop-shadow-alpha:100%;--tw-drop-shadow-size:initial}}}.visible{visibility:visible}.absolute{position:absolute}.fixed{position:fixed}.static{position:static}.contents{display:contents}.flex{display:flex}.inline-flex{display:inline-flex}.table{display:table}.min-h-screen{min-height:100vh}.w-\[240px\]{width:240px}.w-full{width:100%}.flex-col{flex-direction:column}.items-baseline{align-items:baseline}.items-center{align-items:center}.justify-center{justify-content:center}.text-center{text-align:center}.tracking-\[0\.25em\]{--tw-tracking:.25em;letter-spacing:.25em}.uppercase{text-transform:uppercase}.opacity-90{opacity:.9}.filter{filter:var(--tw-blur,)var(--tw-brightness,)var(--tw-contrast,)var(--tw-grayscale,)var(--tw-hue-rotate,)var(--tw-invert,)var(--tw-saturate,)var(--tw-sepia,)var(--tw-drop-shadow,)}@property --tw-tracking{syntax:"*";inherits:false}@property --tw-blur{syntax:"*";inherits:false}@property --tw-brightness{syntax:"*";inherits:false}@property --tw-contrast{syntax:"*";inherits:false}@property --tw-grayscale{syntax:"*";inherits:false}@property --tw-hue-rotate{syntax:"*";inherits:false}@property --tw-invert{syntax:"*";inherits:false}@property --tw-opacity{syntax:"*";inherits:false}@property --tw-saturate{syntax:"*";inherits:false}@property --tw-sepia{syntax:"*";inherits:false}@property --tw-drop-shadow{syntax:"*";inherits:false}@property --tw-drop-shadow-color{syntax:"*";inherits:false}@property --tw-drop-shadow-alpha{syntax:"";inherits:false;initial-value:100%}@property --tw-drop-shadow-size{syntax:"*";inherits:false} \ No newline at end of file +@layer properties{@supports (((-webkit-hyphens:none)) and (not (margin-trim:inline))) or ((-moz-orient:inline) and (not (color:rgb(from red r g b)))){*,:before,:after,::backdrop{--tw-tracking:initial;--tw-blur:initial;--tw-brightness:initial;--tw-contrast:initial;--tw-grayscale:initial;--tw-hue-rotate:initial;--tw-invert:initial;--tw-opacity:initial;--tw-saturate:initial;--tw-sepia:initial;--tw-drop-shadow:initial;--tw-drop-shadow-color:initial;--tw-drop-shadow-alpha:100%;--tw-drop-shadow-size:initial}}}.visible{visibility:visible}.absolute{position:absolute}.fixed{position:fixed}.static{position:static}.container{width:100%}.contents{display:contents}.flex{display:flex}.inline-flex{display:inline-flex}.table{display:table}.min-h-screen{min-height:100vh}.w-\[240px\]{width:240px}.w-full{width:100%}.flex-col{flex-direction:column}.items-baseline{align-items:baseline}.items-center{align-items:center}.justify-center{justify-content:center}.text-center{text-align:center}.tracking-\[0\.25em\]{--tw-tracking:.25em;letter-spacing:.25em}.uppercase{text-transform:uppercase}.opacity-90{opacity:.9}.filter{filter:var(--tw-blur,)var(--tw-brightness,)var(--tw-contrast,)var(--tw-grayscale,)var(--tw-hue-rotate,)var(--tw-invert,)var(--tw-saturate,)var(--tw-sepia,)var(--tw-drop-shadow,)}@property --tw-tracking{syntax:"*";inherits:false}@property --tw-blur{syntax:"*";inherits:false}@property --tw-brightness{syntax:"*";inherits:false}@property --tw-contrast{syntax:"*";inherits:false}@property --tw-grayscale{syntax:"*";inherits:false}@property --tw-hue-rotate{syntax:"*";inherits:false}@property --tw-invert{syntax:"*";inherits:false}@property --tw-opacity{syntax:"*";inherits:false}@property --tw-saturate{syntax:"*";inherits:false}@property --tw-sepia{syntax:"*";inherits:false}@property --tw-drop-shadow{syntax:"*";inherits:false}@property --tw-drop-shadow-color{syntax:"*";inherits:false}@property --tw-drop-shadow-alpha{syntax:"";inherits:false;initial-value:100%}@property --tw-drop-shadow-size{syntax:"*";inherits:false} \ No newline at end of file diff --git a/internal/errorpages/templates/502.html b/internal/errorpages/templates/502.html new file mode 100644 index 0000000..fc46425 --- /dev/null +++ b/internal/errorpages/templates/502.html @@ -0,0 +1,32 @@ + + + + + 502 Bad Gateway - HopGate + + + + +
+
+ HopGate +

HopGate

+
+ +
+ 502 + Bad Gateway +
+ +

+ HopGate could not get a valid response from the backend service.
+ HopGate가 백엔드 서비스로부터 유효한 응답을 받지 못했습니다. +

+ +
+ This may happen when the origin is down, misconfigured, or responding with invalid data.
+ 원본 서버가 다운되었거나 설정이 잘못되었거나, 잘못된 응답을 보내는 경우 발생할 수 있습니다. +
+
+ + \ No newline at end of file diff --git a/internal/protocol/codec.go b/internal/protocol/codec.go new file mode 100644 index 0000000..dcbdfdf --- /dev/null +++ b/internal/protocol/codec.go @@ -0,0 +1,399 @@ +package protocol + +import ( + "bufio" + "encoding/binary" + "encoding/json" + "fmt" + "io" + + protocolpb "github.com/dalbodeule/hop-gate/internal/protocol/pb" + "google.golang.org/protobuf/proto" +) + +// defaultDecoderBufferSize 는 pion/dtls 가 복호화한 애플리케이션 데이터를 +// JSON 디코더가 안전하게 처리할 수 있도록 사용하는 버퍼 크기입니다. +// This matches existing 64KiB readers used around DTLS sessions (used by the JSON codec). +const defaultDecoderBufferSize = 64 * 1024 + +// maxProtoEnvelopeBytes 는 단일 Protobuf Envelope 의 최대 크기에 대한 보수적 상한입니다. +// 아직 하드 리미트로 사용하지는 않지만, 향후 방어적 체크에 사용할 수 있습니다. +const maxProtoEnvelopeBytes = 512 * 1024 // 512KiB, 충분히 여유 있는 값 + +// WireCodec 는 protocol.Envelope 의 직렬화/역직렬화를 추상화합니다. +// JSON, Protobuf, length-prefixed binary 등으로 교체할 때 이 인터페이스만 유지하면 됩니다. +type WireCodec interface { + Encode(w io.Writer, env *Envelope) error + Decode(r io.Reader, env *Envelope) error +} + +// jsonCodec 은 JSON 기반 WireCodec 구현입니다. +// JSON 직렬화를 계속 사용하고 싶을 때를 위해 남겨둡니다. +type jsonCodec struct{} + +// Encode 는 Envelope 를 JSON 으로 인코딩해 작성합니다. +// Encode encodes an Envelope as JSON to the given writer. +func (jsonCodec) Encode(w io.Writer, env *Envelope) error { + enc := json.NewEncoder(w) + return enc.Encode(env) +} + +// Decode 는 DTLS 세션에서 읽은 데이터를 JSON Envelope 로 디코딩합니다. +// pion/dtls 의 버퍼 특성 때문에, 충분히 큰 bufio.Reader 로 감싸서 사용합니다. +// Decode decodes an Envelope from JSON using a buffered reader on top of the DTLS session. +func (jsonCodec) Decode(r io.Reader, env *Envelope) error { + dec := json.NewDecoder(bufio.NewReaderSize(r, defaultDecoderBufferSize)) + return dec.Decode(env) +} + +// protobufCodec 은 Protobuf + length-prefix framing 기반 WireCodec 구현입니다. +// 한 Envelope 당 [4바이트 big-endian 길이] + [protobuf bytes] 형태로 인코딩합니다. +type protobufCodec struct{} + +// Encode 는 Envelope 를 Protobuf Envelope 로 변환한 뒤, length-prefix 프레이밍으로 기록합니다. +// Encode encodes an Envelope as a length-prefixed protobuf message. +func (protobufCodec) Encode(w io.Writer, env *Envelope) error { + pbEnv, err := toProtoEnvelope(env) + if err != nil { + return err + } + + // Body/stream payload 하드 리밋: 4KiB (StreamChunkSize). + // HTTP 단일 Envelope 및 스트림 기반 프레임 모두에서 payload 가 이 값을 넘지 않도록 강제합니다. + // Enforce a 4KiB hard limit (StreamChunkSize) for HTTP bodies and stream payloads. + switch env.Type { + case MessageTypeHTTP: + if env.HTTPRequest != nil && len(env.HTTPRequest.Body) > int(StreamChunkSize) { + return fmt.Errorf("protobuf codec: http request body too large: %d bytes (max %d)", len(env.HTTPRequest.Body), StreamChunkSize) + } + if env.HTTPResponse != nil && len(env.HTTPResponse.Body) > int(StreamChunkSize) { + return fmt.Errorf("protobuf codec: http response body too large: %d bytes (max %d)", len(env.HTTPResponse.Body), StreamChunkSize) + } + case MessageTypeStreamData: + if env.StreamData != nil && len(env.StreamData.Data) > int(StreamChunkSize) { + return fmt.Errorf("protobuf codec: stream data payload too large: %d bytes (max %d)", len(env.StreamData.Data), StreamChunkSize) + } + } + + data, err := proto.Marshal(pbEnv) + if err != nil { + return fmt.Errorf("protobuf marshal envelope: %w", err) + } + if len(data) == 0 { + return fmt.Errorf("protobuf codec: empty marshaled envelope") + } + + var lenBuf [4]byte + if len(data) > int(^uint32(0)) { + return fmt.Errorf("protobuf codec: envelope too large: %d bytes", len(data)) + } + binary.BigEndian.PutUint32(lenBuf[:], uint32(len(data))) + + if _, err := w.Write(lenBuf[:]); err != nil { + return fmt.Errorf("protobuf codec: write length prefix: %w", err) + } + if _, err := w.Write(data); err != nil { + return fmt.Errorf("protobuf codec: write payload: %w", err) + } + return nil +} + +// Decode 는 length-prefix 프레임에서 Protobuf Envelope 를 읽어들여 +// 내부 Envelope 구조체로 변환합니다. +// Decode reads a length-prefixed protobuf Envelope and converts it into the internal Envelope. +func (protobufCodec) Decode(r io.Reader, env *Envelope) error { + var lenBuf [4]byte + if _, err := io.ReadFull(r, lenBuf[:]); err != nil { + return fmt.Errorf("protobuf codec: read length prefix: %w", err) + } + n := binary.BigEndian.Uint32(lenBuf[:]) + if n == 0 { + return fmt.Errorf("protobuf codec: zero-length envelope") + } + if n > maxProtoEnvelopeBytes { + return fmt.Errorf("protobuf codec: envelope too large: %d bytes (max %d)", n, maxProtoEnvelopeBytes) + } + + buf := make([]byte, int(n)) + if _, err := io.ReadFull(r, buf); err != nil { + return fmt.Errorf("protobuf codec: read payload: %w", err) + } + + var pbEnv protocolpb.Envelope + if err := proto.Unmarshal(buf, &pbEnv); err != nil { + return fmt.Errorf("protobuf codec: unmarshal envelope: %w", err) + } + + return fromProtoEnvelope(&pbEnv, env) +} + +// DefaultCodec 은 현재 런타임에서 사용하는 기본 WireCodec 입니다. +// 이제 Protobuf 기반 codec 을 기본으로 사용합니다. +var DefaultCodec WireCodec = protobufCodec{} + +// toProtoEnvelope 는 내부 Envelope 구조체를 Protobuf Envelope 로 변환합니다. +// 현재 구현은 HTTP 요청/응답 및 스트림 관련 타입(StreamOpen/StreamData/StreamClose/StreamAck)을 지원합니다. +func toProtoEnvelope(env *Envelope) (*protocolpb.Envelope, error) { + switch env.Type { + case MessageTypeHTTP: + if env.HTTPRequest != nil { + req := env.HTTPRequest + pbReq := &protocolpb.Request{ + RequestId: req.RequestID, + ClientId: req.ClientID, + ServiceName: req.ServiceName, + Method: req.Method, + Url: req.URL, + Header: make(map[string]*protocolpb.HeaderValues, len(req.Header)), + Body: req.Body, + } + for k, vs := range req.Header { + hv := &protocolpb.HeaderValues{ + Values: append([]string(nil), vs...), + } + pbReq.Header[k] = hv + } + return &protocolpb.Envelope{ + Payload: &protocolpb.Envelope_HttpRequest{ + HttpRequest: pbReq, + }, + }, nil + } + if env.HTTPResponse != nil { + resp := env.HTTPResponse + pbResp := &protocolpb.Response{ + RequestId: resp.RequestID, + Status: int32(resp.Status), + Header: make(map[string]*protocolpb.HeaderValues, len(resp.Header)), + Body: resp.Body, + Error: resp.Error, + } + for k, vs := range resp.Header { + hv := &protocolpb.HeaderValues{ + Values: append([]string(nil), vs...), + } + pbResp.Header[k] = hv + } + return &protocolpb.Envelope{ + Payload: &protocolpb.Envelope_HttpResponse{ + HttpResponse: pbResp, + }, + }, nil + } + return nil, fmt.Errorf("protobuf codec: http envelope has neither request nor response") + case MessageTypeStreamOpen: + if env.StreamOpen == nil { + return nil, fmt.Errorf("protobuf codec: stream_open envelope missing payload") + } + so := env.StreamOpen + pbSO := &protocolpb.StreamOpen{ + Id: string(so.ID), + ServiceName: so.Service, + TargetAddr: so.TargetAddr, + Header: make(map[string]*protocolpb.HeaderValues, len(so.Header)), + } + for k, vs := range so.Header { + hv := &protocolpb.HeaderValues{ + Values: append([]string(nil), vs...), + } + pbSO.Header[k] = hv + } + return &protocolpb.Envelope{ + Payload: &protocolpb.Envelope_StreamOpen{ + StreamOpen: pbSO, + }, + }, nil + case MessageTypeStreamData: + if env.StreamData == nil { + return nil, fmt.Errorf("protobuf codec: stream_data envelope missing payload") + } + sd := env.StreamData + pbSD := &protocolpb.StreamData{ + Id: string(sd.ID), + Seq: sd.Seq, + Data: sd.Data, + } + return &protocolpb.Envelope{ + Payload: &protocolpb.Envelope_StreamData{ + StreamData: pbSD, + }, + }, nil + case MessageTypeStreamClose: + if env.StreamClose == nil { + return nil, fmt.Errorf("protobuf codec: stream_close envelope missing payload") + } + sc := env.StreamClose + pbSC := &protocolpb.StreamClose{ + Id: string(sc.ID), + Error: sc.Error, + } + return &protocolpb.Envelope{ + Payload: &protocolpb.Envelope_StreamClose{ + StreamClose: pbSC, + }, + }, nil + case MessageTypeStreamAck: + if env.StreamAck == nil { + return nil, fmt.Errorf("protobuf codec: stream_ack envelope missing payload") + } + sa := env.StreamAck + pbSA := &protocolpb.StreamAck{ + Id: string(sa.ID), + AckSeq: sa.AckSeq, + LostSeqs: append([]uint64(nil), sa.LostSeqs...), + WindowSize: sa.WindowSize, + } + return &protocolpb.Envelope{ + Payload: &protocolpb.Envelope_StreamAck{ + StreamAck: pbSA, + }, + }, nil + default: + return nil, fmt.Errorf("protobuf codec: unsupported envelope type %q", env.Type) + } +} + +// fromProtoEnvelope 는 Protobuf Envelope 를 내부 Envelope 구조체로 변환합니다. +// 현재 구현은 HTTP 요청/응답 및 스트림 관련 타입(StreamOpen/StreamData/StreamClose/StreamAck)을 지원합니다. +func fromProtoEnvelope(pbEnv *protocolpb.Envelope, env *Envelope) error { + switch payload := pbEnv.Payload.(type) { + case *protocolpb.Envelope_HttpRequest: + req := payload.HttpRequest + if req == nil { + return fmt.Errorf("protobuf codec: http_request payload is nil") + } + hdr := make(map[string][]string, len(req.Header)) + for k, hv := range req.Header { + if hv == nil { + continue + } + hdr[k] = append([]string(nil), hv.Values...) + } + env.Type = MessageTypeHTTP + env.HTTPRequest = &Request{ + RequestID: req.RequestId, + ClientID: req.ClientId, + ServiceName: req.ServiceName, + Method: req.Method, + URL: req.Url, + Header: hdr, + Body: append([]byte(nil), req.Body...), + } + env.HTTPResponse = nil + env.StreamOpen = nil + env.StreamData = nil + env.StreamClose = nil + env.StreamAck = nil + return nil + + case *protocolpb.Envelope_HttpResponse: + resp := payload.HttpResponse + if resp == nil { + return fmt.Errorf("protobuf codec: http_response payload is nil") + } + hdr := make(map[string][]string, len(resp.Header)) + for k, hv := range resp.Header { + if hv == nil { + continue + } + hdr[k] = append([]string(nil), hv.Values...) + } + env.Type = MessageTypeHTTP + env.HTTPResponse = &Response{ + RequestID: resp.RequestId, + Status: int(resp.Status), + Header: hdr, + Body: append([]byte(nil), resp.Body...), + Error: resp.Error, + } + env.HTTPRequest = nil + env.StreamOpen = nil + env.StreamData = nil + env.StreamClose = nil + env.StreamAck = nil + return nil + + case *protocolpb.Envelope_StreamOpen: + so := payload.StreamOpen + if so == nil { + return fmt.Errorf("protobuf codec: stream_open payload is nil") + } + hdr := make(map[string][]string, len(so.Header)) + for k, hv := range so.Header { + if hv == nil { + continue + } + hdr[k] = append([]string(nil), hv.Values...) + } + env.Type = MessageTypeStreamOpen + env.StreamOpen = &StreamOpen{ + ID: StreamID(so.Id), + Service: so.ServiceName, + TargetAddr: so.TargetAddr, + Header: hdr, + } + env.StreamData = nil + env.StreamClose = nil + env.StreamAck = nil + env.HTTPRequest = nil + env.HTTPResponse = nil + return nil + + case *protocolpb.Envelope_StreamData: + sd := payload.StreamData + if sd == nil { + return fmt.Errorf("protobuf codec: stream_data payload is nil") + } + env.Type = MessageTypeStreamData + env.StreamData = &StreamData{ + ID: StreamID(sd.Id), + Seq: sd.Seq, + Data: append([]byte(nil), sd.Data...), + } + env.StreamOpen = nil + env.StreamClose = nil + env.StreamAck = nil + env.HTTPRequest = nil + env.HTTPResponse = nil + return nil + + case *protocolpb.Envelope_StreamClose: + sc := payload.StreamClose + if sc == nil { + return fmt.Errorf("protobuf codec: stream_close payload is nil") + } + env.Type = MessageTypeStreamClose + env.StreamClose = &StreamClose{ + ID: StreamID(sc.Id), + Error: sc.Error, + } + env.StreamOpen = nil + env.StreamData = nil + env.StreamAck = nil + env.HTTPRequest = nil + env.HTTPResponse = nil + return nil + + case *protocolpb.Envelope_StreamAck: + sa := payload.StreamAck + if sa == nil { + return fmt.Errorf("protobuf codec: stream_ack payload is nil") + } + env.Type = MessageTypeStreamAck + env.StreamAck = &StreamAck{ + ID: StreamID(sa.Id), + AckSeq: sa.AckSeq, + LostSeqs: append([]uint64(nil), sa.LostSeqs...), + WindowSize: sa.WindowSize, + } + env.StreamOpen = nil + env.StreamData = nil + env.StreamClose = nil + env.HTTPRequest = nil + env.HTTPResponse = nil + return nil + + default: + return fmt.Errorf("protobuf codec: unsupported payload type %T", payload) + } +} diff --git a/internal/protocol/hopgate_stream.proto b/internal/protocol/hopgate_stream.proto new file mode 100644 index 0000000..09eb5c8 --- /dev/null +++ b/internal/protocol/hopgate_stream.proto @@ -0,0 +1,103 @@ +syntax = "proto3"; + +package hopgate.protocol.v1; + +option go_package = "github.com/dalbodeule/hop-gate/internal/protocol/pb;protocolpb"; + +// HeaderValues 는 HTTP 헤더의 다중 값 표현을 위한 래퍼입니다. +// HeaderValues wraps multiple header values for a single HTTP header key. +message HeaderValues { + repeated string values = 1; +} + +// Request 는 DTLS 터널 위에서 교환되는 HTTP 요청을 표현합니다. +// This mirrors internal/protocol.Request. +message Request { + string request_id = 1; + string client_id = 2; // optional client identifier + string service_name = 3; // logical service name on the client side + + string method = 4; + string url = 5; + + // HTTP header: map of key -> multiple values. + map header = 6; + + // Raw HTTP body bytes. + bytes body = 7; +} + +// Response 는 DTLS 터널 위에서 교환되는 HTTP 응답을 표현합니다. +// This mirrors internal/protocol.Response. +message Response { + string request_id = 1; + int32 status = 2; + + // HTTP header. + map header = 3; + + // Raw HTTP body bytes. + bytes body = 4; + + // Optional error description when tunneling fails. + string error = 5; +} + +// StreamOpen 은 새로운 스트림(HTTP 요청/응답, WebSocket 등)을 여는 메시지입니다. +// This represents opening a new stream (HTTP request/response, WebSocket, etc.). +message StreamOpen { + string id = 1; // StreamID (text form) + + // Which logical service / local target to use on the client side. + string service_name = 2; + string target_addr = 3; // e.g. "127.0.0.1:8080" + + // Initial HTTP-like headers (including Upgrade, etc.). + map header = 4; +} + +// StreamData 는 이미 열린 스트림에 대한 단방향 데이터 프레임입니다. +// This is a unidirectional data frame on an already-open stream. +message StreamData { + string id = 1; // StreamID + uint64 seq = 2; // per-stream sequence number starting from 0 + bytes data = 3; +} + +// StreamAck 는 StreamData 에 대한 ACK/NACK 및 선택적 재전송 힌트를 전달합니다. +// This conveys ACK/NACK and optional retransmission hints for StreamData. +message StreamAck { + string id = 1; + + // Last contiguously received sequence number (starting from 0). + uint64 ack_seq = 2; + + // Additional missing sequence numbers beyond ack_seq (optional). + repeated uint64 lost_seqs = 3; + + // Optional receive window size hint. + uint32 window_size = 4; +} + +// StreamClose 는 스트림 종료(정상/에러)를 알립니다. +// This indicates normal or error termination of a stream. +message StreamClose { + string id = 1; + string error = 2; // empty means normal close +} + +// Envelope 는 DTLS 세션 위에서 교환되는 상위 레벨 메시지 컨테이너입니다. +// 하나의 Envelope 에는 HTTP 요청/응답 또는 스트림 관련 메시지 중 하나만 포함됩니다. +// Envelope is the top-level container exchanged over the DTLS session. +// Exactly one payload (http_request/http_response/stream_*) is set per message. +message Envelope { + oneof payload { + Request http_request = 1; + Response http_response = 2; + + StreamOpen stream_open = 3; + StreamData stream_data = 4; + StreamClose stream_close = 5; + StreamAck stream_ack = 6; + } +} \ No newline at end of file diff --git a/internal/protocol/pb/hopgate_stream.pb.go b/internal/protocol/pb/hopgate_stream.pb.go new file mode 100644 index 0000000..2e2b18a --- /dev/null +++ b/internal/protocol/pb/hopgate_stream.pb.go @@ -0,0 +1,799 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.36.10 +// protoc v6.33.1 +// source: internal/protocol/hopgate_stream.proto + +package pb + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" + unsafe "unsafe" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// HeaderValues 는 HTTP 헤더의 다중 값 표현을 위한 래퍼입니다. +// HeaderValues wraps multiple header values for a single HTTP header key. +type HeaderValues struct { + state protoimpl.MessageState `protogen:"open.v1"` + Values []string `protobuf:"bytes,1,rep,name=values,proto3" json:"values,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *HeaderValues) Reset() { + *x = HeaderValues{} + mi := &file_internal_protocol_hopgate_stream_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *HeaderValues) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*HeaderValues) ProtoMessage() {} + +func (x *HeaderValues) ProtoReflect() protoreflect.Message { + mi := &file_internal_protocol_hopgate_stream_proto_msgTypes[0] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use HeaderValues.ProtoReflect.Descriptor instead. +func (*HeaderValues) Descriptor() ([]byte, []int) { + return file_internal_protocol_hopgate_stream_proto_rawDescGZIP(), []int{0} +} + +func (x *HeaderValues) GetValues() []string { + if x != nil { + return x.Values + } + return nil +} + +// Request 는 DTLS 터널 위에서 교환되는 HTTP 요청을 표현합니다. +// This mirrors internal/protocol.Request. +type Request struct { + state protoimpl.MessageState `protogen:"open.v1"` + RequestId string `protobuf:"bytes,1,opt,name=request_id,json=requestId,proto3" json:"request_id,omitempty"` + ClientId string `protobuf:"bytes,2,opt,name=client_id,json=clientId,proto3" json:"client_id,omitempty"` // optional client identifier + ServiceName string `protobuf:"bytes,3,opt,name=service_name,json=serviceName,proto3" json:"service_name,omitempty"` // logical service name on the client side + Method string `protobuf:"bytes,4,opt,name=method,proto3" json:"method,omitempty"` + Url string `protobuf:"bytes,5,opt,name=url,proto3" json:"url,omitempty"` + // HTTP header: map of key -> multiple values. + Header map[string]*HeaderValues `protobuf:"bytes,6,rep,name=header,proto3" json:"header,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` + // Raw HTTP body bytes. + Body []byte `protobuf:"bytes,7,opt,name=body,proto3" json:"body,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *Request) Reset() { + *x = Request{} + mi := &file_internal_protocol_hopgate_stream_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *Request) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Request) ProtoMessage() {} + +func (x *Request) ProtoReflect() protoreflect.Message { + mi := &file_internal_protocol_hopgate_stream_proto_msgTypes[1] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Request.ProtoReflect.Descriptor instead. +func (*Request) Descriptor() ([]byte, []int) { + return file_internal_protocol_hopgate_stream_proto_rawDescGZIP(), []int{1} +} + +func (x *Request) GetRequestId() string { + if x != nil { + return x.RequestId + } + return "" +} + +func (x *Request) GetClientId() string { + if x != nil { + return x.ClientId + } + return "" +} + +func (x *Request) GetServiceName() string { + if x != nil { + return x.ServiceName + } + return "" +} + +func (x *Request) GetMethod() string { + if x != nil { + return x.Method + } + return "" +} + +func (x *Request) GetUrl() string { + if x != nil { + return x.Url + } + return "" +} + +func (x *Request) GetHeader() map[string]*HeaderValues { + if x != nil { + return x.Header + } + return nil +} + +func (x *Request) GetBody() []byte { + if x != nil { + return x.Body + } + return nil +} + +// Response 는 DTLS 터널 위에서 교환되는 HTTP 응답을 표현합니다. +// This mirrors internal/protocol.Response. +type Response struct { + state protoimpl.MessageState `protogen:"open.v1"` + RequestId string `protobuf:"bytes,1,opt,name=request_id,json=requestId,proto3" json:"request_id,omitempty"` + Status int32 `protobuf:"varint,2,opt,name=status,proto3" json:"status,omitempty"` + // HTTP header. + Header map[string]*HeaderValues `protobuf:"bytes,3,rep,name=header,proto3" json:"header,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` + // Raw HTTP body bytes. + Body []byte `protobuf:"bytes,4,opt,name=body,proto3" json:"body,omitempty"` + // Optional error description when tunneling fails. + Error string `protobuf:"bytes,5,opt,name=error,proto3" json:"error,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *Response) Reset() { + *x = Response{} + mi := &file_internal_protocol_hopgate_stream_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *Response) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Response) ProtoMessage() {} + +func (x *Response) ProtoReflect() protoreflect.Message { + mi := &file_internal_protocol_hopgate_stream_proto_msgTypes[2] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Response.ProtoReflect.Descriptor instead. +func (*Response) Descriptor() ([]byte, []int) { + return file_internal_protocol_hopgate_stream_proto_rawDescGZIP(), []int{2} +} + +func (x *Response) GetRequestId() string { + if x != nil { + return x.RequestId + } + return "" +} + +func (x *Response) GetStatus() int32 { + if x != nil { + return x.Status + } + return 0 +} + +func (x *Response) GetHeader() map[string]*HeaderValues { + if x != nil { + return x.Header + } + return nil +} + +func (x *Response) GetBody() []byte { + if x != nil { + return x.Body + } + return nil +} + +func (x *Response) GetError() string { + if x != nil { + return x.Error + } + return "" +} + +// StreamOpen 은 새로운 스트림(HTTP 요청/응답, WebSocket 등)을 여는 메시지입니다. +// This represents opening a new stream (HTTP request/response, WebSocket, etc.). +type StreamOpen struct { + state protoimpl.MessageState `protogen:"open.v1"` + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` // StreamID (text form) + // Which logical service / local target to use on the client side. + ServiceName string `protobuf:"bytes,2,opt,name=service_name,json=serviceName,proto3" json:"service_name,omitempty"` + TargetAddr string `protobuf:"bytes,3,opt,name=target_addr,json=targetAddr,proto3" json:"target_addr,omitempty"` // e.g. "127.0.0.1:8080" + // Initial HTTP-like headers (including Upgrade, etc.). + Header map[string]*HeaderValues `protobuf:"bytes,4,rep,name=header,proto3" json:"header,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *StreamOpen) Reset() { + *x = StreamOpen{} + mi := &file_internal_protocol_hopgate_stream_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *StreamOpen) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*StreamOpen) ProtoMessage() {} + +func (x *StreamOpen) ProtoReflect() protoreflect.Message { + mi := &file_internal_protocol_hopgate_stream_proto_msgTypes[3] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use StreamOpen.ProtoReflect.Descriptor instead. +func (*StreamOpen) Descriptor() ([]byte, []int) { + return file_internal_protocol_hopgate_stream_proto_rawDescGZIP(), []int{3} +} + +func (x *StreamOpen) GetId() string { + if x != nil { + return x.Id + } + return "" +} + +func (x *StreamOpen) GetServiceName() string { + if x != nil { + return x.ServiceName + } + return "" +} + +func (x *StreamOpen) GetTargetAddr() string { + if x != nil { + return x.TargetAddr + } + return "" +} + +func (x *StreamOpen) GetHeader() map[string]*HeaderValues { + if x != nil { + return x.Header + } + return nil +} + +// StreamData 는 이미 열린 스트림에 대한 단방향 데이터 프레임입니다. +// This is a unidirectional data frame on an already-open stream. +type StreamData struct { + state protoimpl.MessageState `protogen:"open.v1"` + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` // StreamID + Seq uint64 `protobuf:"varint,2,opt,name=seq,proto3" json:"seq,omitempty"` // per-stream sequence number starting from 0 + Data []byte `protobuf:"bytes,3,opt,name=data,proto3" json:"data,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *StreamData) Reset() { + *x = StreamData{} + mi := &file_internal_protocol_hopgate_stream_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *StreamData) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*StreamData) ProtoMessage() {} + +func (x *StreamData) ProtoReflect() protoreflect.Message { + mi := &file_internal_protocol_hopgate_stream_proto_msgTypes[4] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use StreamData.ProtoReflect.Descriptor instead. +func (*StreamData) Descriptor() ([]byte, []int) { + return file_internal_protocol_hopgate_stream_proto_rawDescGZIP(), []int{4} +} + +func (x *StreamData) GetId() string { + if x != nil { + return x.Id + } + return "" +} + +func (x *StreamData) GetSeq() uint64 { + if x != nil { + return x.Seq + } + return 0 +} + +func (x *StreamData) GetData() []byte { + if x != nil { + return x.Data + } + return nil +} + +// StreamAck 는 StreamData 에 대한 ACK/NACK 및 선택적 재전송 힌트를 전달합니다. +// This conveys ACK/NACK and optional retransmission hints for StreamData. +type StreamAck struct { + state protoimpl.MessageState `protogen:"open.v1"` + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` + // Last contiguously received sequence number (starting from 0). + AckSeq uint64 `protobuf:"varint,2,opt,name=ack_seq,json=ackSeq,proto3" json:"ack_seq,omitempty"` + // Additional missing sequence numbers beyond ack_seq (optional). + LostSeqs []uint64 `protobuf:"varint,3,rep,packed,name=lost_seqs,json=lostSeqs,proto3" json:"lost_seqs,omitempty"` + // Optional receive window size hint. + WindowSize uint32 `protobuf:"varint,4,opt,name=window_size,json=windowSize,proto3" json:"window_size,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *StreamAck) Reset() { + *x = StreamAck{} + mi := &file_internal_protocol_hopgate_stream_proto_msgTypes[5] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *StreamAck) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*StreamAck) ProtoMessage() {} + +func (x *StreamAck) ProtoReflect() protoreflect.Message { + mi := &file_internal_protocol_hopgate_stream_proto_msgTypes[5] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use StreamAck.ProtoReflect.Descriptor instead. +func (*StreamAck) Descriptor() ([]byte, []int) { + return file_internal_protocol_hopgate_stream_proto_rawDescGZIP(), []int{5} +} + +func (x *StreamAck) GetId() string { + if x != nil { + return x.Id + } + return "" +} + +func (x *StreamAck) GetAckSeq() uint64 { + if x != nil { + return x.AckSeq + } + return 0 +} + +func (x *StreamAck) GetLostSeqs() []uint64 { + if x != nil { + return x.LostSeqs + } + return nil +} + +func (x *StreamAck) GetWindowSize() uint32 { + if x != nil { + return x.WindowSize + } + return 0 +} + +// StreamClose 는 스트림 종료(정상/에러)를 알립니다. +// This indicates normal or error termination of a stream. +type StreamClose struct { + state protoimpl.MessageState `protogen:"open.v1"` + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` + Error string `protobuf:"bytes,2,opt,name=error,proto3" json:"error,omitempty"` // empty means normal close + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *StreamClose) Reset() { + *x = StreamClose{} + mi := &file_internal_protocol_hopgate_stream_proto_msgTypes[6] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *StreamClose) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*StreamClose) ProtoMessage() {} + +func (x *StreamClose) ProtoReflect() protoreflect.Message { + mi := &file_internal_protocol_hopgate_stream_proto_msgTypes[6] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use StreamClose.ProtoReflect.Descriptor instead. +func (*StreamClose) Descriptor() ([]byte, []int) { + return file_internal_protocol_hopgate_stream_proto_rawDescGZIP(), []int{6} +} + +func (x *StreamClose) GetId() string { + if x != nil { + return x.Id + } + return "" +} + +func (x *StreamClose) GetError() string { + if x != nil { + return x.Error + } + return "" +} + +// Envelope 는 DTLS 세션 위에서 교환되는 상위 레벨 메시지 컨테이너입니다. +// 하나의 Envelope 에는 HTTP 요청/응답 또는 스트림 관련 메시지 중 하나만 포함됩니다. +// Envelope is the top-level container exchanged over the DTLS session. +// Exactly one payload (http_request/http_response/stream_*) is set per message. +type Envelope struct { + state protoimpl.MessageState `protogen:"open.v1"` + // Types that are valid to be assigned to Payload: + // + // *Envelope_HttpRequest + // *Envelope_HttpResponse + // *Envelope_StreamOpen + // *Envelope_StreamData + // *Envelope_StreamClose + // *Envelope_StreamAck + Payload isEnvelope_Payload `protobuf_oneof:"payload"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *Envelope) Reset() { + *x = Envelope{} + mi := &file_internal_protocol_hopgate_stream_proto_msgTypes[7] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *Envelope) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Envelope) ProtoMessage() {} + +func (x *Envelope) ProtoReflect() protoreflect.Message { + mi := &file_internal_protocol_hopgate_stream_proto_msgTypes[7] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Envelope.ProtoReflect.Descriptor instead. +func (*Envelope) Descriptor() ([]byte, []int) { + return file_internal_protocol_hopgate_stream_proto_rawDescGZIP(), []int{7} +} + +func (x *Envelope) GetPayload() isEnvelope_Payload { + if x != nil { + return x.Payload + } + return nil +} + +func (x *Envelope) GetHttpRequest() *Request { + if x != nil { + if x, ok := x.Payload.(*Envelope_HttpRequest); ok { + return x.HttpRequest + } + } + return nil +} + +func (x *Envelope) GetHttpResponse() *Response { + if x != nil { + if x, ok := x.Payload.(*Envelope_HttpResponse); ok { + return x.HttpResponse + } + } + return nil +} + +func (x *Envelope) GetStreamOpen() *StreamOpen { + if x != nil { + if x, ok := x.Payload.(*Envelope_StreamOpen); ok { + return x.StreamOpen + } + } + return nil +} + +func (x *Envelope) GetStreamData() *StreamData { + if x != nil { + if x, ok := x.Payload.(*Envelope_StreamData); ok { + return x.StreamData + } + } + return nil +} + +func (x *Envelope) GetStreamClose() *StreamClose { + if x != nil { + if x, ok := x.Payload.(*Envelope_StreamClose); ok { + return x.StreamClose + } + } + return nil +} + +func (x *Envelope) GetStreamAck() *StreamAck { + if x != nil { + if x, ok := x.Payload.(*Envelope_StreamAck); ok { + return x.StreamAck + } + } + return nil +} + +type isEnvelope_Payload interface { + isEnvelope_Payload() +} + +type Envelope_HttpRequest struct { + HttpRequest *Request `protobuf:"bytes,1,opt,name=http_request,json=httpRequest,proto3,oneof"` +} + +type Envelope_HttpResponse struct { + HttpResponse *Response `protobuf:"bytes,2,opt,name=http_response,json=httpResponse,proto3,oneof"` +} + +type Envelope_StreamOpen struct { + StreamOpen *StreamOpen `protobuf:"bytes,3,opt,name=stream_open,json=streamOpen,proto3,oneof"` +} + +type Envelope_StreamData struct { + StreamData *StreamData `protobuf:"bytes,4,opt,name=stream_data,json=streamData,proto3,oneof"` +} + +type Envelope_StreamClose struct { + StreamClose *StreamClose `protobuf:"bytes,5,opt,name=stream_close,json=streamClose,proto3,oneof"` +} + +type Envelope_StreamAck struct { + StreamAck *StreamAck `protobuf:"bytes,6,opt,name=stream_ack,json=streamAck,proto3,oneof"` +} + +func (*Envelope_HttpRequest) isEnvelope_Payload() {} + +func (*Envelope_HttpResponse) isEnvelope_Payload() {} + +func (*Envelope_StreamOpen) isEnvelope_Payload() {} + +func (*Envelope_StreamData) isEnvelope_Payload() {} + +func (*Envelope_StreamClose) isEnvelope_Payload() {} + +func (*Envelope_StreamAck) isEnvelope_Payload() {} + +var File_internal_protocol_hopgate_stream_proto protoreflect.FileDescriptor + +const file_internal_protocol_hopgate_stream_proto_rawDesc = "" + + "\n" + + "&internal/protocol/hopgate_stream.proto\x12\x13hopgate.protocol.v1\"&\n" + + "\fHeaderValues\x12\x16\n" + + "\x06values\x18\x01 \x03(\tR\x06values\"\xc6\x02\n" + + "\aRequest\x12\x1d\n" + + "\n" + + "request_id\x18\x01 \x01(\tR\trequestId\x12\x1b\n" + + "\tclient_id\x18\x02 \x01(\tR\bclientId\x12!\n" + + "\fservice_name\x18\x03 \x01(\tR\vserviceName\x12\x16\n" + + "\x06method\x18\x04 \x01(\tR\x06method\x12\x10\n" + + "\x03url\x18\x05 \x01(\tR\x03url\x12@\n" + + "\x06header\x18\x06 \x03(\v2(.hopgate.protocol.v1.Request.HeaderEntryR\x06header\x12\x12\n" + + "\x04body\x18\a \x01(\fR\x04body\x1a\\\n" + + "\vHeaderEntry\x12\x10\n" + + "\x03key\x18\x01 \x01(\tR\x03key\x127\n" + + "\x05value\x18\x02 \x01(\v2!.hopgate.protocol.v1.HeaderValuesR\x05value:\x028\x01\"\x8c\x02\n" + + "\bResponse\x12\x1d\n" + + "\n" + + "request_id\x18\x01 \x01(\tR\trequestId\x12\x16\n" + + "\x06status\x18\x02 \x01(\x05R\x06status\x12A\n" + + "\x06header\x18\x03 \x03(\v2).hopgate.protocol.v1.Response.HeaderEntryR\x06header\x12\x12\n" + + "\x04body\x18\x04 \x01(\fR\x04body\x12\x14\n" + + "\x05error\x18\x05 \x01(\tR\x05error\x1a\\\n" + + "\vHeaderEntry\x12\x10\n" + + "\x03key\x18\x01 \x01(\tR\x03key\x127\n" + + "\x05value\x18\x02 \x01(\v2!.hopgate.protocol.v1.HeaderValuesR\x05value:\x028\x01\"\x83\x02\n" + + "\n" + + "StreamOpen\x12\x0e\n" + + "\x02id\x18\x01 \x01(\tR\x02id\x12!\n" + + "\fservice_name\x18\x02 \x01(\tR\vserviceName\x12\x1f\n" + + "\vtarget_addr\x18\x03 \x01(\tR\n" + + "targetAddr\x12C\n" + + "\x06header\x18\x04 \x03(\v2+.hopgate.protocol.v1.StreamOpen.HeaderEntryR\x06header\x1a\\\n" + + "\vHeaderEntry\x12\x10\n" + + "\x03key\x18\x01 \x01(\tR\x03key\x127\n" + + "\x05value\x18\x02 \x01(\v2!.hopgate.protocol.v1.HeaderValuesR\x05value:\x028\x01\"B\n" + + "\n" + + "StreamData\x12\x0e\n" + + "\x02id\x18\x01 \x01(\tR\x02id\x12\x10\n" + + "\x03seq\x18\x02 \x01(\x04R\x03seq\x12\x12\n" + + "\x04data\x18\x03 \x01(\fR\x04data\"r\n" + + "\tStreamAck\x12\x0e\n" + + "\x02id\x18\x01 \x01(\tR\x02id\x12\x17\n" + + "\aack_seq\x18\x02 \x01(\x04R\x06ackSeq\x12\x1b\n" + + "\tlost_seqs\x18\x03 \x03(\x04R\blostSeqs\x12\x1f\n" + + "\vwindow_size\x18\x04 \x01(\rR\n" + + "windowSize\"3\n" + + "\vStreamClose\x12\x0e\n" + + "\x02id\x18\x01 \x01(\tR\x02id\x12\x14\n" + + "\x05error\x18\x02 \x01(\tR\x05error\"\xae\x03\n" + + "\bEnvelope\x12A\n" + + "\fhttp_request\x18\x01 \x01(\v2\x1c.hopgate.protocol.v1.RequestH\x00R\vhttpRequest\x12D\n" + + "\rhttp_response\x18\x02 \x01(\v2\x1d.hopgate.protocol.v1.ResponseH\x00R\fhttpResponse\x12B\n" + + "\vstream_open\x18\x03 \x01(\v2\x1f.hopgate.protocol.v1.StreamOpenH\x00R\n" + + "streamOpen\x12B\n" + + "\vstream_data\x18\x04 \x01(\v2\x1f.hopgate.protocol.v1.StreamDataH\x00R\n" + + "streamData\x12E\n" + + "\fstream_close\x18\x05 \x01(\v2 .hopgate.protocol.v1.StreamCloseH\x00R\vstreamClose\x12?\n" + + "\n" + + "stream_ack\x18\x06 \x01(\v2\x1e.hopgate.protocol.v1.StreamAckH\x00R\tstreamAckB\t\n" + + "\apayloadB@Z>github.com/dalbodeule/hop-gate/internal/protocol/pb;protocolpbb\x06proto3" + +var ( + file_internal_protocol_hopgate_stream_proto_rawDescOnce sync.Once + file_internal_protocol_hopgate_stream_proto_rawDescData []byte +) + +func file_internal_protocol_hopgate_stream_proto_rawDescGZIP() []byte { + file_internal_protocol_hopgate_stream_proto_rawDescOnce.Do(func() { + file_internal_protocol_hopgate_stream_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_internal_protocol_hopgate_stream_proto_rawDesc), len(file_internal_protocol_hopgate_stream_proto_rawDesc))) + }) + return file_internal_protocol_hopgate_stream_proto_rawDescData +} + +var file_internal_protocol_hopgate_stream_proto_msgTypes = make([]protoimpl.MessageInfo, 11) +var file_internal_protocol_hopgate_stream_proto_goTypes = []any{ + (*HeaderValues)(nil), // 0: hopgate.protocol.v1.HeaderValues + (*Request)(nil), // 1: hopgate.protocol.v1.Request + (*Response)(nil), // 2: hopgate.protocol.v1.Response + (*StreamOpen)(nil), // 3: hopgate.protocol.v1.StreamOpen + (*StreamData)(nil), // 4: hopgate.protocol.v1.StreamData + (*StreamAck)(nil), // 5: hopgate.protocol.v1.StreamAck + (*StreamClose)(nil), // 6: hopgate.protocol.v1.StreamClose + (*Envelope)(nil), // 7: hopgate.protocol.v1.Envelope + nil, // 8: hopgate.protocol.v1.Request.HeaderEntry + nil, // 9: hopgate.protocol.v1.Response.HeaderEntry + nil, // 10: hopgate.protocol.v1.StreamOpen.HeaderEntry +} +var file_internal_protocol_hopgate_stream_proto_depIdxs = []int32{ + 8, // 0: hopgate.protocol.v1.Request.header:type_name -> hopgate.protocol.v1.Request.HeaderEntry + 9, // 1: hopgate.protocol.v1.Response.header:type_name -> hopgate.protocol.v1.Response.HeaderEntry + 10, // 2: hopgate.protocol.v1.StreamOpen.header:type_name -> hopgate.protocol.v1.StreamOpen.HeaderEntry + 1, // 3: hopgate.protocol.v1.Envelope.http_request:type_name -> hopgate.protocol.v1.Request + 2, // 4: hopgate.protocol.v1.Envelope.http_response:type_name -> hopgate.protocol.v1.Response + 3, // 5: hopgate.protocol.v1.Envelope.stream_open:type_name -> hopgate.protocol.v1.StreamOpen + 4, // 6: hopgate.protocol.v1.Envelope.stream_data:type_name -> hopgate.protocol.v1.StreamData + 6, // 7: hopgate.protocol.v1.Envelope.stream_close:type_name -> hopgate.protocol.v1.StreamClose + 5, // 8: hopgate.protocol.v1.Envelope.stream_ack:type_name -> hopgate.protocol.v1.StreamAck + 0, // 9: hopgate.protocol.v1.Request.HeaderEntry.value:type_name -> hopgate.protocol.v1.HeaderValues + 0, // 10: hopgate.protocol.v1.Response.HeaderEntry.value:type_name -> hopgate.protocol.v1.HeaderValues + 0, // 11: hopgate.protocol.v1.StreamOpen.HeaderEntry.value:type_name -> hopgate.protocol.v1.HeaderValues + 12, // [12:12] is the sub-list for method output_type + 12, // [12:12] is the sub-list for method input_type + 12, // [12:12] is the sub-list for extension type_name + 12, // [12:12] is the sub-list for extension extendee + 0, // [0:12] is the sub-list for field type_name +} + +func init() { file_internal_protocol_hopgate_stream_proto_init() } +func file_internal_protocol_hopgate_stream_proto_init() { + if File_internal_protocol_hopgate_stream_proto != nil { + return + } + file_internal_protocol_hopgate_stream_proto_msgTypes[7].OneofWrappers = []any{ + (*Envelope_HttpRequest)(nil), + (*Envelope_HttpResponse)(nil), + (*Envelope_StreamOpen)(nil), + (*Envelope_StreamData)(nil), + (*Envelope_StreamClose)(nil), + (*Envelope_StreamAck)(nil), + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: unsafe.Slice(unsafe.StringData(file_internal_protocol_hopgate_stream_proto_rawDesc), len(file_internal_protocol_hopgate_stream_proto_rawDesc)), + NumEnums: 0, + NumMessages: 11, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_internal_protocol_hopgate_stream_proto_goTypes, + DependencyIndexes: file_internal_protocol_hopgate_stream_proto_depIdxs, + MessageInfos: file_internal_protocol_hopgate_stream_proto_msgTypes, + }.Build() + File_internal_protocol_hopgate_stream_proto = out.File + file_internal_protocol_hopgate_stream_proto_goTypes = nil + file_internal_protocol_hopgate_stream_proto_depIdxs = nil +} diff --git a/internal/protocol/protocol.go b/internal/protocol/protocol.go index 312fe02..5805950 100644 --- a/internal/protocol/protocol.go +++ b/internal/protocol/protocol.go @@ -32,6 +32,11 @@ type Response struct { // MessageType 은 DTLS 위에서 교환되는 상위 레벨 메시지 종류를 나타냅니다. type MessageType string +// StreamChunkSize 는 스트림 터널링 시 단일 StreamData 프레임에 담을 최대 payload 크기입니다. +// 현재 구현에서는 4KiB 로 고정하여 DTLS/UDP MTU 한계를 여유 있게 피하도록 합니다. +// StreamChunkSize is the maximum payload size per StreamData frame (4KiB). +const StreamChunkSize = 4 * 1024 + const ( // MessageTypeHTTP 는 기존 단일 HTTP 요청/응답 메시지를 의미합니다. // 이 경우 HTTPRequest / HTTPResponse 필드를 사용합니다. @@ -41,10 +46,17 @@ const ( MessageTypeStreamOpen MessageType = "stream_open" // MessageTypeStreamData 는 열린 스트림에 대한 양방향 데이터 프레임을 의미합니다. + // HTTP 바디 chunk 를 비롯한 실제 payload 는 이 타입을 통해 전송됩니다. + // Stream data frames for an already-opened stream (HTTP body chunks, etc.). MessageTypeStreamData MessageType = "stream_data" // MessageTypeStreamClose 는 스트림 종료(정상/에러)를 의미합니다. + // Normal or error-termination of a stream. MessageTypeStreamClose MessageType = "stream_close" + + // MessageTypeStreamAck 는 스트림 데이터 프레임에 대한 ACK/NACK 및 재전송 힌트를 전달합니다. + // Stream-level ACK/NACK frames for selective retransmission hints. + MessageTypeStreamAck MessageType = "stream_ack" ) // Envelope 는 DTLS 세션 위에서 교환되는 상위 레벨 메시지 컨테이너입니다. @@ -60,11 +72,24 @@ type Envelope struct { StreamOpen *StreamOpen `json:"stream_open,omitempty"` StreamData *StreamData `json:"stream_data,omitempty"` StreamClose *StreamClose `json:"stream_close,omitempty"` + + // 스트림 제어 메시지 (ACK/NACK, 재전송 힌트 등) + // Stream-level control messages (ACK/NACK, retransmission hints, etc.). + StreamAck *StreamAck `json:"stream_ack,omitempty"` } // StreamID 는 스트림(예: 특정 WebSocket 연결 또는 TCP 커넥션)을 구분하기 위한 식별자입니다. type StreamID string +// HTTP-over-stream 터널링에서 사용되는 pseudo-header 키 상수입니다. +// These pseudo-header keys are used when tunneling HTTP over the stream protocol. +const ( + HeaderKeyMethod = "X-HopGate-Method" + HeaderKeyURL = "X-HopGate-URL" + HeaderKeyHost = "X-HopGate-Host" + HeaderKeyStatus = "X-HopGate-Status" +) + // StreamOpen 은 새로운 스트림을 여는 요청을 나타냅니다. type StreamOpen struct { ID StreamID `json:"id"` @@ -77,11 +102,44 @@ type StreamOpen struct { } // StreamData 는 이미 열린 스트림에 대해 한 방향으로 전송되는 데이터 프레임을 표현합니다. +// DTLS/UDP 특성상 손실/중복/순서 뒤바뀜을 감지하고 재전송할 수 있도록 +// 각 스트림 내에서 0부터 시작하는 시퀀스 번호(Seq)를 포함합니다. +// +// StreamData represents a unidirectional data frame on an already-opened stream. +// To support loss/duplication/reordering detection and retransmission over DTLS/UDP, +// it carries a per-stream sequence number (Seq) starting from 0. type StreamData struct { ID StreamID `json:"id"` + Seq uint64 `json:"seq"` Data []byte `json:"data"` } +// StreamAck 는 스트림 데이터 프레임에 대한 ACK/NACK 및 선택적 재전송 요청 정보를 전달합니다. +// AckSeq 는 수신 측에서 "연속적으로" 수신 완료한 마지막 Seq 를 의미하며, +// LostSeqs 는 그 이후 구간에서 누락된 시퀀스 번호(선택적)를 나타냅니다. +// +// StreamAck conveys ACK/NACK and optional retransmission hints for stream data frames. +// AckSeq denotes the last sequence number received contiguously by the receiver, +// while LostSeqs can list additional missing sequence numbers beyond AckSeq. +type StreamAck struct { + ID StreamID `json:"id"` + + // AckSeq 는 수신 측에서 0부터 시작해 연속으로 수신 완료한 마지막 Seq 입니다. + // AckSeq is the last contiguously received sequence number starting from 0. + AckSeq uint64 `json:"ack_seq"` + + // LostSeqs 는 AckSeq 이후 구간에서 누락된 시퀀스 번호 목록입니다(선택). + // 이 필드는 선택적 selective retransmission 힌트를 제공하기 위해 사용됩니다. + // + // LostSeqs is an optional list of missing sequence numbers beyond AckSeq, + // used as a hint for selective retransmission. + LostSeqs []uint64 `json:"lost_seqs,omitempty"` + + // WindowSize 는 수신 측이 허용 가능한 in-flight 프레임 수를 나타내는 선택적 힌트입니다. + // WindowSize is an optional hint for the allowed number of in-flight frames. + WindowSize uint32 `json:"window_size,omitempty"` +} + // StreamClose 는 스트림 종료를 알리는 메시지입니다. type StreamClose struct { ID StreamID `json:"id"` diff --git a/internal/proxy/client.go b/internal/proxy/client.go index d7fc38c..c43be7a 100644 --- a/internal/proxy/client.go +++ b/internal/proxy/client.go @@ -1,15 +1,16 @@ package proxy import ( - "bufio" "bytes" "context" - "encoding/json" "fmt" "io" "net" "net/http" "net/url" + "sort" + "strconv" + "sync" "time" "github.com/dalbodeule/hop-gate/internal/dtls" @@ -23,6 +24,9 @@ type ClientProxy struct { HTTPClient *http.Client Logger logging.Logger LocalTarget string // e.g. "127.0.0.1:8080" + + sendersMu sync.Mutex + streamSenders map[protocol.StreamID]*streamSender } // NewClientProxy 는 기본 HTTP 클라이언트 및 로거를 사용해 ClientProxy 를 생성합니다. (ko) @@ -47,15 +51,99 @@ func NewClientProxy(logger logging.Logger, localTarget string) *ClientProxy { ExpectContinueTimeout: 1 * time.Second, }, }, - Logger: logger.With(logging.Fields{"component": "client_proxy"}), - LocalTarget: localTarget, + Logger: logger.With(logging.Fields{"component": "client_proxy"}), + LocalTarget: localTarget, + streamSenders: make(map[protocol.StreamID]*streamSender), } } -// StartLoop 는 DTLS 세션에서 protocol.Envelope 를 읽고, HTTP 요청의 경우 로컬 HTTP 요청을 수행한 뒤 -// protocol.Envelope(HTTP 응답 포함)을 다시 세션으로 쓰는 루프를 실행합니다. (ko) -// StartLoop reads protocol.Envelope messages from the DTLS session; for HTTP messages it -// performs local HTTP requests and writes back HTTP responses wrapped in an Envelope. (en) +// StartLoop 는 DTLS 세션에서 protocol.Envelope 를 읽고, HTTP/스트림 요청의 경우 로컬 HTTP 요청을 수행한 뒤 +// protocol.Envelope(HTTP/스트림 응답 포함)을 다시 세션으로 쓰는 루프를 실행합니다. (ko) +// StartLoop reads protocol.Envelope messages from the DTLS session; for HTTP/stream +// messages it performs local HTTP requests and writes back responses over the DTLS +// tunnel. (en) +type streamSender struct { + mu sync.Mutex + outstanding map[uint64][]byte +} + +func newStreamSender() *streamSender { + return &streamSender{ + outstanding: make(map[uint64][]byte), + } +} + +func (s *streamSender) register(seq uint64, data []byte) { + s.mu.Lock() + defer s.mu.Unlock() + + if s.outstanding == nil { + s.outstanding = make(map[uint64][]byte) + } + buf := make([]byte, len(data)) + copy(buf, data) + s.outstanding[seq] = buf +} + +func (s *streamSender) handleAck(ack *protocol.StreamAck) map[uint64][]byte { + s.mu.Lock() + defer s.mu.Unlock() + + if s.outstanding == nil { + return nil + } + + // 연속 수신 완료 구간(seq <= AckSeq)은 outstanding 에서 제거합니다. + for seq := range s.outstanding { + if seq <= ack.AckSeq { + delete(s.outstanding, seq) + } + } + + // LostSeqs 가 비어 있으면 재전송할 것이 없습니다. + if len(ack.LostSeqs) == 0 { + return nil + } + + // LostSeqs 에 포함된 시퀀스 중, 아직 outstanding 에 남아 있는 것들만 재전송 대상으로 선택합니다. + lost := make(map[uint64][]byte, len(ack.LostSeqs)) + for _, seq := range ack.LostSeqs { + if data, ok := s.outstanding[seq]; ok { + buf := make([]byte, len(data)) + copy(buf, data) + lost[seq] = buf + } + } + return lost +} + +func (p *ClientProxy) registerStreamSender(id protocol.StreamID, sender *streamSender) { + p.sendersMu.Lock() + defer p.sendersMu.Unlock() + if p.streamSenders == nil { + p.streamSenders = make(map[protocol.StreamID]*streamSender) + } + p.streamSenders[id] = sender +} + +func (p *ClientProxy) unregisterStreamSender(id protocol.StreamID) { + p.sendersMu.Lock() + defer p.sendersMu.Unlock() + if p.streamSenders == nil { + return + } + delete(p.streamSenders, id) +} + +func (p *ClientProxy) getStreamSender(id protocol.StreamID) *streamSender { + p.sendersMu.Lock() + defer p.sendersMu.Unlock() + if p.streamSenders == nil { + return nil + } + return p.streamSenders[id] +} + func (p *ClientProxy) StartLoop(ctx context.Context, sess dtls.Session) error { if ctx == nil { ctx = context.Background() @@ -67,10 +155,10 @@ func (p *ClientProxy) StartLoop(ctx context.Context, sess dtls.Session) error { // "dtls: buffer too small" 오류가 날 수 있으므로, 여기서는 여유 있는 버퍼(64KiB)를 사용합니다. (ko) // NOTE: pion/dtls decrypts application data into the buffer provided by the caller. // Using only the default JSON decoder buffer (a few hundred bytes) can trigger - // "dtls: buffer too small" for large HTTP bodies/envelopes, so we wrap the - // session with a reasonably large bufio.Reader (64KiB). (en) - dec := json.NewDecoder(bufio.NewReaderSize(sess, 64*1024)) - enc := json.NewEncoder(sess) + // "dtls: buffer too small" for large HTTP bodies/envelopes. The default + // JSON-based WireCodec internally wraps the DTLS session with a 64KiB + // bufio.Reader, matching this requirement. (en) + codec := protocol.DefaultCodec for { select { @@ -83,7 +171,7 @@ func (p *ClientProxy) StartLoop(ctx context.Context, sess dtls.Session) error { } var env protocol.Envelope - if err := dec.Decode(&env); err != nil { + if err := codec.Decode(sess, &env); err != nil { if err == io.EOF { log.Info("dtls session closed by server", nil) return nil @@ -94,60 +182,485 @@ func (p *ClientProxy) StartLoop(ctx context.Context, sess dtls.Session) error { return err } - // 현재는 HTTP 타입만 지원하며, 그 외 타입은 에러로 처리합니다. - if env.Type != protocol.MessageTypeHTTP || env.HTTPRequest == nil { + switch env.Type { + case protocol.MessageTypeHTTP: + if err := p.handleHTTPEnvelope(ctx, sess, &env); err != nil { + log.Error("failed to handle http envelope", logging.Fields{ + "error": err.Error(), + }) + return err + } + case protocol.MessageTypeStreamOpen: + if err := p.handleStreamRequest(ctx, sess, &env); err != nil { + log.Error("failed to handle stream http envelope", logging.Fields{ + "error": err.Error(), + }) + return err + } + case protocol.MessageTypeStreamAck: + sa := env.StreamAck + if sa == nil { + log.Error("received stream_ack envelope with nil payload", nil) + return fmt.Errorf("stream_ack payload is nil") + } + streamID := protocol.StreamID(sa.ID) + sender := p.getStreamSender(streamID) + if sender == nil { + log.Warn("received stream_ack for unknown stream", logging.Fields{ + "stream_id": sa.ID, + }) + continue + } + lost := sender.handleAck(sa) + // LostSeqs 를 기반으로 선택적 재전송 수행 + for seq, data := range lost { + retryEnv := protocol.Envelope{ + Type: protocol.MessageTypeStreamData, + StreamData: &protocol.StreamData{ + ID: streamID, + Seq: seq, + Data: data, + }, + } + if err := codec.Encode(sess, &retryEnv); err != nil { + log.Error("failed to retransmit stream_data after stream_ack", logging.Fields{ + "stream_id": streamID, + "seq": seq, + "error": err.Error(), + }) + return err + } + log.Info("retransmitted stream_data after stream_ack", logging.Fields{ + "stream_id": streamID, + "seq": seq, + }) + } + default: log.Error("received unsupported envelope type from server", logging.Fields{ "type": env.Type, }) - return fmt.Errorf("unsupported envelope type %q or missing http_request", env.Type) + return fmt.Errorf("unsupported envelope type %q", env.Type) } + } +} - req := env.HTTPRequest +// handleHTTPEnvelope 는 기존 단일 HTTP 요청/응답 Envelope 경로를 처리합니다. (ko) +// handleHTTPEnvelope handles the legacy single HTTP request/response envelope path. (en) +func (p *ClientProxy) handleHTTPEnvelope(ctx context.Context, sess dtls.Session, env *protocol.Envelope) error { + if env.HTTPRequest == nil { + return fmt.Errorf("http envelope missing http_request payload") + } - start := time.Now() - logReq := log.With(logging.Fields{ - "request_id": req.RequestID, - "service": req.ServiceName, - "method": req.Method, - "url": req.URL, - "client_id": req.ClientID, - "local_target": p.LocalTarget, - }) - logReq.Info("received http envelope from server", nil) + req := env.HTTPRequest + log := p.Logger + start := time.Now() - resp := protocol.Response{ - RequestID: req.RequestID, - Header: make(map[string][]string), - } + logReq := log.With(logging.Fields{ + "request_id": req.RequestID, + "service": req.ServiceName, + "method": req.Method, + "url": req.URL, + "client_id": req.ClientID, + "local_target": p.LocalTarget, + }) + logReq.Info("received http envelope from server", nil) - // 로컬 HTTP 요청 수행 - if err := p.forwardToLocal(ctx, req, &resp); err != nil { - resp.Status = http.StatusBadGateway - resp.Error = err.Error() - logReq.Error("local http request failed", logging.Fields{ - "error": err.Error(), - }) - } + resp := protocol.Response{ + RequestID: req.RequestID, + Header: make(map[string][]string), + } - // HTTP 응답을 Envelope 로 감싸서 서버로 전송합니다. - respEnv := protocol.Envelope{ - Type: protocol.MessageTypeHTTP, - HTTPResponse: &resp, - } - - if err := enc.Encode(&respEnv); err != nil { - logReq.Error("failed to encode http response envelope", logging.Fields{ - "error": err.Error(), - }) - return err - } - - logReq.Info("http response envelope sent to server", logging.Fields{ - "status": resp.Status, - "elapsed_ms": time.Since(start).Milliseconds(), - "error": resp.Error, + // 로컬 HTTP 요청 수행 + if err := p.forwardToLocal(ctx, req, &resp); err != nil { + resp.Status = http.StatusBadGateway + resp.Error = err.Error() + logReq.Error("local http request failed", logging.Fields{ + "error": err.Error(), }) } + + // HTTP 응답을 Envelope 로 감싸서 서버로 전송합니다. + respEnv := protocol.Envelope{ + Type: protocol.MessageTypeHTTP, + HTTPResponse: &resp, + } + + if err := protocol.DefaultCodec.Encode(sess, &respEnv); err != nil { + logReq.Error("failed to encode http response envelope", logging.Fields{ + "error": err.Error(), + }) + return err + } + + logReq.Info("http response envelope sent to server", logging.Fields{ + "status": resp.Status, + "elapsed_ms": time.Since(start).Milliseconds(), + "error": resp.Error, + }) + + return nil +} + +// handleStreamRequest 는 StreamOpen/StreamData/StreamClose 기반 HTTP 요청/응답 스트림을 처리합니다. (ko) +// handleStreamRequest handles an HTTP request/response exchange using StreamOpen/StreamData/StreamClose frames. (en) +func (p *ClientProxy) handleStreamRequest(ctx context.Context, sess dtls.Session, openEnv *protocol.Envelope) error { + codec := protocol.DefaultCodec + log := p.Logger + + so := openEnv.StreamOpen + if so == nil { + return fmt.Errorf("stream_open envelope missing payload") + } + + streamID := so.ID + // 이 스트림에 대한 송신 측 ARQ 상태를 준비하고, StartLoop 에서 들어오는 StreamAck 와 연동합니다. + sender := newStreamSender() + p.registerStreamSender(streamID, sender) + defer p.unregisterStreamSender(streamID) + + // Pseudo-header 에서 HTTP 메타데이터를 추출합니다. (ko) + // Extract HTTP metadata from pseudo-headers. (en) + method := firstHeaderValue(so.Header, protocol.HeaderKeyMethod, http.MethodGet) + urlStr := firstHeaderValue(so.Header, protocol.HeaderKeyURL, "/") + _ = firstHeaderValue(so.Header, protocol.HeaderKeyHost, "") + + if p.LocalTarget == "" { + return fmt.Errorf("local target is empty") + } + + u, err := url.Parse(urlStr) + if err != nil { + return fmt.Errorf("parse url from stream_open: %w", err) + } + u.Scheme = "http" + u.Host = p.LocalTarget + + // 로컬 HTTP 요청용 헤더 맵을 생성하면서 pseudo-header 는 제거합니다. (ko) + // Build local HTTP header map while stripping pseudo-headers. (en) + httpHeader := make(http.Header, len(so.Header)) + for k, vs := range so.Header { + if k == protocol.HeaderKeyMethod || + k == protocol.HeaderKeyURL || + k == protocol.HeaderKeyHost || + k == protocol.HeaderKeyStatus { + continue + } + for _, v := range vs { + httpHeader.Add(k, v) + } + } + + // 요청 바디를 StreamData/StreamClose 프레임에서 모두 읽어 메모리에 적재합니다. (ko) + // Read the entire request body from StreamData/StreamClose frames into memory. (en) + // + // 동시에 수신 측 ARQ 상태( expectedSeq / out-of-order 버퍼 / LostSeqs )를 관리하고 + // StreamAck 를 전송해 선택적 재전송(Selective Retransmission)을 유도합니다. + var ( + bodyBuf bytes.Buffer + expectedSeq uint64 + received = make(map[uint64][]byte) + lost = make(map[uint64]struct{}) + ) + const maxLostReport = 32 + + for { + var env protocol.Envelope + if err := codec.Decode(sess, &env); err != nil { + if err == io.EOF { + return fmt.Errorf("unexpected EOF while reading stream request body") + } + return fmt.Errorf("decode stream request frame: %w", err) + } + + switch env.Type { + case protocol.MessageTypeStreamData: + sd := env.StreamData + if sd == nil { + return fmt.Errorf("stream_data payload is nil") + } + if sd.ID != streamID { + return fmt.Errorf("stream_data for unexpected stream id %q (expected %q)", sd.ID, streamID) + } + + // 수신 측 ARQ: Seq 에 따라 분기 + switch { + case sd.Seq == expectedSeq: + // 기대하던 순서의 프레임: 바로 bodyBuf 에 기록하고, 이후 버퍼된 연속 프레임도 flush. + if len(sd.Data) > 0 { + if _, err := bodyBuf.Write(sd.Data); err != nil { + return fmt.Errorf("buffer stream_data: %w", err) + } + } + expectedSeq++ + for { + data, ok := received[expectedSeq] + if !ok { + break + } + if len(data) > 0 { + if _, err := bodyBuf.Write(data); err != nil { + return fmt.Errorf("buffer reordered stream_data: %w", err) + } + } + delete(received, expectedSeq) + delete(lost, expectedSeq) + expectedSeq++ + } + + // AckSeq 이전 구간의 lost 항목 정리 + for seq := range lost { + if seq < expectedSeq { + delete(lost, seq) + } + } + + case sd.Seq > expectedSeq: + // 앞선 일부 Seq 들이 누락된 상태: 현재 프레임을 버퍼링하고 missing seq 들을 lost 에 추가. + if len(sd.Data) > 0 { + buf := make([]byte, len(sd.Data)) + copy(buf, sd.Data) + received[sd.Seq] = buf + } + for seq := expectedSeq; seq < sd.Seq && len(lost) < maxLostReport; seq++ { + if _, ok := lost[seq]; !ok { + lost[seq] = struct{}{} + } + } + + default: + // sd.Seq < expectedSeq 인 경우: 이미 처리했거나 Ack 로 커버된 프레임 → 무시. + } + + // 수신 측 StreamAck 전송: + // - AckSeq: 0부터 시작해 연속으로 수신 완료한 마지막 시퀀스 (expectedSeq-1) + // - LostSeqs: 현재 윈도우 내에서 누락된 시퀀스 중 상한 개수(maxLostReport)까지만 포함 + var ackSeq uint64 + if expectedSeq == 0 { + ackSeq = 0 + } else { + ackSeq = expectedSeq - 1 + } + + lostSeqs := make([]uint64, 0, len(lost)) + for seq := range lost { + if seq >= expectedSeq { + lostSeqs = append(lostSeqs, seq) + } + } + if len(lostSeqs) > 0 { + sort.Slice(lostSeqs, func(i, j int) bool { return lostSeqs[i] < lostSeqs[j] }) + if len(lostSeqs) > maxLostReport { + lostSeqs = lostSeqs[:maxLostReport] + } + } + + ackEnv := protocol.Envelope{ + Type: protocol.MessageTypeStreamAck, + StreamAck: &protocol.StreamAck{ + ID: streamID, + AckSeq: ackSeq, + LostSeqs: lostSeqs, + }, + } + if err := codec.Encode(sess, &ackEnv); err != nil { + return fmt.Errorf("send stream ack: %w", err) + } + + case protocol.MessageTypeStreamClose: + sc := env.StreamClose + if sc == nil { + return fmt.Errorf("stream_close payload is nil") + } + if sc.ID != streamID { + return fmt.Errorf("stream_close for unexpected stream id %q (expected %q)", sc.ID, streamID) + } + // sc.Error 는 최소 구현에서는 로컬 요청 에러와 별도로 취급하지 않습니다. (ko) + // For the minimal implementation we do not surface sc.Error here. (en) + goto haveBody + default: + return fmt.Errorf("unexpected envelope type %q while reading stream request body", env.Type) + } + } + +haveBody: + bodyBytes := bodyBuf.Bytes() + + // 로컬 HTTP 요청 생성 (stream 기반 요청을 실제 HTTP 요청으로 변환). (ko) + // Build the local HTTP request from the stream-based metadata and body. (en) + req, err := http.NewRequestWithContext(ctx, method, u.String(), nil) + if err != nil { + return fmt.Errorf("create http request from stream: %w", err) + } + if len(bodyBytes) > 0 { + buf := bytes.NewReader(bodyBytes) + req.Body = io.NopCloser(buf) + req.ContentLength = int64(len(bodyBytes)) + } + req.Header = httpHeader + + start := time.Now() + logReq := log.With(logging.Fields{ + "request_id": string(streamID), + "service": so.Service, + "method": method, + "url": urlStr, + "stream_id": string(streamID), + "local_target": p.LocalTarget, + }) + logReq.Info("received stream_open envelope from server", nil) + + res, err := p.HTTPClient.Do(req) + if err != nil { + // 로컬 요청 실패 시, 502 + 에러 메시지를 스트림 응답으로 전송합니다. (ko) + // On local request failure, send a 502 response over the stream. (en) + errMsg := fmt.Sprintf("perform http request: %v", err) + streamRespHeader := map[string][]string{ + "Content-Type": {"text/plain; charset=utf-8"}, + protocol.HeaderKeyStatus: {strconv.Itoa(http.StatusBadGateway)}, + } + respOpen := protocol.Envelope{ + Type: protocol.MessageTypeStreamOpen, + StreamOpen: &protocol.StreamOpen{ + ID: streamID, + Service: so.Service, + TargetAddr: so.TargetAddr, + Header: streamRespHeader, + }, + } + if err2 := codec.Encode(sess, &respOpen); err2 != nil { + logReq.Error("failed to encode stream response open envelope (error path)", logging.Fields{ + "error": err2.Error(), + }) + return err2 + } + + dataEnv := protocol.Envelope{ + Type: protocol.MessageTypeStreamData, + StreamData: &protocol.StreamData{ + ID: streamID, + Seq: 0, + Data: []byte("HopGate: " + errMsg), + }, + } + // 에러 응답 프레임도 ARQ 대상에 등록합니다. + sender.register(0, dataEnv.StreamData.Data) + if err2 := codec.Encode(sess, &dataEnv); err2 != nil { + logReq.Error("failed to encode stream response data envelope (error path)", logging.Fields{ + "error": err2.Error(), + }) + return err2 + } + + closeEnv := protocol.Envelope{ + Type: protocol.MessageTypeStreamClose, + StreamClose: &protocol.StreamClose{ + ID: streamID, + Error: errMsg, + }, + } + if err2 := codec.Encode(sess, &closeEnv); err2 != nil { + logReq.Error("failed to encode stream response close envelope (error path)", logging.Fields{ + "error": err2.Error(), + }) + return err2 + } + + logReq.Error("local http request failed (stream)", logging.Fields{ + "error": err.Error(), + }) + return nil + } + defer res.Body.Close() + + // 응답을 StreamOpen + StreamData(4KiB chunk) + StreamClose 프레임으로 전송합니다. (ko) + // Send the response as StreamOpen + StreamData (4KiB chunks) + StreamClose frames. (en) + + // 응답 헤더 맵을 복사하고 상태 코드를 pseudo-header 로 추가합니다. (ko) + // Copy response headers and attach status code as a pseudo-header. (en) + streamRespHeader := make(map[string][]string, len(res.Header)+1) + for k, vs := range res.Header { + streamRespHeader[k] = append([]string(nil), vs...) + } + statusCode := res.StatusCode + if statusCode == 0 { + statusCode = http.StatusOK + } + streamRespHeader[protocol.HeaderKeyStatus] = []string{strconv.Itoa(statusCode)} + + respOpen := protocol.Envelope{ + Type: protocol.MessageTypeStreamOpen, + StreamOpen: &protocol.StreamOpen{ + ID: streamID, + Service: so.Service, + TargetAddr: so.TargetAddr, + Header: streamRespHeader, + }, + } + + if err := codec.Encode(sess, &respOpen); err != nil { + logReq.Error("failed to encode stream response open envelope", logging.Fields{ + "error": err.Error(), + }) + return err + } + + // 응답 바디를 4KiB(StreamChunkSize) 단위로 잘라 StreamData 프레임으로 전송합니다. (ko) + // Chunk the response body into 4KiB (StreamChunkSize) StreamData frames. (en) + var seq uint64 + chunk := make([]byte, protocol.StreamChunkSize) + for { + n, err := res.Body.Read(chunk) + if n > 0 { + dataCopy := append([]byte(nil), chunk[:n]...) + // 송신 측 ARQ: Seq 별 payload 를 기록해 두었다가, StreamAck 의 LostSeqs 를 기반으로 재전송할 수 있습니다. + sender.register(seq, dataCopy) + + dataEnv := protocol.Envelope{ + Type: protocol.MessageTypeStreamData, + StreamData: &protocol.StreamData{ + ID: streamID, + Seq: seq, + Data: dataCopy, + }, + } + if err2 := codec.Encode(sess, &dataEnv); err2 != nil { + logReq.Error("failed to encode stream response data envelope", logging.Fields{ + "error": err2.Error(), + }) + return err2 + } + seq++ + } + if err == io.EOF { + break + } + if err != nil { + return fmt.Errorf("read http response body for streaming: %w", err) + } + } + + closeEnv := protocol.Envelope{ + Type: protocol.MessageTypeStreamClose, + StreamClose: &protocol.StreamClose{ + ID: streamID, + Error: "", + }, + } + + if err := codec.Encode(sess, &closeEnv); err != nil { + logReq.Error("failed to encode stream response close envelope", logging.Fields{ + "error": err.Error(), + }) + return err + } + + logReq.Info("stream http response sent to server", logging.Fields{ + "status": statusCode, + "elapsed_ms": time.Since(start).Milliseconds(), + "error": "", + }) + + return nil } // forwardToLocal 는 protocol.Request 를 로컬 HTTP 요청으로 변환하고 protocol.Response 를 채웁니다. (ko) @@ -192,11 +705,65 @@ func (p *ClientProxy) forwardToLocal(ctx context.Context, preq *protocol.Request for k, vs := range res.Header { presp.Header[k] = append([]string(nil), vs...) } - body, err := io.ReadAll(res.Body) + + // DTLS over UDP has an upper bound on packet size (~64KiB). 전체 HTTP 바디를 + // 하나의 Envelope 로 감싸 전송하는 현재 설계에서는, 바디가 너무 크면 + // OS 레벨에서 "message too long" (EMSGSIZE) 가 발생할 수 있습니다. (ko) + // + // 이를 피하기 위해, 터널링 가능한 **단일 HTTP 바디** 크기에 상한을 두고, + // 이를 초과하는 응답은 502 Bad Gateway + HopGate 전용 에러 메시지로 대체합니다. (ko) + // + // DTLS over UDP has an upper bound on datagram size (~64KiB). With the current + // single-envelope design, very large bodies can still trigger "message too long" + // (EMSGSIZE) at the OS level. To avoid this, we cap the tunneled HTTP body size + // and replace oversized responses with a 502 Bad Gateway + HopGate-specific + // error body. (en) + // + // Protobuf 기반 터널링에서는 향후 StreamData(4KiB) 단위로 나누어 전송할 예정이지만, + // 그 전 단계에서도 body 자체를 4KiB( StreamChunkSize )로 하드 리밋하여 + // Proto message body 필드가 지나치게 커지지 않도록 합니다. (ko) + // + // Even before full stream tunneling is implemented, we hard-limit the protobuf + // body field to 4KiB (StreamChunkSize) so that individual messages remain small. (en) + const maxTunnelBodyBytes = protocol.StreamChunkSize + + limited := &io.LimitedReader{ + R: res.Body, + N: maxTunnelBodyBytes + 1, // read up to limit+1 to detect overflow + } + body, err := io.ReadAll(limited) if err != nil { return fmt.Errorf("read http response body: %w", err) } + if len(body) > maxTunnelBodyBytes { + // 응답 바디가 너무 커서 DTLS/UDP 로 안전하게 전송하기 어렵기 때문에, + // 원본 바디 대신 HopGate 에러 응답으로 대체합니다. (ko) + // + // The response body is too large to be safely tunneled over DTLS/UDP. + // Replace it with a HopGate error response instead of attempting to + // send an oversized datagram. (en) + presp.Status = http.StatusBadGateway + presp.Header = map[string][]string{ + "Content-Type": {"text/plain; charset=utf-8"}, + } + presp.Body = []byte("HopGate: response body too large for DTLS tunnel (over max_tunnel_body_bytes)") + presp.Error = "response body too large for DTLS tunnel" + return nil + } + presp.Body = body return nil } + +// firstHeaderValue 는 주어진 키의 첫 번째 헤더 값을 반환하고, 없으면 기본값을 반환합니다. (ko) +// firstHeaderValue returns the first header value for a key, or a default if absent. (en) +func firstHeaderValue(hdr map[string][]string, key, def string) string { + if hdr == nil { + return def + } + if vs, ok := hdr[key]; ok && len(vs) > 0 { + return vs[0] + } + return def +} diff --git a/progress.md b/progress.md index 5fe2135..c396244 100644 --- a/progress.md +++ b/progress.md @@ -7,12 +7,14 @@ This document tracks implementation progress against the HopGate architecture an ## 1. High-level Status / 상위 수준 상태 -- 아키텍처 문서 및 README 정리 완료 (ko/en 병기). - Architecture and README are documented in both Korean and English. -- 서버/클라이언트 엔트리 포인트, DTLS 핸드셰이크, 기본 PostgreSQL/ent 스키마까지 1차 뼈대 구현 완료. - First skeleton implementation is done for server/client entrypoints, DTLS handshake, and basic PostgreSQL/ent schema. -- 실제 Proxy 동작(HTTP ↔ DTLS 터널링), Admin API의 비즈니스 로직, 실 ACME 연동 등은 아직 남아 있음. - Actual proxying (HTTP ↔ DTLS tunneling), admin API business logic, and real ACME integration are still pending. +- 아키텍처 문서 및 README 정리 완료 (ko/en 병기). + Architecture and README are documented in both Korean and English. +- 서버/클라이언트 엔트리 포인트, DTLS 핸드셰이크, 기본 PostgreSQL/ent 스키마까지 1차 뼈대 구현 완료. + First skeleton implementation is done for server/client entrypoints, DTLS handshake, and basic PostgreSQL/ent schema. +- 기본 Proxy 동작(HTTP ↔ DTLS 터널링), Admin API 비즈니스 로직, ACME 기반 인증서 관리는 구현 완료된 상태. + Core proxying (HTTP ↔ DTLS tunneling), admin API business logic, and ACME-based certificate management are implemented. +- 스트림 ARQ, Observability, Hardening, ACME 고급 전략 등은 아직 남아 있는 다음 단계 작업이다. + Stream-level ARQ, observability, hardening, and advanced ACME operational strategies remain as next-step work items. --- @@ -39,12 +41,12 @@ This document tracks implementation progress against the HopGate architecture an ### 2.2 Server / Client Entrypoints -- 서버 메인: [`cmd/server/main.go`](cmd/server/main.go) - - 서버 설정 로드 (`LoadServerConfigFromEnv`). - - PostgreSQL 연결 및 ent 스키마 init (`store.OpenPostgresFromEnv`). - - Debug 모드 시 self-signed localhost cert 생성 (`dtls.NewSelfSignedLocalhostConfig`). - - DTLS 서버 생성 (`dtls.NewPionServer`) 및 Accept + Handshake 루프 (`PerformServerHandshake`). - - DummyDomainValidator 사용해 도메인/API Key 조합을 임시로 모두 허용. +- 서버 메인: [`cmd/server/main.go`](cmd/server/main.go) + - 서버 설정 로드 (`LoadServerConfigFromEnv`). + - PostgreSQL 연결 및 ent 스키마 init (`store.OpenPostgresFromEnv`). + - Debug 모드 시 self-signed localhost cert 생성 (`dtls.NewSelfSignedLocalhostConfig`). + - DTLS 서버 생성 (`dtls.NewPionServer`) 및 Accept + Handshake 루프 (`PerformServerHandshake`). + - ent 기반 `DomainValidator` + `domainGateValidator` 를 사용해 `(domain, client_api_key)` 조합과 DNS/IP(옵션) 검증을 수행. - 클라이언트 메인: [`cmd/client/main.go`](cmd/client/main.go) - CLI + env 병합 설정 (우선순위: CLI > env). @@ -115,13 +117,13 @@ This document tracks implementation progress against the HopGate architecture an - `RegisterDomain(ctx, domain, memo) (clientAPIKey string, err error)` - `UnregisterDomain(ctx, domain, clientAPIKey string) error` -- HTTP Handler: [`internal/admin/http.go`](internal/admin/http.go) - - `Authorization: Bearer {ADMIN_API_KEY}` 검증. +- HTTP Handler: [`internal/admin/http.go`](internal/admin/http.go) + - `Authorization: Bearer {ADMIN_API_KEY}` 검증. - 엔드포인트: - - `POST /api/v1/admin/domains/register` - - `POST /api/v1/admin/domains/unregister` - - JSON request/response 구조 정의 및 기본 에러 처리. - - 아직 실제 서비스/라우터 wiring, ent 기반 구현 미완성. + - `POST /api/v1/admin/domains/register` + - `POST /api/v1/admin/domains/unregister` + - JSON request/response 구조 정의 및 기본 에러 처리. + - 실제 서비스(`DomainService`) 및 라우터 wiring, ent 기반 구현이 완료되어 도메인 등록/해제가 동작. --- @@ -224,10 +226,11 @@ This document tracks implementation progress against the HopGate architecture an ### 3.3 Proxy Core / HTTP Tunneling -- [x] 서버 측 Proxy 구현 확장: [`internal/proxy/server.go`](internal/proxy/server.go) - - HTTP/HTTPS 리스너와 DTLS 세션 매핑 구현. - - `Router` 구현체 추가 (도메인/패스 → 클라이언트/서비스). - - 요청/응답을 `internal/protocol` 구조체로 직렬화/역직렬화. +- [ ] 서버 측 Proxy 구현 확장: [`internal/proxy/server.go`](internal/proxy/server.go) + - 현재 `ServerProxy` / `Router` 인터페이스와 `NewHTTPServer` 만 정의되어 있고, + 실제 HTTP/HTTPS 리스너와 DTLS 세션 매핑 로직은 [`cmd/server/main.go`](cmd/server/main.go) 의 + `newHTTPHandler` / `dtlsSessionWrapper.ForwardHTTP` 안에 위치합니다. + - Proxy 코어 로직을 proxy 레이어로 이동하는 리팩터링은 아직 진행되지 않았습니다. (3.6 항목과 연동) - [x] 클라이언트 측 Proxy 구현 확장: [`internal/proxy/client.go`](internal/proxy/client.go) - DTLS 세션에서 `protocol.Request` 수신 → 로컬 HTTP 호출 → `protocol.Response` 전송 루프 구현. @@ -240,6 +243,164 @@ This document tracks implementation progress against the HopGate architecture an - [x] 클라이언트 main 에 Proxy loop wiring 추가: [`cmd/client/main.go`](cmd/client/main.go) - handshake 성공 후 `proxy.ClientProxy.StartLoop` 실행. +#### 3.3A Stream-based DTLS Tunneling / 스트림 기반 DTLS 터널링 + +초기 HTTP 터널링 설계는 **단일 JSON Envelope + 단일 DTLS 쓰기** 방식(요청/응답 바디 전체를 한 번에 전송)이었고, +대용량 응답 바디에서 UDP MTU 한계로 인한 `sendto: message too long` 문제가 발생할 수 있었습니다. +이 한계를 제거하기 위해, 현재 코드는 DTLS 위 애플리케이션 프로토콜을 **스트림/프레임 기반**으로 재설계하여 `StreamOpen` / `StreamData` / `StreamClose` 를 사용합니다. +The initial tunneling model used a **single JSON envelope + single DTLS write per HTTP message**, which could hit UDP MTU limits (`sendto: message too long`) for large bodies. +The current implementation uses a **stream/frame-based** protocol over DTLS (`StreamOpen` / `StreamData` / `StreamClose`), and this section documents its constraints and further improvements (e.g. ARQ). + +고려해야 할 제약 / Constraints: + +- 전송 계층은 DTLS(pion/dtls)를 유지합니다. + The transport layer must remain DTLS (pion/dtls). +- JSON 기반 단일 Envelope 모델에서 벗어나, HTTP 바디를 안전한 크기의 chunk 로 나누어 전송해야 합니다. + We must move away from the single-envelope JSON model and chunk HTTP bodies under a safe MTU. +- UDP 특성상 일부 프레임 손실/오염에 대비해, **해당 chunk 만 재전송 요청할 수 있는 ARQ 메커니즘**이 필요합니다. + Given UDP characteristics, we need an application-level ARQ so that **only lost/corrupted chunks are retransmitted**. + +아래 단계들은 `feature/udp-stream` 브랜치에서 구현할 구체적인 작업 항목입니다. +The following tasks describe concrete work items to be implemented on the `feature/udp-stream` branch. + +--- + +##### 3.3A.1 스트림 프레이밍 프로토콜 설계 (JSON 1단계) +##### 3.3A.1 Stream framing protocol (JSON, phase 1) + +- [x] 스트림 프레임 타입 정리 및 확장: [`internal/protocol/protocol.go`](internal/protocol/protocol.go:35) + - 이미 정의된 스트림 관련 타입을 1단계에서 적극 활용합니다. + Reuse the already defined stream-related types in phase 1: + - `MessageTypeStreamOpen`, `MessageTypeStreamData`, `MessageTypeStreamClose` + - [`Envelope`](internal/protocol/protocol.go:52), [`StreamOpen`](internal/protocol/protocol.go:69), [`StreamData`](internal/protocol/protocol.go:80), [`StreamClose`](internal/protocol/protocol.go:86) + - `StreamData` 에 per-stream 시퀀스 번호를 추가합니다. + Add a per-stream sequence number to `StreamData`: + - 예시 / Example: + ```go + type StreamData struct { + ID StreamID `json:"id"` + Seq uint64 `json:"seq"` // 0부터 시작하는 per-stream sequence + Data []byte `json:"data"` + } + ``` + +- [x] 스트림 ACK / 재전송 제어 메시지 추가: [`internal/protocol/protocol.go`](internal/protocol/protocol.go:52) + - 선택적 재전송(Selective Retransmission)을 위해 `StreamAck` 메시지와 `MessageTypeStreamAck` 를 추가합니다. + Add `StreamAck` message and `MessageTypeStreamAck` for selective retransmission: + ```go + const ( + MessageTypeStreamAck MessageType = "stream_ack" + ) + + type StreamAck struct { + ID StreamID `json:"id"` // 대상 스트림 / target stream + AckSeq uint64 `json:"ack_seq"` // 연속으로 수신 완료한 마지막 Seq / last contiguous sequence + LostSeqs []uint64 `json:"lost_seqs"` // 누락된 시퀀스 목록(선택) / optional list of missing seqs + WindowSize uint32 `json:"window_size"` // 선택: 허용 in-flight 프레임 수 / optional receive window + } + ``` + - [`Envelope`](internal/protocol/protocol.go:52)에 `StreamAck *StreamAck` 필드를 추가합니다. + Extend `Envelope` with a `StreamAck *StreamAck` field. + +- [x] MTU-safe chunk 크기 정의 + - DTLS/UDP 헤더 및 Protobuf/length-prefix 오버헤드를 고려해 안전한 payload 크기(4KiB)를 상수로 정의합니다. + Define a safe payload size constant (4KiB) considering DTLS/UDP headers and Protobuf/length-prefix framing. + - 이 값은 [`internal/protocol/protocol.go`](internal/protocol/protocol.go:32) 의 `StreamChunkSize` 로 정의되었습니다. + Implemented as `StreamChunkSize` in [`internal/protocol/protocol.go`](internal/protocol/protocol.go:32). + - 이후 HTTP 바디 스트림 터널링 구현 시, 모든 `StreamData.Data` 는 이 크기 이하 chunk 로 잘라 전송해야 합니다. + In the stream tunneling implementation, every `StreamData.Data` must be sliced into chunks no larger than this size. + +--- + +##### 3.3A.2 애플리케이션 레벨 ARQ 설계 (Selective Retransmission) +##### 3.3A.2 Application-level ARQ (Selective Retransmission) + +- [x] 수신 측 ARQ 상태 관리 구현 + - 스트림별로 `expectedSeq`, out-of-order chunk 버퍼(`received`), 누락 시퀀스 집합(`lost`)을 유지하면서, + in-order / out-of-order 프레임을 구분해 HTTP 바디 버퍼에 순서대로 쌓습니다. + - For each stream, maintain `expectedSeq`, an out-of-order buffer (`received`), and a lost-sequence set (`lost`), + delivering in-order frames directly to the HTTP body buffer while buffering/reordering out-of-order ones. + +- [x] 수신 측 StreamAck 전송 정책 구현 + - 각 `StreamData` 수신 시점에 `AckSeq = expectedSeq - 1` 과 현재 윈도우에서 누락된 시퀀스 일부(`LostSeqs`, 상한 개수 적용)를 포함한 + `StreamAck{AckSeq, LostSeqs}` 를 전송해 선택적 재전송을 유도합니다. + - On every `StreamData` frame, send `StreamAck{AckSeq, LostSeqs}` where `AckSeq = expectedSeq - 1` and `LostSeqs` + contains a bounded set (up to a fixed limit) of missing sequence numbers in the current receive window. + +- [x] 송신 측 재전송 로직 구현 (StreamAck 기반) + - 응답 스트림 송신 측에서 스트림별 `streamSender` 를 두고, `outstanding[seq] = payload` 로 아직 Ack 되지 않은 프레임을 추적합니다. + - `StreamAck{AckSeq, LostSeqs}` 수신 시: + - `seq <= AckSeq` 인 항목은 모두 제거하고, + - `LostSeqs` 에 포함된 시퀀스에 대해서만 `StreamData{ID, Seq, Data}` 를 재전송합니다. + - A per-stream `streamSender` tracks `outstanding[seq] = payload` for unacknowledged frames. Upon receiving + `StreamAck{AckSeq, LostSeqs}`, it deletes all `seq <= AckSeq` and retransmits only frames whose sequence + numbers appear in `LostSeqs`. + +> Note: 현재 구현은 StreamAck 기반 **선택적 재전송(Selective Retransmission)** 까지 포함하며, +> 별도의 RTO(재전송 타이머) 기반 백그라운드 재전송 루프는 향후 확장 여지로 남겨둔 상태입니다. +> Note: The current implementation covers StreamAck-based **selective retransmission**; a separate RTO-based +> background retransmission loop is left as a potential future enhancement. + +--- + +##### 3.3A.3 HTTP ↔ 스트림 매핑 (서버/클라이언트) +##### 3.3A.3 HTTP ↔ stream mapping (server/client) + +- [x] 서버 → 클라이언트 요청 스트림: [`cmd/server/main.go`](cmd/server/main.go:200) + - `ForwardHTTP` 는 스트림 기반 HTTP 요청/응답을 처리하도록 구현되어 있으며, 동작은 다음과 같습니다. + `ForwardHTTP` is implemented in stream mode and behaves as follows: + - HTTP 요청 수신 시: + - 새로운 `StreamID` 를 발급합니다 (세션별 증가). + Generate a new `StreamID` per incoming HTTP request on the DTLS session. + - `StreamOpen` 전송: + - 요청 메서드/URL/헤더를 [`StreamOpen`](internal/protocol/protocol.go:69) 의 `Header` 혹은 pseudo-header 로 encode. + Encode method/URL/headers into `StreamOpen.Header` or a pseudo-header scheme. + - 요청 바디를 읽으면서 `StreamData{ID, Seq, Data}` 를 지속적으로 전송합니다. + Read the HTTP request body and send it as a sequence of `StreamData` frames. + - 바디 종료 시 `StreamClose{ID, Error:""}` 를 전송합니다. + When the body ends, send `StreamClose{ID, Error:""}`. + - 응답 수신: + - 클라이언트에서 오는 역방향 `StreamOpen` 으로 HTTP status/header 를 수신하고, + 이를 `http.ResponseWriter` 에 반영합니다. + Receive response status/headers via reverse-direction `StreamOpen` and map them to `http.ResponseWriter`. + - 연속되는 `StreamData` 를 수신할 때마다 `http.ResponseWriter.Write` 로 chunk 를 바로 전송합니다. + For each `StreamData`, write the chunk directly to the HTTP response. + - `StreamClose` 수신 시 응답 종료 및 스트림 자원 정리. + On `StreamClose`, finish the response and clean up per-stream state. + +- [x] 클라이언트에서의 요청 처리 스트림: [`internal/proxy/client.go`](internal/proxy/client.go:200) + - 서버로부터 들어오는 `StreamOpen{ID, ...}` 을 수신하면, + 새로운 goroutine 을 띄워 해당 ID에 대한 로컬 HTTP 요청을 수행합니다. + On receiving `StreamOpen{ID, ...}` from the server, spawn a goroutine to handle the local HTTP request for that stream ID. + - 스트림별로 `io.Pipe` 또는 채널 기반 바디 리더를 준비하고, + `StreamData` 프레임을 수신할 때마다 이 파이프에 쓰도록 합니다. + Prepare an `io.Pipe` (or channel-backed reader) per stream and write incoming `StreamData` chunks into it. + - 로컬 HTTP 클라이언트 응답은 반대로: + For the local HTTP client response: + - 응답 status/header → `StreamOpen` (client → server) + - 응답 바디 → 여러 개의 `StreamData` + - 종료 시점에 `StreamClose` 전송 + Send `StreamOpen` (status/headers), then a sequence of `StreamData`, followed by `StreamClose` when done. + +--- + +##### 3.3A.4 JSON → 바이너리 직렬화로의 잠재적 전환 (2단계) +##### 3.3A.4 JSON → binary serialization (potential phase 2) + +- [x] JSON 기반 스트림 프로토콜의 1단계 구현/안정화 이후, 직렬화 포맷 재검토 및 Protobuf 전환 + - 현재는 JSON 대신 Protobuf length-prefix `Envelope` 포맷을 기본으로 사용합니다. + The runtime now uses a Protobuf-based, length-prefixed `Envelope` format instead of JSON. + - HTTP/스트림 payload 는 여전히 MTU-safe 크기(예: 4KiB, `StreamChunkSize`)로 제한되어 있어, 단일 프레임이 과도하게 커지지 않습니다. + HTTP/stream payloads remain bounded to an MTU-safe size (e.g. 4KiB via `StreamChunkSize`), so individual frames stay small. +- [x] length-prefix 이진 프레임(Protobuf)으로 전환 + - 동일한 logical model (`StreamOpen` / `StreamData(seq)` / `StreamClose` / `StreamAck`)을 유지한 채, + wire-format 을 Protobuf length-prefix binary 프레이밍으로 교체했고, 이는 `protobufCodec` 으로 구현되었습니다. + We now keep the same logical model while using Protobuf length-prefixed framing via `protobufCodec`. +- [x] 이 전환은 `internal/protocol` 내 직렬화 레이어를 얇은 abstraction 으로 감싸 구현했습니다. + - [`internal/protocol/codec.go`](internal/protocol/codec.go:130) 에 `WireCodec` 인터페이스와 Protobuf 기반 `DefaultCodec` 을 도입해, + 호출자는 `protocol.DefaultCodec` 만 사용하고, JSON codec 은 보조 용도로만 남아 있습니다. + In [`internal/protocol/codec.go`](internal/protocol/codec.go:130), the `WireCodec` abstraction and Protobuf-based `DefaultCodec` allow callers to use only `protocol.DefaultCodec` while JSON remains as an auxiliary codec. + --- ### 3.4 ACME Integration / ACME 연동 @@ -261,10 +422,10 @@ This document tracks implementation progress against the HopGate architecture an ### 3.5 Observability / 관측성 -- [ ] Prometheus 메트릭 노출 및 서버 wiring +- [x] Prometheus 메트릭 노출 및 서버 wiring - `cmd/server/main.go` 에 Prometheus `/metrics` 엔드포인트 추가 (예: promhttp.Handler). - - DTLS 세션 수, DTLS 핸드셰이크 성공/실패 수, HTTP/Proxy 요청 수 및 에러 수에 대한 카운터/게이지 메트릭 정의. - - 도메인, 클라이언트 ID, request_id 등의 라벨 설계 및 현재 구조적 로깅 필드와 일관성 유지. + - DTLS 핸드셰이크 성공/실패 수, HTTP 요청 수, HTTP 요청 지연, Proxy 에러 수에 대한 메트릭을 정의합니다. + - 메트릭 라벨은 메서드/상태 코드/결과/에러 타입 등에 한정되며, 도메인/클라이언트 ID/request_id 는 구조적 로그 필드로만 노출됩니다. - [ ] Loki/Grafana 대시보드 및 쿼리 예시 - Loki/Promtail 구성을 가정한 주요 로그 쿼리 예시 정리(도메인, 클라이언트 ID, request_id 기준). @@ -301,9 +462,11 @@ This document tracks implementation progress against the HopGate architecture an ### Milestone 2 — Full HTTP Tunneling (프락시 동작 완성) -- [ ] 서버 Proxy 코어 구현 및 HTTPS ↔ DTLS 라우팅. -- [ ] 클라이언트 Proxy 루프 구현 및 로컬 서비스 연동. -- [ ] End-to-end HTTP 요청/응답 터널링 E2E 테스트. +- [x] 서버 Proxy 코어 구현 및 HTTPS ↔ DTLS 라우팅. + - 현재 `cmd/server/main.go` 의 `newHTTPHandler` / `dtlsSessionWrapper.ForwardHTTP` 경로에서 동작합니다. +- [x] 클라이언트 Proxy 루프 구현 및 로컬 서비스 연동. + - `cmd/client/main.go` + [`ClientProxy.StartLoop()`](internal/proxy/client.go:59) 를 통해 DTLS 세션 위에서 로컬 서비스와 연동됩니다. +- [ ] End-to-end HTTP 요청/응답 터널링 E2E 테스트. ### Milestone 3 — ACME + TLS/DTLS 정식 인증 @@ -313,7 +476,10 @@ This document tracks implementation progress against the HopGate architecture an ### Milestone 4 — Observability & Hardening -- [ ] Prometheus/Loki/Grafana 통합. +- [ ] Prometheus/Loki/Grafana 통합. + - Prometheus 메트릭 정의 및 `/metrics` 엔드포인트는 이미 구현 및 동작 중이며, + Loki/Promtail/Grafana 대시보드 및 운영 통합 작업은 아직 남아 있습니다. + - [ ] 에러/리트라이/타임아웃 정책 정교화. - [ ] 보안/구성 최종 점검 및 문서화.