From 412b59f420d9f72857ad6bd1f149b750312b1fa9 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 9 Dec 2025 13:59:02 +0000 Subject: [PATCH 1/8] Initial plan From 1292df33e521a7a606786680055ff7ac582f0148 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 9 Dec 2025 14:07:15 +0000 Subject: [PATCH 2/8] Fix DTLS buffer size issue by wrapping sessions with buffered readers - Add dtlsReadBufferSize constant (8KB) matching pion/dtls limits - Wrap DTLS sessions with bufio.Reader in client and server code - Update tests to use buffered readers for datagram-based connections - All tests passing successfully Co-authored-by: dalbodeule <11470513+dalbodeule@users.noreply.github.com> --- cmd/server/main.go | 15 ++++++++++----- internal/protocol/codec.go | 18 ++++++++++++++++++ internal/protocol/codec_test.go | 17 +++++++++++------ internal/proxy/client.go | 23 +++++++++++++---------- 4 files changed, 52 insertions(+), 21 deletions(-) diff --git a/cmd/server/main.go b/cmd/server/main.go index 5f7f08f..117a492 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -1,6 +1,7 @@ package main import ( + "bufio" "bytes" "context" "crypto/tls" @@ -34,9 +35,10 @@ import ( var version = "dev" type dtlsSessionWrapper struct { - sess dtls.Session - mu sync.Mutex - nextStreamID uint64 + sess dtls.Session + bufferedReader *bufio.Reader + mu sync.Mutex + nextStreamID uint64 } func getEnvOrPanic(logger logging.Logger, key string) string { @@ -302,7 +304,7 @@ func (w *dtlsSessionWrapper) ForwardHTTP(ctx context.Context, logger logging.Log for { var env protocol.Envelope - if err := codec.Decode(w.sess, &env); err != nil { + if err := codec.Decode(w.bufferedReader, &env); err != nil { log.Error("failed to decode stream response envelope", logging.Fields{ "error": err.Error(), }) @@ -502,7 +504,10 @@ func registerSessionForDomain(domain string, sess dtls.Session, logger logging.L if d == "" { return } - w := &dtlsSessionWrapper{sess: sess} + w := &dtlsSessionWrapper{ + sess: sess, + bufferedReader: bufio.NewReaderSize(sess, protocol.GetDTLSReadBufferSize()), + } sessionsMu.Lock() sessionsByDomain[d] = w sessionsMu.Unlock() diff --git a/internal/protocol/codec.go b/internal/protocol/codec.go index ad84634..5a3940f 100644 --- a/internal/protocol/codec.go +++ b/internal/protocol/codec.go @@ -16,6 +16,16 @@ import ( // This matches existing 64KiB readers used around DTLS sessions (used by the JSON codec). const defaultDecoderBufferSize = 64 * 1024 +// dtlsReadBufferSize 는 pion/dtls 내부 버퍼 한계에 맞춘 읽기 버퍼 크기입니다. +// pion/dtls 의 UnpackDatagram 함수는 8KB (8,192 bytes) 의 기본 수신 버퍼를 사용합니다. +// DTLS는 UDP 기반이므로 한 번의 Read()에서 전체 datagram을 읽어야 하며, +// 이 크기를 초과하는 DTLS 레코드는 처리되지 않습니다. +// dtlsReadBufferSize matches the pion/dtls internal buffer limit. +// pion/dtls's UnpackDatagram function uses an 8KB (8,192 bytes) receive buffer. +// Since DTLS is UDP-based, the entire datagram must be read in a single Read() call, +// and DTLS records exceeding this size cannot be processed. +const dtlsReadBufferSize = 8 * 1024 // 8KB + // maxProtoEnvelopeBytes 는 단일 Protobuf Envelope 의 최대 크기에 대한 보수적 상한입니다. // 아직 하드 리미트로 사용하지는 않지만, 향후 방어적 체크에 사용할 수 있습니다. const maxProtoEnvelopeBytes = 512 * 1024 // 512KiB, 충분히 여유 있는 값 @@ -141,6 +151,14 @@ func (protobufCodec) Decode(r io.Reader, env *Envelope) error { // 서버와 클라이언트가 모두 이 버전을 사용해야 wire-format 이 일치합니다. var DefaultCodec WireCodec = protobufCodec{} +// GetDTLSReadBufferSize 는 DTLS 세션 읽기에 사용할 버퍼 크기를 반환합니다. +// 이 값은 pion/dtls 내부 버퍼 한계(8KB)에 맞춰져 있습니다. +// GetDTLSReadBufferSize returns the buffer size to use for reading from DTLS sessions. +// This value is aligned with pion/dtls's internal buffer limit (8KB). +func GetDTLSReadBufferSize() int { + return dtlsReadBufferSize +} + // toProtoEnvelope 는 내부 Envelope 구조체를 Protobuf Envelope 로 변환합니다. // 현재 구현은 HTTP 요청/응답 및 스트림 관련 타입(StreamOpen/StreamData/StreamClose/StreamAck)을 지원합니다. func toProtoEnvelope(env *Envelope) (*protocolpb.Envelope, error) { diff --git a/internal/protocol/codec_test.go b/internal/protocol/codec_test.go index b2b6001..6ca0fff 100644 --- a/internal/protocol/codec_test.go +++ b/internal/protocol/codec_test.go @@ -1,6 +1,7 @@ package protocol import ( + "bufio" "bytes" "io" "testing" @@ -80,9 +81,11 @@ func TestProtobufCodecDatagramBehavior(t *testing.T) { t.Fatalf("Message too short: %d bytes", len(msg)) } - // Decode the envelope + // Decode the envelope using a buffered reader (as we do in actual code) + // to handle datagram-based reading properly + reader := bufio.NewReaderSize(conn, GetDTLSReadBufferSize()) var decodedEnv Envelope - if err := codec.Decode(conn, &decodedEnv); err != nil { + if err := codec.Decode(reader, &decodedEnv); err != nil { t.Fatalf("Failed to decode envelope: %v", err) } @@ -132,9 +135,10 @@ func TestProtobufCodecStreamData(t *testing.T) { t.Fatalf("Expected 1 message, got %d", len(conn.messages)) } - // Decode + // Decode using a buffered reader (as we do in actual code) + reader := bufio.NewReaderSize(conn, GetDTLSReadBufferSize()) var decodedEnv Envelope - if err := codec.Decode(conn, &decodedEnv); err != nil { + if err := codec.Decode(reader, &decodedEnv); err != nil { t.Fatalf("Failed to decode StreamData: %v", err) } @@ -208,10 +212,11 @@ func TestProtobufCodecMultipleMessages(t *testing.T) { t.Fatalf("Expected %d messages, got %d", len(envelopes), len(conn.messages)) } - // Decode and verify all messages + // Decode and verify all messages using a buffered reader (as we do in actual code) + reader := bufio.NewReaderSize(conn, GetDTLSReadBufferSize()) for i := 0; i < len(envelopes); i++ { var decoded Envelope - if err := codec.Decode(conn, &decoded); err != nil { + if err := codec.Decode(reader, &decoded); err != nil { t.Fatalf("Failed to decode message %d: %v", i, err) } if decoded.Type != envelopes[i].Type { diff --git a/internal/proxy/client.go b/internal/proxy/client.go index c43be7a..7682316 100644 --- a/internal/proxy/client.go +++ b/internal/proxy/client.go @@ -1,6 +1,7 @@ package proxy import ( + "bufio" "bytes" "context" "fmt" @@ -151,14 +152,16 @@ func (p *ClientProxy) StartLoop(ctx context.Context, sess dtls.Session) error { log := p.Logger // NOTE: pion/dtls 는 복호화된 애플리케이션 데이터를 호출자가 제공한 버퍼에 채워 넣습니다. - // 기본 JSON 디코더 버퍼(수백 바이트 수준)만 사용하면 큰 HTTP 바디/Envelope 에서 - // "dtls: buffer too small" 오류가 날 수 있으므로, 여기서는 여유 있는 버퍼(64KiB)를 사용합니다. (ko) + // DTLS는 UDP 기반이므로 한 번의 Read()에서 전체 datagram을 읽어야 하며, + // pion/dtls 내부 버퍼 한계(8KB)를 초과하는 메시지는 "dtls: buffer too small" 오류를 발생시킵니다. + // 이를 방지하기 위해 DTLS 세션을 bufio.Reader로 감싸서 datagram을 완전히 읽어들인 후 파싱합니다. (ko) // NOTE: pion/dtls decrypts application data into the buffer provided by the caller. - // Using only the default JSON decoder buffer (a few hundred bytes) can trigger - // "dtls: buffer too small" for large HTTP bodies/envelopes. The default - // JSON-based WireCodec internally wraps the DTLS session with a 64KiB - // bufio.Reader, matching this requirement. (en) + // Since DTLS is UDP-based, the entire datagram must be read in a single Read() call, + // and messages exceeding pion/dtls's internal buffer limit (8KB) will trigger + // "dtls: buffer too small" errors. To prevent this, we wrap the DTLS session with + // a bufio.Reader to fully read the datagram before parsing. (en) codec := protocol.DefaultCodec + bufferedReader := bufio.NewReaderSize(sess, protocol.GetDTLSReadBufferSize()) for { select { @@ -171,7 +174,7 @@ func (p *ClientProxy) StartLoop(ctx context.Context, sess dtls.Session) error { } var env protocol.Envelope - if err := codec.Decode(sess, &env); err != nil { + if err := codec.Decode(bufferedReader, &env); err != nil { if err == io.EOF { log.Info("dtls session closed by server", nil) return nil @@ -191,7 +194,7 @@ func (p *ClientProxy) StartLoop(ctx context.Context, sess dtls.Session) error { return err } case protocol.MessageTypeStreamOpen: - if err := p.handleStreamRequest(ctx, sess, &env); err != nil { + if err := p.handleStreamRequest(ctx, sess, bufferedReader, &env); err != nil { log.Error("failed to handle stream http envelope", logging.Fields{ "error": err.Error(), }) @@ -303,7 +306,7 @@ func (p *ClientProxy) handleHTTPEnvelope(ctx context.Context, sess dtls.Session, // handleStreamRequest 는 StreamOpen/StreamData/StreamClose 기반 HTTP 요청/응답 스트림을 처리합니다. (ko) // handleStreamRequest handles an HTTP request/response exchange using StreamOpen/StreamData/StreamClose frames. (en) -func (p *ClientProxy) handleStreamRequest(ctx context.Context, sess dtls.Session, openEnv *protocol.Envelope) error { +func (p *ClientProxy) handleStreamRequest(ctx context.Context, sess dtls.Session, reader io.Reader, openEnv *protocol.Envelope) error { codec := protocol.DefaultCodec log := p.Logger @@ -365,7 +368,7 @@ func (p *ClientProxy) handleStreamRequest(ctx context.Context, sess dtls.Session for { var env protocol.Envelope - if err := codec.Decode(sess, &env); err != nil { + if err := codec.Decode(reader, &env); err != nil { if err == io.EOF { return fmt.Errorf("unexpected EOF while reading stream request body") } From ff38ef2828e55d9a3e482cace39ac37789c89b5f Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 9 Dec 2025 14:22:29 +0000 Subject: [PATCH 3/8] Fix concurrent request handling with stream multiplexing - Add channel-based multiplexing to handle concurrent HTTP requests - Implement background readLoop to dispatch responses to correct streams - Remove mutex bottleneck that was serializing all requests - Fixes "unexpected stream_data/stream_open for id" errors with concurrent requests Co-authored-by: dalbodeule <11470513+dalbodeule@users.noreply.github.com> --- cmd/server/main.go | 279 ++++++++++++++++++++++++++++++++------------- 1 file changed, 201 insertions(+), 78 deletions(-) diff --git a/cmd/server/main.go b/cmd/server/main.go index 117a492..18a25c3 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -34,11 +34,31 @@ import ( // 기본값 "dev" 는 로컬 개발용입니다. var version = "dev" +// streamResponse collects the complete response for a single HTTP stream request +type streamResponse struct { + statusCode int + header map[string][]string + body bytes.Buffer + err error +} + +// pendingRequest tracks a request waiting for its response +type pendingRequest struct { + streamID protocol.StreamID + respCh chan *protocol.Envelope + doneCh chan struct{} +} + type dtlsSessionWrapper struct { sess dtls.Session bufferedReader *bufio.Reader - mu sync.Mutex - nextStreamID uint64 + codec protocol.WireCodec + logger logging.Logger + + mu sync.Mutex + nextStreamID uint64 + pending map[protocol.StreamID]*pendingRequest + readerDone chan struct{} } func getEnvOrPanic(logger logging.Logger, key string) string { @@ -176,21 +196,114 @@ func parseExpectedIPsFromEnv(logger logging.Logger, envKey string) []net.IP { // ForwardHTTP 는 HTTP 요청을 DTLS 세션 위의 StreamOpen/StreamData/StreamClose 프레임으로 전송하고, // 역방향 스트림 응답을 수신해 protocol.Response 로 반환합니다. (ko) +// readLoop continuously reads from the DTLS session and dispatches incoming frames +// to the appropriate pending request based on stream ID +func (w *dtlsSessionWrapper) readLoop() { + defer close(w.readerDone) + + for { + var env protocol.Envelope + if err := w.codec.Decode(w.bufferedReader, &env); err != nil { + if err == io.EOF { + w.logger.Info("dtls session closed", nil) + } else { + w.logger.Error("failed to decode envelope in read loop", logging.Fields{ + "error": err.Error(), + }) + } + // Notify all pending requests of the error + w.mu.Lock() + for _, pending := range w.pending { + close(pending.respCh) + } + w.pending = make(map[protocol.StreamID]*pendingRequest) + w.mu.Unlock() + return + } + + // Determine the stream ID from the envelope + var streamID protocol.StreamID + switch env.Type { + case protocol.MessageTypeStreamOpen: + if env.StreamOpen != nil { + streamID = env.StreamOpen.ID + } + case protocol.MessageTypeStreamData: + if env.StreamData != nil { + streamID = env.StreamData.ID + } + case protocol.MessageTypeStreamClose: + if env.StreamClose != nil { + streamID = env.StreamClose.ID + } + default: + w.logger.Warn("received unexpected envelope type in read loop", logging.Fields{ + "type": env.Type, + }) + continue + } + + if streamID == "" { + w.logger.Warn("received envelope with empty stream ID", logging.Fields{ + "type": env.Type, + }) + continue + } + + // Find the pending request for this stream ID + w.mu.Lock() + pending := w.pending[streamID] + w.mu.Unlock() + + if pending == nil { + w.logger.Warn("received envelope for unknown stream ID", logging.Fields{ + "stream_id": streamID, + "type": env.Type, + }) + continue + } + + // Send the envelope to the waiting request + select { + case pending.respCh <- &env: + // Successfully delivered + case <-pending.doneCh: + // Request was cancelled or timed out + w.logger.Warn("pending request already closed", logging.Fields{ + "stream_id": streamID, + }) + } + } +} + // ForwardHTTP forwards an HTTP request over the DTLS session using StreamOpen/StreamData/StreamClose // frames and reconstructs the reverse stream into a protocol.Response. (en) +// This method now supports concurrent requests by using a channel-based multiplexing approach. func (w *dtlsSessionWrapper) ForwardHTTP(ctx context.Context, logger logging.Logger, req *http.Request, serviceName string) (*protocol.Response, error) { - w.mu.Lock() - defer w.mu.Unlock() - if ctx == nil { ctx = context.Background() } - codec := protocol.DefaultCodec - - // 세션 내에서 고유한 StreamID 를 생성합니다. (ko) - // Generate a unique StreamID for this HTTP request within the DTLS session. (en) + // Generate a unique stream ID (needs mutex for nextStreamID) + w.mu.Lock() streamID := w.nextHTTPStreamID() + + // Create a pending request to receive responses + pending := &pendingRequest{ + streamID: streamID, + respCh: make(chan *protocol.Envelope, 16), // Buffered to avoid blocking readLoop + doneCh: make(chan struct{}), + } + w.pending[streamID] = pending + w.mu.Unlock() + + // Ensure cleanup on exit + defer func() { + w.mu.Lock() + delete(w.pending, streamID) + w.mu.Unlock() + close(pending.doneCh) + }() log := logger.With(logging.Fields{ "component": "http_to_dtls", @@ -233,7 +346,7 @@ func (w *dtlsSessionWrapper) ForwardHTTP(ctx context.Context, logger logging.Log Header: hdr, }, } - if err := codec.Encode(w.sess, openEnv); err != nil { + if err := w.codec.Encode(w.sess, openEnv); err != nil { log.Error("failed to encode stream_open envelope", logging.Fields{ "error": err.Error(), }) @@ -257,7 +370,7 @@ func (w *dtlsSessionWrapper) ForwardHTTP(ctx context.Context, logger logging.Log Data: dataCopy, }, } - if err2 := codec.Encode(w.sess, dataEnv); err2 != nil { + if err2 := w.codec.Encode(w.sess, dataEnv); err2 != nil { log.Error("failed to encode stream_data envelope", logging.Fields{ "error": err2.Error(), }) @@ -283,7 +396,7 @@ func (w *dtlsSessionWrapper) ForwardHTTP(ctx context.Context, logger logging.Log Error: "", }, } - if err := codec.Encode(w.sess, closeReqEnv); err != nil { + if err := w.codec.Encode(w.sess, closeReqEnv); err != nil { log.Error("failed to encode request stream_close envelope", logging.Fields{ "error": err.Error(), }) @@ -291,7 +404,7 @@ func (w *dtlsSessionWrapper) ForwardHTTP(ctx context.Context, logger logging.Log } // 클라이언트로부터 역방향 스트림 응답을 수신합니다. (ko) - // Receive reverse stream response (StreamOpen + StreamData* + StreamClose). (en) + // Receive reverse stream response (StreamOpen + StreamData* + StreamClose) via the readLoop. (en) var ( resp protocol.Response bodyBuf bytes.Buffer @@ -303,79 +416,81 @@ func (w *dtlsSessionWrapper) ForwardHTTP(ctx context.Context, logger logging.Log resp.Header = make(map[string][]string) for { - var env protocol.Envelope - if err := codec.Decode(w.bufferedReader, &env); err != nil { - log.Error("failed to decode stream response envelope", logging.Fields{ - "error": err.Error(), + select { + case <-ctx.Done(): + log.Error("context cancelled while waiting for response", logging.Fields{ + "error": ctx.Err().Error(), }) - return nil, err - } + return nil, ctx.Err() - switch env.Type { - case protocol.MessageTypeStreamOpen: - so := env.StreamOpen - if so == nil { - return nil, fmt.Errorf("stream_open response payload is nil") + case <-w.readerDone: + log.Error("dtls session closed while waiting for response", nil) + return nil, fmt.Errorf("dtls session closed") + + case env, ok := <-pending.respCh: + if !ok { + // Channel closed, session is dead + log.Error("response channel closed unexpectedly", nil) + return nil, fmt.Errorf("response channel closed") } - if so.ID != streamID { - return nil, fmt.Errorf("unexpected stream_open for id %q (expected %q)", so.ID, streamID) - } - // 상태 코드 및 헤더 복원 (pseudo-header 제거). (ko) - // Restore status code and headers (strip pseudo-headers). (en) - statusStr := firstHeaderValue(so.Header, protocol.HeaderKeyStatus, strconv.Itoa(http.StatusOK)) - if sc, err := strconv.Atoi(statusStr); err == nil && sc > 0 { - statusCode = sc - } - for k, vs := range so.Header { - if k == protocol.HeaderKeyMethod || - k == protocol.HeaderKeyURL || - k == protocol.HeaderKeyHost || - k == protocol.HeaderKeyStatus { - continue + + switch env.Type { + case protocol.MessageTypeStreamOpen: + so := env.StreamOpen + if so == nil { + return nil, fmt.Errorf("stream_open response payload is nil") } - resp.Header[k] = append([]string(nil), vs...) - } - gotOpen = true - - case protocol.MessageTypeStreamData: - sd := env.StreamData - if sd == nil { - return nil, fmt.Errorf("stream_data response payload is nil") - } - if sd.ID != streamID { - return nil, fmt.Errorf("unexpected stream_data for id %q (expected %q)", sd.ID, streamID) - } - if len(sd.Data) > 0 { - if _, err := bodyBuf.Write(sd.Data); err != nil { - return nil, fmt.Errorf("buffer stream_data response: %w", err) + // 상태 코드 및 헤더 복원 (pseudo-header 제거). (ko) + // Restore status code and headers (strip pseudo-headers). (en) + statusStr := firstHeaderValue(so.Header, protocol.HeaderKeyStatus, strconv.Itoa(http.StatusOK)) + if sc, err := strconv.Atoi(statusStr); err == nil && sc > 0 { + statusCode = sc } - } + for k, vs := range so.Header { + if k == protocol.HeaderKeyMethod || + k == protocol.HeaderKeyURL || + k == protocol.HeaderKeyHost || + k == protocol.HeaderKeyStatus { + continue + } + resp.Header[k] = append([]string(nil), vs...) + } + gotOpen = true - case protocol.MessageTypeStreamClose: - sc := env.StreamClose - if sc == nil { - return nil, fmt.Errorf("stream_close response payload is nil") - } - if sc.ID != streamID { - return nil, fmt.Errorf("unexpected stream_close for id %q (expected %q)", sc.ID, streamID) - } - // 스트림 종료: 지금까지 수신한 헤더/바디로 protocol.Response 를 완성합니다. (ko) - // Stream finished: complete protocol.Response using collected headers/body. (en) - resp.Status = statusCode - resp.Body = bodyBuf.Bytes() - resp.Error = sc.Error + case protocol.MessageTypeStreamData: + sd := env.StreamData + if sd == nil { + return nil, fmt.Errorf("stream_data response payload is nil") + } + if len(sd.Data) > 0 { + if _, err := bodyBuf.Write(sd.Data); err != nil { + return nil, fmt.Errorf("buffer stream_data response: %w", err) + } + } - log.Info("received stream http response over dtls", logging.Fields{ - "status": resp.Status, - "error": resp.Error, - }) - if !gotOpen { - return nil, fmt.Errorf("received stream_close without prior stream_open for stream %q", streamID) - } - return &resp, nil + case protocol.MessageTypeStreamClose: + sc := env.StreamClose + if sc == nil { + return nil, fmt.Errorf("stream_close response payload is nil") + } + // 스트림 종료: 지금까지 수신한 헤더/바디로 protocol.Response 를 완성합니다. (ko) + // Stream finished: complete protocol.Response using collected headers/body. (en) + resp.Status = statusCode + resp.Body = bodyBuf.Bytes() + resp.Error = sc.Error - default: - return nil, fmt.Errorf("unexpected envelope type %q in stream response", env.Type) + log.Info("received stream http response over dtls", logging.Fields{ + "status": resp.Status, + "error": resp.Error, + }) + if !gotOpen { + return nil, fmt.Errorf("received stream_close without prior stream_open for stream %q", streamID) + } + return &resp, nil + + default: + return nil, fmt.Errorf("unexpected envelope type %q in stream response", env.Type) + } } } } @@ -507,7 +622,15 @@ func registerSessionForDomain(domain string, sess dtls.Session, logger logging.L w := &dtlsSessionWrapper{ sess: sess, bufferedReader: bufio.NewReaderSize(sess, protocol.GetDTLSReadBufferSize()), + codec: protocol.DefaultCodec, + logger: logger.With(logging.Fields{"component": "dtls_session_wrapper", "domain": d}), + pending: make(map[protocol.StreamID]*pendingRequest), + readerDone: make(chan struct{}), } + + // Start background reader goroutine to demultiplex incoming responses + go w.readLoop() + sessionsMu.Lock() sessionsByDomain[d] = w sessionsMu.Unlock() From 887c5fcdff591327a0ad3f13ddb82beb5ca3576d Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 9 Dec 2025 14:24:56 +0000 Subject: [PATCH 4/8] Address code review feedback: cleanup unused code and improve channel safety - Remove unused streamResponse struct - Add named constant for response channel buffer size - Add default case to prevent blocking in readLoop select - Fix potential double-close of channels during cleanup - Improve comments explaining concurrency patterns Co-authored-by: dalbodeule <11470513+dalbodeule@users.noreply.github.com> --- cmd/server/main.go | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/cmd/server/main.go b/cmd/server/main.go index 18a25c3..241a99b 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -34,14 +34,6 @@ import ( // 기본값 "dev" 는 로컬 개발용입니다. var version = "dev" -// streamResponse collects the complete response for a single HTTP stream request -type streamResponse struct { - statusCode int - header map[string][]string - body bytes.Buffer - err error -} - // pendingRequest tracks a request waiting for its response type pendingRequest struct { streamID protocol.StreamID @@ -211,7 +203,8 @@ func (w *dtlsSessionWrapper) readLoop() { "error": err.Error(), }) } - // Notify all pending requests of the error + // Notify all pending requests of the error by closing their response channels + // The doneCh will be closed by each ForwardHTTP's defer w.mu.Lock() for _, pending := range w.pending { close(pending.respCh) @@ -272,6 +265,12 @@ func (w *dtlsSessionWrapper) readLoop() { w.logger.Warn("pending request already closed", logging.Fields{ "stream_id": streamID, }) + default: + // Channel buffer full - shouldn't happen with proper sizing + w.logger.Warn("response channel buffer full, dropping frame", logging.Fields{ + "stream_id": streamID, + "type": env.Type, + }) } } } @@ -288,10 +287,15 @@ func (w *dtlsSessionWrapper) ForwardHTTP(ctx context.Context, logger logging.Log w.mu.Lock() streamID := w.nextHTTPStreamID() + // Channel buffer size for response frames to avoid blocking readLoop. + // A typical HTTP response has: 1 StreamOpen + N StreamData + 1 StreamClose frames. + // With 4KB chunks, even large responses stay within this buffer. + const responseChannelBuffer = 16 + // Create a pending request to receive responses pending := &pendingRequest{ streamID: streamID, - respCh: make(chan *protocol.Envelope, 16), // Buffered to avoid blocking readLoop + respCh: make(chan *protocol.Envelope, responseChannelBuffer), doneCh: make(chan struct{}), } w.pending[streamID] = pending From 56916c75f4d07fd93f99f57cc1ab73e9ca08b720 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 9 Dec 2025 14:44:31 +0000 Subject: [PATCH 5/8] Fix client handling of unexpected stream frames at top level - Add handling for StreamData and StreamClose at top level in client StartLoop - Log warning and continue instead of crashing on unexpected frames - Fixes "unsupported envelope type stream_close" error when frames arrive out of order Co-authored-by: dalbodeule <11470513+dalbodeule@users.noreply.github.com> --- internal/proxy/client.go | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/internal/proxy/client.go b/internal/proxy/client.go index 7682316..d92b96a 100644 --- a/internal/proxy/client.go +++ b/internal/proxy/client.go @@ -238,6 +238,30 @@ func (p *ClientProxy) StartLoop(ctx context.Context, sess dtls.Session) error { "seq": seq, }) } + case protocol.MessageTypeStreamData: + // StreamData received at top level (not expected, should be consumed by handleStreamRequest) + // This can happen if frames arrive out of order or if there's a protocol mismatch + log.Warn("received unexpected stream_data at top level, ignoring", logging.Fields{ + "stream_id": func() string { + if env.StreamData != nil { + return string(env.StreamData.ID) + } + return "unknown" + }(), + }) + continue + case protocol.MessageTypeStreamClose: + // StreamClose received at top level (not expected, should be consumed by handleStreamRequest) + // This can happen if frames arrive out of order or if there's a protocol mismatch + log.Warn("received unexpected stream_close at top level, ignoring", logging.Fields{ + "stream_id": func() string { + if env.StreamClose != nil { + return string(env.StreamClose.ID) + } + return "unknown" + }(), + }) + continue default: log.Error("received unsupported envelope type from server", logging.Fields{ "type": env.Type, From 446a265fa2d98094dc9a1528591cd0da896d47de Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 9 Dec 2025 14:46:00 +0000 Subject: [PATCH 6/8] Improve code readability in client stream frame handlers - Extract stream IDs before logging calls for better readability - Remove unnecessary anonymous functions - Address code review feedback Co-authored-by: dalbodeule <11470513+dalbodeule@users.noreply.github.com> --- internal/proxy/client.go | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/internal/proxy/client.go b/internal/proxy/client.go index d92b96a..c980d2b 100644 --- a/internal/proxy/client.go +++ b/internal/proxy/client.go @@ -241,25 +241,23 @@ func (p *ClientProxy) StartLoop(ctx context.Context, sess dtls.Session) error { case protocol.MessageTypeStreamData: // StreamData received at top level (not expected, should be consumed by handleStreamRequest) // This can happen if frames arrive out of order or if there's a protocol mismatch + streamID := "unknown" + if env.StreamData != nil { + streamID = string(env.StreamData.ID) + } log.Warn("received unexpected stream_data at top level, ignoring", logging.Fields{ - "stream_id": func() string { - if env.StreamData != nil { - return string(env.StreamData.ID) - } - return "unknown" - }(), + "stream_id": streamID, }) continue case protocol.MessageTypeStreamClose: // StreamClose received at top level (not expected, should be consumed by handleStreamRequest) // This can happen if frames arrive out of order or if there's a protocol mismatch + streamID := "unknown" + if env.StreamClose != nil { + streamID = string(env.StreamClose.ID) + } log.Warn("received unexpected stream_close at top level, ignoring", logging.Fields{ - "stream_id": func() string { - if env.StreamClose != nil { - return string(env.StreamClose.ID) - } - return "unknown" - }(), + "stream_id": streamID, }) continue default: From 05dfff21f667ad7f87c5c5147b5a83d2d925181c Mon Sep 17 00:00:00 2001 From: dalbodeule <11470513+dalbodeule@users.noreply.github.com> Date: Wed, 10 Dec 2025 01:12:58 +0900 Subject: [PATCH 7/8] [feat](server, protocol): add sender and receiver ARQ for reliable HTTP stream delivery - Implemented application-level ARQ with selective retransmission for server-to-client streams, leveraging `StreamAck` logic. - Added sender-side ARQ state in `streamSender` for tracking and resending unacknowledged frames. - Introduced receiver-side ARQ with `AckSeq` and `LostSeqs` for handling out-of-order and lost frames. - Enhanced `dtlsSessionWrapper` to support ARQ management and seamless stream-based DTLS tunneling. --- cmd/server/main.go | 274 +++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 266 insertions(+), 8 deletions(-) diff --git a/cmd/server/main.go b/cmd/server/main.go index 241a99b..0a89d85 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -12,6 +12,7 @@ import ( "net/http" "os" "path/filepath" + "sort" "strconv" "strings" "sync" @@ -41,6 +42,72 @@ type pendingRequest struct { doneCh chan struct{} } +// streamSender 는 특정 스트림에 대해 전송한 StreamData 프레임의 payload 를 +// 시퀀스 번호별로 보관하여, peer 로부터의 StreamAck 를 기반으로 선택적 재전송을 +// 수행하기 위한 송신 측 ARQ 상태를 나타냅니다. (ko) +// streamSender keeps outstanding StreamData payloads per sequence number so that +// they can be selectively retransmitted based on StreamAck from the peer. (en) +type streamSender struct { + mu sync.Mutex + outstanding map[uint64][]byte +} + +func newStreamSender() *streamSender { + return &streamSender{ + outstanding: make(map[uint64][]byte), + } +} + +func (s *streamSender) register(seq uint64, data []byte) { + s.mu.Lock() + defer s.mu.Unlock() + + if s.outstanding == nil { + s.outstanding = make(map[uint64][]byte) + } + buf := make([]byte, len(data)) + copy(buf, data) + s.outstanding[seq] = buf +} + +// handleAck 는 주어진 StreamAck 를 적용하여 AckSeq 이하의 프레임을 정리하고, +// LostSeqs 중 아직 outstanding 에 남아 있는 시퀀스의 payload 를 복사하여 +// 재전송 대상 목록으로 반환합니다. (ko) +// handleAck applies the given StreamAck, removes frames up to AckSeq, and +// returns copies of payloads for LostSeqs that are still outstanding so that +// they can be retransmitted. (en) +func (s *streamSender) handleAck(ack *protocol.StreamAck) map[uint64][]byte { + s.mu.Lock() + defer s.mu.Unlock() + + if s.outstanding == nil { + return nil + } + + // 연속 수신 완료 구간(seq <= AckSeq)은 outstanding 에서 제거합니다. + for seq := range s.outstanding { + if seq <= ack.AckSeq { + delete(s.outstanding, seq) + } + } + + // LostSeqs 가 비어 있으면 재전송할 것이 없습니다. + if len(ack.LostSeqs) == 0 { + return nil + } + + // LostSeqs 중 아직 outstanding 에 남아 있는 것만 재전송 대상으로 선택합니다. + lost := make(map[uint64][]byte, len(ack.LostSeqs)) + for _, seq := range ack.LostSeqs { + if data, ok := s.outstanding[seq]; ok { + buf := make([]byte, len(data)) + copy(buf, data) + lost[seq] = buf + } + } + return lost +} + type dtlsSessionWrapper struct { sess dtls.Session bufferedReader *bufio.Reader @@ -51,6 +118,48 @@ type dtlsSessionWrapper struct { nextStreamID uint64 pending map[protocol.StreamID]*pendingRequest readerDone chan struct{} + + // streamSenders 는 서버 → 클라이언트 방향 HTTP 요청 바디 전송에 대한 + // 송신 측 ARQ 상태를 보관합니다. (ko) + // streamSenders keeps ARQ sender state for HTTP request bodies sent + // from server to client. (en) + streamSenders map[protocol.StreamID]*streamSender +} + +// registerStreamSender 는 주어진 스트림 ID 에 대한 송신 측 ARQ 상태를 등록합니다. (ko) +// registerStreamSender registers the sender-side ARQ state for a given stream ID. (en) +func (w *dtlsSessionWrapper) registerStreamSender(id protocol.StreamID, sender *streamSender) { + w.mu.Lock() + defer w.mu.Unlock() + + if w.streamSenders == nil { + w.streamSenders = make(map[protocol.StreamID]*streamSender) + } + w.streamSenders[id] = sender +} + +// unregisterStreamSender 는 더 이상 사용하지 않는 스트림 ID 에 대한 송신 측 ARQ 상태를 제거합니다. (ko) +// unregisterStreamSender removes the sender-side ARQ state for a stream ID that is no longer used. (en) +func (w *dtlsSessionWrapper) unregisterStreamSender(id protocol.StreamID) { + w.mu.Lock() + defer w.mu.Unlock() + + if w.streamSenders == nil { + return + } + delete(w.streamSenders, id) +} + +// getStreamSender 는 주어진 스트림 ID 에 대한 송신 측 ARQ 상태를 반환합니다. (ko) +// getStreamSender returns the sender-side ARQ state for the given stream ID, if any. (en) +func (w *dtlsSessionWrapper) getStreamSender(id protocol.StreamID) *streamSender { + w.mu.Lock() + defer w.mu.Unlock() + + if w.streamSenders == nil { + return nil + } + return w.streamSenders[id] } func getEnvOrPanic(logger logging.Logger, key string) string { @@ -189,7 +298,8 @@ func parseExpectedIPsFromEnv(logger logging.Logger, envKey string) []net.IP { // ForwardHTTP 는 HTTP 요청을 DTLS 세션 위의 StreamOpen/StreamData/StreamClose 프레임으로 전송하고, // 역방향 스트림 응답을 수신해 protocol.Response 로 반환합니다. (ko) // readLoop continuously reads from the DTLS session and dispatches incoming frames -// to the appropriate pending request based on stream ID +// to the appropriate pending request based on stream ID. It also handles +// application-level ARQ (StreamAck) for request bodies sent from server to client. (en) func (w *dtlsSessionWrapper) readLoop() { defer close(w.readerDone) @@ -203,8 +313,8 @@ func (w *dtlsSessionWrapper) readLoop() { "error": err.Error(), }) } - // Notify all pending requests of the error by closing their response channels - // The doneCh will be closed by each ForwardHTTP's defer + // Notify all pending requests of the error by closing their response channels. + // The doneCh will be closed by each ForwardHTTP's defer. w.mu.Lock() for _, pending := range w.pending { close(pending.respCh) @@ -214,7 +324,53 @@ func (w *dtlsSessionWrapper) readLoop() { return } - // Determine the stream ID from the envelope + // 1) StreamAck 처리: 서버 → 클라이언트 방향 요청 바디 전송에 대한 ARQ. (ko) + // 1) Handle StreamAck: application-level ARQ for request bodies + // sent from server to client. (en) + if env.Type == protocol.MessageTypeStreamAck { + sa := env.StreamAck + if sa == nil { + w.logger.Warn("received stream_ack envelope with nil payload", logging.Fields{}) + continue + } + streamID := sa.ID + sender := w.getStreamSender(streamID) + if sender == nil { + w.logger.Warn("received stream_ack for unknown stream ID", logging.Fields{ + "stream_id": streamID, + }) + continue + } + lost := sender.handleAck(sa) + for seq, data := range lost { + retryEnv := protocol.Envelope{ + Type: protocol.MessageTypeStreamData, + StreamData: &protocol.StreamData{ + ID: streamID, + Seq: seq, + Data: data, + }, + } + if err := w.codec.Encode(w.sess, &retryEnv); err != nil { + w.logger.Error("failed to retransmit stream_data after stream_ack", logging.Fields{ + "stream_id": streamID, + "seq": seq, + "error": err.Error(), + }) + // 세션 쓰기 오류가 발생하면 루프를 종료하여 상위에서 세션 종료를 유도합니다. (ko) + // On write error, stop the loop so that the caller can tear down the session. (en) + return + } + } + // StreamAck 는 애플리케이션 페이로드를 포함하지 않으므로 pending 에 전달하지 않습니다. (ko) + // StreamAck carries no application payload, so it is not forwarded to pending requests. (en) + continue + } + + // 2) StreamOpen / StreamData / StreamClose 에 대해 stream ID 를 산출하고, + // 해당 pending 요청으로 전달합니다. (ko) + // 2) For StreamOpen / StreamData / StreamClose, determine the stream ID + // and forward to the corresponding pending request. (en) var streamID protocol.StreamID switch env.Type { case protocol.MessageTypeStreamOpen: @@ -286,7 +442,7 @@ func (w *dtlsSessionWrapper) ForwardHTTP(ctx context.Context, logger logging.Log // Generate a unique stream ID (needs mutex for nextStreamID) w.mu.Lock() streamID := w.nextHTTPStreamID() - + // Channel buffer size for response frames to avoid blocking readLoop. // A typical HTTP response has: 1 StreamOpen + N StreamData + 1 StreamClose frames. // With 4KB chunks, even large responses stay within this buffer. @@ -301,12 +457,18 @@ func (w *dtlsSessionWrapper) ForwardHTTP(ctx context.Context, logger logging.Log w.pending[streamID] = pending w.mu.Unlock() + // 서버 → 클라이언트 방향 요청 바디 전송에 대한 송신 측 ARQ 상태를 준비합니다. (ko) + // Prepare ARQ sender state for the request body sent from server to client. (en) + sender := newStreamSender() + w.registerStreamSender(streamID, sender) + // Ensure cleanup on exit defer func() { w.mu.Lock() delete(w.pending, streamID) w.mu.Unlock() close(pending.doneCh) + w.unregisterStreamSender(streamID) }() log := logger.With(logging.Fields{ @@ -366,6 +528,10 @@ func (w *dtlsSessionWrapper) ForwardHTTP(ctx context.Context, logger logging.Log n, err := req.Body.Read(buf) if n > 0 { dataCopy := append([]byte(nil), buf[:n]...) + // 송신 측 ARQ: Seq 별 payload 를 기록해 두었다가, 클라이언트의 StreamAck 를 기반으로 재전송합니다. (ko) + // Sender-side ARQ: record payload per Seq so it can be retransmitted based on StreamAck from the client. (en) + sender.register(seq, dataCopy) + dataEnv := &protocol.Envelope{ Type: protocol.MessageTypeStreamData, StreamData: &protocol.StreamData{ @@ -414,7 +580,14 @@ func (w *dtlsSessionWrapper) ForwardHTTP(ctx context.Context, logger logging.Log bodyBuf bytes.Buffer gotOpen bool statusCode = http.StatusOK + + // 응답 바디(클라이언트 → 서버)에 대한 수신 측 ARQ 상태입니다. (ko) + // ARQ receiver state for the response body (client → server). (en) + expectedSeq uint64 + received = make(map[uint64][]byte) + lost = make(map[uint64]struct{}) ) + const maxLostReport = 32 resp.RequestID = string(streamID) resp.Header = make(map[string][]string) @@ -466,10 +639,94 @@ func (w *dtlsSessionWrapper) ForwardHTTP(ctx context.Context, logger logging.Log if sd == nil { return nil, fmt.Errorf("stream_data response payload is nil") } - if len(sd.Data) > 0 { - if _, err := bodyBuf.Write(sd.Data); err != nil { - return nil, fmt.Errorf("buffer stream_data response: %w", err) + + // 수신 측 ARQ: Seq 에 따라 분기하고, 연속 구간을 bodyBuf 에 순서대로 기록합니다. (ko) + // Receiver-side ARQ: handle Seq and append contiguous data to bodyBuf in order. (en) + switch { + case sd.Seq == expectedSeq: + if len(sd.Data) > 0 { + if _, err := bodyBuf.Write(sd.Data); err != nil { + return nil, fmt.Errorf("buffer stream_data response: %w", err) + } } + expectedSeq++ + for { + data, ok := received[expectedSeq] + if !ok { + break + } + if len(data) > 0 { + if _, err := bodyBuf.Write(data); err != nil { + return nil, fmt.Errorf("buffer reordered stream_data response: %w", err) + } + } + delete(received, expectedSeq) + delete(lost, expectedSeq) + expectedSeq++ + } + + // AckSeq 이전 구간의 lost 항목 정리 + for seq := range lost { + if seq < expectedSeq { + delete(lost, seq) + } + } + + case sd.Seq > expectedSeq: + // 앞선 일부 Seq 들이 누락된 상태: 현재 프레임을 버퍼링하고 missing seq 들을 lost 에 추가. (ko) + // Missing earlier Seq: buffer this frame and mark missing seqs as lost. (en) + if len(sd.Data) > 0 { + bufCopy := make([]byte, len(sd.Data)) + copy(bufCopy, sd.Data) + received[sd.Seq] = bufCopy + } + for seq := expectedSeq; seq < sd.Seq && len(lost) < maxLostReport; seq++ { + if _, ok := lost[seq]; !ok { + lost[seq] = struct{}{} + } + } + + default: + // sd.Seq < expectedSeq 인 경우: 이미 처리했거나 Ack 로 커버된 프레임 → 무시. (ko) + // sd.Seq < expectedSeq: already processed/acked frame → ignore. (en) + } + + // 수신 측 StreamAck 전송: + // - AckSeq: 0부터 시작해 연속으로 수신 완료한 마지막 시퀀스 (expectedSeq-1) + // - LostSeqs: 현재 윈도우 내에서 누락된 시퀀스 중 상한 개수(maxLostReport)까지만 포함 (ko) + // Send receiver-side StreamAck: + // - AckSeq: last contiguously received sequence starting from 0 (expectedSeq-1) + // - LostSeqs: up to maxLostReport missing sequences in the current window. (en) + var ackSeq uint64 + if expectedSeq == 0 { + ackSeq = 0 + } else { + ackSeq = expectedSeq - 1 + } + + lostSeqs := make([]uint64, 0, len(lost)) + for seq := range lost { + if seq >= expectedSeq { + lostSeqs = append(lostSeqs, seq) + } + } + if len(lostSeqs) > 0 { + sort.Slice(lostSeqs, func(i, j int) bool { return lostSeqs[i] < lostSeqs[j] }) + if len(lostSeqs) > maxLostReport { + lostSeqs = lostSeqs[:maxLostReport] + } + } + + ackEnv := protocol.Envelope{ + Type: protocol.MessageTypeStreamAck, + StreamAck: &protocol.StreamAck{ + ID: streamID, + AckSeq: ackSeq, + LostSeqs: lostSeqs, + }, + } + if err := w.codec.Encode(w.sess, &ackEnv); err != nil { + return nil, fmt.Errorf("send stream ack: %w", err) } case protocol.MessageTypeStreamClose: @@ -630,6 +887,7 @@ func registerSessionForDomain(domain string, sess dtls.Session, logger logging.L logger: logger.With(logging.Fields{"component": "dtls_session_wrapper", "domain": d}), pending: make(map[protocol.StreamID]*pendingRequest), readerDone: make(chan struct{}), + streamSenders: make(map[protocol.StreamID]*streamSender), } // Start background reader goroutine to demultiplex incoming responses From 661f8b6413bd4ec0bdf609e4d8ee7619d4df422c Mon Sep 17 00:00:00 2001 From: dalbodeule <11470513+dalbodeule@users.noreply.github.com> Date: Wed, 10 Dec 2025 01:25:56 +0900 Subject: [PATCH 8/8] [feat](server): serialize HTTP requests per DTLS session with session-level mutex - Added `requestMu` mutex in `dtlsSessionWrapper` to serialize HTTP request handling per DTLS session. - Prevents interleaved HTTP request streams on clients that process one stream at a time. - Updated `ForwardHTTP` logic to lock and unlock around HTTP request handling for safe serialization. - Documented behavior and rationale in `progress.md` for future multiplexing enhancements. --- cmd/server/main.go | 15 +++++++ progress.md | 104 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 119 insertions(+) diff --git a/cmd/server/main.go b/cmd/server/main.go index 0a89d85..fe7ede9 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -124,6 +124,13 @@ type dtlsSessionWrapper struct { // streamSenders keeps ARQ sender state for HTTP request bodies sent // from server to client. (en) streamSenders map[protocol.StreamID]*streamSender + + // requestMu 는 이 DTLS 세션에서 동시에 처리될 수 있는 HTTP 요청을 + // 하나로 제한하기 위한 뮤텍스입니다. (ko) + // requestMu serializes HTTP requests on this DTLS session so that the + // client (which currently processes one StreamOpen at a time) does not + // see interleaved streams. (en) + requestMu sync.Mutex } // registerStreamSender 는 주어진 스트림 ID 에 대한 송신 측 ARQ 상태를 등록합니다. (ko) @@ -439,6 +446,14 @@ func (w *dtlsSessionWrapper) ForwardHTTP(ctx context.Context, logger logging.Log ctx = context.Background() } + // 현재 클라이언트 구현은 DTLS 세션당 하나의 StreamOpen/StreamData/StreamClose + // 만 순차적으로 처리하므로, 서버에서도 동일 세션 위 HTTP 요청을 직렬화합니다. (ko) + // The current client processes exactly one StreamOpen/StreamData/StreamClose + // sequence at a time per DTLS session, so we serialize HTTP requests on + // this session as well. (en) + w.requestMu.Lock() + defer w.requestMu.Unlock() + // Generate a unique stream ID (needs mutex for nextStreamID) w.mu.Lock() streamID := w.nextHTTPStreamID() diff --git a/progress.md b/progress.md index c396244..36d614c 100644 --- a/progress.md +++ b/progress.md @@ -402,6 +402,110 @@ The following tasks describe concrete work items to be implemented on the `featu In [`internal/protocol/codec.go`](internal/protocol/codec.go:130), the `WireCodec` abstraction and Protobuf-based `DefaultCodec` allow callers to use only `protocol.DefaultCodec` while JSON remains as an auxiliary codec. --- + +##### 3.3B DTLS Session Multiplexing / 세션 내 다중 HTTP 요청 처리 + +현재 구현은 클라이언트 측에서 단일 DTLS 세션 내에 **동시에 하나의 HTTP 요청 스트림만** 처리할 수 있습니다. +`ClientProxy.handleStreamRequest` 가 DTLS 세션의 reader 를 직접 소비하기 때문에, 동일 세션에서 두 번째 `StreamOpen` 이 섞여 들어오면 프로토콜 위반으로 간주되고 세션이 끊어집니다. +이 섹션은 **클라이언트 측 스트림 demux + per-stream goroutine 구조**를 도입해, 하나의 DTLS 세션 안에서 여러 HTTP 요청을 안전하게 병렬 처리하기 위한 단계입니다. + +Currently, the client can effectively handle **only one HTTP request stream at a time per DTLS session**. +Because `ClientProxy.handleStreamRequest` directly consumes the DTLS session reader, an additional `StreamOpen` for a different stream interleaving on the same session is treated as a protocol error and tears down the session. +This section introduces a **client-side stream demultiplexer + per-stream goroutines** to safely support multiple concurrent HTTP requests within a single DTLS session. + +--- + +##### 3.3B.1 클라이언트 측 중앙 readLoop → 스트림 demux 설계 +##### 3.3B.1 Design client-side central readLoop → per-stream demux + +- [ ] `ClientProxy.StartLoop` 의 역할을 명확히 분리 + - DTLS 세션에서 `Envelope` 를 연속해서 읽어들이는 **중앙 readLoop** 를 유지하되, + - 개별 스트림의 HTTP 처리 로직(현재 `handleStreamRequest` 내부 로직)을 분리해 별도 타입/구조체로 옮길 계획을 문서화합니다. +- [ ] 스트림 demux 위한 자료구조 설계 + - `map[protocol.StreamID]*streamReceiver` 형태의 수신측 스트림 상태 테이블을 정의합니다. + - 각 `streamReceiver` 는 자신만의 입력 채널(예: `inCh chan *protocol.Envelope`)을 가져, 중앙 readLoop 로부터 `StreamOpen/StreamData/StreamClose` 를 전달받도록 합니다. +- [ ] 중앙 readLoop 에서 스트림별 라우팅 규칙 정의 + - `Envelope.Type` 에 따라: + - `StreamOpen` / `StreamData` / `StreamClose`: + - `streamID` 를 추출하고, 해당 `streamReceiver` 의 `inCh` 로 전달. + - `StreamOpen` 수신 시에는 아직 없는 경우 `streamReceiver` 를 생성 후 등록. + - `StreamAck`: + - 송신 측 ARQ(`streamSender`) 용 테이블(이미 구현된 구조)을 찾아 재전송 로직으로 전달. + - 이 설계를 통해 중앙 readLoop 는 **DTLS 세션 → 스트림 단위 이벤트 분배**만 담당하도록 제한합니다. + +--- + +##### 3.3B.2 streamReceiver 타입 설계 및 HTTP 매핑 리팩터링 +##### 3.3B.2 Design streamReceiver type and refactor HTTP mapping + +- [ ] `streamReceiver` 타입 정의 + - 필드 예시: + - `id protocol.StreamID` + - 수신 ARQ 상태: `expectedSeq`, `received map[uint64][]byte`, `lost map[uint64]struct{}` + - 입력 채널: `inCh chan *protocol.Envelope` + - DTLS 세션/codec/logging 핸들: `sess dtls.Session`, `codec protocol.WireCodec`, `logger logging.Logger` + - 로컬 HTTP 호출 관련: `HTTPClient *http.Client`, `LocalTarget string` + - 역할: + - 서버에서 온 `StreamOpen`/`StreamData`/`StreamClose` 를 순서대로 처리해 로컬 HTTP 요청을 구성하고, + - 로컬 HTTP 응답을 다시 `StreamOpen`/`StreamData`/`StreamClose` 로 역방향 전송합니다. +- [ ] 기존 `ClientProxy.handleStreamRequest` 의 로직을 `streamReceiver` 로 이전 + - 현재 `handleStreamRequest` 안에서 수행하던 작업을 단계적으로 옮깁니다: + - `StreamOpen` 의 pseudo-header 에서 HTTP 메서드/URL/헤더를 복원. + - 요청 바디 수신용 수신 측 ARQ(`expectedSeq`, `received`, `lost`) 처리. + - 로컬 HTTP 요청 생성/실행 및 에러 처리. + - 응답을 4KiB `StreamData` chunk 로 전송 + 송신 측 ARQ(`streamSender.register`) 기록. + - 이때 **DTLS reader 를 직접 읽던 부분**은 제거하고, 대신 `inCh` 에서 전달된 `Envelope` 만 사용하도록 리팩터링합니다. +- [ ] streamReceiver 생명주기 관리 + - `StreamClose` 수신 시: + - 로컬 HTTP 요청 바디 구성 종료. + - 로컬 HTTP 요청 실행 및 응답 스트림 전송 완료 후, + - `streamReceivers[streamID]` 에서 자신을 제거하고 goroutine 을 종료하는 정책을 명확히 정의합니다. + +--- + +##### 3.3B.3 StartLoop 와 streamReceiver 통합 +##### 3.3B.3 Integrate StartLoop and streamReceiver + +- [ ] `ClientProxy.StartLoop` 을 “중앙 readLoop + demux” 로 단순화 + - `MessageTypeStreamOpen` 수신 시: + - `streamID := env.StreamOpen.ID` 를 기준으로 기존 `streamReceiver` 존재 여부를 검사. + - 없으면 새 `streamReceiver` 생성 후, goroutine 을 띄우고 `inCh <- env` 로 첫 메시지 전달. + - `MessageTypeStreamData` / `MessageTypeStreamClose` 수신 시: + - 해당 `streamReceiver` 의 `inCh` 로 그대로 전달. + - `MessageTypeStreamAck` 는 기존처럼 송신 측 `streamSender` 로 라우팅. +- [ ] 에러/종료 처리 전략 정리 + - 개별 `streamReceiver` 에서 발생하는 에러는: + - 로컬 HTTP 에러 → 스트림 응답에 5xx/에러 바디로 반영. + - 프로토콜 위반(예: 잘못된 순서의 `StreamClose`) → 해당 스트림만 정리하고 세션은 유지하는지 여부를 정의. + - DTLS 세션 레벨 에러(EOF, decode 실패 등)는: + - 모든 `streamReceiver` 의 `inCh` 를 닫고, + - 이후 클라이언트 전체 루프를 종료하는 방향으로 합의합니다. + +--- + +##### 3.3B.4 세션 단위 직렬화 락 제거 및 멀티플렉싱 검증 +##### 3.3B.4 Remove session-level serialization lock and validate multiplexing + +- [ ] 서버 측 세션 직렬화 락 제거 계획 수립 + - 현재 서버는 [`dtlsSessionWrapper`](cmd/server/main.go:111)에 `requestMu` 를 두어, + - 동일 DTLS 세션에서 동시에 하나의 `ForwardHTTP` 만 수행하도록 직렬화하고 있습니다. + - 클라이언트 측 멀티플렉싱이 안정화되면, `requestMu` 를 제거하고 + - 하나의 세션 안에서 여러 HTTP 요청이 각기 다른 `StreamID` 로 병렬 진행되도록 허용합니다. +- [ ] E2E 멀티플렉싱 테스트 시나리오 정의 + - 하나의 DTLS 세션 위에서: + - 동시에 여러 정적 리소스(`/css`, `/js`, `/img`) 요청. + - 큰 응답(수 MB 파일)과 작은 응답(API JSON)이 섞여 있는 시나리오. + - 기대 동작: + - 어떤 요청이 느리더라도, 다른 요청이 세션 내부 큐잉 때문에 과도하게 지연되지 않고 병렬로 완료되는지 확인. + - 클라이언트/서버 로그에 프로토콜 위반(`unexpected envelope type ...`) 이 더 이상 발생하지 않는지 확인. +- [ ] 관측성/메트릭에 멀티플렉싱 관련 라벨/필드 추가(선택) + - 필요 시: + - 세션당 동시 활성 스트림 수, + - 스트림 수명(요청-응답 왕복 시간), + - 세션 내 스트림 에러 수 + 를 관찰할 수 있는 메트릭/로그 필드를 설계합니다. + +--- ### 3.4 ACME Integration / ACME 연동