mirror of
https://github.com/dalbodeule/hop-gate.git
synced 2025-12-12 14:50:09 +09:00
Merge pull request #19 from dalbodeule/copilot/fix-dtls-buffer-error
Fix DTLS buffer size, concurrent request handling, and client frame robustness
This commit is contained in:
@@ -1,6 +1,7 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bufio"
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
@@ -11,6 +12,7 @@ import (
|
|||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
@@ -33,10 +35,138 @@ import (
|
|||||||
// 기본값 "dev" 는 로컬 개발용입니다.
|
// 기본값 "dev" 는 로컬 개발용입니다.
|
||||||
var version = "dev"
|
var version = "dev"
|
||||||
|
|
||||||
|
// pendingRequest tracks a request waiting for its response
|
||||||
|
type pendingRequest struct {
|
||||||
|
streamID protocol.StreamID
|
||||||
|
respCh chan *protocol.Envelope
|
||||||
|
doneCh chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// streamSender 는 특정 스트림에 대해 전송한 StreamData 프레임의 payload 를
|
||||||
|
// 시퀀스 번호별로 보관하여, peer 로부터의 StreamAck 를 기반으로 선택적 재전송을
|
||||||
|
// 수행하기 위한 송신 측 ARQ 상태를 나타냅니다. (ko)
|
||||||
|
// streamSender keeps outstanding StreamData payloads per sequence number so that
|
||||||
|
// they can be selectively retransmitted based on StreamAck from the peer. (en)
|
||||||
|
type streamSender struct {
|
||||||
|
mu sync.Mutex
|
||||||
|
outstanding map[uint64][]byte
|
||||||
|
}
|
||||||
|
|
||||||
|
func newStreamSender() *streamSender {
|
||||||
|
return &streamSender{
|
||||||
|
outstanding: make(map[uint64][]byte),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *streamSender) register(seq uint64, data []byte) {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
|
||||||
|
if s.outstanding == nil {
|
||||||
|
s.outstanding = make(map[uint64][]byte)
|
||||||
|
}
|
||||||
|
buf := make([]byte, len(data))
|
||||||
|
copy(buf, data)
|
||||||
|
s.outstanding[seq] = buf
|
||||||
|
}
|
||||||
|
|
||||||
|
// handleAck 는 주어진 StreamAck 를 적용하여 AckSeq 이하의 프레임을 정리하고,
|
||||||
|
// LostSeqs 중 아직 outstanding 에 남아 있는 시퀀스의 payload 를 복사하여
|
||||||
|
// 재전송 대상 목록으로 반환합니다. (ko)
|
||||||
|
// handleAck applies the given StreamAck, removes frames up to AckSeq, and
|
||||||
|
// returns copies of payloads for LostSeqs that are still outstanding so that
|
||||||
|
// they can be retransmitted. (en)
|
||||||
|
func (s *streamSender) handleAck(ack *protocol.StreamAck) map[uint64][]byte {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
|
||||||
|
if s.outstanding == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// 연속 수신 완료 구간(seq <= AckSeq)은 outstanding 에서 제거합니다.
|
||||||
|
for seq := range s.outstanding {
|
||||||
|
if seq <= ack.AckSeq {
|
||||||
|
delete(s.outstanding, seq)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// LostSeqs 가 비어 있으면 재전송할 것이 없습니다.
|
||||||
|
if len(ack.LostSeqs) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// LostSeqs 중 아직 outstanding 에 남아 있는 것만 재전송 대상으로 선택합니다.
|
||||||
|
lost := make(map[uint64][]byte, len(ack.LostSeqs))
|
||||||
|
for _, seq := range ack.LostSeqs {
|
||||||
|
if data, ok := s.outstanding[seq]; ok {
|
||||||
|
buf := make([]byte, len(data))
|
||||||
|
copy(buf, data)
|
||||||
|
lost[seq] = buf
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return lost
|
||||||
|
}
|
||||||
|
|
||||||
type dtlsSessionWrapper struct {
|
type dtlsSessionWrapper struct {
|
||||||
sess dtls.Session
|
sess dtls.Session
|
||||||
|
bufferedReader *bufio.Reader
|
||||||
|
codec protocol.WireCodec
|
||||||
|
logger logging.Logger
|
||||||
|
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
nextStreamID uint64
|
nextStreamID uint64
|
||||||
|
pending map[protocol.StreamID]*pendingRequest
|
||||||
|
readerDone chan struct{}
|
||||||
|
|
||||||
|
// streamSenders 는 서버 → 클라이언트 방향 HTTP 요청 바디 전송에 대한
|
||||||
|
// 송신 측 ARQ 상태를 보관합니다. (ko)
|
||||||
|
// streamSenders keeps ARQ sender state for HTTP request bodies sent
|
||||||
|
// from server to client. (en)
|
||||||
|
streamSenders map[protocol.StreamID]*streamSender
|
||||||
|
|
||||||
|
// requestMu 는 이 DTLS 세션에서 동시에 처리될 수 있는 HTTP 요청을
|
||||||
|
// 하나로 제한하기 위한 뮤텍스입니다. (ko)
|
||||||
|
// requestMu serializes HTTP requests on this DTLS session so that the
|
||||||
|
// client (which currently processes one StreamOpen at a time) does not
|
||||||
|
// see interleaved streams. (en)
|
||||||
|
requestMu sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
// registerStreamSender 는 주어진 스트림 ID 에 대한 송신 측 ARQ 상태를 등록합니다. (ko)
|
||||||
|
// registerStreamSender registers the sender-side ARQ state for a given stream ID. (en)
|
||||||
|
func (w *dtlsSessionWrapper) registerStreamSender(id protocol.StreamID, sender *streamSender) {
|
||||||
|
w.mu.Lock()
|
||||||
|
defer w.mu.Unlock()
|
||||||
|
|
||||||
|
if w.streamSenders == nil {
|
||||||
|
w.streamSenders = make(map[protocol.StreamID]*streamSender)
|
||||||
|
}
|
||||||
|
w.streamSenders[id] = sender
|
||||||
|
}
|
||||||
|
|
||||||
|
// unregisterStreamSender 는 더 이상 사용하지 않는 스트림 ID 에 대한 송신 측 ARQ 상태를 제거합니다. (ko)
|
||||||
|
// unregisterStreamSender removes the sender-side ARQ state for a stream ID that is no longer used. (en)
|
||||||
|
func (w *dtlsSessionWrapper) unregisterStreamSender(id protocol.StreamID) {
|
||||||
|
w.mu.Lock()
|
||||||
|
defer w.mu.Unlock()
|
||||||
|
|
||||||
|
if w.streamSenders == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
delete(w.streamSenders, id)
|
||||||
|
}
|
||||||
|
|
||||||
|
// getStreamSender 는 주어진 스트림 ID 에 대한 송신 측 ARQ 상태를 반환합니다. (ko)
|
||||||
|
// getStreamSender returns the sender-side ARQ state for the given stream ID, if any. (en)
|
||||||
|
func (w *dtlsSessionWrapper) getStreamSender(id protocol.StreamID) *streamSender {
|
||||||
|
w.mu.Lock()
|
||||||
|
defer w.mu.Unlock()
|
||||||
|
|
||||||
|
if w.streamSenders == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return w.streamSenders[id]
|
||||||
}
|
}
|
||||||
|
|
||||||
func getEnvOrPanic(logger logging.Logger, key string) string {
|
func getEnvOrPanic(logger logging.Logger, key string) string {
|
||||||
@@ -174,22 +304,188 @@ func parseExpectedIPsFromEnv(logger logging.Logger, envKey string) []net.IP {
|
|||||||
|
|
||||||
// ForwardHTTP 는 HTTP 요청을 DTLS 세션 위의 StreamOpen/StreamData/StreamClose 프레임으로 전송하고,
|
// ForwardHTTP 는 HTTP 요청을 DTLS 세션 위의 StreamOpen/StreamData/StreamClose 프레임으로 전송하고,
|
||||||
// 역방향 스트림 응답을 수신해 protocol.Response 로 반환합니다. (ko)
|
// 역방향 스트림 응답을 수신해 protocol.Response 로 반환합니다. (ko)
|
||||||
|
// readLoop continuously reads from the DTLS session and dispatches incoming frames
|
||||||
|
// to the appropriate pending request based on stream ID. It also handles
|
||||||
|
// application-level ARQ (StreamAck) for request bodies sent from server to client. (en)
|
||||||
|
func (w *dtlsSessionWrapper) readLoop() {
|
||||||
|
defer close(w.readerDone)
|
||||||
|
|
||||||
|
for {
|
||||||
|
var env protocol.Envelope
|
||||||
|
if err := w.codec.Decode(w.bufferedReader, &env); err != nil {
|
||||||
|
if err == io.EOF {
|
||||||
|
w.logger.Info("dtls session closed", nil)
|
||||||
|
} else {
|
||||||
|
w.logger.Error("failed to decode envelope in read loop", logging.Fields{
|
||||||
|
"error": err.Error(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
// Notify all pending requests of the error by closing their response channels.
|
||||||
|
// The doneCh will be closed by each ForwardHTTP's defer.
|
||||||
|
w.mu.Lock()
|
||||||
|
for _, pending := range w.pending {
|
||||||
|
close(pending.respCh)
|
||||||
|
}
|
||||||
|
w.pending = make(map[protocol.StreamID]*pendingRequest)
|
||||||
|
w.mu.Unlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// 1) StreamAck 처리: 서버 → 클라이언트 방향 요청 바디 전송에 대한 ARQ. (ko)
|
||||||
|
// 1) Handle StreamAck: application-level ARQ for request bodies
|
||||||
|
// sent from server to client. (en)
|
||||||
|
if env.Type == protocol.MessageTypeStreamAck {
|
||||||
|
sa := env.StreamAck
|
||||||
|
if sa == nil {
|
||||||
|
w.logger.Warn("received stream_ack envelope with nil payload", logging.Fields{})
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
streamID := sa.ID
|
||||||
|
sender := w.getStreamSender(streamID)
|
||||||
|
if sender == nil {
|
||||||
|
w.logger.Warn("received stream_ack for unknown stream ID", logging.Fields{
|
||||||
|
"stream_id": streamID,
|
||||||
|
})
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
lost := sender.handleAck(sa)
|
||||||
|
for seq, data := range lost {
|
||||||
|
retryEnv := protocol.Envelope{
|
||||||
|
Type: protocol.MessageTypeStreamData,
|
||||||
|
StreamData: &protocol.StreamData{
|
||||||
|
ID: streamID,
|
||||||
|
Seq: seq,
|
||||||
|
Data: data,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
if err := w.codec.Encode(w.sess, &retryEnv); err != nil {
|
||||||
|
w.logger.Error("failed to retransmit stream_data after stream_ack", logging.Fields{
|
||||||
|
"stream_id": streamID,
|
||||||
|
"seq": seq,
|
||||||
|
"error": err.Error(),
|
||||||
|
})
|
||||||
|
// 세션 쓰기 오류가 발생하면 루프를 종료하여 상위에서 세션 종료를 유도합니다. (ko)
|
||||||
|
// On write error, stop the loop so that the caller can tear down the session. (en)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// StreamAck 는 애플리케이션 페이로드를 포함하지 않으므로 pending 에 전달하지 않습니다. (ko)
|
||||||
|
// StreamAck carries no application payload, so it is not forwarded to pending requests. (en)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2) StreamOpen / StreamData / StreamClose 에 대해 stream ID 를 산출하고,
|
||||||
|
// 해당 pending 요청으로 전달합니다. (ko)
|
||||||
|
// 2) For StreamOpen / StreamData / StreamClose, determine the stream ID
|
||||||
|
// and forward to the corresponding pending request. (en)
|
||||||
|
var streamID protocol.StreamID
|
||||||
|
switch env.Type {
|
||||||
|
case protocol.MessageTypeStreamOpen:
|
||||||
|
if env.StreamOpen != nil {
|
||||||
|
streamID = env.StreamOpen.ID
|
||||||
|
}
|
||||||
|
case protocol.MessageTypeStreamData:
|
||||||
|
if env.StreamData != nil {
|
||||||
|
streamID = env.StreamData.ID
|
||||||
|
}
|
||||||
|
case protocol.MessageTypeStreamClose:
|
||||||
|
if env.StreamClose != nil {
|
||||||
|
streamID = env.StreamClose.ID
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
w.logger.Warn("received unexpected envelope type in read loop", logging.Fields{
|
||||||
|
"type": env.Type,
|
||||||
|
})
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if streamID == "" {
|
||||||
|
w.logger.Warn("received envelope with empty stream ID", logging.Fields{
|
||||||
|
"type": env.Type,
|
||||||
|
})
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Find the pending request for this stream ID
|
||||||
|
w.mu.Lock()
|
||||||
|
pending := w.pending[streamID]
|
||||||
|
w.mu.Unlock()
|
||||||
|
|
||||||
|
if pending == nil {
|
||||||
|
w.logger.Warn("received envelope for unknown stream ID", logging.Fields{
|
||||||
|
"stream_id": streamID,
|
||||||
|
"type": env.Type,
|
||||||
|
})
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send the envelope to the waiting request
|
||||||
|
select {
|
||||||
|
case pending.respCh <- &env:
|
||||||
|
// Successfully delivered
|
||||||
|
case <-pending.doneCh:
|
||||||
|
// Request was cancelled or timed out
|
||||||
|
w.logger.Warn("pending request already closed", logging.Fields{
|
||||||
|
"stream_id": streamID,
|
||||||
|
})
|
||||||
|
default:
|
||||||
|
// Channel buffer full - shouldn't happen with proper sizing
|
||||||
|
w.logger.Warn("response channel buffer full, dropping frame", logging.Fields{
|
||||||
|
"stream_id": streamID,
|
||||||
|
"type": env.Type,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// ForwardHTTP forwards an HTTP request over the DTLS session using StreamOpen/StreamData/StreamClose
|
// ForwardHTTP forwards an HTTP request over the DTLS session using StreamOpen/StreamData/StreamClose
|
||||||
// frames and reconstructs the reverse stream into a protocol.Response. (en)
|
// frames and reconstructs the reverse stream into a protocol.Response. (en)
|
||||||
|
// This method now supports concurrent requests by using a channel-based multiplexing approach.
|
||||||
func (w *dtlsSessionWrapper) ForwardHTTP(ctx context.Context, logger logging.Logger, req *http.Request, serviceName string) (*protocol.Response, error) {
|
func (w *dtlsSessionWrapper) ForwardHTTP(ctx context.Context, logger logging.Logger, req *http.Request, serviceName string) (*protocol.Response, error) {
|
||||||
w.mu.Lock()
|
|
||||||
defer w.mu.Unlock()
|
|
||||||
|
|
||||||
if ctx == nil {
|
if ctx == nil {
|
||||||
ctx = context.Background()
|
ctx = context.Background()
|
||||||
}
|
}
|
||||||
|
|
||||||
codec := protocol.DefaultCodec
|
// 현재 클라이언트 구현은 DTLS 세션당 하나의 StreamOpen/StreamData/StreamClose
|
||||||
|
// 만 순차적으로 처리하므로, 서버에서도 동일 세션 위 HTTP 요청을 직렬화합니다. (ko)
|
||||||
|
// The current client processes exactly one StreamOpen/StreamData/StreamClose
|
||||||
|
// sequence at a time per DTLS session, so we serialize HTTP requests on
|
||||||
|
// this session as well. (en)
|
||||||
|
w.requestMu.Lock()
|
||||||
|
defer w.requestMu.Unlock()
|
||||||
|
|
||||||
// 세션 내에서 고유한 StreamID 를 생성합니다. (ko)
|
// Generate a unique stream ID (needs mutex for nextStreamID)
|
||||||
// Generate a unique StreamID for this HTTP request within the DTLS session. (en)
|
w.mu.Lock()
|
||||||
streamID := w.nextHTTPStreamID()
|
streamID := w.nextHTTPStreamID()
|
||||||
|
|
||||||
|
// Channel buffer size for response frames to avoid blocking readLoop.
|
||||||
|
// A typical HTTP response has: 1 StreamOpen + N StreamData + 1 StreamClose frames.
|
||||||
|
// With 4KB chunks, even large responses stay within this buffer.
|
||||||
|
const responseChannelBuffer = 16
|
||||||
|
|
||||||
|
// Create a pending request to receive responses
|
||||||
|
pending := &pendingRequest{
|
||||||
|
streamID: streamID,
|
||||||
|
respCh: make(chan *protocol.Envelope, responseChannelBuffer),
|
||||||
|
doneCh: make(chan struct{}),
|
||||||
|
}
|
||||||
|
w.pending[streamID] = pending
|
||||||
|
w.mu.Unlock()
|
||||||
|
|
||||||
|
// 서버 → 클라이언트 방향 요청 바디 전송에 대한 송신 측 ARQ 상태를 준비합니다. (ko)
|
||||||
|
// Prepare ARQ sender state for the request body sent from server to client. (en)
|
||||||
|
sender := newStreamSender()
|
||||||
|
w.registerStreamSender(streamID, sender)
|
||||||
|
|
||||||
|
// Ensure cleanup on exit
|
||||||
|
defer func() {
|
||||||
|
w.mu.Lock()
|
||||||
|
delete(w.pending, streamID)
|
||||||
|
w.mu.Unlock()
|
||||||
|
close(pending.doneCh)
|
||||||
|
w.unregisterStreamSender(streamID)
|
||||||
|
}()
|
||||||
|
|
||||||
log := logger.With(logging.Fields{
|
log := logger.With(logging.Fields{
|
||||||
"component": "http_to_dtls",
|
"component": "http_to_dtls",
|
||||||
"request_id": string(streamID),
|
"request_id": string(streamID),
|
||||||
@@ -231,7 +527,7 @@ func (w *dtlsSessionWrapper) ForwardHTTP(ctx context.Context, logger logging.Log
|
|||||||
Header: hdr,
|
Header: hdr,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
if err := codec.Encode(w.sess, openEnv); err != nil {
|
if err := w.codec.Encode(w.sess, openEnv); err != nil {
|
||||||
log.Error("failed to encode stream_open envelope", logging.Fields{
|
log.Error("failed to encode stream_open envelope", logging.Fields{
|
||||||
"error": err.Error(),
|
"error": err.Error(),
|
||||||
})
|
})
|
||||||
@@ -247,6 +543,10 @@ func (w *dtlsSessionWrapper) ForwardHTTP(ctx context.Context, logger logging.Log
|
|||||||
n, err := req.Body.Read(buf)
|
n, err := req.Body.Read(buf)
|
||||||
if n > 0 {
|
if n > 0 {
|
||||||
dataCopy := append([]byte(nil), buf[:n]...)
|
dataCopy := append([]byte(nil), buf[:n]...)
|
||||||
|
// 송신 측 ARQ: Seq 별 payload 를 기록해 두었다가, 클라이언트의 StreamAck 를 기반으로 재전송합니다. (ko)
|
||||||
|
// Sender-side ARQ: record payload per Seq so it can be retransmitted based on StreamAck from the client. (en)
|
||||||
|
sender.register(seq, dataCopy)
|
||||||
|
|
||||||
dataEnv := &protocol.Envelope{
|
dataEnv := &protocol.Envelope{
|
||||||
Type: protocol.MessageTypeStreamData,
|
Type: protocol.MessageTypeStreamData,
|
||||||
StreamData: &protocol.StreamData{
|
StreamData: &protocol.StreamData{
|
||||||
@@ -255,7 +555,7 @@ func (w *dtlsSessionWrapper) ForwardHTTP(ctx context.Context, logger logging.Log
|
|||||||
Data: dataCopy,
|
Data: dataCopy,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
if err2 := codec.Encode(w.sess, dataEnv); err2 != nil {
|
if err2 := w.codec.Encode(w.sess, dataEnv); err2 != nil {
|
||||||
log.Error("failed to encode stream_data envelope", logging.Fields{
|
log.Error("failed to encode stream_data envelope", logging.Fields{
|
||||||
"error": err2.Error(),
|
"error": err2.Error(),
|
||||||
})
|
})
|
||||||
@@ -281,7 +581,7 @@ func (w *dtlsSessionWrapper) ForwardHTTP(ctx context.Context, logger logging.Log
|
|||||||
Error: "",
|
Error: "",
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
if err := codec.Encode(w.sess, closeReqEnv); err != nil {
|
if err := w.codec.Encode(w.sess, closeReqEnv); err != nil {
|
||||||
log.Error("failed to encode request stream_close envelope", logging.Fields{
|
log.Error("failed to encode request stream_close envelope", logging.Fields{
|
||||||
"error": err.Error(),
|
"error": err.Error(),
|
||||||
})
|
})
|
||||||
@@ -289,24 +589,41 @@ func (w *dtlsSessionWrapper) ForwardHTTP(ctx context.Context, logger logging.Log
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 클라이언트로부터 역방향 스트림 응답을 수신합니다. (ko)
|
// 클라이언트로부터 역방향 스트림 응답을 수신합니다. (ko)
|
||||||
// Receive reverse stream response (StreamOpen + StreamData* + StreamClose). (en)
|
// Receive reverse stream response (StreamOpen + StreamData* + StreamClose) via the readLoop. (en)
|
||||||
var (
|
var (
|
||||||
resp protocol.Response
|
resp protocol.Response
|
||||||
bodyBuf bytes.Buffer
|
bodyBuf bytes.Buffer
|
||||||
gotOpen bool
|
gotOpen bool
|
||||||
statusCode = http.StatusOK
|
statusCode = http.StatusOK
|
||||||
|
|
||||||
|
// 응답 바디(클라이언트 → 서버)에 대한 수신 측 ARQ 상태입니다. (ko)
|
||||||
|
// ARQ receiver state for the response body (client → server). (en)
|
||||||
|
expectedSeq uint64
|
||||||
|
received = make(map[uint64][]byte)
|
||||||
|
lost = make(map[uint64]struct{})
|
||||||
)
|
)
|
||||||
|
const maxLostReport = 32
|
||||||
|
|
||||||
resp.RequestID = string(streamID)
|
resp.RequestID = string(streamID)
|
||||||
resp.Header = make(map[string][]string)
|
resp.Header = make(map[string][]string)
|
||||||
|
|
||||||
for {
|
for {
|
||||||
var env protocol.Envelope
|
select {
|
||||||
if err := codec.Decode(w.sess, &env); err != nil {
|
case <-ctx.Done():
|
||||||
log.Error("failed to decode stream response envelope", logging.Fields{
|
log.Error("context cancelled while waiting for response", logging.Fields{
|
||||||
"error": err.Error(),
|
"error": ctx.Err().Error(),
|
||||||
})
|
})
|
||||||
return nil, err
|
return nil, ctx.Err()
|
||||||
|
|
||||||
|
case <-w.readerDone:
|
||||||
|
log.Error("dtls session closed while waiting for response", nil)
|
||||||
|
return nil, fmt.Errorf("dtls session closed")
|
||||||
|
|
||||||
|
case env, ok := <-pending.respCh:
|
||||||
|
if !ok {
|
||||||
|
// Channel closed, session is dead
|
||||||
|
log.Error("response channel closed unexpectedly", nil)
|
||||||
|
return nil, fmt.Errorf("response channel closed")
|
||||||
}
|
}
|
||||||
|
|
||||||
switch env.Type {
|
switch env.Type {
|
||||||
@@ -315,9 +632,6 @@ func (w *dtlsSessionWrapper) ForwardHTTP(ctx context.Context, logger logging.Log
|
|||||||
if so == nil {
|
if so == nil {
|
||||||
return nil, fmt.Errorf("stream_open response payload is nil")
|
return nil, fmt.Errorf("stream_open response payload is nil")
|
||||||
}
|
}
|
||||||
if so.ID != streamID {
|
|
||||||
return nil, fmt.Errorf("unexpected stream_open for id %q (expected %q)", so.ID, streamID)
|
|
||||||
}
|
|
||||||
// 상태 코드 및 헤더 복원 (pseudo-header 제거). (ko)
|
// 상태 코드 및 헤더 복원 (pseudo-header 제거). (ko)
|
||||||
// Restore status code and headers (strip pseudo-headers). (en)
|
// Restore status code and headers (strip pseudo-headers). (en)
|
||||||
statusStr := firstHeaderValue(so.Header, protocol.HeaderKeyStatus, strconv.Itoa(http.StatusOK))
|
statusStr := firstHeaderValue(so.Header, protocol.HeaderKeyStatus, strconv.Itoa(http.StatusOK))
|
||||||
@@ -340,23 +654,101 @@ func (w *dtlsSessionWrapper) ForwardHTTP(ctx context.Context, logger logging.Log
|
|||||||
if sd == nil {
|
if sd == nil {
|
||||||
return nil, fmt.Errorf("stream_data response payload is nil")
|
return nil, fmt.Errorf("stream_data response payload is nil")
|
||||||
}
|
}
|
||||||
if sd.ID != streamID {
|
|
||||||
return nil, fmt.Errorf("unexpected stream_data for id %q (expected %q)", sd.ID, streamID)
|
// 수신 측 ARQ: Seq 에 따라 분기하고, 연속 구간을 bodyBuf 에 순서대로 기록합니다. (ko)
|
||||||
}
|
// Receiver-side ARQ: handle Seq and append contiguous data to bodyBuf in order. (en)
|
||||||
|
switch {
|
||||||
|
case sd.Seq == expectedSeq:
|
||||||
if len(sd.Data) > 0 {
|
if len(sd.Data) > 0 {
|
||||||
if _, err := bodyBuf.Write(sd.Data); err != nil {
|
if _, err := bodyBuf.Write(sd.Data); err != nil {
|
||||||
return nil, fmt.Errorf("buffer stream_data response: %w", err)
|
return nil, fmt.Errorf("buffer stream_data response: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
expectedSeq++
|
||||||
|
for {
|
||||||
|
data, ok := received[expectedSeq]
|
||||||
|
if !ok {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if len(data) > 0 {
|
||||||
|
if _, err := bodyBuf.Write(data); err != nil {
|
||||||
|
return nil, fmt.Errorf("buffer reordered stream_data response: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
delete(received, expectedSeq)
|
||||||
|
delete(lost, expectedSeq)
|
||||||
|
expectedSeq++
|
||||||
|
}
|
||||||
|
|
||||||
|
// AckSeq 이전 구간의 lost 항목 정리
|
||||||
|
for seq := range lost {
|
||||||
|
if seq < expectedSeq {
|
||||||
|
delete(lost, seq)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
case sd.Seq > expectedSeq:
|
||||||
|
// 앞선 일부 Seq 들이 누락된 상태: 현재 프레임을 버퍼링하고 missing seq 들을 lost 에 추가. (ko)
|
||||||
|
// Missing earlier Seq: buffer this frame and mark missing seqs as lost. (en)
|
||||||
|
if len(sd.Data) > 0 {
|
||||||
|
bufCopy := make([]byte, len(sd.Data))
|
||||||
|
copy(bufCopy, sd.Data)
|
||||||
|
received[sd.Seq] = bufCopy
|
||||||
|
}
|
||||||
|
for seq := expectedSeq; seq < sd.Seq && len(lost) < maxLostReport; seq++ {
|
||||||
|
if _, ok := lost[seq]; !ok {
|
||||||
|
lost[seq] = struct{}{}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
default:
|
||||||
|
// sd.Seq < expectedSeq 인 경우: 이미 처리했거나 Ack 로 커버된 프레임 → 무시. (ko)
|
||||||
|
// sd.Seq < expectedSeq: already processed/acked frame → ignore. (en)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 수신 측 StreamAck 전송:
|
||||||
|
// - AckSeq: 0부터 시작해 연속으로 수신 완료한 마지막 시퀀스 (expectedSeq-1)
|
||||||
|
// - LostSeqs: 현재 윈도우 내에서 누락된 시퀀스 중 상한 개수(maxLostReport)까지만 포함 (ko)
|
||||||
|
// Send receiver-side StreamAck:
|
||||||
|
// - AckSeq: last contiguously received sequence starting from 0 (expectedSeq-1)
|
||||||
|
// - LostSeqs: up to maxLostReport missing sequences in the current window. (en)
|
||||||
|
var ackSeq uint64
|
||||||
|
if expectedSeq == 0 {
|
||||||
|
ackSeq = 0
|
||||||
|
} else {
|
||||||
|
ackSeq = expectedSeq - 1
|
||||||
|
}
|
||||||
|
|
||||||
|
lostSeqs := make([]uint64, 0, len(lost))
|
||||||
|
for seq := range lost {
|
||||||
|
if seq >= expectedSeq {
|
||||||
|
lostSeqs = append(lostSeqs, seq)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(lostSeqs) > 0 {
|
||||||
|
sort.Slice(lostSeqs, func(i, j int) bool { return lostSeqs[i] < lostSeqs[j] })
|
||||||
|
if len(lostSeqs) > maxLostReport {
|
||||||
|
lostSeqs = lostSeqs[:maxLostReport]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ackEnv := protocol.Envelope{
|
||||||
|
Type: protocol.MessageTypeStreamAck,
|
||||||
|
StreamAck: &protocol.StreamAck{
|
||||||
|
ID: streamID,
|
||||||
|
AckSeq: ackSeq,
|
||||||
|
LostSeqs: lostSeqs,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
if err := w.codec.Encode(w.sess, &ackEnv); err != nil {
|
||||||
|
return nil, fmt.Errorf("send stream ack: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
case protocol.MessageTypeStreamClose:
|
case protocol.MessageTypeStreamClose:
|
||||||
sc := env.StreamClose
|
sc := env.StreamClose
|
||||||
if sc == nil {
|
if sc == nil {
|
||||||
return nil, fmt.Errorf("stream_close response payload is nil")
|
return nil, fmt.Errorf("stream_close response payload is nil")
|
||||||
}
|
}
|
||||||
if sc.ID != streamID {
|
|
||||||
return nil, fmt.Errorf("unexpected stream_close for id %q (expected %q)", sc.ID, streamID)
|
|
||||||
}
|
|
||||||
// 스트림 종료: 지금까지 수신한 헤더/바디로 protocol.Response 를 완성합니다. (ko)
|
// 스트림 종료: 지금까지 수신한 헤더/바디로 protocol.Response 를 완성합니다. (ko)
|
||||||
// Stream finished: complete protocol.Response using collected headers/body. (en)
|
// Stream finished: complete protocol.Response using collected headers/body. (en)
|
||||||
resp.Status = statusCode
|
resp.Status = statusCode
|
||||||
@@ -376,6 +768,7 @@ func (w *dtlsSessionWrapper) ForwardHTTP(ctx context.Context, logger logging.Log
|
|||||||
return nil, fmt.Errorf("unexpected envelope type %q in stream response", env.Type)
|
return nil, fmt.Errorf("unexpected envelope type %q in stream response", env.Type)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// nextHTTPStreamID 는 DTLS 세션 내 HTTP 요청에 사용할 고유 StreamID 를 생성합니다. (ko)
|
// nextHTTPStreamID 는 DTLS 세션 내 HTTP 요청에 사용할 고유 StreamID 를 생성합니다. (ko)
|
||||||
@@ -502,7 +895,19 @@ func registerSessionForDomain(domain string, sess dtls.Session, logger logging.L
|
|||||||
if d == "" {
|
if d == "" {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
w := &dtlsSessionWrapper{sess: sess}
|
w := &dtlsSessionWrapper{
|
||||||
|
sess: sess,
|
||||||
|
bufferedReader: bufio.NewReaderSize(sess, protocol.GetDTLSReadBufferSize()),
|
||||||
|
codec: protocol.DefaultCodec,
|
||||||
|
logger: logger.With(logging.Fields{"component": "dtls_session_wrapper", "domain": d}),
|
||||||
|
pending: make(map[protocol.StreamID]*pendingRequest),
|
||||||
|
readerDone: make(chan struct{}),
|
||||||
|
streamSenders: make(map[protocol.StreamID]*streamSender),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start background reader goroutine to demultiplex incoming responses
|
||||||
|
go w.readLoop()
|
||||||
|
|
||||||
sessionsMu.Lock()
|
sessionsMu.Lock()
|
||||||
sessionsByDomain[d] = w
|
sessionsByDomain[d] = w
|
||||||
sessionsMu.Unlock()
|
sessionsMu.Unlock()
|
||||||
|
|||||||
@@ -16,6 +16,16 @@ import (
|
|||||||
// This matches existing 64KiB readers used around DTLS sessions (used by the JSON codec).
|
// This matches existing 64KiB readers used around DTLS sessions (used by the JSON codec).
|
||||||
const defaultDecoderBufferSize = 64 * 1024
|
const defaultDecoderBufferSize = 64 * 1024
|
||||||
|
|
||||||
|
// dtlsReadBufferSize 는 pion/dtls 내부 버퍼 한계에 맞춘 읽기 버퍼 크기입니다.
|
||||||
|
// pion/dtls 의 UnpackDatagram 함수는 8KB (8,192 bytes) 의 기본 수신 버퍼를 사용합니다.
|
||||||
|
// DTLS는 UDP 기반이므로 한 번의 Read()에서 전체 datagram을 읽어야 하며,
|
||||||
|
// 이 크기를 초과하는 DTLS 레코드는 처리되지 않습니다.
|
||||||
|
// dtlsReadBufferSize matches the pion/dtls internal buffer limit.
|
||||||
|
// pion/dtls's UnpackDatagram function uses an 8KB (8,192 bytes) receive buffer.
|
||||||
|
// Since DTLS is UDP-based, the entire datagram must be read in a single Read() call,
|
||||||
|
// and DTLS records exceeding this size cannot be processed.
|
||||||
|
const dtlsReadBufferSize = 8 * 1024 // 8KB
|
||||||
|
|
||||||
// maxProtoEnvelopeBytes 는 단일 Protobuf Envelope 의 최대 크기에 대한 보수적 상한입니다.
|
// maxProtoEnvelopeBytes 는 단일 Protobuf Envelope 의 최대 크기에 대한 보수적 상한입니다.
|
||||||
// 아직 하드 리미트로 사용하지는 않지만, 향후 방어적 체크에 사용할 수 있습니다.
|
// 아직 하드 리미트로 사용하지는 않지만, 향후 방어적 체크에 사용할 수 있습니다.
|
||||||
const maxProtoEnvelopeBytes = 512 * 1024 // 512KiB, 충분히 여유 있는 값
|
const maxProtoEnvelopeBytes = 512 * 1024 // 512KiB, 충분히 여유 있는 값
|
||||||
@@ -141,6 +151,14 @@ func (protobufCodec) Decode(r io.Reader, env *Envelope) error {
|
|||||||
// 서버와 클라이언트가 모두 이 버전을 사용해야 wire-format 이 일치합니다.
|
// 서버와 클라이언트가 모두 이 버전을 사용해야 wire-format 이 일치합니다.
|
||||||
var DefaultCodec WireCodec = protobufCodec{}
|
var DefaultCodec WireCodec = protobufCodec{}
|
||||||
|
|
||||||
|
// GetDTLSReadBufferSize 는 DTLS 세션 읽기에 사용할 버퍼 크기를 반환합니다.
|
||||||
|
// 이 값은 pion/dtls 내부 버퍼 한계(8KB)에 맞춰져 있습니다.
|
||||||
|
// GetDTLSReadBufferSize returns the buffer size to use for reading from DTLS sessions.
|
||||||
|
// This value is aligned with pion/dtls's internal buffer limit (8KB).
|
||||||
|
func GetDTLSReadBufferSize() int {
|
||||||
|
return dtlsReadBufferSize
|
||||||
|
}
|
||||||
|
|
||||||
// toProtoEnvelope 는 내부 Envelope 구조체를 Protobuf Envelope 로 변환합니다.
|
// toProtoEnvelope 는 내부 Envelope 구조체를 Protobuf Envelope 로 변환합니다.
|
||||||
// 현재 구현은 HTTP 요청/응답 및 스트림 관련 타입(StreamOpen/StreamData/StreamClose/StreamAck)을 지원합니다.
|
// 현재 구현은 HTTP 요청/응답 및 스트림 관련 타입(StreamOpen/StreamData/StreamClose/StreamAck)을 지원합니다.
|
||||||
func toProtoEnvelope(env *Envelope) (*protocolpb.Envelope, error) {
|
func toProtoEnvelope(env *Envelope) (*protocolpb.Envelope, error) {
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
package protocol
|
package protocol
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bufio"
|
||||||
"bytes"
|
"bytes"
|
||||||
"io"
|
"io"
|
||||||
"testing"
|
"testing"
|
||||||
@@ -80,9 +81,11 @@ func TestProtobufCodecDatagramBehavior(t *testing.T) {
|
|||||||
t.Fatalf("Message too short: %d bytes", len(msg))
|
t.Fatalf("Message too short: %d bytes", len(msg))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Decode the envelope
|
// Decode the envelope using a buffered reader (as we do in actual code)
|
||||||
|
// to handle datagram-based reading properly
|
||||||
|
reader := bufio.NewReaderSize(conn, GetDTLSReadBufferSize())
|
||||||
var decodedEnv Envelope
|
var decodedEnv Envelope
|
||||||
if err := codec.Decode(conn, &decodedEnv); err != nil {
|
if err := codec.Decode(reader, &decodedEnv); err != nil {
|
||||||
t.Fatalf("Failed to decode envelope: %v", err)
|
t.Fatalf("Failed to decode envelope: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -132,9 +135,10 @@ func TestProtobufCodecStreamData(t *testing.T) {
|
|||||||
t.Fatalf("Expected 1 message, got %d", len(conn.messages))
|
t.Fatalf("Expected 1 message, got %d", len(conn.messages))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Decode
|
// Decode using a buffered reader (as we do in actual code)
|
||||||
|
reader := bufio.NewReaderSize(conn, GetDTLSReadBufferSize())
|
||||||
var decodedEnv Envelope
|
var decodedEnv Envelope
|
||||||
if err := codec.Decode(conn, &decodedEnv); err != nil {
|
if err := codec.Decode(reader, &decodedEnv); err != nil {
|
||||||
t.Fatalf("Failed to decode StreamData: %v", err)
|
t.Fatalf("Failed to decode StreamData: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -208,10 +212,11 @@ func TestProtobufCodecMultipleMessages(t *testing.T) {
|
|||||||
t.Fatalf("Expected %d messages, got %d", len(envelopes), len(conn.messages))
|
t.Fatalf("Expected %d messages, got %d", len(envelopes), len(conn.messages))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Decode and verify all messages
|
// Decode and verify all messages using a buffered reader (as we do in actual code)
|
||||||
|
reader := bufio.NewReaderSize(conn, GetDTLSReadBufferSize())
|
||||||
for i := 0; i < len(envelopes); i++ {
|
for i := 0; i < len(envelopes); i++ {
|
||||||
var decoded Envelope
|
var decoded Envelope
|
||||||
if err := codec.Decode(conn, &decoded); err != nil {
|
if err := codec.Decode(reader, &decoded); err != nil {
|
||||||
t.Fatalf("Failed to decode message %d: %v", i, err)
|
t.Fatalf("Failed to decode message %d: %v", i, err)
|
||||||
}
|
}
|
||||||
if decoded.Type != envelopes[i].Type {
|
if decoded.Type != envelopes[i].Type {
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
package proxy
|
package proxy
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bufio"
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
@@ -151,14 +152,16 @@ func (p *ClientProxy) StartLoop(ctx context.Context, sess dtls.Session) error {
|
|||||||
log := p.Logger
|
log := p.Logger
|
||||||
|
|
||||||
// NOTE: pion/dtls 는 복호화된 애플리케이션 데이터를 호출자가 제공한 버퍼에 채워 넣습니다.
|
// NOTE: pion/dtls 는 복호화된 애플리케이션 데이터를 호출자가 제공한 버퍼에 채워 넣습니다.
|
||||||
// 기본 JSON 디코더 버퍼(수백 바이트 수준)만 사용하면 큰 HTTP 바디/Envelope 에서
|
// DTLS는 UDP 기반이므로 한 번의 Read()에서 전체 datagram을 읽어야 하며,
|
||||||
// "dtls: buffer too small" 오류가 날 수 있으므로, 여기서는 여유 있는 버퍼(64KiB)를 사용합니다. (ko)
|
// pion/dtls 내부 버퍼 한계(8KB)를 초과하는 메시지는 "dtls: buffer too small" 오류를 발생시킵니다.
|
||||||
|
// 이를 방지하기 위해 DTLS 세션을 bufio.Reader로 감싸서 datagram을 완전히 읽어들인 후 파싱합니다. (ko)
|
||||||
// NOTE: pion/dtls decrypts application data into the buffer provided by the caller.
|
// NOTE: pion/dtls decrypts application data into the buffer provided by the caller.
|
||||||
// Using only the default JSON decoder buffer (a few hundred bytes) can trigger
|
// Since DTLS is UDP-based, the entire datagram must be read in a single Read() call,
|
||||||
// "dtls: buffer too small" for large HTTP bodies/envelopes. The default
|
// and messages exceeding pion/dtls's internal buffer limit (8KB) will trigger
|
||||||
// JSON-based WireCodec internally wraps the DTLS session with a 64KiB
|
// "dtls: buffer too small" errors. To prevent this, we wrap the DTLS session with
|
||||||
// bufio.Reader, matching this requirement. (en)
|
// a bufio.Reader to fully read the datagram before parsing. (en)
|
||||||
codec := protocol.DefaultCodec
|
codec := protocol.DefaultCodec
|
||||||
|
bufferedReader := bufio.NewReaderSize(sess, protocol.GetDTLSReadBufferSize())
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
@@ -171,7 +174,7 @@ func (p *ClientProxy) StartLoop(ctx context.Context, sess dtls.Session) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
var env protocol.Envelope
|
var env protocol.Envelope
|
||||||
if err := codec.Decode(sess, &env); err != nil {
|
if err := codec.Decode(bufferedReader, &env); err != nil {
|
||||||
if err == io.EOF {
|
if err == io.EOF {
|
||||||
log.Info("dtls session closed by server", nil)
|
log.Info("dtls session closed by server", nil)
|
||||||
return nil
|
return nil
|
||||||
@@ -191,7 +194,7 @@ func (p *ClientProxy) StartLoop(ctx context.Context, sess dtls.Session) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
case protocol.MessageTypeStreamOpen:
|
case protocol.MessageTypeStreamOpen:
|
||||||
if err := p.handleStreamRequest(ctx, sess, &env); err != nil {
|
if err := p.handleStreamRequest(ctx, sess, bufferedReader, &env); err != nil {
|
||||||
log.Error("failed to handle stream http envelope", logging.Fields{
|
log.Error("failed to handle stream http envelope", logging.Fields{
|
||||||
"error": err.Error(),
|
"error": err.Error(),
|
||||||
})
|
})
|
||||||
@@ -235,6 +238,28 @@ func (p *ClientProxy) StartLoop(ctx context.Context, sess dtls.Session) error {
|
|||||||
"seq": seq,
|
"seq": seq,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
case protocol.MessageTypeStreamData:
|
||||||
|
// StreamData received at top level (not expected, should be consumed by handleStreamRequest)
|
||||||
|
// This can happen if frames arrive out of order or if there's a protocol mismatch
|
||||||
|
streamID := "unknown"
|
||||||
|
if env.StreamData != nil {
|
||||||
|
streamID = string(env.StreamData.ID)
|
||||||
|
}
|
||||||
|
log.Warn("received unexpected stream_data at top level, ignoring", logging.Fields{
|
||||||
|
"stream_id": streamID,
|
||||||
|
})
|
||||||
|
continue
|
||||||
|
case protocol.MessageTypeStreamClose:
|
||||||
|
// StreamClose received at top level (not expected, should be consumed by handleStreamRequest)
|
||||||
|
// This can happen if frames arrive out of order or if there's a protocol mismatch
|
||||||
|
streamID := "unknown"
|
||||||
|
if env.StreamClose != nil {
|
||||||
|
streamID = string(env.StreamClose.ID)
|
||||||
|
}
|
||||||
|
log.Warn("received unexpected stream_close at top level, ignoring", logging.Fields{
|
||||||
|
"stream_id": streamID,
|
||||||
|
})
|
||||||
|
continue
|
||||||
default:
|
default:
|
||||||
log.Error("received unsupported envelope type from server", logging.Fields{
|
log.Error("received unsupported envelope type from server", logging.Fields{
|
||||||
"type": env.Type,
|
"type": env.Type,
|
||||||
@@ -303,7 +328,7 @@ func (p *ClientProxy) handleHTTPEnvelope(ctx context.Context, sess dtls.Session,
|
|||||||
|
|
||||||
// handleStreamRequest 는 StreamOpen/StreamData/StreamClose 기반 HTTP 요청/응답 스트림을 처리합니다. (ko)
|
// handleStreamRequest 는 StreamOpen/StreamData/StreamClose 기반 HTTP 요청/응답 스트림을 처리합니다. (ko)
|
||||||
// handleStreamRequest handles an HTTP request/response exchange using StreamOpen/StreamData/StreamClose frames. (en)
|
// handleStreamRequest handles an HTTP request/response exchange using StreamOpen/StreamData/StreamClose frames. (en)
|
||||||
func (p *ClientProxy) handleStreamRequest(ctx context.Context, sess dtls.Session, openEnv *protocol.Envelope) error {
|
func (p *ClientProxy) handleStreamRequest(ctx context.Context, sess dtls.Session, reader io.Reader, openEnv *protocol.Envelope) error {
|
||||||
codec := protocol.DefaultCodec
|
codec := protocol.DefaultCodec
|
||||||
log := p.Logger
|
log := p.Logger
|
||||||
|
|
||||||
@@ -365,7 +390,7 @@ func (p *ClientProxy) handleStreamRequest(ctx context.Context, sess dtls.Session
|
|||||||
|
|
||||||
for {
|
for {
|
||||||
var env protocol.Envelope
|
var env protocol.Envelope
|
||||||
if err := codec.Decode(sess, &env); err != nil {
|
if err := codec.Decode(reader, &env); err != nil {
|
||||||
if err == io.EOF {
|
if err == io.EOF {
|
||||||
return fmt.Errorf("unexpected EOF while reading stream request body")
|
return fmt.Errorf("unexpected EOF while reading stream request body")
|
||||||
}
|
}
|
||||||
|
|||||||
104
progress.md
104
progress.md
@@ -403,6 +403,110 @@ The following tasks describe concrete work items to be implemented on the `featu
|
|||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
|
##### 3.3B DTLS Session Multiplexing / 세션 내 다중 HTTP 요청 처리
|
||||||
|
|
||||||
|
현재 구현은 클라이언트 측에서 단일 DTLS 세션 내에 **동시에 하나의 HTTP 요청 스트림만** 처리할 수 있습니다.
|
||||||
|
`ClientProxy.handleStreamRequest` 가 DTLS 세션의 reader 를 직접 소비하기 때문에, 동일 세션에서 두 번째 `StreamOpen` 이 섞여 들어오면 프로토콜 위반으로 간주되고 세션이 끊어집니다.
|
||||||
|
이 섹션은 **클라이언트 측 스트림 demux + per-stream goroutine 구조**를 도입해, 하나의 DTLS 세션 안에서 여러 HTTP 요청을 안전하게 병렬 처리하기 위한 단계입니다.
|
||||||
|
|
||||||
|
Currently, the client can effectively handle **only one HTTP request stream at a time per DTLS session**.
|
||||||
|
Because `ClientProxy.handleStreamRequest` directly consumes the DTLS session reader, an additional `StreamOpen` for a different stream interleaving on the same session is treated as a protocol error and tears down the session.
|
||||||
|
This section introduces a **client-side stream demultiplexer + per-stream goroutines** to safely support multiple concurrent HTTP requests within a single DTLS session.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
##### 3.3B.1 클라이언트 측 중앙 readLoop → 스트림 demux 설계
|
||||||
|
##### 3.3B.1 Design client-side central readLoop → per-stream demux
|
||||||
|
|
||||||
|
- [ ] `ClientProxy.StartLoop` 의 역할을 명확히 분리
|
||||||
|
- DTLS 세션에서 `Envelope` 를 연속해서 읽어들이는 **중앙 readLoop** 를 유지하되,
|
||||||
|
- 개별 스트림의 HTTP 처리 로직(현재 `handleStreamRequest` 내부 로직)을 분리해 별도 타입/구조체로 옮길 계획을 문서화합니다.
|
||||||
|
- [ ] 스트림 demux 위한 자료구조 설계
|
||||||
|
- `map[protocol.StreamID]*streamReceiver` 형태의 수신측 스트림 상태 테이블을 정의합니다.
|
||||||
|
- 각 `streamReceiver` 는 자신만의 입력 채널(예: `inCh chan *protocol.Envelope`)을 가져, 중앙 readLoop 로부터 `StreamOpen/StreamData/StreamClose` 를 전달받도록 합니다.
|
||||||
|
- [ ] 중앙 readLoop 에서 스트림별 라우팅 규칙 정의
|
||||||
|
- `Envelope.Type` 에 따라:
|
||||||
|
- `StreamOpen` / `StreamData` / `StreamClose`:
|
||||||
|
- `streamID` 를 추출하고, 해당 `streamReceiver` 의 `inCh` 로 전달.
|
||||||
|
- `StreamOpen` 수신 시에는 아직 없는 경우 `streamReceiver` 를 생성 후 등록.
|
||||||
|
- `StreamAck`:
|
||||||
|
- 송신 측 ARQ(`streamSender`) 용 테이블(이미 구현된 구조)을 찾아 재전송 로직으로 전달.
|
||||||
|
- 이 설계를 통해 중앙 readLoop 는 **DTLS 세션 → 스트림 단위 이벤트 분배**만 담당하도록 제한합니다.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
##### 3.3B.2 streamReceiver 타입 설계 및 HTTP 매핑 리팩터링
|
||||||
|
##### 3.3B.2 Design streamReceiver type and refactor HTTP mapping
|
||||||
|
|
||||||
|
- [ ] `streamReceiver` 타입 정의
|
||||||
|
- 필드 예시:
|
||||||
|
- `id protocol.StreamID`
|
||||||
|
- 수신 ARQ 상태: `expectedSeq`, `received map[uint64][]byte`, `lost map[uint64]struct{}`
|
||||||
|
- 입력 채널: `inCh chan *protocol.Envelope`
|
||||||
|
- DTLS 세션/codec/logging 핸들: `sess dtls.Session`, `codec protocol.WireCodec`, `logger logging.Logger`
|
||||||
|
- 로컬 HTTP 호출 관련: `HTTPClient *http.Client`, `LocalTarget string`
|
||||||
|
- 역할:
|
||||||
|
- 서버에서 온 `StreamOpen`/`StreamData`/`StreamClose` 를 순서대로 처리해 로컬 HTTP 요청을 구성하고,
|
||||||
|
- 로컬 HTTP 응답을 다시 `StreamOpen`/`StreamData`/`StreamClose` 로 역방향 전송합니다.
|
||||||
|
- [ ] 기존 `ClientProxy.handleStreamRequest` 의 로직을 `streamReceiver` 로 이전
|
||||||
|
- 현재 `handleStreamRequest` 안에서 수행하던 작업을 단계적으로 옮깁니다:
|
||||||
|
- `StreamOpen` 의 pseudo-header 에서 HTTP 메서드/URL/헤더를 복원.
|
||||||
|
- 요청 바디 수신용 수신 측 ARQ(`expectedSeq`, `received`, `lost`) 처리.
|
||||||
|
- 로컬 HTTP 요청 생성/실행 및 에러 처리.
|
||||||
|
- 응답을 4KiB `StreamData` chunk 로 전송 + 송신 측 ARQ(`streamSender.register`) 기록.
|
||||||
|
- 이때 **DTLS reader 를 직접 읽던 부분**은 제거하고, 대신 `inCh` 에서 전달된 `Envelope` 만 사용하도록 리팩터링합니다.
|
||||||
|
- [ ] streamReceiver 생명주기 관리
|
||||||
|
- `StreamClose` 수신 시:
|
||||||
|
- 로컬 HTTP 요청 바디 구성 종료.
|
||||||
|
- 로컬 HTTP 요청 실행 및 응답 스트림 전송 완료 후,
|
||||||
|
- `streamReceivers[streamID]` 에서 자신을 제거하고 goroutine 을 종료하는 정책을 명확히 정의합니다.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
##### 3.3B.3 StartLoop 와 streamReceiver 통합
|
||||||
|
##### 3.3B.3 Integrate StartLoop and streamReceiver
|
||||||
|
|
||||||
|
- [ ] `ClientProxy.StartLoop` 을 “중앙 readLoop + demux” 로 단순화
|
||||||
|
- `MessageTypeStreamOpen` 수신 시:
|
||||||
|
- `streamID := env.StreamOpen.ID` 를 기준으로 기존 `streamReceiver` 존재 여부를 검사.
|
||||||
|
- 없으면 새 `streamReceiver` 생성 후, goroutine 을 띄우고 `inCh <- env` 로 첫 메시지 전달.
|
||||||
|
- `MessageTypeStreamData` / `MessageTypeStreamClose` 수신 시:
|
||||||
|
- 해당 `streamReceiver` 의 `inCh` 로 그대로 전달.
|
||||||
|
- `MessageTypeStreamAck` 는 기존처럼 송신 측 `streamSender` 로 라우팅.
|
||||||
|
- [ ] 에러/종료 처리 전략 정리
|
||||||
|
- 개별 `streamReceiver` 에서 발생하는 에러는:
|
||||||
|
- 로컬 HTTP 에러 → 스트림 응답에 5xx/에러 바디로 반영.
|
||||||
|
- 프로토콜 위반(예: 잘못된 순서의 `StreamClose`) → 해당 스트림만 정리하고 세션은 유지하는지 여부를 정의.
|
||||||
|
- DTLS 세션 레벨 에러(EOF, decode 실패 등)는:
|
||||||
|
- 모든 `streamReceiver` 의 `inCh` 를 닫고,
|
||||||
|
- 이후 클라이언트 전체 루프를 종료하는 방향으로 합의합니다.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
##### 3.3B.4 세션 단위 직렬화 락 제거 및 멀티플렉싱 검증
|
||||||
|
##### 3.3B.4 Remove session-level serialization lock and validate multiplexing
|
||||||
|
|
||||||
|
- [ ] 서버 측 세션 직렬화 락 제거 계획 수립
|
||||||
|
- 현재 서버는 [`dtlsSessionWrapper`](cmd/server/main.go:111)에 `requestMu` 를 두어,
|
||||||
|
- 동일 DTLS 세션에서 동시에 하나의 `ForwardHTTP` 만 수행하도록 직렬화하고 있습니다.
|
||||||
|
- 클라이언트 측 멀티플렉싱이 안정화되면, `requestMu` 를 제거하고
|
||||||
|
- 하나의 세션 안에서 여러 HTTP 요청이 각기 다른 `StreamID` 로 병렬 진행되도록 허용합니다.
|
||||||
|
- [ ] E2E 멀티플렉싱 테스트 시나리오 정의
|
||||||
|
- 하나의 DTLS 세션 위에서:
|
||||||
|
- 동시에 여러 정적 리소스(`/css`, `/js`, `/img`) 요청.
|
||||||
|
- 큰 응답(수 MB 파일)과 작은 응답(API JSON)이 섞여 있는 시나리오.
|
||||||
|
- 기대 동작:
|
||||||
|
- 어떤 요청이 느리더라도, 다른 요청이 세션 내부 큐잉 때문에 과도하게 지연되지 않고 병렬로 완료되는지 확인.
|
||||||
|
- 클라이언트/서버 로그에 프로토콜 위반(`unexpected envelope type ...`) 이 더 이상 발생하지 않는지 확인.
|
||||||
|
- [ ] 관측성/메트릭에 멀티플렉싱 관련 라벨/필드 추가(선택)
|
||||||
|
- 필요 시:
|
||||||
|
- 세션당 동시 활성 스트림 수,
|
||||||
|
- 스트림 수명(요청-응답 왕복 시간),
|
||||||
|
- 세션 내 스트림 에러 수
|
||||||
|
를 관찰할 수 있는 메트릭/로그 필드를 설계합니다.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
### 3.4 ACME Integration / ACME 연동
|
### 3.4 ACME Integration / ACME 연동
|
||||||
|
|
||||||
- [x] [`internal/acme/acme.go`](internal/acme/acme.go) 실제 구현
|
- [x] [`internal/acme/acme.go`](internal/acme/acme.go) 실제 구현
|
||||||
|
|||||||
Reference in New Issue
Block a user