[feat](client): implement application-level ARQ with selective retransmission

- Added `StreamAck`-based selective retransmission logic for reliable stream delivery.
- Introduced per-stream ARQ states (`expectedSeq`, `lost`, `received`) for out-of-order handling and lost frame tracking.
- Implemented mechanisms to send `StreamAck` with `AckSeq` and `LostSeqs` attributes in response to `StreamData`.
- Enhanced retransmission logic for unacknowledged frames in `streamSender`, ensuring robust recovery for lost data.
- Updated progress notes in `progress.md` to reflect ARQ implementation.
This commit is contained in:
dalbodeule
2025-12-09 00:15:03 +09:00
parent 5e94dd7aa9
commit 5c3be0a3bb
2 changed files with 309 additions and 99 deletions

View File

@@ -8,7 +8,9 @@ import (
"net"
"net/http"
"net/url"
"sort"
"strconv"
"sync"
"time"
"github.com/dalbodeule/hop-gate/internal/dtls"
@@ -22,6 +24,9 @@ type ClientProxy struct {
HTTPClient *http.Client
Logger logging.Logger
LocalTarget string // e.g. "127.0.0.1:8080"
sendersMu sync.Mutex
streamSenders map[protocol.StreamID]*streamSender
}
// NewClientProxy 는 기본 HTTP 클라이언트 및 로거를 사용해 ClientProxy 를 생성합니다. (ko)
@@ -46,8 +51,9 @@ func NewClientProxy(logger logging.Logger, localTarget string) *ClientProxy {
ExpectContinueTimeout: 1 * time.Second,
},
},
Logger: logger.With(logging.Fields{"component": "client_proxy"}),
LocalTarget: localTarget,
Logger: logger.With(logging.Fields{"component": "client_proxy"}),
LocalTarget: localTarget,
streamSenders: make(map[protocol.StreamID]*streamSender),
}
}
@@ -56,6 +62,88 @@ func NewClientProxy(logger logging.Logger, localTarget string) *ClientProxy {
// StartLoop reads protocol.Envelope messages from the DTLS session; for HTTP/stream
// messages it performs local HTTP requests and writes back responses over the DTLS
// tunnel. (en)
type streamSender struct {
mu sync.Mutex
outstanding map[uint64][]byte
}
func newStreamSender() *streamSender {
return &streamSender{
outstanding: make(map[uint64][]byte),
}
}
func (s *streamSender) register(seq uint64, data []byte) {
s.mu.Lock()
defer s.mu.Unlock()
if s.outstanding == nil {
s.outstanding = make(map[uint64][]byte)
}
buf := make([]byte, len(data))
copy(buf, data)
s.outstanding[seq] = buf
}
func (s *streamSender) handleAck(ack *protocol.StreamAck) map[uint64][]byte {
s.mu.Lock()
defer s.mu.Unlock()
if s.outstanding == nil {
return nil
}
// 연속 수신 완료 구간(seq <= AckSeq)은 outstanding 에서 제거합니다.
for seq := range s.outstanding {
if seq <= ack.AckSeq {
delete(s.outstanding, seq)
}
}
// LostSeqs 가 비어 있으면 재전송할 것이 없습니다.
if len(ack.LostSeqs) == 0 {
return nil
}
// LostSeqs 에 포함된 시퀀스 중, 아직 outstanding 에 남아 있는 것들만 재전송 대상으로 선택합니다.
lost := make(map[uint64][]byte, len(ack.LostSeqs))
for _, seq := range ack.LostSeqs {
if data, ok := s.outstanding[seq]; ok {
buf := make([]byte, len(data))
copy(buf, data)
lost[seq] = buf
}
}
return lost
}
func (p *ClientProxy) registerStreamSender(id protocol.StreamID, sender *streamSender) {
p.sendersMu.Lock()
defer p.sendersMu.Unlock()
if p.streamSenders == nil {
p.streamSenders = make(map[protocol.StreamID]*streamSender)
}
p.streamSenders[id] = sender
}
func (p *ClientProxy) unregisterStreamSender(id protocol.StreamID) {
p.sendersMu.Lock()
defer p.sendersMu.Unlock()
if p.streamSenders == nil {
return
}
delete(p.streamSenders, id)
}
func (p *ClientProxy) getStreamSender(id protocol.StreamID) *streamSender {
p.sendersMu.Lock()
defer p.sendersMu.Unlock()
if p.streamSenders == nil {
return nil
}
return p.streamSenders[id]
}
func (p *ClientProxy) StartLoop(ctx context.Context, sess dtls.Session) error {
if ctx == nil {
ctx = context.Background()
@@ -109,6 +197,44 @@ func (p *ClientProxy) StartLoop(ctx context.Context, sess dtls.Session) error {
})
return err
}
case protocol.MessageTypeStreamAck:
sa := env.StreamAck
if sa == nil {
log.Error("received stream_ack envelope with nil payload", nil)
return fmt.Errorf("stream_ack payload is nil")
}
streamID := protocol.StreamID(sa.ID)
sender := p.getStreamSender(streamID)
if sender == nil {
log.Warn("received stream_ack for unknown stream", logging.Fields{
"stream_id": sa.ID,
})
continue
}
lost := sender.handleAck(sa)
// LostSeqs 를 기반으로 선택적 재전송 수행
for seq, data := range lost {
retryEnv := protocol.Envelope{
Type: protocol.MessageTypeStreamData,
StreamData: &protocol.StreamData{
ID: streamID,
Seq: seq,
Data: data,
},
}
if err := codec.Encode(sess, &retryEnv); err != nil {
log.Error("failed to retransmit stream_data after stream_ack", logging.Fields{
"stream_id": streamID,
"seq": seq,
"error": err.Error(),
})
return err
}
log.Info("retransmitted stream_data after stream_ack", logging.Fields{
"stream_id": streamID,
"seq": seq,
})
}
default:
log.Error("received unsupported envelope type from server", logging.Fields{
"type": env.Type,
@@ -187,6 +313,10 @@ func (p *ClientProxy) handleStreamRequest(ctx context.Context, sess dtls.Session
}
streamID := so.ID
// 이 스트림에 대한 송신 측 ARQ 상태를 준비하고, StartLoop 에서 들어오는 StreamAck 와 연동합니다.
sender := newStreamSender()
p.registerStreamSender(streamID, sender)
defer p.unregisterStreamSender(streamID)
// Pseudo-header 에서 HTTP 메타데이터를 추출합니다. (ko)
// Extract HTTP metadata from pseudo-headers. (en)
@@ -222,7 +352,17 @@ func (p *ClientProxy) handleStreamRequest(ctx context.Context, sess dtls.Session
// 요청 바디를 StreamData/StreamClose 프레임에서 모두 읽어 메모리에 적재합니다. (ko)
// Read the entire request body from StreamData/StreamClose frames into memory. (en)
var bodyBuf bytes.Buffer
//
// 동시에 수신 측 ARQ 상태( expectedSeq / out-of-order 버퍼 / LostSeqs )를 관리하고
// StreamAck 를 전송해 선택적 재전송(Selective Retransmission)을 유도합니다.
var (
bodyBuf bytes.Buffer
expectedSeq uint64
received = make(map[uint64][]byte)
lost = make(map[uint64]struct{})
)
const maxLostReport = 32
for {
var env protocol.Envelope
if err := codec.Decode(sess, &env); err != nil {
@@ -241,11 +381,91 @@ func (p *ClientProxy) handleStreamRequest(ctx context.Context, sess dtls.Session
if sd.ID != streamID {
return fmt.Errorf("stream_data for unexpected stream id %q (expected %q)", sd.ID, streamID)
}
if len(sd.Data) > 0 {
if _, err := bodyBuf.Write(sd.Data); err != nil {
return fmt.Errorf("buffer stream_data: %w", err)
// 수신 측 ARQ: Seq 에 따라 분기
switch {
case sd.Seq == expectedSeq:
// 기대하던 순서의 프레임: 바로 bodyBuf 에 기록하고, 이후 버퍼된 연속 프레임도 flush.
if len(sd.Data) > 0 {
if _, err := bodyBuf.Write(sd.Data); err != nil {
return fmt.Errorf("buffer stream_data: %w", err)
}
}
expectedSeq++
for {
data, ok := received[expectedSeq]
if !ok {
break
}
if len(data) > 0 {
if _, err := bodyBuf.Write(data); err != nil {
return fmt.Errorf("buffer reordered stream_data: %w", err)
}
}
delete(received, expectedSeq)
delete(lost, expectedSeq)
expectedSeq++
}
// AckSeq 이전 구간의 lost 항목 정리
for seq := range lost {
if seq < expectedSeq {
delete(lost, seq)
}
}
case sd.Seq > expectedSeq:
// 앞선 일부 Seq 들이 누락된 상태: 현재 프레임을 버퍼링하고 missing seq 들을 lost 에 추가.
if len(sd.Data) > 0 {
buf := make([]byte, len(sd.Data))
copy(buf, sd.Data)
received[sd.Seq] = buf
}
for seq := expectedSeq; seq < sd.Seq && len(lost) < maxLostReport; seq++ {
if _, ok := lost[seq]; !ok {
lost[seq] = struct{}{}
}
}
default:
// sd.Seq < expectedSeq 인 경우: 이미 처리했거나 Ack 로 커버된 프레임 → 무시.
}
// 수신 측 StreamAck 전송:
// - AckSeq: 0부터 시작해 연속으로 수신 완료한 마지막 시퀀스 (expectedSeq-1)
// - LostSeqs: 현재 윈도우 내에서 누락된 시퀀스 중 상한 개수(maxLostReport)까지만 포함
var ackSeq uint64
if expectedSeq == 0 {
ackSeq = 0
} else {
ackSeq = expectedSeq - 1
}
lostSeqs := make([]uint64, 0, len(lost))
for seq := range lost {
if seq >= expectedSeq {
lostSeqs = append(lostSeqs, seq)
}
}
if len(lostSeqs) > 0 {
sort.Slice(lostSeqs, func(i, j int) bool { return lostSeqs[i] < lostSeqs[j] })
if len(lostSeqs) > maxLostReport {
lostSeqs = lostSeqs[:maxLostReport]
}
}
ackEnv := protocol.Envelope{
Type: protocol.MessageTypeStreamAck,
StreamAck: &protocol.StreamAck{
ID: streamID,
AckSeq: ackSeq,
LostSeqs: lostSeqs,
},
}
if err := codec.Encode(sess, &ackEnv); err != nil {
return fmt.Errorf("send stream ack: %w", err)
}
case protocol.MessageTypeStreamClose:
sc := env.StreamClose
if sc == nil {
@@ -322,6 +542,8 @@ haveBody:
Data: []byte("HopGate: " + errMsg),
},
}
// 에러 응답 프레임도 ARQ 대상에 등록합니다.
sender.register(0, dataEnv.StreamData.Data)
if err2 := codec.Encode(sess, &dataEnv); err2 != nil {
logReq.Error("failed to encode stream response data envelope (error path)", logging.Fields{
"error": err2.Error(),
@@ -390,6 +612,9 @@ haveBody:
n, err := res.Body.Read(chunk)
if n > 0 {
dataCopy := append([]byte(nil), chunk[:n]...)
// 송신 측 ARQ: Seq 별 payload 를 기록해 두었다가, StreamAck 의 LostSeqs 를 기반으로 재전송할 수 있습니다.
sender.register(seq, dataCopy)
dataEnv := protocol.Envelope{
Type: protocol.MessageTypeStreamData,
StreamData: &protocol.StreamData{