[feat](protocol): extend Protobuf codec with stream-based message support

- Added support for `StreamOpen`, `StreamData`, `StreamClose`, and `StreamAck` types in the Protobuf codec.
- Defined new pseudo-header constants for HTTP-over-stream tunneling.
- Introduced `StreamChunkSize` constant for MTU-safe payload sizes (4 KiB).
- Updated encoding and decoding logic to handle stream-based types seamlessly.
This commit is contained in:
dalbodeule
2025-12-08 21:25:26 +09:00
parent 99be2d2e31
commit eac39550e2
2 changed files with 171 additions and 5 deletions

View File

@@ -114,8 +114,7 @@ func (protobufCodec) Decode(r io.Reader, env *Envelope) error {
var DefaultCodec WireCodec = protobufCodec{} var DefaultCodec WireCodec = protobufCodec{}
// toProtoEnvelope 는 내부 Envelope 구조체를 Protobuf Envelope 로 변환합니다. // toProtoEnvelope 는 내부 Envelope 구조체를 Protobuf Envelope 로 변환합니다.
// 현재 구현은 MessageTypeHTTP (HTTPRequest/HTTPResponse) 만 지원하며, // 현재 구현은 HTTP 요청/응답 및 스트림 관련 타입(StreamOpen/StreamData/StreamClose/StreamAck)을 지원합니다.
// 스트림 관련 타입은 이후 스트림 터널링 구현 단계에서 확장합니다.
func toProtoEnvelope(env *Envelope) (*protocolpb.Envelope, error) { func toProtoEnvelope(env *Envelope) (*protocolpb.Envelope, error) {
switch env.Type { switch env.Type {
case MessageTypeHTTP: case MessageTypeHTTP:
@@ -164,15 +163,80 @@ func toProtoEnvelope(env *Envelope) (*protocolpb.Envelope, error) {
}, nil }, nil
} }
return nil, fmt.Errorf("protobuf codec: http envelope has neither request nor response") return nil, fmt.Errorf("protobuf codec: http envelope has neither request nor response")
case MessageTypeStreamOpen:
if env.StreamOpen == nil {
return nil, fmt.Errorf("protobuf codec: stream_open envelope missing payload")
}
so := env.StreamOpen
pbSO := &protocolpb.StreamOpen{
Id: string(so.ID),
ServiceName: so.Service,
TargetAddr: so.TargetAddr,
Header: make(map[string]*protocolpb.HeaderValues, len(so.Header)),
}
for k, vs := range so.Header {
hv := &protocolpb.HeaderValues{
Values: append([]string(nil), vs...),
}
pbSO.Header[k] = hv
}
return &protocolpb.Envelope{
Payload: &protocolpb.Envelope_StreamOpen{
StreamOpen: pbSO,
},
}, nil
case MessageTypeStreamData:
if env.StreamData == nil {
return nil, fmt.Errorf("protobuf codec: stream_data envelope missing payload")
}
sd := env.StreamData
pbSD := &protocolpb.StreamData{
Id: string(sd.ID),
Seq: sd.Seq,
Data: sd.Data,
}
return &protocolpb.Envelope{
Payload: &protocolpb.Envelope_StreamData{
StreamData: pbSD,
},
}, nil
case MessageTypeStreamClose:
if env.StreamClose == nil {
return nil, fmt.Errorf("protobuf codec: stream_close envelope missing payload")
}
sc := env.StreamClose
pbSC := &protocolpb.StreamClose{
Id: string(sc.ID),
Error: sc.Error,
}
return &protocolpb.Envelope{
Payload: &protocolpb.Envelope_StreamClose{
StreamClose: pbSC,
},
}, nil
case MessageTypeStreamAck:
if env.StreamAck == nil {
return nil, fmt.Errorf("protobuf codec: stream_ack envelope missing payload")
}
sa := env.StreamAck
pbSA := &protocolpb.StreamAck{
Id: string(sa.ID),
AckSeq: sa.AckSeq,
LostSeqs: append([]uint64(nil), sa.LostSeqs...),
WindowSize: sa.WindowSize,
}
return &protocolpb.Envelope{
Payload: &protocolpb.Envelope_StreamAck{
StreamAck: pbSA,
},
}, nil
default: default:
// 스트림 관련 타입은 아직 DTLS 스트림 터널링 구현 이전 단계이므로 지원하지 않습니다.
// Stream-based message types are not yet supported by the protobuf codec.
return nil, fmt.Errorf("protobuf codec: unsupported envelope type %q", env.Type) return nil, fmt.Errorf("protobuf codec: unsupported envelope type %q", env.Type)
} }
} }
// fromProtoEnvelope 는 Protobuf Envelope 를 내부 Envelope 구조체로 변환합니다. // fromProtoEnvelope 는 Protobuf Envelope 를 내부 Envelope 구조체로 변환합니다.
// 현재 구현은 HTTP 요청/응답 지원합니다. // 현재 구현은 HTTP 요청/응답 및 스트림 관련 타입(StreamOpen/StreamData/StreamClose/StreamAck)을 지원합니다.
func fromProtoEnvelope(pbEnv *protocolpb.Envelope, env *Envelope) error { func fromProtoEnvelope(pbEnv *protocolpb.Envelope, env *Envelope) error {
switch payload := pbEnv.Payload.(type) { switch payload := pbEnv.Payload.(type) {
case *protocolpb.Envelope_HttpRequest: case *protocolpb.Envelope_HttpRequest:
@@ -198,6 +262,10 @@ func fromProtoEnvelope(pbEnv *protocolpb.Envelope, env *Envelope) error {
Body: append([]byte(nil), req.Body...), Body: append([]byte(nil), req.Body...),
} }
env.HTTPResponse = nil env.HTTPResponse = nil
env.StreamOpen = nil
env.StreamData = nil
env.StreamClose = nil
env.StreamAck = nil
return nil return nil
case *protocolpb.Envelope_HttpResponse: case *protocolpb.Envelope_HttpResponse:
@@ -221,6 +289,90 @@ func fromProtoEnvelope(pbEnv *protocolpb.Envelope, env *Envelope) error {
Error: resp.Error, Error: resp.Error,
} }
env.HTTPRequest = nil env.HTTPRequest = nil
env.StreamOpen = nil
env.StreamData = nil
env.StreamClose = nil
env.StreamAck = nil
return nil
case *protocolpb.Envelope_StreamOpen:
so := payload.StreamOpen
if so == nil {
return fmt.Errorf("protobuf codec: stream_open payload is nil")
}
hdr := make(map[string][]string, len(so.Header))
for k, hv := range so.Header {
if hv == nil {
continue
}
hdr[k] = append([]string(nil), hv.Values...)
}
env.Type = MessageTypeStreamOpen
env.StreamOpen = &StreamOpen{
ID: StreamID(so.Id),
Service: so.ServiceName,
TargetAddr: so.TargetAddr,
Header: hdr,
}
env.StreamData = nil
env.StreamClose = nil
env.StreamAck = nil
env.HTTPRequest = nil
env.HTTPResponse = nil
return nil
case *protocolpb.Envelope_StreamData:
sd := payload.StreamData
if sd == nil {
return fmt.Errorf("protobuf codec: stream_data payload is nil")
}
env.Type = MessageTypeStreamData
env.StreamData = &StreamData{
ID: StreamID(sd.Id),
Seq: sd.Seq,
Data: append([]byte(nil), sd.Data...),
}
env.StreamOpen = nil
env.StreamClose = nil
env.StreamAck = nil
env.HTTPRequest = nil
env.HTTPResponse = nil
return nil
case *protocolpb.Envelope_StreamClose:
sc := payload.StreamClose
if sc == nil {
return fmt.Errorf("protobuf codec: stream_close payload is nil")
}
env.Type = MessageTypeStreamClose
env.StreamClose = &StreamClose{
ID: StreamID(sc.Id),
Error: sc.Error,
}
env.StreamOpen = nil
env.StreamData = nil
env.StreamAck = nil
env.HTTPRequest = nil
env.HTTPResponse = nil
return nil
case *protocolpb.Envelope_StreamAck:
sa := payload.StreamAck
if sa == nil {
return fmt.Errorf("protobuf codec: stream_ack payload is nil")
}
env.Type = MessageTypeStreamAck
env.StreamAck = &StreamAck{
ID: StreamID(sa.Id),
AckSeq: sa.AckSeq,
LostSeqs: append([]uint64(nil), sa.LostSeqs...),
WindowSize: sa.WindowSize,
}
env.StreamOpen = nil
env.StreamData = nil
env.StreamClose = nil
env.HTTPRequest = nil
env.HTTPResponse = nil
return nil return nil
default: default:

View File

@@ -32,6 +32,11 @@ type Response struct {
// MessageType 은 DTLS 위에서 교환되는 상위 레벨 메시지 종류를 나타냅니다. // MessageType 은 DTLS 위에서 교환되는 상위 레벨 메시지 종류를 나타냅니다.
type MessageType string type MessageType string
// StreamChunkSize 는 스트림 터널링 시 단일 StreamData 프레임에 담을 최대 payload 크기입니다.
// 현재 구현에서는 4KiB 로 고정하여 DTLS/UDP MTU 한계를 여유 있게 피하도록 합니다.
// StreamChunkSize is the maximum payload size per StreamData frame (4KiB).
const StreamChunkSize = 4 * 1024
const ( const (
// MessageTypeHTTP 는 기존 단일 HTTP 요청/응답 메시지를 의미합니다. // MessageTypeHTTP 는 기존 단일 HTTP 요청/응답 메시지를 의미합니다.
// 이 경우 HTTPRequest / HTTPResponse 필드를 사용합니다. // 이 경우 HTTPRequest / HTTPResponse 필드를 사용합니다.
@@ -76,6 +81,15 @@ type Envelope struct {
// StreamID 는 스트림(예: 특정 WebSocket 연결 또는 TCP 커넥션)을 구분하기 위한 식별자입니다. // StreamID 는 스트림(예: 특정 WebSocket 연결 또는 TCP 커넥션)을 구분하기 위한 식별자입니다.
type StreamID string type StreamID string
// HTTP-over-stream 터널링에서 사용되는 pseudo-header 키 상수입니다.
// These pseudo-header keys are used when tunneling HTTP over the stream protocol.
const (
HeaderKeyMethod = "X-HopGate-Method"
HeaderKeyURL = "X-HopGate-URL"
HeaderKeyHost = "X-HopGate-Host"
HeaderKeyStatus = "X-HopGate-Status"
)
// StreamOpen 은 새로운 스트림을 여는 요청을 나타냅니다. // StreamOpen 은 새로운 스트림을 여는 요청을 나타냅니다.
type StreamOpen struct { type StreamOpen struct {
ID StreamID `json:"id"` ID StreamID `json:"id"`