diff --git a/cmd/server/main.go b/cmd/server/main.go index 5f7f08f..117a492 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -1,6 +1,7 @@ package main import ( + "bufio" "bytes" "context" "crypto/tls" @@ -34,9 +35,10 @@ import ( var version = "dev" type dtlsSessionWrapper struct { - sess dtls.Session - mu sync.Mutex - nextStreamID uint64 + sess dtls.Session + bufferedReader *bufio.Reader + mu sync.Mutex + nextStreamID uint64 } func getEnvOrPanic(logger logging.Logger, key string) string { @@ -302,7 +304,7 @@ func (w *dtlsSessionWrapper) ForwardHTTP(ctx context.Context, logger logging.Log for { var env protocol.Envelope - if err := codec.Decode(w.sess, &env); err != nil { + if err := codec.Decode(w.bufferedReader, &env); err != nil { log.Error("failed to decode stream response envelope", logging.Fields{ "error": err.Error(), }) @@ -502,7 +504,10 @@ func registerSessionForDomain(domain string, sess dtls.Session, logger logging.L if d == "" { return } - w := &dtlsSessionWrapper{sess: sess} + w := &dtlsSessionWrapper{ + sess: sess, + bufferedReader: bufio.NewReaderSize(sess, protocol.GetDTLSReadBufferSize()), + } sessionsMu.Lock() sessionsByDomain[d] = w sessionsMu.Unlock() diff --git a/internal/protocol/codec.go b/internal/protocol/codec.go index ad84634..5a3940f 100644 --- a/internal/protocol/codec.go +++ b/internal/protocol/codec.go @@ -16,6 +16,16 @@ import ( // This matches existing 64KiB readers used around DTLS sessions (used by the JSON codec). const defaultDecoderBufferSize = 64 * 1024 +// dtlsReadBufferSize 는 pion/dtls 내부 버퍼 한계에 맞춘 읽기 버퍼 크기입니다. +// pion/dtls 의 UnpackDatagram 함수는 8KB (8,192 bytes) 의 기본 수신 버퍼를 사용합니다. +// DTLS는 UDP 기반이므로 한 번의 Read()에서 전체 datagram을 읽어야 하며, +// 이 크기를 초과하는 DTLS 레코드는 처리되지 않습니다. +// dtlsReadBufferSize matches the pion/dtls internal buffer limit. +// pion/dtls's UnpackDatagram function uses an 8KB (8,192 bytes) receive buffer. +// Since DTLS is UDP-based, the entire datagram must be read in a single Read() call, +// and DTLS records exceeding this size cannot be processed. +const dtlsReadBufferSize = 8 * 1024 // 8KB + // maxProtoEnvelopeBytes 는 단일 Protobuf Envelope 의 최대 크기에 대한 보수적 상한입니다. // 아직 하드 리미트로 사용하지는 않지만, 향후 방어적 체크에 사용할 수 있습니다. const maxProtoEnvelopeBytes = 512 * 1024 // 512KiB, 충분히 여유 있는 값 @@ -141,6 +151,14 @@ func (protobufCodec) Decode(r io.Reader, env *Envelope) error { // 서버와 클라이언트가 모두 이 버전을 사용해야 wire-format 이 일치합니다. var DefaultCodec WireCodec = protobufCodec{} +// GetDTLSReadBufferSize 는 DTLS 세션 읽기에 사용할 버퍼 크기를 반환합니다. +// 이 값은 pion/dtls 내부 버퍼 한계(8KB)에 맞춰져 있습니다. +// GetDTLSReadBufferSize returns the buffer size to use for reading from DTLS sessions. +// This value is aligned with pion/dtls's internal buffer limit (8KB). +func GetDTLSReadBufferSize() int { + return dtlsReadBufferSize +} + // toProtoEnvelope 는 내부 Envelope 구조체를 Protobuf Envelope 로 변환합니다. // 현재 구현은 HTTP 요청/응답 및 스트림 관련 타입(StreamOpen/StreamData/StreamClose/StreamAck)을 지원합니다. func toProtoEnvelope(env *Envelope) (*protocolpb.Envelope, error) { diff --git a/internal/protocol/codec_test.go b/internal/protocol/codec_test.go index b2b6001..6ca0fff 100644 --- a/internal/protocol/codec_test.go +++ b/internal/protocol/codec_test.go @@ -1,6 +1,7 @@ package protocol import ( + "bufio" "bytes" "io" "testing" @@ -80,9 +81,11 @@ func TestProtobufCodecDatagramBehavior(t *testing.T) { t.Fatalf("Message too short: %d bytes", len(msg)) } - // Decode the envelope + // Decode the envelope using a buffered reader (as we do in actual code) + // to handle datagram-based reading properly + reader := bufio.NewReaderSize(conn, GetDTLSReadBufferSize()) var decodedEnv Envelope - if err := codec.Decode(conn, &decodedEnv); err != nil { + if err := codec.Decode(reader, &decodedEnv); err != nil { t.Fatalf("Failed to decode envelope: %v", err) } @@ -132,9 +135,10 @@ func TestProtobufCodecStreamData(t *testing.T) { t.Fatalf("Expected 1 message, got %d", len(conn.messages)) } - // Decode + // Decode using a buffered reader (as we do in actual code) + reader := bufio.NewReaderSize(conn, GetDTLSReadBufferSize()) var decodedEnv Envelope - if err := codec.Decode(conn, &decodedEnv); err != nil { + if err := codec.Decode(reader, &decodedEnv); err != nil { t.Fatalf("Failed to decode StreamData: %v", err) } @@ -208,10 +212,11 @@ func TestProtobufCodecMultipleMessages(t *testing.T) { t.Fatalf("Expected %d messages, got %d", len(envelopes), len(conn.messages)) } - // Decode and verify all messages + // Decode and verify all messages using a buffered reader (as we do in actual code) + reader := bufio.NewReaderSize(conn, GetDTLSReadBufferSize()) for i := 0; i < len(envelopes); i++ { var decoded Envelope - if err := codec.Decode(conn, &decoded); err != nil { + if err := codec.Decode(reader, &decoded); err != nil { t.Fatalf("Failed to decode message %d: %v", i, err) } if decoded.Type != envelopes[i].Type { diff --git a/internal/proxy/client.go b/internal/proxy/client.go index c43be7a..7682316 100644 --- a/internal/proxy/client.go +++ b/internal/proxy/client.go @@ -1,6 +1,7 @@ package proxy import ( + "bufio" "bytes" "context" "fmt" @@ -151,14 +152,16 @@ func (p *ClientProxy) StartLoop(ctx context.Context, sess dtls.Session) error { log := p.Logger // NOTE: pion/dtls 는 복호화된 애플리케이션 데이터를 호출자가 제공한 버퍼에 채워 넣습니다. - // 기본 JSON 디코더 버퍼(수백 바이트 수준)만 사용하면 큰 HTTP 바디/Envelope 에서 - // "dtls: buffer too small" 오류가 날 수 있으므로, 여기서는 여유 있는 버퍼(64KiB)를 사용합니다. (ko) + // DTLS는 UDP 기반이므로 한 번의 Read()에서 전체 datagram을 읽어야 하며, + // pion/dtls 내부 버퍼 한계(8KB)를 초과하는 메시지는 "dtls: buffer too small" 오류를 발생시킵니다. + // 이를 방지하기 위해 DTLS 세션을 bufio.Reader로 감싸서 datagram을 완전히 읽어들인 후 파싱합니다. (ko) // NOTE: pion/dtls decrypts application data into the buffer provided by the caller. - // Using only the default JSON decoder buffer (a few hundred bytes) can trigger - // "dtls: buffer too small" for large HTTP bodies/envelopes. The default - // JSON-based WireCodec internally wraps the DTLS session with a 64KiB - // bufio.Reader, matching this requirement. (en) + // Since DTLS is UDP-based, the entire datagram must be read in a single Read() call, + // and messages exceeding pion/dtls's internal buffer limit (8KB) will trigger + // "dtls: buffer too small" errors. To prevent this, we wrap the DTLS session with + // a bufio.Reader to fully read the datagram before parsing. (en) codec := protocol.DefaultCodec + bufferedReader := bufio.NewReaderSize(sess, protocol.GetDTLSReadBufferSize()) for { select { @@ -171,7 +174,7 @@ func (p *ClientProxy) StartLoop(ctx context.Context, sess dtls.Session) error { } var env protocol.Envelope - if err := codec.Decode(sess, &env); err != nil { + if err := codec.Decode(bufferedReader, &env); err != nil { if err == io.EOF { log.Info("dtls session closed by server", nil) return nil @@ -191,7 +194,7 @@ func (p *ClientProxy) StartLoop(ctx context.Context, sess dtls.Session) error { return err } case protocol.MessageTypeStreamOpen: - if err := p.handleStreamRequest(ctx, sess, &env); err != nil { + if err := p.handleStreamRequest(ctx, sess, bufferedReader, &env); err != nil { log.Error("failed to handle stream http envelope", logging.Fields{ "error": err.Error(), }) @@ -303,7 +306,7 @@ func (p *ClientProxy) handleHTTPEnvelope(ctx context.Context, sess dtls.Session, // handleStreamRequest 는 StreamOpen/StreamData/StreamClose 기반 HTTP 요청/응답 스트림을 처리합니다. (ko) // handleStreamRequest handles an HTTP request/response exchange using StreamOpen/StreamData/StreamClose frames. (en) -func (p *ClientProxy) handleStreamRequest(ctx context.Context, sess dtls.Session, openEnv *protocol.Envelope) error { +func (p *ClientProxy) handleStreamRequest(ctx context.Context, sess dtls.Session, reader io.Reader, openEnv *protocol.Envelope) error { codec := protocol.DefaultCodec log := p.Logger @@ -365,7 +368,7 @@ func (p *ClientProxy) handleStreamRequest(ctx context.Context, sess dtls.Session for { var env protocol.Envelope - if err := codec.Decode(sess, &env); err != nil { + if err := codec.Decode(reader, &env); err != nil { if err == io.EOF { return fmt.Errorf("unexpected EOF while reading stream request body") }