mirror of
https://github.com/dalbodeule/hop-gate.git
synced 2025-12-09 13:25:44 +09:00
[feat](protocol): replace JSON handlers with codec abstraction
- Introduced `WireCodec` interface in `internal/protocol/codec.go` to abstract serialization/deserialization logic. - Updated server and client to use `DefaultCodec`, replacing direct JSON encoding/decoding. - Eliminated `bufio.Reader` from session handling, as `DefaultCodec` manages buffering for DTLS sessions. - Marked related protocol tasks in `progress.md` as complete.
This commit is contained in:
@@ -1,10 +1,8 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bufio"
|
|
||||||
"context"
|
"context"
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
"encoding/json"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
stdfs "io/fs"
|
stdfs "io/fs"
|
||||||
@@ -208,8 +206,7 @@ func (w *dtlsSessionWrapper) ForwardHTTP(ctx context.Context, logger logging.Log
|
|||||||
HTTPRequest: httpReq,
|
HTTPRequest: httpReq,
|
||||||
}
|
}
|
||||||
|
|
||||||
enc := json.NewEncoder(w.sess)
|
if err := protocol.DefaultCodec.Encode(w.sess, env); err != nil {
|
||||||
if err := enc.Encode(env); err != nil {
|
|
||||||
log.Error("failed to encode http envelope", logging.Fields{
|
log.Error("failed to encode http envelope", logging.Fields{
|
||||||
"error": err.Error(),
|
"error": err.Error(),
|
||||||
})
|
})
|
||||||
@@ -218,16 +215,7 @@ func (w *dtlsSessionWrapper) ForwardHTTP(ctx context.Context, logger logging.Log
|
|||||||
|
|
||||||
// 클라이언트로부터 HTTP 응답 Envelope 를 수신합니다.
|
// 클라이언트로부터 HTTP 응답 Envelope 를 수신합니다.
|
||||||
var respEnv protocol.Envelope
|
var respEnv protocol.Envelope
|
||||||
|
if err := protocol.DefaultCodec.Decode(w.sess, &respEnv); err != nil {
|
||||||
// NOTE: pion/dtls 는 복호화된 애플리케이션 데이터를 호출자가 제공한 버퍼에 채웁니다.
|
|
||||||
// 기본 JSON 디코더 버퍼만 사용하면 큰 HTTP 응답/Envelope 에서 "dtls: buffer too small"
|
|
||||||
// 오류가 발생할 수 있으므로, 충분히 큰 bufio.Reader(64KiB)를 사용합니다. (ko)
|
|
||||||
// NOTE: pion/dtls decrypts application data into the buffer provided by the caller.
|
|
||||||
// Using only the default JSON decoder buffer can cause "dtls: buffer too small"
|
|
||||||
// errors for large HTTP responses/envelopes, so we wrap the session with a
|
|
||||||
// reasonably large bufio.Reader (64KiB). (en)
|
|
||||||
dec := json.NewDecoder(bufio.NewReaderSize(w.sess, 64*1024))
|
|
||||||
if err := dec.Decode(&respEnv); err != nil {
|
|
||||||
log.Error("failed to decode http envelope", logging.Fields{
|
log.Error("failed to decode http envelope", logging.Fields{
|
||||||
"error": err.Error(),
|
"error": err.Error(),
|
||||||
})
|
})
|
||||||
|
|||||||
41
internal/protocol/codec.go
Normal file
41
internal/protocol/codec.go
Normal file
@@ -0,0 +1,41 @@
|
|||||||
|
package protocol
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bufio"
|
||||||
|
"encoding/json"
|
||||||
|
"io"
|
||||||
|
)
|
||||||
|
|
||||||
|
// defaultDecoderBufferSize 는 pion/dtls 가 복호화한 애플리케이션 데이터를
|
||||||
|
// JSON 디코더가 안전하게 처리할 수 있도록 사용하는 버퍼 크기입니다.
|
||||||
|
// This matches existing 64KiB readers used around DTLS sessions.
|
||||||
|
const defaultDecoderBufferSize = 64 * 1024
|
||||||
|
|
||||||
|
// WireCodec 는 protocol.Envelope 의 직렬화/역직렬화를 추상화합니다.
|
||||||
|
// JSON, Protobuf, length-prefixed binary 등으로 교체할 때 이 인터페이스만 유지하면 됩니다.
|
||||||
|
type WireCodec interface {
|
||||||
|
Encode(w io.Writer, env *Envelope) error
|
||||||
|
Decode(r io.Reader, env *Envelope) error
|
||||||
|
}
|
||||||
|
|
||||||
|
// jsonCodec 은 현재 사용 중인 JSON 기반 WireCodec 구현입니다.
|
||||||
|
type jsonCodec struct{}
|
||||||
|
|
||||||
|
// Encode 는 Envelope 를 JSON 으로 인코딩해 작성합니다.
|
||||||
|
// Encode encodes an Envelope as JSON to the given writer.
|
||||||
|
func (jsonCodec) Encode(w io.Writer, env *Envelope) error {
|
||||||
|
enc := json.NewEncoder(w)
|
||||||
|
return enc.Encode(env)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Decode 는 DTLS 세션에서 읽은 데이터를 JSON Envelope 로 디코딩합니다.
|
||||||
|
// pion/dtls 의 버퍼 특성 때문에, 충분히 큰 bufio.Reader 로 감싸서 사용합니다.
|
||||||
|
// Decode decodes an Envelope from JSON using a buffered reader on top of the DTLS session.
|
||||||
|
func (jsonCodec) Decode(r io.Reader, env *Envelope) error {
|
||||||
|
dec := json.NewDecoder(bufio.NewReaderSize(r, defaultDecoderBufferSize))
|
||||||
|
return dec.Decode(env)
|
||||||
|
}
|
||||||
|
|
||||||
|
// DefaultCodec 은 현재 런타임에서 사용하는 기본 WireCodec 입니다.
|
||||||
|
// 초기 구현은 JSON 기반이지만, 추후 Protobuf/length-prefixed binary 로 교체 가능하도록 분리해 두었습니다.
|
||||||
|
var DefaultCodec WireCodec = jsonCodec{}
|
||||||
@@ -1,10 +1,8 @@
|
|||||||
package proxy
|
package proxy
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bufio"
|
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"net"
|
"net"
|
||||||
@@ -67,10 +65,10 @@ func (p *ClientProxy) StartLoop(ctx context.Context, sess dtls.Session) error {
|
|||||||
// "dtls: buffer too small" 오류가 날 수 있으므로, 여기서는 여유 있는 버퍼(64KiB)를 사용합니다. (ko)
|
// "dtls: buffer too small" 오류가 날 수 있으므로, 여기서는 여유 있는 버퍼(64KiB)를 사용합니다. (ko)
|
||||||
// NOTE: pion/dtls decrypts application data into the buffer provided by the caller.
|
// NOTE: pion/dtls decrypts application data into the buffer provided by the caller.
|
||||||
// Using only the default JSON decoder buffer (a few hundred bytes) can trigger
|
// Using only the default JSON decoder buffer (a few hundred bytes) can trigger
|
||||||
// "dtls: buffer too small" for large HTTP bodies/envelopes, so we wrap the
|
// "dtls: buffer too small" for large HTTP bodies/envelopes. The default
|
||||||
// session with a reasonably large bufio.Reader (64KiB). (en)
|
// JSON-based WireCodec internally wraps the DTLS session with a 64KiB
|
||||||
dec := json.NewDecoder(bufio.NewReaderSize(sess, 64*1024))
|
// bufio.Reader, matching this requirement. (en)
|
||||||
enc := json.NewEncoder(sess)
|
codec := protocol.DefaultCodec
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
@@ -83,7 +81,7 @@ func (p *ClientProxy) StartLoop(ctx context.Context, sess dtls.Session) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
var env protocol.Envelope
|
var env protocol.Envelope
|
||||||
if err := dec.Decode(&env); err != nil {
|
if err := codec.Decode(sess, &env); err != nil {
|
||||||
if err == io.EOF {
|
if err == io.EOF {
|
||||||
log.Info("dtls session closed by server", nil)
|
log.Info("dtls session closed by server", nil)
|
||||||
return nil
|
return nil
|
||||||
@@ -135,7 +133,7 @@ func (p *ClientProxy) StartLoop(ctx context.Context, sess dtls.Session) error {
|
|||||||
HTTPResponse: &resp,
|
HTTPResponse: &resp,
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := enc.Encode(&respEnv); err != nil {
|
if err := codec.Encode(sess, &respEnv); err != nil {
|
||||||
logReq.Error("failed to encode http response envelope", logging.Fields{
|
logReq.Error("failed to encode http response envelope", logging.Fields{
|
||||||
"error": err.Error(),
|
"error": err.Error(),
|
||||||
})
|
})
|
||||||
|
|||||||
21
progress.md
21
progress.md
@@ -265,7 +265,7 @@ The following tasks describe concrete work items to be implemented on the `featu
|
|||||||
##### 3.3A.1 스트림 프레이밍 프로토콜 설계 (JSON 1단계)
|
##### 3.3A.1 스트림 프레이밍 프로토콜 설계 (JSON 1단계)
|
||||||
##### 3.3A.1 Stream framing protocol (JSON, phase 1)
|
##### 3.3A.1 Stream framing protocol (JSON, phase 1)
|
||||||
|
|
||||||
- [ ] 스트림 프레임 타입 정리 및 확장: [`internal/protocol/protocol.go`](internal/protocol/protocol.go:35)
|
- [x] 스트림 프레임 타입 정리 및 확장: [`internal/protocol/protocol.go`](internal/protocol/protocol.go:35)
|
||||||
- 이미 정의된 스트림 관련 타입을 1단계에서 적극 활용합니다.
|
- 이미 정의된 스트림 관련 타입을 1단계에서 적극 활용합니다.
|
||||||
Reuse the already defined stream-related types in phase 1:
|
Reuse the already defined stream-related types in phase 1:
|
||||||
- `MessageTypeStreamOpen`, `MessageTypeStreamData`, `MessageTypeStreamClose`
|
- `MessageTypeStreamOpen`, `MessageTypeStreamData`, `MessageTypeStreamClose`
|
||||||
@@ -281,7 +281,7 @@ The following tasks describe concrete work items to be implemented on the `featu
|
|||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
- [ ] 스트림 ACK / 재전송 제어 메시지 추가: [`internal/protocol/protocol.go`](internal/protocol/protocol.go:52)
|
- [x] 스트림 ACK / 재전송 제어 메시지 추가: [`internal/protocol/protocol.go`](internal/protocol/protocol.go:52)
|
||||||
- 선택적 재전송(Selective Retransmission)을 위해 `StreamAck` 메시지와 `MessageTypeStreamAck` 를 추가합니다.
|
- 선택적 재전송(Selective Retransmission)을 위해 `StreamAck` 메시지와 `MessageTypeStreamAck` 를 추가합니다.
|
||||||
Add `StreamAck` message and `MessageTypeStreamAck` for selective retransmission:
|
Add `StreamAck` message and `MessageTypeStreamAck` for selective retransmission:
|
||||||
```go
|
```go
|
||||||
@@ -310,7 +310,7 @@ The following tasks describe concrete work items to be implemented on the `featu
|
|||||||
##### 3.3A.2 애플리케이션 레벨 ARQ 설계 (Selective Retransmission)
|
##### 3.3A.2 애플리케이션 레벨 ARQ 설계 (Selective Retransmission)
|
||||||
##### 3.3A.2 Application-level ARQ (Selective Retransmission)
|
##### 3.3A.2 Application-level ARQ (Selective Retransmission)
|
||||||
|
|
||||||
- [ ] 수신 측 스트림 상태 관리 로직 설계
|
- [x] 수신 측 스트림 상태 관리 로직 설계
|
||||||
- 스트림별로 다음 상태를 유지합니다.
|
- 스트림별로 다음 상태를 유지합니다.
|
||||||
For each stream, maintain:
|
For each stream, maintain:
|
||||||
- `expectedSeq` (다음에 연속으로 기대하는 Seq, 초기값 0)
|
- `expectedSeq` (다음에 연속으로 기대하는 Seq, 초기값 0)
|
||||||
@@ -328,7 +328,7 @@ The following tasks describe concrete work items to be implemented on the `featu
|
|||||||
`expectedSeq` ~ `Seq-1` 구간 중 비어 있는 Seq 들을 `lostBuffer` 에 추가.
|
`expectedSeq` ~ `Seq-1` 구간 중 비어 있는 Seq 들을 `lostBuffer` 에 추가.
|
||||||
If `Seq > expectedSeq`, buffer as out-of-order and mark missing seqs in `lostBuffer`.
|
If `Seq > expectedSeq`, buffer as out-of-order and mark missing seqs in `lostBuffer`.
|
||||||
|
|
||||||
- [ ] 수신 측 StreamAck 전송 정책
|
- [x] 수신 측 StreamAck 전송 정책
|
||||||
- 주기적 타이머 또는 일정 수의 프레임 처리 후에 `StreamAck` 를 전송합니다.
|
- 주기적 타이머 또는 일정 수의 프레임 처리 후에 `StreamAck` 를 전송합니다.
|
||||||
Send `StreamAck` periodically or after processing N frames:
|
Send `StreamAck` periodically or after processing N frames:
|
||||||
- `AckSeq = expectedSeq - 1` (연속 수신 완료 지점)
|
- `AckSeq = expectedSeq - 1` (연속 수신 완료 지점)
|
||||||
@@ -336,7 +336,7 @@ The following tasks describe concrete work items to be implemented on the `featu
|
|||||||
- `LostSeqs` 는 윈도우 내 손실 시퀀스 중 상한 개수까지만 포함 (과도한 길이 방지).
|
- `LostSeqs` 는 윈도우 내 손실 시퀀스 중 상한 개수까지만 포함 (과도한 길이 방지).
|
||||||
`LostSeqs` should only include a bounded set of missing seqs within the receive window.
|
`LostSeqs` should only include a bounded set of missing seqs within the receive window.
|
||||||
|
|
||||||
- [ ] 송신 측 재전송 로직
|
- [x] 송신 측 재전송 로직
|
||||||
- 스트림별로 다음 상태를 유지합니다.
|
- 스트림별로 다음 상태를 유지합니다.
|
||||||
For each stream on the sender:
|
For each stream on the sender:
|
||||||
- `sendSeq` – 송신에 사용할 다음 Seq (0부터 시작)
|
- `sendSeq` – 송신에 사용할 다음 Seq (0부터 시작)
|
||||||
@@ -361,7 +361,7 @@ The following tasks describe concrete work items to be implemented on the `featu
|
|||||||
##### 3.3A.3 HTTP ↔ 스트림 매핑 (서버/클라이언트)
|
##### 3.3A.3 HTTP ↔ 스트림 매핑 (서버/클라이언트)
|
||||||
##### 3.3A.3 HTTP ↔ stream mapping (server/client)
|
##### 3.3A.3 HTTP ↔ stream mapping (server/client)
|
||||||
|
|
||||||
- [ ] 서버 → 클라이언트 요청 스트림: [`cmd/server/main.go`](cmd/server/main.go:200)
|
- [x] 서버 → 클라이언트 요청 스트림: [`cmd/server/main.go`](cmd/server/main.go:200)
|
||||||
- 현재 `ForwardHTTP` 는 단일 `HTTPRequest`/`HTTPResponse` 를 처리하는 구조입니다.
|
- 현재 `ForwardHTTP` 는 단일 `HTTPRequest`/`HTTPResponse` 를 처리하는 구조입니다.
|
||||||
Currently `ForwardHTTP` handles a single `HTTPRequest`/`HTTPResponse` pair.
|
Currently `ForwardHTTP` handles a single `HTTPRequest`/`HTTPResponse` pair.
|
||||||
- 스트림 모드에서는 다음과 같이 바꿉니다.
|
- 스트림 모드에서는 다음과 같이 바꿉니다.
|
||||||
@@ -385,7 +385,7 @@ The following tasks describe concrete work items to be implemented on the `featu
|
|||||||
- `StreamClose` 수신 시 응답 종료 및 스트림 자원 정리.
|
- `StreamClose` 수신 시 응답 종료 및 스트림 자원 정리.
|
||||||
On `StreamClose`, finish the response and clean up per-stream state.
|
On `StreamClose`, finish the response and clean up per-stream state.
|
||||||
|
|
||||||
- [ ] 클라이언트에서의 요청 처리 스트림: [`internal/proxy/client.go`](internal/proxy/client.go:200)
|
- [x] 클라이언트에서의 요청 처리 스트림: [`internal/proxy/client.go`](internal/proxy/client.go:200)
|
||||||
- 서버로부터 들어오는 `StreamOpen{ID, ...}` 을 수신하면,
|
- 서버로부터 들어오는 `StreamOpen{ID, ...}` 을 수신하면,
|
||||||
새로운 goroutine 을 띄워 해당 ID에 대한 로컬 HTTP 요청을 수행합니다.
|
새로운 goroutine 을 띄워 해당 ID에 대한 로컬 HTTP 요청을 수행합니다.
|
||||||
On receiving `StreamOpen{ID, ...}` from the server, spawn a goroutine to handle the local HTTP request for that stream ID.
|
On receiving `StreamOpen{ID, ...}` from the server, spawn a goroutine to handle the local HTTP request for that stream ID.
|
||||||
@@ -413,8 +413,11 @@ The following tasks describe concrete work items to be implemented on the `featu
|
|||||||
- 동일한 logical model (`StreamOpen` / `StreamData(seq)` / `StreamClose` / `StreamAck`)을 유지한 채,
|
- 동일한 logical model (`StreamOpen` / `StreamData(seq)` / `StreamClose` / `StreamAck`)을 유지한 채,
|
||||||
wire-format 만 Protobuf 또는 MsgPack 등의 length-prefix binary 프레이밍으로 교체할 수 있습니다.
|
wire-format 만 Protobuf 또는 MsgPack 등의 length-prefix binary 프레이밍으로 교체할 수 있습니다.
|
||||||
We can later keep the same logical model and swap the wire format for Protobuf or other length-prefix binary framing.
|
We can later keep the same logical model and swap the wire format for Protobuf or other length-prefix binary framing.
|
||||||
- 이 전환은 `internal/protocol` 내 직렬화 레이어를 얇은 abstraction 으로 감싸 구현할 수 있습니다.
|
- [x] 이 전환은 `internal/protocol` 내 직렬화 레이어를 얇은 abstraction 으로 감싸 구현할 수 있습니다.
|
||||||
This can be implemented by wrapping serialization in a thin abstraction layer inside [`internal/protocol`](internal/protocol/protocol.go:35).
|
- 현재는 [`internal/protocol/codec.go`](internal/protocol/codec.go:1) 에 `WireCodec` 인터페이스와 JSON 기반 `DefaultCodec` 을 도입하여,
|
||||||
|
추후 Protobuf/이진 포맷으로 교체할 때 호출자는 `protocol.DefaultCodec` 만 사용하도록 분리해 두었습니다.
|
||||||
|
- This has been prepared via [`internal/protocol/codec.go`](internal/protocol/codec.go:1), which introduces a `WireCodec` interface
|
||||||
|
and a JSON-based `DefaultCodec` so that future Protobuf/binary codecs can be swapped in behind the same API.
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user