From eac39550e22e953d1cd51f93feb871989e78883c Mon Sep 17 00:00:00 2001 From: dalbodeule <11470513+dalbodeule@users.noreply.github.com> Date: Mon, 8 Dec 2025 21:25:26 +0900 Subject: [PATCH] [feat](protocol): extend Protobuf codec with stream-based message support - Added support for `StreamOpen`, `StreamData`, `StreamClose`, and `StreamAck` types in the Protobuf codec. - Defined new pseudo-header constants for HTTP-over-stream tunneling. - Introduced `StreamChunkSize` constant for MTU-safe payload sizes (4 KiB). - Updated encoding and decoding logic to handle stream-based types seamlessly. --- internal/protocol/codec.go | 162 ++++++++++++++++++++++++++++++++-- internal/protocol/protocol.go | 14 +++ 2 files changed, 171 insertions(+), 5 deletions(-) diff --git a/internal/protocol/codec.go b/internal/protocol/codec.go index d7e06c3..c982614 100644 --- a/internal/protocol/codec.go +++ b/internal/protocol/codec.go @@ -114,8 +114,7 @@ func (protobufCodec) Decode(r io.Reader, env *Envelope) error { var DefaultCodec WireCodec = protobufCodec{} // toProtoEnvelope 는 내부 Envelope 구조체를 Protobuf Envelope 로 변환합니다. -// 현재 구현은 MessageTypeHTTP (HTTPRequest/HTTPResponse) 만 지원하며, -// 스트림 관련 타입은 이후 스트림 터널링 구현 단계에서 확장합니다. +// 현재 구현은 HTTP 요청/응답 및 스트림 관련 타입(StreamOpen/StreamData/StreamClose/StreamAck)을 지원합니다. func toProtoEnvelope(env *Envelope) (*protocolpb.Envelope, error) { switch env.Type { case MessageTypeHTTP: @@ -164,15 +163,80 @@ func toProtoEnvelope(env *Envelope) (*protocolpb.Envelope, error) { }, nil } return nil, fmt.Errorf("protobuf codec: http envelope has neither request nor response") + case MessageTypeStreamOpen: + if env.StreamOpen == nil { + return nil, fmt.Errorf("protobuf codec: stream_open envelope missing payload") + } + so := env.StreamOpen + pbSO := &protocolpb.StreamOpen{ + Id: string(so.ID), + ServiceName: so.Service, + TargetAddr: so.TargetAddr, + Header: make(map[string]*protocolpb.HeaderValues, len(so.Header)), + } + for k, vs := range so.Header { + hv := &protocolpb.HeaderValues{ + Values: append([]string(nil), vs...), + } + pbSO.Header[k] = hv + } + return &protocolpb.Envelope{ + Payload: &protocolpb.Envelope_StreamOpen{ + StreamOpen: pbSO, + }, + }, nil + case MessageTypeStreamData: + if env.StreamData == nil { + return nil, fmt.Errorf("protobuf codec: stream_data envelope missing payload") + } + sd := env.StreamData + pbSD := &protocolpb.StreamData{ + Id: string(sd.ID), + Seq: sd.Seq, + Data: sd.Data, + } + return &protocolpb.Envelope{ + Payload: &protocolpb.Envelope_StreamData{ + StreamData: pbSD, + }, + }, nil + case MessageTypeStreamClose: + if env.StreamClose == nil { + return nil, fmt.Errorf("protobuf codec: stream_close envelope missing payload") + } + sc := env.StreamClose + pbSC := &protocolpb.StreamClose{ + Id: string(sc.ID), + Error: sc.Error, + } + return &protocolpb.Envelope{ + Payload: &protocolpb.Envelope_StreamClose{ + StreamClose: pbSC, + }, + }, nil + case MessageTypeStreamAck: + if env.StreamAck == nil { + return nil, fmt.Errorf("protobuf codec: stream_ack envelope missing payload") + } + sa := env.StreamAck + pbSA := &protocolpb.StreamAck{ + Id: string(sa.ID), + AckSeq: sa.AckSeq, + LostSeqs: append([]uint64(nil), sa.LostSeqs...), + WindowSize: sa.WindowSize, + } + return &protocolpb.Envelope{ + Payload: &protocolpb.Envelope_StreamAck{ + StreamAck: pbSA, + }, + }, nil default: - // 스트림 관련 타입은 아직 DTLS 스트림 터널링 구현 이전 단계이므로 지원하지 않습니다. - // Stream-based message types are not yet supported by the protobuf codec. return nil, fmt.Errorf("protobuf codec: unsupported envelope type %q", env.Type) } } // fromProtoEnvelope 는 Protobuf Envelope 를 내부 Envelope 구조체로 변환합니다. -// 현재 구현은 HTTP 요청/응답만 지원합니다. +// 현재 구현은 HTTP 요청/응답 및 스트림 관련 타입(StreamOpen/StreamData/StreamClose/StreamAck)을 지원합니다. func fromProtoEnvelope(pbEnv *protocolpb.Envelope, env *Envelope) error { switch payload := pbEnv.Payload.(type) { case *protocolpb.Envelope_HttpRequest: @@ -198,6 +262,10 @@ func fromProtoEnvelope(pbEnv *protocolpb.Envelope, env *Envelope) error { Body: append([]byte(nil), req.Body...), } env.HTTPResponse = nil + env.StreamOpen = nil + env.StreamData = nil + env.StreamClose = nil + env.StreamAck = nil return nil case *protocolpb.Envelope_HttpResponse: @@ -221,6 +289,90 @@ func fromProtoEnvelope(pbEnv *protocolpb.Envelope, env *Envelope) error { Error: resp.Error, } env.HTTPRequest = nil + env.StreamOpen = nil + env.StreamData = nil + env.StreamClose = nil + env.StreamAck = nil + return nil + + case *protocolpb.Envelope_StreamOpen: + so := payload.StreamOpen + if so == nil { + return fmt.Errorf("protobuf codec: stream_open payload is nil") + } + hdr := make(map[string][]string, len(so.Header)) + for k, hv := range so.Header { + if hv == nil { + continue + } + hdr[k] = append([]string(nil), hv.Values...) + } + env.Type = MessageTypeStreamOpen + env.StreamOpen = &StreamOpen{ + ID: StreamID(so.Id), + Service: so.ServiceName, + TargetAddr: so.TargetAddr, + Header: hdr, + } + env.StreamData = nil + env.StreamClose = nil + env.StreamAck = nil + env.HTTPRequest = nil + env.HTTPResponse = nil + return nil + + case *protocolpb.Envelope_StreamData: + sd := payload.StreamData + if sd == nil { + return fmt.Errorf("protobuf codec: stream_data payload is nil") + } + env.Type = MessageTypeStreamData + env.StreamData = &StreamData{ + ID: StreamID(sd.Id), + Seq: sd.Seq, + Data: append([]byte(nil), sd.Data...), + } + env.StreamOpen = nil + env.StreamClose = nil + env.StreamAck = nil + env.HTTPRequest = nil + env.HTTPResponse = nil + return nil + + case *protocolpb.Envelope_StreamClose: + sc := payload.StreamClose + if sc == nil { + return fmt.Errorf("protobuf codec: stream_close payload is nil") + } + env.Type = MessageTypeStreamClose + env.StreamClose = &StreamClose{ + ID: StreamID(sc.Id), + Error: sc.Error, + } + env.StreamOpen = nil + env.StreamData = nil + env.StreamAck = nil + env.HTTPRequest = nil + env.HTTPResponse = nil + return nil + + case *protocolpb.Envelope_StreamAck: + sa := payload.StreamAck + if sa == nil { + return fmt.Errorf("protobuf codec: stream_ack payload is nil") + } + env.Type = MessageTypeStreamAck + env.StreamAck = &StreamAck{ + ID: StreamID(sa.Id), + AckSeq: sa.AckSeq, + LostSeqs: append([]uint64(nil), sa.LostSeqs...), + WindowSize: sa.WindowSize, + } + env.StreamOpen = nil + env.StreamData = nil + env.StreamClose = nil + env.HTTPRequest = nil + env.HTTPResponse = nil return nil default: diff --git a/internal/protocol/protocol.go b/internal/protocol/protocol.go index fb7446b..5805950 100644 --- a/internal/protocol/protocol.go +++ b/internal/protocol/protocol.go @@ -32,6 +32,11 @@ type Response struct { // MessageType 은 DTLS 위에서 교환되는 상위 레벨 메시지 종류를 나타냅니다. type MessageType string +// StreamChunkSize 는 스트림 터널링 시 단일 StreamData 프레임에 담을 최대 payload 크기입니다. +// 현재 구현에서는 4KiB 로 고정하여 DTLS/UDP MTU 한계를 여유 있게 피하도록 합니다. +// StreamChunkSize is the maximum payload size per StreamData frame (4KiB). +const StreamChunkSize = 4 * 1024 + const ( // MessageTypeHTTP 는 기존 단일 HTTP 요청/응답 메시지를 의미합니다. // 이 경우 HTTPRequest / HTTPResponse 필드를 사용합니다. @@ -76,6 +81,15 @@ type Envelope struct { // StreamID 는 스트림(예: 특정 WebSocket 연결 또는 TCP 커넥션)을 구분하기 위한 식별자입니다. type StreamID string +// HTTP-over-stream 터널링에서 사용되는 pseudo-header 키 상수입니다. +// These pseudo-header keys are used when tunneling HTTP over the stream protocol. +const ( + HeaderKeyMethod = "X-HopGate-Method" + HeaderKeyURL = "X-HopGate-URL" + HeaderKeyHost = "X-HopGate-Host" + HeaderKeyStatus = "X-HopGate-Status" +) + // StreamOpen 은 새로운 스트림을 여는 요청을 나타냅니다. type StreamOpen struct { ID StreamID `json:"id"`