diff --git a/cmd/server/main.go b/cmd/server/main.go index 356cbeb..46c9118 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -1,10 +1,8 @@ package main import ( - "bufio" "context" "crypto/tls" - "encoding/json" "fmt" "io" stdfs "io/fs" @@ -208,8 +206,7 @@ func (w *dtlsSessionWrapper) ForwardHTTP(ctx context.Context, logger logging.Log HTTPRequest: httpReq, } - enc := json.NewEncoder(w.sess) - if err := enc.Encode(env); err != nil { + if err := protocol.DefaultCodec.Encode(w.sess, env); err != nil { log.Error("failed to encode http envelope", logging.Fields{ "error": err.Error(), }) @@ -218,16 +215,7 @@ func (w *dtlsSessionWrapper) ForwardHTTP(ctx context.Context, logger logging.Log // 클라이언트로부터 HTTP 응답 Envelope 를 수신합니다. var respEnv protocol.Envelope - - // NOTE: pion/dtls 는 복호화된 애플리케이션 데이터를 호출자가 제공한 버퍼에 채웁니다. - // 기본 JSON 디코더 버퍼만 사용하면 큰 HTTP 응답/Envelope 에서 "dtls: buffer too small" - // 오류가 발생할 수 있으므로, 충분히 큰 bufio.Reader(64KiB)를 사용합니다. (ko) - // NOTE: pion/dtls decrypts application data into the buffer provided by the caller. - // Using only the default JSON decoder buffer can cause "dtls: buffer too small" - // errors for large HTTP responses/envelopes, so we wrap the session with a - // reasonably large bufio.Reader (64KiB). (en) - dec := json.NewDecoder(bufio.NewReaderSize(w.sess, 64*1024)) - if err := dec.Decode(&respEnv); err != nil { + if err := protocol.DefaultCodec.Decode(w.sess, &respEnv); err != nil { log.Error("failed to decode http envelope", logging.Fields{ "error": err.Error(), }) diff --git a/internal/protocol/codec.go b/internal/protocol/codec.go new file mode 100644 index 0000000..2df490c --- /dev/null +++ b/internal/protocol/codec.go @@ -0,0 +1,41 @@ +package protocol + +import ( + "bufio" + "encoding/json" + "io" +) + +// defaultDecoderBufferSize 는 pion/dtls 가 복호화한 애플리케이션 데이터를 +// JSON 디코더가 안전하게 처리할 수 있도록 사용하는 버퍼 크기입니다. +// This matches existing 64KiB readers used around DTLS sessions. +const defaultDecoderBufferSize = 64 * 1024 + +// WireCodec 는 protocol.Envelope 의 직렬화/역직렬화를 추상화합니다. +// JSON, Protobuf, length-prefixed binary 등으로 교체할 때 이 인터페이스만 유지하면 됩니다. +type WireCodec interface { + Encode(w io.Writer, env *Envelope) error + Decode(r io.Reader, env *Envelope) error +} + +// jsonCodec 은 현재 사용 중인 JSON 기반 WireCodec 구현입니다. +type jsonCodec struct{} + +// Encode 는 Envelope 를 JSON 으로 인코딩해 작성합니다. +// Encode encodes an Envelope as JSON to the given writer. +func (jsonCodec) Encode(w io.Writer, env *Envelope) error { + enc := json.NewEncoder(w) + return enc.Encode(env) +} + +// Decode 는 DTLS 세션에서 읽은 데이터를 JSON Envelope 로 디코딩합니다. +// pion/dtls 의 버퍼 특성 때문에, 충분히 큰 bufio.Reader 로 감싸서 사용합니다. +// Decode decodes an Envelope from JSON using a buffered reader on top of the DTLS session. +func (jsonCodec) Decode(r io.Reader, env *Envelope) error { + dec := json.NewDecoder(bufio.NewReaderSize(r, defaultDecoderBufferSize)) + return dec.Decode(env) +} + +// DefaultCodec 은 현재 런타임에서 사용하는 기본 WireCodec 입니다. +// 초기 구현은 JSON 기반이지만, 추후 Protobuf/length-prefixed binary 로 교체 가능하도록 분리해 두었습니다. +var DefaultCodec WireCodec = jsonCodec{} diff --git a/internal/proxy/client.go b/internal/proxy/client.go index b50dc52..f7215a6 100644 --- a/internal/proxy/client.go +++ b/internal/proxy/client.go @@ -1,10 +1,8 @@ package proxy import ( - "bufio" "bytes" "context" - "encoding/json" "fmt" "io" "net" @@ -67,10 +65,10 @@ func (p *ClientProxy) StartLoop(ctx context.Context, sess dtls.Session) error { // "dtls: buffer too small" 오류가 날 수 있으므로, 여기서는 여유 있는 버퍼(64KiB)를 사용합니다. (ko) // NOTE: pion/dtls decrypts application data into the buffer provided by the caller. // Using only the default JSON decoder buffer (a few hundred bytes) can trigger - // "dtls: buffer too small" for large HTTP bodies/envelopes, so we wrap the - // session with a reasonably large bufio.Reader (64KiB). (en) - dec := json.NewDecoder(bufio.NewReaderSize(sess, 64*1024)) - enc := json.NewEncoder(sess) + // "dtls: buffer too small" for large HTTP bodies/envelopes. The default + // JSON-based WireCodec internally wraps the DTLS session with a 64KiB + // bufio.Reader, matching this requirement. (en) + codec := protocol.DefaultCodec for { select { @@ -83,7 +81,7 @@ func (p *ClientProxy) StartLoop(ctx context.Context, sess dtls.Session) error { } var env protocol.Envelope - if err := dec.Decode(&env); err != nil { + if err := codec.Decode(sess, &env); err != nil { if err == io.EOF { log.Info("dtls session closed by server", nil) return nil @@ -135,7 +133,7 @@ func (p *ClientProxy) StartLoop(ctx context.Context, sess dtls.Session) error { HTTPResponse: &resp, } - if err := enc.Encode(&respEnv); err != nil { + if err := codec.Encode(sess, &respEnv); err != nil { logReq.Error("failed to encode http response envelope", logging.Fields{ "error": err.Error(), }) diff --git a/progress.md b/progress.md index 4a5254e..445708c 100644 --- a/progress.md +++ b/progress.md @@ -265,7 +265,7 @@ The following tasks describe concrete work items to be implemented on the `featu ##### 3.3A.1 스트림 프레이밍 프로토콜 설계 (JSON 1단계) ##### 3.3A.1 Stream framing protocol (JSON, phase 1) -- [ ] 스트림 프레임 타입 정리 및 확장: [`internal/protocol/protocol.go`](internal/protocol/protocol.go:35) +- [x] 스트림 프레임 타입 정리 및 확장: [`internal/protocol/protocol.go`](internal/protocol/protocol.go:35) - 이미 정의된 스트림 관련 타입을 1단계에서 적극 활용합니다. Reuse the already defined stream-related types in phase 1: - `MessageTypeStreamOpen`, `MessageTypeStreamData`, `MessageTypeStreamClose` @@ -281,7 +281,7 @@ The following tasks describe concrete work items to be implemented on the `featu } ``` -- [ ] 스트림 ACK / 재전송 제어 메시지 추가: [`internal/protocol/protocol.go`](internal/protocol/protocol.go:52) +- [x] 스트림 ACK / 재전송 제어 메시지 추가: [`internal/protocol/protocol.go`](internal/protocol/protocol.go:52) - 선택적 재전송(Selective Retransmission)을 위해 `StreamAck` 메시지와 `MessageTypeStreamAck` 를 추가합니다. Add `StreamAck` message and `MessageTypeStreamAck` for selective retransmission: ```go @@ -310,7 +310,7 @@ The following tasks describe concrete work items to be implemented on the `featu ##### 3.3A.2 애플리케이션 레벨 ARQ 설계 (Selective Retransmission) ##### 3.3A.2 Application-level ARQ (Selective Retransmission) -- [ ] 수신 측 스트림 상태 관리 로직 설계 +- [x] 수신 측 스트림 상태 관리 로직 설계 - 스트림별로 다음 상태를 유지합니다. For each stream, maintain: - `expectedSeq` (다음에 연속으로 기대하는 Seq, 초기값 0) @@ -328,7 +328,7 @@ The following tasks describe concrete work items to be implemented on the `featu `expectedSeq` ~ `Seq-1` 구간 중 비어 있는 Seq 들을 `lostBuffer` 에 추가. If `Seq > expectedSeq`, buffer as out-of-order and mark missing seqs in `lostBuffer`. -- [ ] 수신 측 StreamAck 전송 정책 +- [x] 수신 측 StreamAck 전송 정책 - 주기적 타이머 또는 일정 수의 프레임 처리 후에 `StreamAck` 를 전송합니다. Send `StreamAck` periodically or after processing N frames: - `AckSeq = expectedSeq - 1` (연속 수신 완료 지점) @@ -336,7 +336,7 @@ The following tasks describe concrete work items to be implemented on the `featu - `LostSeqs` 는 윈도우 내 손실 시퀀스 중 상한 개수까지만 포함 (과도한 길이 방지). `LostSeqs` should only include a bounded set of missing seqs within the receive window. -- [ ] 송신 측 재전송 로직 +- [x] 송신 측 재전송 로직 - 스트림별로 다음 상태를 유지합니다. For each stream on the sender: - `sendSeq` – 송신에 사용할 다음 Seq (0부터 시작) @@ -361,7 +361,7 @@ The following tasks describe concrete work items to be implemented on the `featu ##### 3.3A.3 HTTP ↔ 스트림 매핑 (서버/클라이언트) ##### 3.3A.3 HTTP ↔ stream mapping (server/client) -- [ ] 서버 → 클라이언트 요청 스트림: [`cmd/server/main.go`](cmd/server/main.go:200) +- [x] 서버 → 클라이언트 요청 스트림: [`cmd/server/main.go`](cmd/server/main.go:200) - 현재 `ForwardHTTP` 는 단일 `HTTPRequest`/`HTTPResponse` 를 처리하는 구조입니다. Currently `ForwardHTTP` handles a single `HTTPRequest`/`HTTPResponse` pair. - 스트림 모드에서는 다음과 같이 바꿉니다. @@ -385,7 +385,7 @@ The following tasks describe concrete work items to be implemented on the `featu - `StreamClose` 수신 시 응답 종료 및 스트림 자원 정리. On `StreamClose`, finish the response and clean up per-stream state. -- [ ] 클라이언트에서의 요청 처리 스트림: [`internal/proxy/client.go`](internal/proxy/client.go:200) +- [x] 클라이언트에서의 요청 처리 스트림: [`internal/proxy/client.go`](internal/proxy/client.go:200) - 서버로부터 들어오는 `StreamOpen{ID, ...}` 을 수신하면, 새로운 goroutine 을 띄워 해당 ID에 대한 로컬 HTTP 요청을 수행합니다. On receiving `StreamOpen{ID, ...}` from the server, spawn a goroutine to handle the local HTTP request for that stream ID. @@ -413,8 +413,11 @@ The following tasks describe concrete work items to be implemented on the `featu - 동일한 logical model (`StreamOpen` / `StreamData(seq)` / `StreamClose` / `StreamAck`)을 유지한 채, wire-format 만 Protobuf 또는 MsgPack 등의 length-prefix binary 프레이밍으로 교체할 수 있습니다. We can later keep the same logical model and swap the wire format for Protobuf or other length-prefix binary framing. - - 이 전환은 `internal/protocol` 내 직렬화 레이어를 얇은 abstraction 으로 감싸 구현할 수 있습니다. - This can be implemented by wrapping serialization in a thin abstraction layer inside [`internal/protocol`](internal/protocol/protocol.go:35). +- [x] 이 전환은 `internal/protocol` 내 직렬화 레이어를 얇은 abstraction 으로 감싸 구현할 수 있습니다. + - 현재는 [`internal/protocol/codec.go`](internal/protocol/codec.go:1) 에 `WireCodec` 인터페이스와 JSON 기반 `DefaultCodec` 을 도입하여, + 추후 Protobuf/이진 포맷으로 교체할 때 호출자는 `protocol.DefaultCodec` 만 사용하도록 분리해 두었습니다. + - This has been prepared via [`internal/protocol/codec.go`](internal/protocol/codec.go:1), which introduces a `WireCodec` interface + and a JSON-based `DefaultCodec` so that future Protobuf/binary codecs can be swapped in behind the same API. ---