Files
hop-gate/internal/proxy/client.go
dalbodeule 5c3be0a3bb [feat](client): implement application-level ARQ with selective retransmission
- Added `StreamAck`-based selective retransmission logic for reliable stream delivery.
- Introduced per-stream ARQ states (`expectedSeq`, `lost`, `received`) for out-of-order handling and lost frame tracking.
- Implemented mechanisms to send `StreamAck` with `AckSeq` and `LostSeqs` attributes in response to `StreamData`.
- Enhanced retransmission logic for unacknowledged frames in `streamSender`, ensuring robust recovery for lost data.
- Updated progress notes in `progress.md` to reflect ARQ implementation.
2025-12-09 00:15:03 +09:00

770 lines
24 KiB
Go

package proxy
import (
"bytes"
"context"
"fmt"
"io"
"net"
"net/http"
"net/url"
"sort"
"strconv"
"sync"
"time"
"github.com/dalbodeule/hop-gate/internal/dtls"
"github.com/dalbodeule/hop-gate/internal/logging"
"github.com/dalbodeule/hop-gate/internal/protocol"
)
// ClientProxy 는 서버로부터 받은 요청을 로컬 HTTP 서비스로 전달하는 클라이언트 측 프록시입니다. (ko)
// ClientProxy forwards requests from the server to local HTTP services. (en)
type ClientProxy struct {
HTTPClient *http.Client
Logger logging.Logger
LocalTarget string // e.g. "127.0.0.1:8080"
sendersMu sync.Mutex
streamSenders map[protocol.StreamID]*streamSender
}
// NewClientProxy 는 기본 HTTP 클라이언트 및 로거를 사용해 ClientProxy 를 생성합니다. (ko)
// NewClientProxy creates a ClientProxy with a default HTTP client and logger. (en)
func NewClientProxy(logger logging.Logger, localTarget string) *ClientProxy {
if logger == nil {
logger = logging.NewStdJSONLogger("client_proxy")
}
return &ClientProxy{
HTTPClient: &http.Client{
Timeout: 30 * time.Second,
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 10 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext,
ForceAttemptHTTP2: true,
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
},
},
Logger: logger.With(logging.Fields{"component": "client_proxy"}),
LocalTarget: localTarget,
streamSenders: make(map[protocol.StreamID]*streamSender),
}
}
// StartLoop 는 DTLS 세션에서 protocol.Envelope 를 읽고, HTTP/스트림 요청의 경우 로컬 HTTP 요청을 수행한 뒤
// protocol.Envelope(HTTP/스트림 응답 포함)을 다시 세션으로 쓰는 루프를 실행합니다. (ko)
// StartLoop reads protocol.Envelope messages from the DTLS session; for HTTP/stream
// messages it performs local HTTP requests and writes back responses over the DTLS
// tunnel. (en)
type streamSender struct {
mu sync.Mutex
outstanding map[uint64][]byte
}
func newStreamSender() *streamSender {
return &streamSender{
outstanding: make(map[uint64][]byte),
}
}
func (s *streamSender) register(seq uint64, data []byte) {
s.mu.Lock()
defer s.mu.Unlock()
if s.outstanding == nil {
s.outstanding = make(map[uint64][]byte)
}
buf := make([]byte, len(data))
copy(buf, data)
s.outstanding[seq] = buf
}
func (s *streamSender) handleAck(ack *protocol.StreamAck) map[uint64][]byte {
s.mu.Lock()
defer s.mu.Unlock()
if s.outstanding == nil {
return nil
}
// 연속 수신 완료 구간(seq <= AckSeq)은 outstanding 에서 제거합니다.
for seq := range s.outstanding {
if seq <= ack.AckSeq {
delete(s.outstanding, seq)
}
}
// LostSeqs 가 비어 있으면 재전송할 것이 없습니다.
if len(ack.LostSeqs) == 0 {
return nil
}
// LostSeqs 에 포함된 시퀀스 중, 아직 outstanding 에 남아 있는 것들만 재전송 대상으로 선택합니다.
lost := make(map[uint64][]byte, len(ack.LostSeqs))
for _, seq := range ack.LostSeqs {
if data, ok := s.outstanding[seq]; ok {
buf := make([]byte, len(data))
copy(buf, data)
lost[seq] = buf
}
}
return lost
}
func (p *ClientProxy) registerStreamSender(id protocol.StreamID, sender *streamSender) {
p.sendersMu.Lock()
defer p.sendersMu.Unlock()
if p.streamSenders == nil {
p.streamSenders = make(map[protocol.StreamID]*streamSender)
}
p.streamSenders[id] = sender
}
func (p *ClientProxy) unregisterStreamSender(id protocol.StreamID) {
p.sendersMu.Lock()
defer p.sendersMu.Unlock()
if p.streamSenders == nil {
return
}
delete(p.streamSenders, id)
}
func (p *ClientProxy) getStreamSender(id protocol.StreamID) *streamSender {
p.sendersMu.Lock()
defer p.sendersMu.Unlock()
if p.streamSenders == nil {
return nil
}
return p.streamSenders[id]
}
func (p *ClientProxy) StartLoop(ctx context.Context, sess dtls.Session) error {
if ctx == nil {
ctx = context.Background()
}
log := p.Logger
// NOTE: pion/dtls 는 복호화된 애플리케이션 데이터를 호출자가 제공한 버퍼에 채워 넣습니다.
// 기본 JSON 디코더 버퍼(수백 바이트 수준)만 사용하면 큰 HTTP 바디/Envelope 에서
// "dtls: buffer too small" 오류가 날 수 있으므로, 여기서는 여유 있는 버퍼(64KiB)를 사용합니다. (ko)
// NOTE: pion/dtls decrypts application data into the buffer provided by the caller.
// Using only the default JSON decoder buffer (a few hundred bytes) can trigger
// "dtls: buffer too small" for large HTTP bodies/envelopes. The default
// JSON-based WireCodec internally wraps the DTLS session with a 64KiB
// bufio.Reader, matching this requirement. (en)
codec := protocol.DefaultCodec
for {
select {
case <-ctx.Done():
log.Info("client proxy loop stopping due to context cancellation", logging.Fields{
"reason": ctx.Err().Error(),
})
return nil
default:
}
var env protocol.Envelope
if err := codec.Decode(sess, &env); err != nil {
if err == io.EOF {
log.Info("dtls session closed by server", nil)
return nil
}
log.Error("failed to decode protocol envelope", logging.Fields{
"error": err.Error(),
})
return err
}
switch env.Type {
case protocol.MessageTypeHTTP:
if err := p.handleHTTPEnvelope(ctx, sess, &env); err != nil {
log.Error("failed to handle http envelope", logging.Fields{
"error": err.Error(),
})
return err
}
case protocol.MessageTypeStreamOpen:
if err := p.handleStreamRequest(ctx, sess, &env); err != nil {
log.Error("failed to handle stream http envelope", logging.Fields{
"error": err.Error(),
})
return err
}
case protocol.MessageTypeStreamAck:
sa := env.StreamAck
if sa == nil {
log.Error("received stream_ack envelope with nil payload", nil)
return fmt.Errorf("stream_ack payload is nil")
}
streamID := protocol.StreamID(sa.ID)
sender := p.getStreamSender(streamID)
if sender == nil {
log.Warn("received stream_ack for unknown stream", logging.Fields{
"stream_id": sa.ID,
})
continue
}
lost := sender.handleAck(sa)
// LostSeqs 를 기반으로 선택적 재전송 수행
for seq, data := range lost {
retryEnv := protocol.Envelope{
Type: protocol.MessageTypeStreamData,
StreamData: &protocol.StreamData{
ID: streamID,
Seq: seq,
Data: data,
},
}
if err := codec.Encode(sess, &retryEnv); err != nil {
log.Error("failed to retransmit stream_data after stream_ack", logging.Fields{
"stream_id": streamID,
"seq": seq,
"error": err.Error(),
})
return err
}
log.Info("retransmitted stream_data after stream_ack", logging.Fields{
"stream_id": streamID,
"seq": seq,
})
}
default:
log.Error("received unsupported envelope type from server", logging.Fields{
"type": env.Type,
})
return fmt.Errorf("unsupported envelope type %q", env.Type)
}
}
}
// handleHTTPEnvelope 는 기존 단일 HTTP 요청/응답 Envelope 경로를 처리합니다. (ko)
// handleHTTPEnvelope handles the legacy single HTTP request/response envelope path. (en)
func (p *ClientProxy) handleHTTPEnvelope(ctx context.Context, sess dtls.Session, env *protocol.Envelope) error {
if env.HTTPRequest == nil {
return fmt.Errorf("http envelope missing http_request payload")
}
req := env.HTTPRequest
log := p.Logger
start := time.Now()
logReq := log.With(logging.Fields{
"request_id": req.RequestID,
"service": req.ServiceName,
"method": req.Method,
"url": req.URL,
"client_id": req.ClientID,
"local_target": p.LocalTarget,
})
logReq.Info("received http envelope from server", nil)
resp := protocol.Response{
RequestID: req.RequestID,
Header: make(map[string][]string),
}
// 로컬 HTTP 요청 수행
if err := p.forwardToLocal(ctx, req, &resp); err != nil {
resp.Status = http.StatusBadGateway
resp.Error = err.Error()
logReq.Error("local http request failed", logging.Fields{
"error": err.Error(),
})
}
// HTTP 응답을 Envelope 로 감싸서 서버로 전송합니다.
respEnv := protocol.Envelope{
Type: protocol.MessageTypeHTTP,
HTTPResponse: &resp,
}
if err := protocol.DefaultCodec.Encode(sess, &respEnv); err != nil {
logReq.Error("failed to encode http response envelope", logging.Fields{
"error": err.Error(),
})
return err
}
logReq.Info("http response envelope sent to server", logging.Fields{
"status": resp.Status,
"elapsed_ms": time.Since(start).Milliseconds(),
"error": resp.Error,
})
return nil
}
// handleStreamRequest 는 StreamOpen/StreamData/StreamClose 기반 HTTP 요청/응답 스트림을 처리합니다. (ko)
// handleStreamRequest handles an HTTP request/response exchange using StreamOpen/StreamData/StreamClose frames. (en)
func (p *ClientProxy) handleStreamRequest(ctx context.Context, sess dtls.Session, openEnv *protocol.Envelope) error {
codec := protocol.DefaultCodec
log := p.Logger
so := openEnv.StreamOpen
if so == nil {
return fmt.Errorf("stream_open envelope missing payload")
}
streamID := so.ID
// 이 스트림에 대한 송신 측 ARQ 상태를 준비하고, StartLoop 에서 들어오는 StreamAck 와 연동합니다.
sender := newStreamSender()
p.registerStreamSender(streamID, sender)
defer p.unregisterStreamSender(streamID)
// Pseudo-header 에서 HTTP 메타데이터를 추출합니다. (ko)
// Extract HTTP metadata from pseudo-headers. (en)
method := firstHeaderValue(so.Header, protocol.HeaderKeyMethod, http.MethodGet)
urlStr := firstHeaderValue(so.Header, protocol.HeaderKeyURL, "/")
_ = firstHeaderValue(so.Header, protocol.HeaderKeyHost, "")
if p.LocalTarget == "" {
return fmt.Errorf("local target is empty")
}
u, err := url.Parse(urlStr)
if err != nil {
return fmt.Errorf("parse url from stream_open: %w", err)
}
u.Scheme = "http"
u.Host = p.LocalTarget
// 로컬 HTTP 요청용 헤더 맵을 생성하면서 pseudo-header 는 제거합니다. (ko)
// Build local HTTP header map while stripping pseudo-headers. (en)
httpHeader := make(http.Header, len(so.Header))
for k, vs := range so.Header {
if k == protocol.HeaderKeyMethod ||
k == protocol.HeaderKeyURL ||
k == protocol.HeaderKeyHost ||
k == protocol.HeaderKeyStatus {
continue
}
for _, v := range vs {
httpHeader.Add(k, v)
}
}
// 요청 바디를 StreamData/StreamClose 프레임에서 모두 읽어 메모리에 적재합니다. (ko)
// Read the entire request body from StreamData/StreamClose frames into memory. (en)
//
// 동시에 수신 측 ARQ 상태( expectedSeq / out-of-order 버퍼 / LostSeqs )를 관리하고
// StreamAck 를 전송해 선택적 재전송(Selective Retransmission)을 유도합니다.
var (
bodyBuf bytes.Buffer
expectedSeq uint64
received = make(map[uint64][]byte)
lost = make(map[uint64]struct{})
)
const maxLostReport = 32
for {
var env protocol.Envelope
if err := codec.Decode(sess, &env); err != nil {
if err == io.EOF {
return fmt.Errorf("unexpected EOF while reading stream request body")
}
return fmt.Errorf("decode stream request frame: %w", err)
}
switch env.Type {
case protocol.MessageTypeStreamData:
sd := env.StreamData
if sd == nil {
return fmt.Errorf("stream_data payload is nil")
}
if sd.ID != streamID {
return fmt.Errorf("stream_data for unexpected stream id %q (expected %q)", sd.ID, streamID)
}
// 수신 측 ARQ: Seq 에 따라 분기
switch {
case sd.Seq == expectedSeq:
// 기대하던 순서의 프레임: 바로 bodyBuf 에 기록하고, 이후 버퍼된 연속 프레임도 flush.
if len(sd.Data) > 0 {
if _, err := bodyBuf.Write(sd.Data); err != nil {
return fmt.Errorf("buffer stream_data: %w", err)
}
}
expectedSeq++
for {
data, ok := received[expectedSeq]
if !ok {
break
}
if len(data) > 0 {
if _, err := bodyBuf.Write(data); err != nil {
return fmt.Errorf("buffer reordered stream_data: %w", err)
}
}
delete(received, expectedSeq)
delete(lost, expectedSeq)
expectedSeq++
}
// AckSeq 이전 구간의 lost 항목 정리
for seq := range lost {
if seq < expectedSeq {
delete(lost, seq)
}
}
case sd.Seq > expectedSeq:
// 앞선 일부 Seq 들이 누락된 상태: 현재 프레임을 버퍼링하고 missing seq 들을 lost 에 추가.
if len(sd.Data) > 0 {
buf := make([]byte, len(sd.Data))
copy(buf, sd.Data)
received[sd.Seq] = buf
}
for seq := expectedSeq; seq < sd.Seq && len(lost) < maxLostReport; seq++ {
if _, ok := lost[seq]; !ok {
lost[seq] = struct{}{}
}
}
default:
// sd.Seq < expectedSeq 인 경우: 이미 처리했거나 Ack 로 커버된 프레임 → 무시.
}
// 수신 측 StreamAck 전송:
// - AckSeq: 0부터 시작해 연속으로 수신 완료한 마지막 시퀀스 (expectedSeq-1)
// - LostSeqs: 현재 윈도우 내에서 누락된 시퀀스 중 상한 개수(maxLostReport)까지만 포함
var ackSeq uint64
if expectedSeq == 0 {
ackSeq = 0
} else {
ackSeq = expectedSeq - 1
}
lostSeqs := make([]uint64, 0, len(lost))
for seq := range lost {
if seq >= expectedSeq {
lostSeqs = append(lostSeqs, seq)
}
}
if len(lostSeqs) > 0 {
sort.Slice(lostSeqs, func(i, j int) bool { return lostSeqs[i] < lostSeqs[j] })
if len(lostSeqs) > maxLostReport {
lostSeqs = lostSeqs[:maxLostReport]
}
}
ackEnv := protocol.Envelope{
Type: protocol.MessageTypeStreamAck,
StreamAck: &protocol.StreamAck{
ID: streamID,
AckSeq: ackSeq,
LostSeqs: lostSeqs,
},
}
if err := codec.Encode(sess, &ackEnv); err != nil {
return fmt.Errorf("send stream ack: %w", err)
}
case protocol.MessageTypeStreamClose:
sc := env.StreamClose
if sc == nil {
return fmt.Errorf("stream_close payload is nil")
}
if sc.ID != streamID {
return fmt.Errorf("stream_close for unexpected stream id %q (expected %q)", sc.ID, streamID)
}
// sc.Error 는 최소 구현에서는 로컬 요청 에러와 별도로 취급하지 않습니다. (ko)
// For the minimal implementation we do not surface sc.Error here. (en)
goto haveBody
default:
return fmt.Errorf("unexpected envelope type %q while reading stream request body", env.Type)
}
}
haveBody:
bodyBytes := bodyBuf.Bytes()
// 로컬 HTTP 요청 생성 (stream 기반 요청을 실제 HTTP 요청으로 변환). (ko)
// Build the local HTTP request from the stream-based metadata and body. (en)
req, err := http.NewRequestWithContext(ctx, method, u.String(), nil)
if err != nil {
return fmt.Errorf("create http request from stream: %w", err)
}
if len(bodyBytes) > 0 {
buf := bytes.NewReader(bodyBytes)
req.Body = io.NopCloser(buf)
req.ContentLength = int64(len(bodyBytes))
}
req.Header = httpHeader
start := time.Now()
logReq := log.With(logging.Fields{
"request_id": string(streamID),
"service": so.Service,
"method": method,
"url": urlStr,
"stream_id": string(streamID),
"local_target": p.LocalTarget,
})
logReq.Info("received stream_open envelope from server", nil)
res, err := p.HTTPClient.Do(req)
if err != nil {
// 로컬 요청 실패 시, 502 + 에러 메시지를 스트림 응답으로 전송합니다. (ko)
// On local request failure, send a 502 response over the stream. (en)
errMsg := fmt.Sprintf("perform http request: %v", err)
streamRespHeader := map[string][]string{
"Content-Type": {"text/plain; charset=utf-8"},
protocol.HeaderKeyStatus: {strconv.Itoa(http.StatusBadGateway)},
}
respOpen := protocol.Envelope{
Type: protocol.MessageTypeStreamOpen,
StreamOpen: &protocol.StreamOpen{
ID: streamID,
Service: so.Service,
TargetAddr: so.TargetAddr,
Header: streamRespHeader,
},
}
if err2 := codec.Encode(sess, &respOpen); err2 != nil {
logReq.Error("failed to encode stream response open envelope (error path)", logging.Fields{
"error": err2.Error(),
})
return err2
}
dataEnv := protocol.Envelope{
Type: protocol.MessageTypeStreamData,
StreamData: &protocol.StreamData{
ID: streamID,
Seq: 0,
Data: []byte("HopGate: " + errMsg),
},
}
// 에러 응답 프레임도 ARQ 대상에 등록합니다.
sender.register(0, dataEnv.StreamData.Data)
if err2 := codec.Encode(sess, &dataEnv); err2 != nil {
logReq.Error("failed to encode stream response data envelope (error path)", logging.Fields{
"error": err2.Error(),
})
return err2
}
closeEnv := protocol.Envelope{
Type: protocol.MessageTypeStreamClose,
StreamClose: &protocol.StreamClose{
ID: streamID,
Error: errMsg,
},
}
if err2 := codec.Encode(sess, &closeEnv); err2 != nil {
logReq.Error("failed to encode stream response close envelope (error path)", logging.Fields{
"error": err2.Error(),
})
return err2
}
logReq.Error("local http request failed (stream)", logging.Fields{
"error": err.Error(),
})
return nil
}
defer res.Body.Close()
// 응답을 StreamOpen + StreamData(4KiB chunk) + StreamClose 프레임으로 전송합니다. (ko)
// Send the response as StreamOpen + StreamData (4KiB chunks) + StreamClose frames. (en)
// 응답 헤더 맵을 복사하고 상태 코드를 pseudo-header 로 추가합니다. (ko)
// Copy response headers and attach status code as a pseudo-header. (en)
streamRespHeader := make(map[string][]string, len(res.Header)+1)
for k, vs := range res.Header {
streamRespHeader[k] = append([]string(nil), vs...)
}
statusCode := res.StatusCode
if statusCode == 0 {
statusCode = http.StatusOK
}
streamRespHeader[protocol.HeaderKeyStatus] = []string{strconv.Itoa(statusCode)}
respOpen := protocol.Envelope{
Type: protocol.MessageTypeStreamOpen,
StreamOpen: &protocol.StreamOpen{
ID: streamID,
Service: so.Service,
TargetAddr: so.TargetAddr,
Header: streamRespHeader,
},
}
if err := codec.Encode(sess, &respOpen); err != nil {
logReq.Error("failed to encode stream response open envelope", logging.Fields{
"error": err.Error(),
})
return err
}
// 응답 바디를 4KiB(StreamChunkSize) 단위로 잘라 StreamData 프레임으로 전송합니다. (ko)
// Chunk the response body into 4KiB (StreamChunkSize) StreamData frames. (en)
var seq uint64
chunk := make([]byte, protocol.StreamChunkSize)
for {
n, err := res.Body.Read(chunk)
if n > 0 {
dataCopy := append([]byte(nil), chunk[:n]...)
// 송신 측 ARQ: Seq 별 payload 를 기록해 두었다가, StreamAck 의 LostSeqs 를 기반으로 재전송할 수 있습니다.
sender.register(seq, dataCopy)
dataEnv := protocol.Envelope{
Type: protocol.MessageTypeStreamData,
StreamData: &protocol.StreamData{
ID: streamID,
Seq: seq,
Data: dataCopy,
},
}
if err2 := codec.Encode(sess, &dataEnv); err2 != nil {
logReq.Error("failed to encode stream response data envelope", logging.Fields{
"error": err2.Error(),
})
return err2
}
seq++
}
if err == io.EOF {
break
}
if err != nil {
return fmt.Errorf("read http response body for streaming: %w", err)
}
}
closeEnv := protocol.Envelope{
Type: protocol.MessageTypeStreamClose,
StreamClose: &protocol.StreamClose{
ID: streamID,
Error: "",
},
}
if err := codec.Encode(sess, &closeEnv); err != nil {
logReq.Error("failed to encode stream response close envelope", logging.Fields{
"error": err.Error(),
})
return err
}
logReq.Info("stream http response sent to server", logging.Fields{
"status": statusCode,
"elapsed_ms": time.Since(start).Milliseconds(),
"error": "",
})
return nil
}
// forwardToLocal 는 protocol.Request 를 로컬 HTTP 요청으로 변환하고 protocol.Response 를 채웁니다. (ko)
// forwardToLocal converts a protocol.Request into a local HTTP request and fills protocol.Response. (en)
func (p *ClientProxy) forwardToLocal(ctx context.Context, preq *protocol.Request, presp *protocol.Response) error {
if p.LocalTarget == "" {
return fmt.Errorf("local target is empty")
}
// 요청 URL을 local target 기준으로 재구성
u, err := url.Parse(preq.URL)
if err != nil {
return fmt.Errorf("parse url: %w", err)
}
u.Scheme = "http"
u.Host = p.LocalTarget
req, err := http.NewRequestWithContext(ctx, preq.Method, u.String(), nil)
if err != nil {
return fmt.Errorf("create http request: %w", err)
}
// Body 설정 (원본 바이트를 그대로 사용)
if len(preq.Body) > 0 {
buf := bytes.NewReader(preq.Body)
req.Body = io.NopCloser(buf)
req.ContentLength = int64(len(preq.Body))
}
// 헤더 복사
for k, vs := range preq.Header {
for _, v := range vs {
req.Header.Add(k, v)
}
}
res, err := p.HTTPClient.Do(req)
if err != nil {
return fmt.Errorf("perform http request: %w", err)
}
defer res.Body.Close()
presp.Status = res.StatusCode
for k, vs := range res.Header {
presp.Header[k] = append([]string(nil), vs...)
}
// DTLS over UDP has an upper bound on packet size (~64KiB). 전체 HTTP 바디를
// 하나의 Envelope 로 감싸 전송하는 현재 설계에서는, 바디가 너무 크면
// OS 레벨에서 "message too long" (EMSGSIZE) 가 발생할 수 있습니다. (ko)
//
// 이를 피하기 위해, 터널링 가능한 **단일 HTTP 바디** 크기에 상한을 두고,
// 이를 초과하는 응답은 502 Bad Gateway + HopGate 전용 에러 메시지로 대체합니다. (ko)
//
// DTLS over UDP has an upper bound on datagram size (~64KiB). With the current
// single-envelope design, very large bodies can still trigger "message too long"
// (EMSGSIZE) at the OS level. To avoid this, we cap the tunneled HTTP body size
// and replace oversized responses with a 502 Bad Gateway + HopGate-specific
// error body. (en)
//
// Protobuf 기반 터널링에서는 향후 StreamData(4KiB) 단위로 나누어 전송할 예정이지만,
// 그 전 단계에서도 body 자체를 4KiB( StreamChunkSize )로 하드 리밋하여
// Proto message body 필드가 지나치게 커지지 않도록 합니다. (ko)
//
// Even before full stream tunneling is implemented, we hard-limit the protobuf
// body field to 4KiB (StreamChunkSize) so that individual messages remain small. (en)
const maxTunnelBodyBytes = protocol.StreamChunkSize
limited := &io.LimitedReader{
R: res.Body,
N: maxTunnelBodyBytes + 1, // read up to limit+1 to detect overflow
}
body, err := io.ReadAll(limited)
if err != nil {
return fmt.Errorf("read http response body: %w", err)
}
if len(body) > maxTunnelBodyBytes {
// 응답 바디가 너무 커서 DTLS/UDP 로 안전하게 전송하기 어렵기 때문에,
// 원본 바디 대신 HopGate 에러 응답으로 대체합니다. (ko)
//
// The response body is too large to be safely tunneled over DTLS/UDP.
// Replace it with a HopGate error response instead of attempting to
// send an oversized datagram. (en)
presp.Status = http.StatusBadGateway
presp.Header = map[string][]string{
"Content-Type": {"text/plain; charset=utf-8"},
}
presp.Body = []byte("HopGate: response body too large for DTLS tunnel (over max_tunnel_body_bytes)")
presp.Error = "response body too large for DTLS tunnel"
return nil
}
presp.Body = body
return nil
}
// firstHeaderValue 는 주어진 키의 첫 번째 헤더 값을 반환하고, 없으면 기본값을 반환합니다. (ko)
// firstHeaderValue returns the first header value for a key, or a default if absent. (en)
func firstHeaderValue(hdr map[string][]string, key, def string) string {
if hdr == nil {
return def
}
if vs, ok := hdr[key]; ok && len(vs) > 0 {
return vs[0]
}
return def
}