Fix DTLS buffer size issue by wrapping sessions with buffered readers

- Add dtlsReadBufferSize constant (8KB) matching pion/dtls limits
- Wrap DTLS sessions with bufio.Reader in client and server code
- Update tests to use buffered readers for datagram-based connections
- All tests passing successfully

Co-authored-by: dalbodeule <11470513+dalbodeule@users.noreply.github.com>
This commit is contained in:
copilot-swe-agent[bot]
2025-12-09 14:07:15 +00:00
parent 412b59f420
commit 1292df33e5
4 changed files with 52 additions and 21 deletions

View File

@@ -1,6 +1,7 @@
package main package main
import ( import (
"bufio"
"bytes" "bytes"
"context" "context"
"crypto/tls" "crypto/tls"
@@ -35,6 +36,7 @@ var version = "dev"
type dtlsSessionWrapper struct { type dtlsSessionWrapper struct {
sess dtls.Session sess dtls.Session
bufferedReader *bufio.Reader
mu sync.Mutex mu sync.Mutex
nextStreamID uint64 nextStreamID uint64
} }
@@ -302,7 +304,7 @@ func (w *dtlsSessionWrapper) ForwardHTTP(ctx context.Context, logger logging.Log
for { for {
var env protocol.Envelope var env protocol.Envelope
if err := codec.Decode(w.sess, &env); err != nil { if err := codec.Decode(w.bufferedReader, &env); err != nil {
log.Error("failed to decode stream response envelope", logging.Fields{ log.Error("failed to decode stream response envelope", logging.Fields{
"error": err.Error(), "error": err.Error(),
}) })
@@ -502,7 +504,10 @@ func registerSessionForDomain(domain string, sess dtls.Session, logger logging.L
if d == "" { if d == "" {
return return
} }
w := &dtlsSessionWrapper{sess: sess} w := &dtlsSessionWrapper{
sess: sess,
bufferedReader: bufio.NewReaderSize(sess, protocol.GetDTLSReadBufferSize()),
}
sessionsMu.Lock() sessionsMu.Lock()
sessionsByDomain[d] = w sessionsByDomain[d] = w
sessionsMu.Unlock() sessionsMu.Unlock()

View File

@@ -16,6 +16,16 @@ import (
// This matches existing 64KiB readers used around DTLS sessions (used by the JSON codec). // This matches existing 64KiB readers used around DTLS sessions (used by the JSON codec).
const defaultDecoderBufferSize = 64 * 1024 const defaultDecoderBufferSize = 64 * 1024
// dtlsReadBufferSize 는 pion/dtls 내부 버퍼 한계에 맞춘 읽기 버퍼 크기입니다.
// pion/dtls 의 UnpackDatagram 함수는 8KB (8,192 bytes) 의 기본 수신 버퍼를 사용합니다.
// DTLS는 UDP 기반이므로 한 번의 Read()에서 전체 datagram을 읽어야 하며,
// 이 크기를 초과하는 DTLS 레코드는 처리되지 않습니다.
// dtlsReadBufferSize matches the pion/dtls internal buffer limit.
// pion/dtls's UnpackDatagram function uses an 8KB (8,192 bytes) receive buffer.
// Since DTLS is UDP-based, the entire datagram must be read in a single Read() call,
// and DTLS records exceeding this size cannot be processed.
const dtlsReadBufferSize = 8 * 1024 // 8KB
// maxProtoEnvelopeBytes 는 단일 Protobuf Envelope 의 최대 크기에 대한 보수적 상한입니다. // maxProtoEnvelopeBytes 는 단일 Protobuf Envelope 의 최대 크기에 대한 보수적 상한입니다.
// 아직 하드 리미트로 사용하지는 않지만, 향후 방어적 체크에 사용할 수 있습니다. // 아직 하드 리미트로 사용하지는 않지만, 향후 방어적 체크에 사용할 수 있습니다.
const maxProtoEnvelopeBytes = 512 * 1024 // 512KiB, 충분히 여유 있는 값 const maxProtoEnvelopeBytes = 512 * 1024 // 512KiB, 충분히 여유 있는 값
@@ -141,6 +151,14 @@ func (protobufCodec) Decode(r io.Reader, env *Envelope) error {
// 서버와 클라이언트가 모두 이 버전을 사용해야 wire-format 이 일치합니다. // 서버와 클라이언트가 모두 이 버전을 사용해야 wire-format 이 일치합니다.
var DefaultCodec WireCodec = protobufCodec{} var DefaultCodec WireCodec = protobufCodec{}
// GetDTLSReadBufferSize 는 DTLS 세션 읽기에 사용할 버퍼 크기를 반환합니다.
// 이 값은 pion/dtls 내부 버퍼 한계(8KB)에 맞춰져 있습니다.
// GetDTLSReadBufferSize returns the buffer size to use for reading from DTLS sessions.
// This value is aligned with pion/dtls's internal buffer limit (8KB).
func GetDTLSReadBufferSize() int {
return dtlsReadBufferSize
}
// toProtoEnvelope 는 내부 Envelope 구조체를 Protobuf Envelope 로 변환합니다. // toProtoEnvelope 는 내부 Envelope 구조체를 Protobuf Envelope 로 변환합니다.
// 현재 구현은 HTTP 요청/응답 및 스트림 관련 타입(StreamOpen/StreamData/StreamClose/StreamAck)을 지원합니다. // 현재 구현은 HTTP 요청/응답 및 스트림 관련 타입(StreamOpen/StreamData/StreamClose/StreamAck)을 지원합니다.
func toProtoEnvelope(env *Envelope) (*protocolpb.Envelope, error) { func toProtoEnvelope(env *Envelope) (*protocolpb.Envelope, error) {

View File

@@ -1,6 +1,7 @@
package protocol package protocol
import ( import (
"bufio"
"bytes" "bytes"
"io" "io"
"testing" "testing"
@@ -80,9 +81,11 @@ func TestProtobufCodecDatagramBehavior(t *testing.T) {
t.Fatalf("Message too short: %d bytes", len(msg)) t.Fatalf("Message too short: %d bytes", len(msg))
} }
// Decode the envelope // Decode the envelope using a buffered reader (as we do in actual code)
// to handle datagram-based reading properly
reader := bufio.NewReaderSize(conn, GetDTLSReadBufferSize())
var decodedEnv Envelope var decodedEnv Envelope
if err := codec.Decode(conn, &decodedEnv); err != nil { if err := codec.Decode(reader, &decodedEnv); err != nil {
t.Fatalf("Failed to decode envelope: %v", err) t.Fatalf("Failed to decode envelope: %v", err)
} }
@@ -132,9 +135,10 @@ func TestProtobufCodecStreamData(t *testing.T) {
t.Fatalf("Expected 1 message, got %d", len(conn.messages)) t.Fatalf("Expected 1 message, got %d", len(conn.messages))
} }
// Decode // Decode using a buffered reader (as we do in actual code)
reader := bufio.NewReaderSize(conn, GetDTLSReadBufferSize())
var decodedEnv Envelope var decodedEnv Envelope
if err := codec.Decode(conn, &decodedEnv); err != nil { if err := codec.Decode(reader, &decodedEnv); err != nil {
t.Fatalf("Failed to decode StreamData: %v", err) t.Fatalf("Failed to decode StreamData: %v", err)
} }
@@ -208,10 +212,11 @@ func TestProtobufCodecMultipleMessages(t *testing.T) {
t.Fatalf("Expected %d messages, got %d", len(envelopes), len(conn.messages)) t.Fatalf("Expected %d messages, got %d", len(envelopes), len(conn.messages))
} }
// Decode and verify all messages // Decode and verify all messages using a buffered reader (as we do in actual code)
reader := bufio.NewReaderSize(conn, GetDTLSReadBufferSize())
for i := 0; i < len(envelopes); i++ { for i := 0; i < len(envelopes); i++ {
var decoded Envelope var decoded Envelope
if err := codec.Decode(conn, &decoded); err != nil { if err := codec.Decode(reader, &decoded); err != nil {
t.Fatalf("Failed to decode message %d: %v", i, err) t.Fatalf("Failed to decode message %d: %v", i, err)
} }
if decoded.Type != envelopes[i].Type { if decoded.Type != envelopes[i].Type {

View File

@@ -1,6 +1,7 @@
package proxy package proxy
import ( import (
"bufio"
"bytes" "bytes"
"context" "context"
"fmt" "fmt"
@@ -151,14 +152,16 @@ func (p *ClientProxy) StartLoop(ctx context.Context, sess dtls.Session) error {
log := p.Logger log := p.Logger
// NOTE: pion/dtls 는 복호화된 애플리케이션 데이터를 호출자가 제공한 버퍼에 채워 넣습니다. // NOTE: pion/dtls 는 복호화된 애플리케이션 데이터를 호출자가 제공한 버퍼에 채워 넣습니다.
// 기본 JSON 디코더 버퍼(수백 바이트 수준)만 사용하면 큰 HTTP 바디/Envelope 에서 // DTLS는 UDP 기반이므로 한 번의 Read()에서 전체 datagram을 읽어야 하며,
// "dtls: buffer too small" 오류가 날 수 있으므로, 여기서는 여유 있는 버퍼(64KiB)를 사용합니다. (ko) // pion/dtls 내부 버퍼 한계(8KB)를 초과하는 메시지는 "dtls: buffer too small" 오류를 발생시킵니다.
// 이를 방지하기 위해 DTLS 세션을 bufio.Reader로 감싸서 datagram을 완전히 읽어들인 후 파싱합니다. (ko)
// NOTE: pion/dtls decrypts application data into the buffer provided by the caller. // NOTE: pion/dtls decrypts application data into the buffer provided by the caller.
// Using only the default JSON decoder buffer (a few hundred bytes) can trigger // Since DTLS is UDP-based, the entire datagram must be read in a single Read() call,
// "dtls: buffer too small" for large HTTP bodies/envelopes. The default // and messages exceeding pion/dtls's internal buffer limit (8KB) will trigger
// JSON-based WireCodec internally wraps the DTLS session with a 64KiB // "dtls: buffer too small" errors. To prevent this, we wrap the DTLS session with
// bufio.Reader, matching this requirement. (en) // a bufio.Reader to fully read the datagram before parsing. (en)
codec := protocol.DefaultCodec codec := protocol.DefaultCodec
bufferedReader := bufio.NewReaderSize(sess, protocol.GetDTLSReadBufferSize())
for { for {
select { select {
@@ -171,7 +174,7 @@ func (p *ClientProxy) StartLoop(ctx context.Context, sess dtls.Session) error {
} }
var env protocol.Envelope var env protocol.Envelope
if err := codec.Decode(sess, &env); err != nil { if err := codec.Decode(bufferedReader, &env); err != nil {
if err == io.EOF { if err == io.EOF {
log.Info("dtls session closed by server", nil) log.Info("dtls session closed by server", nil)
return nil return nil
@@ -191,7 +194,7 @@ func (p *ClientProxy) StartLoop(ctx context.Context, sess dtls.Session) error {
return err return err
} }
case protocol.MessageTypeStreamOpen: case protocol.MessageTypeStreamOpen:
if err := p.handleStreamRequest(ctx, sess, &env); err != nil { if err := p.handleStreamRequest(ctx, sess, bufferedReader, &env); err != nil {
log.Error("failed to handle stream http envelope", logging.Fields{ log.Error("failed to handle stream http envelope", logging.Fields{
"error": err.Error(), "error": err.Error(),
}) })
@@ -303,7 +306,7 @@ func (p *ClientProxy) handleHTTPEnvelope(ctx context.Context, sess dtls.Session,
// handleStreamRequest 는 StreamOpen/StreamData/StreamClose 기반 HTTP 요청/응답 스트림을 처리합니다. (ko) // handleStreamRequest 는 StreamOpen/StreamData/StreamClose 기반 HTTP 요청/응답 스트림을 처리합니다. (ko)
// handleStreamRequest handles an HTTP request/response exchange using StreamOpen/StreamData/StreamClose frames. (en) // handleStreamRequest handles an HTTP request/response exchange using StreamOpen/StreamData/StreamClose frames. (en)
func (p *ClientProxy) handleStreamRequest(ctx context.Context, sess dtls.Session, openEnv *protocol.Envelope) error { func (p *ClientProxy) handleStreamRequest(ctx context.Context, sess dtls.Session, reader io.Reader, openEnv *protocol.Envelope) error {
codec := protocol.DefaultCodec codec := protocol.DefaultCodec
log := p.Logger log := p.Logger
@@ -365,7 +368,7 @@ func (p *ClientProxy) handleStreamRequest(ctx context.Context, sess dtls.Session
for { for {
var env protocol.Envelope var env protocol.Envelope
if err := codec.Decode(sess, &env); err != nil { if err := codec.Decode(reader, &env); err != nil {
if err == io.EOF { if err == io.EOF {
return fmt.Errorf("unexpected EOF while reading stream request body") return fmt.Errorf("unexpected EOF while reading stream request body")
} }