diff --git a/cmd/server/main.go b/cmd/server/main.go index fe7ede9..cdb886f 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -124,13 +124,6 @@ type dtlsSessionWrapper struct { // streamSenders keeps ARQ sender state for HTTP request bodies sent // from server to client. (en) streamSenders map[protocol.StreamID]*streamSender - - // requestMu 는 이 DTLS 세션에서 동시에 처리될 수 있는 HTTP 요청을 - // 하나로 제한하기 위한 뮤텍스입니다. (ko) - // requestMu serializes HTTP requests on this DTLS session so that the - // client (which currently processes one StreamOpen at a time) does not - // see interleaved streams. (en) - requestMu sync.Mutex } // registerStreamSender 는 주어진 스트림 ID 에 대한 송신 측 ARQ 상태를 등록합니다. (ko) @@ -446,13 +439,17 @@ func (w *dtlsSessionWrapper) ForwardHTTP(ctx context.Context, logger logging.Log ctx = context.Background() } - // 현재 클라이언트 구현은 DTLS 세션당 하나의 StreamOpen/StreamData/StreamClose - // 만 순차적으로 처리하므로, 서버에서도 동일 세션 위 HTTP 요청을 직렬화합니다. (ko) - // The current client processes exactly one StreamOpen/StreamData/StreamClose - // sequence at a time per DTLS session, so we serialize HTTP requests on - // this session as well. (en) - w.requestMu.Lock() - defer w.requestMu.Unlock() + // 클라이언트는 단일 DTLS 세션 내에서 다중 HTTP 스트림을 처리할 수 있도록 + // 중앙 readLoop + per-stream demux 구조(3.3B.1~3.3B.2)가 적용되어 있습니다. (ko) + // With the client-side central read loop + per-stream demux (3.3B.1–3.3B.2), + // a single DTLS session can now handle multiple concurrent HTTP streams. (en) + // + // 3.3B.4에서 정의한 것처럼, 서버 측에서는 더 이상 세션 단위 직렬화 락을 사용하지 않고 + // 동일 DTLS 세션 위에서 여러 ForwardHTTP 호출이 서로 다른 StreamID 로 병렬 진행되도록 + // 허용합니다. (ko) + // As per 3.3B.4, we no longer use a session-level serialization lock here and + // allow multiple ForwardHTTP calls to run concurrently on the same DTLS session + // using distinct StreamIDs. (en) // Generate a unique stream ID (needs mutex for nextStreamID) w.mu.Lock() diff --git a/internal/proxy/client.go b/internal/proxy/client.go index c980d2b..be2cd22 100644 --- a/internal/proxy/client.go +++ b/internal/proxy/client.go @@ -118,6 +118,432 @@ func (s *streamSender) handleAck(ack *protocol.StreamAck) map[uint64][]byte { return lost } +// streamReceiver 는 단일 스트림(ID)에 대한 클라이언트 측 수신 상태와 +// 로컬 HTTP 매핑을 담당하는 per-stream 구조체 설계입니다. (ko) +// streamReceiver is the per-stream receiver that owns client-side RX state +// and local HTTP mapping for a single stream ID. (en) +// +// 3.3B.2 설계 포인트: +// - 중앙 readLoop(StartLoop)는 DTLS 세션에서 Envelope 만 읽고, +// streamReceiver.inCh 로 `StreamOpen/StreamData/StreamClose` 를 전달합니다. +// - streamReceiver 는 자신에게 전달된 Envelope 들만 사용해 +// - 수신 ARQ(expectedSeq/received/lost) 를 관리하고, +// - HTTP 요청/응답을 구성해 역방향 StreamOpen/StreamData/StreamClose 를 전송합니다. +// - 실제 run 로직 및 StartLoop 와의 통합은 3.3B.3 단계에서 구현할 예정입니다. +type streamReceiver struct { + // 이 수신기가 담당하는 스트림 ID. + id protocol.StreamID + + // 수신 ARQ 상태: per-stream 시퀀스 및 out-of-order 버퍼/누락 집합. (ko) + // Receive-side ARQ state: per-stream sequence and out-of-order/lost sets. (en) + expectedSeq uint64 + received map[uint64][]byte + lost map[uint64]struct{} + + // 중앙 readLoop → per-stream goroutine 으로 전달되는 입력 채널. (ko) + // Input channel for envelopes dispatched from the central readLoop. (en) + inCh chan *protocol.Envelope + + // DTLS 세션 및 직렬화 codec / 로깅 핸들. (ko) + // DTLS session, wire codec and logging handles. (en) + sess dtls.Session + codec protocol.WireCodec + logger logging.Logger + + // 로컬 HTTP 클라이언트 및 타깃 주소 정보. (ko) + // Local HTTP client and target information. (en) + HTTPClient *http.Client + LocalTarget string +} + +// newStreamReceiver 는 단일 스트림 ID 에 대한 수신 상태/HTTP 매핑을 담당하는 +// streamReceiver 인스턴스를 초기화합니다. (ko) +// newStreamReceiver initializes a streamReceiver for a single stream ID. (en) +func newStreamReceiver( + id protocol.StreamID, + sess dtls.Session, + codec protocol.WireCodec, + logger logging.Logger, + httpClient *http.Client, + localTarget string, +) *streamReceiver { + if codec == nil { + codec = protocol.DefaultCodec + } + return &streamReceiver{ + id: id, + expectedSeq: 0, + received: make(map[uint64][]byte), + lost: make(map[uint64]struct{}), + inCh: make(chan *protocol.Envelope, 16), + sess: sess, + codec: codec, + logger: logger, + HTTPClient: httpClient, + LocalTarget: localTarget, + } +} + +// run 은 단일 스트림에 대해 서버→클라이언트 방향 프레임을 처리하고, +// 로컬 HTTP 요청/응답을 수행한 뒤, 클라이언트→서버 방향 스트림 응답을 +// 전송하는 수명주기 전담 루프입니다. (ko) +// run is the per-stream lifecycle loop that consumes inbound frames, +// performs the local HTTP request/response, and sends the reverse stream +// back to the server. (en) +func (r *streamReceiver) run(ctx context.Context, so *protocol.StreamOpen, sender *streamSender) error { + codec := r.codec + if codec == nil { + codec = protocol.DefaultCodec + } + log := r.logger + if log == nil { + log = logging.NewStdJSONLogger("client_proxy_stream_receiver") + } + + streamID := r.id + + // Pseudo-header 에서 HTTP 메타데이터를 추출합니다. (ko) + // Extract HTTP metadata from pseudo-headers. (en) + method := firstHeaderValue(so.Header, protocol.HeaderKeyMethod, http.MethodGet) + urlStr := firstHeaderValue(so.Header, protocol.HeaderKeyURL, "/") + _ = firstHeaderValue(so.Header, protocol.HeaderKeyHost, "") + + if r.LocalTarget == "" { + return fmt.Errorf("local target is empty") + } + + u, err := url.Parse(urlStr) + if err != nil { + return fmt.Errorf("parse url from stream_open: %w", err) + } + u.Scheme = "http" + u.Host = r.LocalTarget + + // 로컬 HTTP 요청용 헤더 맵을 생성하면서 pseudo-header 는 제거합니다. (ko) + // Build local HTTP header map while stripping pseudo-headers. (en) + httpHeader := make(http.Header, len(so.Header)) + for k, vs := range so.Header { + if k == protocol.HeaderKeyMethod || + k == protocol.HeaderKeyURL || + k == protocol.HeaderKeyHost || + k == protocol.HeaderKeyStatus { + continue + } + for _, v := range vs { + httpHeader.Add(k, v) + } + } + + // 요청 바디를 StreamData/StreamClose 프레임에서 모두 읽어 메모리에 적재합니다. (ko) + // Read the entire request body from StreamData/StreamClose frames into memory. (en) + // + // 동시에 수신 측 ARQ 상태(expectedSeq / out-of-order 버퍼 / LostSeqs)를 관리하고 + // StreamAck 를 전송해 선택적 재전송(Selective Retransmission)을 유도합니다. + var bodyBuf bytes.Buffer + const maxLostReport = 32 + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case env, ok := <-r.inCh: + if !ok { + return fmt.Errorf("stream receiver channel closed before stream_close") + } + + switch env.Type { + case protocol.MessageTypeStreamData: + sd := env.StreamData + if sd == nil { + return fmt.Errorf("stream_data payload is nil") + } + if sd.ID != streamID { + return fmt.Errorf("stream_data for unexpected stream id %q (expected %q)", sd.ID, streamID) + } + + // 수신 측 ARQ: Seq 에 따라 분기 + switch { + case sd.Seq == r.expectedSeq: + // 기대하던 순서의 프레임: 바로 bodyBuf 에 기록하고, 이후 버퍼된 연속 프레임도 flush. + if len(sd.Data) > 0 { + if _, err := bodyBuf.Write(sd.Data); err != nil { + return fmt.Errorf("buffer stream_data: %w", err) + } + } + r.expectedSeq++ + for { + data, ok := r.received[r.expectedSeq] + if !ok { + break + } + if len(data) > 0 { + if _, err := bodyBuf.Write(data); err != nil { + return fmt.Errorf("buffer reordered stream_data: %w", err) + } + } + delete(r.received, r.expectedSeq) + delete(r.lost, r.expectedSeq) + r.expectedSeq++ + } + + // AckSeq 이전 구간의 lost 항목 정리 + for seq := range r.lost { + if seq < r.expectedSeq { + delete(r.lost, seq) + } + } + + case sd.Seq > r.expectedSeq: + // 앞선 일부 Seq 들이 누락된 상태: 현재 프레임을 버퍼링하고 missing seq 들을 lost 에 추가. + if len(sd.Data) > 0 { + buf := make([]byte, len(sd.Data)) + copy(buf, sd.Data) + r.received[sd.Seq] = buf + } + for seq := r.expectedSeq; seq < sd.Seq && len(r.lost) < maxLostReport; seq++ { + if _, ok := r.lost[seq]; !ok { + r.lost[seq] = struct{}{} + } + } + + default: + // sd.Seq < expectedSeq 인 경우: 이미 처리했거나 Ack 로 커버된 프레임 → 무시. + } + + // 수신 측 StreamAck 전송: + // - AckSeq: 0부터 시작해 연속으로 수신 완료한 마지막 시퀀스 (expectedSeq-1) + // - LostSeqs: 현재 윈도우 내에서 누락된 시퀀스 중 상한 개수(maxLostReport)까지만 포함 + var ackSeq uint64 + if r.expectedSeq == 0 { + ackSeq = 0 + } else { + ackSeq = r.expectedSeq - 1 + } + + lostSeqs := make([]uint64, 0, len(r.lost)) + for seq := range r.lost { + if seq >= r.expectedSeq { + lostSeqs = append(lostSeqs, seq) + } + } + if len(lostSeqs) > 0 { + sort.Slice(lostSeqs, func(i, j int) bool { return lostSeqs[i] < lostSeqs[j] }) + if len(lostSeqs) > maxLostReport { + lostSeqs = lostSeqs[:maxLostReport] + } + } + + ackEnv := protocol.Envelope{ + Type: protocol.MessageTypeStreamAck, + StreamAck: &protocol.StreamAck{ + ID: streamID, + AckSeq: ackSeq, + LostSeqs: lostSeqs, + }, + } + if err := codec.Encode(r.sess, &ackEnv); err != nil { + return fmt.Errorf("send stream ack: %w", err) + } + + case protocol.MessageTypeStreamClose: + sc := env.StreamClose + if sc == nil { + return fmt.Errorf("stream_close payload is nil") + } + if sc.ID != streamID { + return fmt.Errorf("stream_close for unexpected stream id %q (expected %q)", sc.ID, streamID) + } + // sc.Error 는 최소 구현에서는 로컬 요청 에러와 별도로 취급하지 않습니다. (ko) + // For the minimal implementation we do not surface sc.Error here. (en) + goto haveBody + + default: + return fmt.Errorf("unexpected envelope type %q while reading stream request body", env.Type) + } + } + } + +haveBody: + bodyBytes := bodyBuf.Bytes() + + // 로컬 HTTP 요청 생성 (stream 기반 요청을 실제 HTTP 요청으로 변환). (ko) + // Build the local HTTP request from the stream-based metadata and body. (en) + req, err := http.NewRequestWithContext(ctx, method, u.String(), nil) + if err != nil { + return fmt.Errorf("create http request from stream: %w", err) + } + if len(bodyBytes) > 0 { + buf := bytes.NewReader(bodyBytes) + req.Body = io.NopCloser(buf) + req.ContentLength = int64(len(bodyBytes)) + } + req.Header = httpHeader + + start := time.Now() + logReq := log.With(logging.Fields{ + "request_id": string(streamID), + "service": so.Service, + "method": method, + "url": urlStr, + "stream_id": string(streamID), + "local_target": r.LocalTarget, + }) + logReq.Info("received stream_open envelope from server", nil) + + res, err := r.HTTPClient.Do(req) + if err != nil { + // 로컬 요청 실패 시, 502 + 에러 메시지를 스트림 응답으로 전송합니다. (ko) + // On local request failure, send a 502 response over the stream. (en) + errMsg := fmt.Sprintf("perform http request: %v", err) + streamRespHeader := map[string][]string{ + "Content-Type": {"text/plain; charset=utf-8"}, + protocol.HeaderKeyStatus: {strconv.Itoa(http.StatusBadGateway)}, + } + respOpen := protocol.Envelope{ + Type: protocol.MessageTypeStreamOpen, + StreamOpen: &protocol.StreamOpen{ + ID: streamID, + Service: so.Service, + TargetAddr: so.TargetAddr, + Header: streamRespHeader, + }, + } + if err2 := codec.Encode(r.sess, &respOpen); err2 != nil { + logReq.Error("failed to encode stream response open envelope (error path)", logging.Fields{ + "error": err2.Error(), + }) + return err2 + } + + dataEnv := protocol.Envelope{ + Type: protocol.MessageTypeStreamData, + StreamData: &protocol.StreamData{ + ID: streamID, + Seq: 0, + Data: []byte("HopGate: " + errMsg), + }, + } + // 에러 응답 프레임도 ARQ 대상에 등록합니다. + sender.register(0, dataEnv.StreamData.Data) + if err2 := codec.Encode(r.sess, &dataEnv); err2 != nil { + logReq.Error("failed to encode stream response data envelope (error path)", logging.Fields{ + "error": err2.Error(), + }) + return err2 + } + + closeEnv := protocol.Envelope{ + Type: protocol.MessageTypeStreamClose, + StreamClose: &protocol.StreamClose{ + ID: streamID, + Error: errMsg, + }, + } + if err2 := codec.Encode(r.sess, &closeEnv); err2 != nil { + logReq.Error("failed to encode stream response close envelope (error path)", logging.Fields{ + "error": err2.Error(), + }) + return err2 + } + + logReq.Error("local http request failed (stream)", logging.Fields{ + "error": err.Error(), + }) + return nil + } + defer res.Body.Close() + + // 응답을 StreamOpen + StreamData(4KiB chunk) + StreamClose 프레임으로 전송합니다. (ko) + // Send the response as StreamOpen + StreamData (4KiB chunks) + StreamClose frames. (en) + + // 응답 헤더 맵을 복사하고 상태 코드를 pseudo-header 로 추가합니다. (ko) + // Copy response headers and attach status code as a pseudo-header. (en) + streamRespHeader := make(map[string][]string, len(res.Header)+1) + for k, vs := range res.Header { + streamRespHeader[k] = append([]string(nil), vs...) + } + statusCode := res.StatusCode + if statusCode == 0 { + statusCode = http.StatusOK + } + streamRespHeader[protocol.HeaderKeyStatus] = []string{strconv.Itoa(statusCode)} + + respOpen := protocol.Envelope{ + Type: protocol.MessageTypeStreamOpen, + StreamOpen: &protocol.StreamOpen{ + ID: streamID, + Service: so.Service, + TargetAddr: so.TargetAddr, + Header: streamRespHeader, + }, + } + + if err := codec.Encode(r.sess, &respOpen); err != nil { + logReq.Error("failed to encode stream response open envelope", logging.Fields{ + "error": err.Error(), + }) + return err + } + + // 응답 바디를 4KiB(StreamChunkSize) 단위로 잘라 StreamData 프레임으로 전송합니다. (ko) + // Chunk the response body into 4KiB (StreamChunkSize) StreamData frames. (en) + var seq uint64 + chunk := make([]byte, protocol.StreamChunkSize) + for { + n, err := res.Body.Read(chunk) + if n > 0 { + dataCopy := append([]byte(nil), chunk[:n]...) + // 송신 측 ARQ: Seq 별 payload 를 기록해 두었다가, StreamAck 의 LostSeqs 를 기반으로 재전송할 수 있습니다. + sender.register(seq, dataCopy) + + dataEnv := protocol.Envelope{ + Type: protocol.MessageTypeStreamData, + StreamData: &protocol.StreamData{ + ID: streamID, + Seq: seq, + Data: dataCopy, + }, + } + if err2 := codec.Encode(r.sess, &dataEnv); err2 != nil { + logReq.Error("failed to encode stream response data envelope", logging.Fields{ + "error": err2.Error(), + }) + return err2 + } + seq++ + } + if err == io.EOF { + break + } + if err != nil { + return fmt.Errorf("read http response body for streaming: %w", err) + } + } + + closeEnv := protocol.Envelope{ + Type: protocol.MessageTypeStreamClose, + StreamClose: &protocol.StreamClose{ + ID: streamID, + Error: "", + }, + } + + if err := codec.Encode(r.sess, &closeEnv); err != nil { + logReq.Error("failed to encode stream response close envelope", logging.Fields{ + "error": err.Error(), + }) + return err + } + + logReq.Info("stream http response sent to server", logging.Fields{ + "status": statusCode, + "elapsed_ms": time.Since(start).Milliseconds(), + "error": "", + }) + + return nil +} + func (p *ClientProxy) registerStreamSender(id protocol.StreamID, sender *streamSender) { p.sendersMu.Lock() defer p.sendersMu.Unlock() @@ -145,6 +571,39 @@ func (p *ClientProxy) getStreamSender(id protocol.StreamID) *streamSender { return p.streamSenders[id] } +// StartLoop 는 단일 DTLS 세션에 대한 **중앙 readLoop** 역할을 수행합니다. (ko) +// StartLoop acts as the **central read loop** for a single DTLS session. (en) +// +// 3.3B.1 Design note — client-side DTLS session multiplexing: +// +// - 목표: +// - DTLS 세션 레벨에서는 오직 `protocol.Envelope` 를 연속해서 읽고(decoding), +// 각 Envelope 를 **스트림 단위로 demux** 하는 역할만 맡습니다. +// - 실제 HTTP 처리(요청 바디 수신, 로컬 HTTP 호출, 응답 스트림 전송)는 +// 개별 스트림 전용 goroutine/구조체(`streamReceiver` 등)가 담당하도록 분리합니다. +// +// - 스트림 demux 자료구조(계획): +// - `recvTable: map[protocol.StreamID]*streamReceiver` 형태의 수신 테이블을 유지합니다. +// - 각 `streamReceiver` 는 자신만의 입력 채널을 가집니다. 예: `inCh chan *protocol.Envelope`. +// - 중앙 readLoop 는 DTLS 세션에서 Envelope 를 읽은 뒤, +// - `env.Type == MessageTypeStreamOpen` 인 경우: +// - `id := env.StreamOpen.ID` 로 stream ID 를 구하고, +// - `recvTable[id]` 가 없으면 새 `streamReceiver` 를 생성해 goroutine 을 띄운 뒤 +// 첫 메시지(`env`)를 `receiver.inCh <- env` 로 전달합니다. +// - `env.Type == MessageTypeStreamData` / `MessageTypeStreamClose` 인 경우: +// - `id := env.StreamData.ID` 또는 `env.StreamClose.ID` 로 stream ID 를 구하고, +// - 기존 `recvTable[id]` 를 찾아 `receiver.inCh <- env` 로 전달합니다. +// - receiver 가 존재하지 않으면 해당 스트림에 한정된 프로토콜 에러로 처리할지 정책을 정의합니다. +// - `env.Type == MessageTypeStreamAck` 인 경우: +// - 이미 구현된 송신 측 ARQ 테이블(`streamSenders`)을 조회해 재전송 로직에 전달합니다. +// +// - 현재 구현 상태와 향후 리팩터링 경계: +// - 지금은 `MessageTypeStreamOpen` 을 수신하면 곧바로 `handleStreamRequest` 를 호출하고, +// 이 함수가 `reader` 를 직접 소비하면서 같은 세션 안에 **동시에 하나의 스트림만** 처리할 수 있습니다. +// - 3.3B.2 / 3.3B.3 단계에서는 위에서 설명한 demux 설계에 맞춰 +// - `handleStreamRequest` 내부 HTTP 매핑 로직을 `streamReceiver` 로 옮기고, +// - StartLoop 가 DTLS 세션 → per-stream goroutine 으로 이벤트를 분배하는 역할만 수행하도록 +// 점진적으로 리팩터링할 예정입니다. func (p *ClientProxy) StartLoop(ctx context.Context, sess dtls.Session) error { if ctx == nil { ctx = context.Background() @@ -163,12 +622,47 @@ func (p *ClientProxy) StartLoop(ctx context.Context, sess dtls.Session) error { codec := protocol.DefaultCodec bufferedReader := bufio.NewReaderSize(sess, protocol.GetDTLSReadBufferSize()) + // 스트림 수신기 테이블: 중앙 readLoop 가 StreamOpen/Data/Close 를 + // 각 streamReceiver 로 demux 하기 위해 사용합니다. (ko) + // Per-session stream receiver table used by the central read loop to + // demultiplex StreamOpen/Data/Close frames. (en) + receivers := make(map[protocol.StreamID]*streamReceiver) + var receiversMu sync.Mutex + + getReceiver := func(id protocol.StreamID) *streamReceiver { + receiversMu.Lock() + defer receiversMu.Unlock() + return receivers[id] + } + + addReceiver := func(id protocol.StreamID, rcv *streamReceiver) { + receiversMu.Lock() + receivers[id] = rcv + receiversMu.Unlock() + } + + removeReceiver := func(id protocol.StreamID) { + receiversMu.Lock() + delete(receivers, id) + receiversMu.Unlock() + } + + closeAllReceivers := func() { + receiversMu.Lock() + defer receiversMu.Unlock() + for id, rcv := range receivers { + close(rcv.inCh) + delete(receivers, id) + } + } + for { select { case <-ctx.Done(): log.Info("client proxy loop stopping due to context cancellation", logging.Fields{ "reason": ctx.Err().Error(), }) + closeAllReceivers() return nil default: } @@ -177,11 +671,13 @@ func (p *ClientProxy) StartLoop(ctx context.Context, sess dtls.Session) error { if err := codec.Decode(bufferedReader, &env); err != nil { if err == io.EOF { log.Info("dtls session closed by server", nil) + closeAllReceivers() return nil } log.Error("failed to decode protocol envelope", logging.Fields{ "error": err.Error(), }) + closeAllReceivers() return err } @@ -191,19 +687,17 @@ func (p *ClientProxy) StartLoop(ctx context.Context, sess dtls.Session) error { log.Error("failed to handle http envelope", logging.Fields{ "error": err.Error(), }) + closeAllReceivers() return err } - case protocol.MessageTypeStreamOpen: - if err := p.handleStreamRequest(ctx, sess, bufferedReader, &env); err != nil { - log.Error("failed to handle stream http envelope", logging.Fields{ - "error": err.Error(), - }) - return err - } + case protocol.MessageTypeStreamAck: + // 송신 측 ARQ: 서버 → 클라이언트 응답 스트림에 대한 StreamAck 처리. (ko) + // Sender-side ARQ: handle StreamAck for response streams (server → client). (en) sa := env.StreamAck if sa == nil { log.Error("received stream_ack envelope with nil payload", nil) + closeAllReceivers() return fmt.Errorf("stream_ack payload is nil") } streamID := protocol.StreamID(sa.ID) @@ -215,7 +709,8 @@ func (p *ClientProxy) StartLoop(ctx context.Context, sess dtls.Session) error { continue } lost := sender.handleAck(sa) - // LostSeqs 를 기반으로 선택적 재전송 수행 + // LostSeqs 를 기반으로 선택적 재전송 수행 (Selective Retransmission). (ko) + // Perform selective retransmission based on LostSeqs. (en) for seq, data := range lost { retryEnv := protocol.Envelope{ Type: protocol.MessageTypeStreamData, @@ -231,6 +726,7 @@ func (p *ClientProxy) StartLoop(ctx context.Context, sess dtls.Session) error { "seq": seq, "error": err.Error(), }) + closeAllReceivers() return err } log.Info("retransmitted stream_data after stream_ack", logging.Fields{ @@ -238,32 +734,104 @@ func (p *ClientProxy) StartLoop(ctx context.Context, sess dtls.Session) error { "seq": seq, }) } + + case protocol.MessageTypeStreamOpen: + // 새로운 스트림에 대한 수신기 생성 및 goroutine 실행. (ko) + // Create a new streamReceiver and start its goroutine for this stream. (en) + so := env.StreamOpen + if so == nil { + log.Error("stream_open envelope missing payload", nil) + continue + } + streamID := so.ID + if streamID == "" { + log.Error("stream_open with empty stream id", nil) + continue + } + if p.LocalTarget == "" { + closeAllReceivers() + return fmt.Errorf("local target is empty") + } + + if existing := getReceiver(streamID); existing != nil { + log.Error("duplicate stream_open for existing stream", logging.Fields{ + "stream_id": streamID, + }) + continue + } + + sender := newStreamSender() + p.registerStreamSender(streamID, sender) + + receiver := newStreamReceiver(streamID, sess, codec, log, p.HTTPClient, p.LocalTarget) + addReceiver(streamID, receiver) + + go func(id protocol.StreamID, r *streamReceiver, so *protocol.StreamOpen, snd *streamSender) { + if err := r.run(ctx, so, snd); err != nil { + log.Error("stream receiver terminated with error", logging.Fields{ + "stream_id": id, + "error": err.Error(), + }) + } + removeReceiver(id) + p.unregisterStreamSender(id) + }(streamID, receiver, so, sender) + case protocol.MessageTypeStreamData: - // StreamData received at top level (not expected, should be consumed by handleStreamRequest) - // This can happen if frames arrive out of order or if there's a protocol mismatch - streamID := "unknown" - if env.StreamData != nil { - streamID = string(env.StreamData.ID) + // StreamData 는 중앙 readLoop 에서 해당 streamReceiver 로 demux 됩니다. (ko) + // StreamData frames are demultiplexed to the corresponding streamReceiver. (en) + sd := env.StreamData + if sd == nil { + log.Error("stream_data envelope with nil payload", nil) + continue } - log.Warn("received unexpected stream_data at top level, ignoring", logging.Fields{ - "stream_id": streamID, - }) - continue + streamID := sd.ID + receiver := getReceiver(streamID) + if receiver == nil { + log.Warn("received stream_data for unknown stream", logging.Fields{ + "stream_id": streamID, + }) + continue + } + envCopy := env + select { + case receiver.inCh <- &envCopy: + case <-ctx.Done(): + closeAllReceivers() + return nil + } + case protocol.MessageTypeStreamClose: - // StreamClose received at top level (not expected, should be consumed by handleStreamRequest) - // This can happen if frames arrive out of order or if there's a protocol mismatch - streamID := "unknown" - if env.StreamClose != nil { - streamID = string(env.StreamClose.ID) + // StreamClose 역시 중앙 readLoop 에서 해당 streamReceiver 로 전달합니다. (ko) + // StreamClose is also forwarded from the central readLoop to streamReceiver. (en) + sc := env.StreamClose + if sc == nil { + log.Error("stream_close envelope with nil payload", nil) + continue } - log.Warn("received unexpected stream_close at top level, ignoring", logging.Fields{ - "stream_id": streamID, - }) - continue + streamID := sc.ID + receiver := getReceiver(streamID) + if receiver == nil { + log.Warn("received stream_close for unknown stream", logging.Fields{ + "stream_id": streamID, + }) + continue + } + envCopy := env + select { + case receiver.inCh <- &envCopy: + // 수명주기 정리는 receiver.run 내부와 goroutine 종료 시 removeReceiver 에서 수행됩니다. (ko) + // Lifecycle cleanup is handled inside receiver.run and the goroutine's defer. (en) + case <-ctx.Done(): + closeAllReceivers() + return nil + } + default: log.Error("received unsupported envelope type from server", logging.Fields{ "type": env.Type, }) + closeAllReceivers() return fmt.Errorf("unsupported envelope type %q", env.Type) } } @@ -343,57 +911,37 @@ func (p *ClientProxy) handleStreamRequest(ctx context.Context, sess dtls.Session p.registerStreamSender(streamID, sender) defer p.unregisterStreamSender(streamID) - // Pseudo-header 에서 HTTP 메타데이터를 추출합니다. (ko) - // Extract HTTP metadata from pseudo-headers. (en) - method := firstHeaderValue(so.Header, protocol.HeaderKeyMethod, http.MethodGet) - urlStr := firstHeaderValue(so.Header, protocol.HeaderKeyURL, "/") - _ = firstHeaderValue(so.Header, protocol.HeaderKeyHost, "") - if p.LocalTarget == "" { return fmt.Errorf("local target is empty") } - u, err := url.Parse(urlStr) - if err != nil { - return fmt.Errorf("parse url from stream_open: %w", err) - } - u.Scheme = "http" - u.Host = p.LocalTarget + // streamReceiver 를 생성해 스트림 수신/HTTP 매핑/응답 전송을 전담시킵니다. (ko) + // Delegate per-stream RX/HTTP mapping/response to a streamReceiver. (en) + receiver := newStreamReceiver(streamID, sess, codec, log, p.HTTPClient, p.LocalTarget) - // 로컬 HTTP 요청용 헤더 맵을 생성하면서 pseudo-header 는 제거합니다. (ko) - // Build local HTTP header map while stripping pseudo-headers. (en) - httpHeader := make(http.Header, len(so.Header)) - for k, vs := range so.Header { - if k == protocol.HeaderKeyMethod || - k == protocol.HeaderKeyURL || - k == protocol.HeaderKeyHost || - k == protocol.HeaderKeyStatus { - continue - } - for _, v := range vs { - httpHeader.Add(k, v) - } - } - - // 요청 바디를 StreamData/StreamClose 프레임에서 모두 읽어 메모리에 적재합니다. (ko) - // Read the entire request body from StreamData/StreamClose frames into memory. (en) - // - // 동시에 수신 측 ARQ 상태( expectedSeq / out-of-order 버퍼 / LostSeqs )를 관리하고 - // StreamAck 를 전송해 선택적 재전송(Selective Retransmission)을 유도합니다. - var ( - bodyBuf bytes.Buffer - expectedSeq uint64 - received = make(map[uint64][]byte) - lost = make(map[uint64]struct{}) - ) - const maxLostReport = 32 + // streamReceiver 수명주기를 별도 goroutine 으로 실행합니다. (ko) + // Run the streamReceiver lifecycle in a separate goroutine. (en) + errCh := make(chan error, 1) + go func() { + errCh <- receiver.run(ctx, so, sender) + }() for { var env protocol.Envelope if err := codec.Decode(reader, &env); err != nil { if err == io.EOF { + // DTLS 세션이 조기 종료되면 receiver 에게 더 이상 프레임이 없음을 알리고 종료를 기다립니다. (ko) + // On EOF, close the channel so receiver can terminate gracefully. (en) + close(receiver.inCh) + if recvErr := <-errCh; recvErr != nil { + return recvErr + } return fmt.Errorf("unexpected EOF while reading stream request body") } + close(receiver.inCh) + if recvErr := <-errCh; recvErr != nil { + return recvErr + } return fmt.Errorf("decode stream request frame: %w", err) } @@ -401,291 +949,47 @@ func (p *ClientProxy) handleStreamRequest(ctx context.Context, sess dtls.Session case protocol.MessageTypeStreamData: sd := env.StreamData if sd == nil { + close(receiver.inCh) + _ = <-errCh return fmt.Errorf("stream_data payload is nil") } if sd.ID != streamID { + close(receiver.inCh) + _ = <-errCh return fmt.Errorf("stream_data for unexpected stream id %q (expected %q)", sd.ID, streamID) } - - // 수신 측 ARQ: Seq 에 따라 분기 - switch { - case sd.Seq == expectedSeq: - // 기대하던 순서의 프레임: 바로 bodyBuf 에 기록하고, 이후 버퍼된 연속 프레임도 flush. - if len(sd.Data) > 0 { - if _, err := bodyBuf.Write(sd.Data); err != nil { - return fmt.Errorf("buffer stream_data: %w", err) - } - } - expectedSeq++ - for { - data, ok := received[expectedSeq] - if !ok { - break - } - if len(data) > 0 { - if _, err := bodyBuf.Write(data); err != nil { - return fmt.Errorf("buffer reordered stream_data: %w", err) - } - } - delete(received, expectedSeq) - delete(lost, expectedSeq) - expectedSeq++ - } - - // AckSeq 이전 구간의 lost 항목 정리 - for seq := range lost { - if seq < expectedSeq { - delete(lost, seq) - } - } - - case sd.Seq > expectedSeq: - // 앞선 일부 Seq 들이 누락된 상태: 현재 프레임을 버퍼링하고 missing seq 들을 lost 에 추가. - if len(sd.Data) > 0 { - buf := make([]byte, len(sd.Data)) - copy(buf, sd.Data) - received[sd.Seq] = buf - } - for seq := expectedSeq; seq < sd.Seq && len(lost) < maxLostReport; seq++ { - if _, ok := lost[seq]; !ok { - lost[seq] = struct{}{} - } - } - - default: - // sd.Seq < expectedSeq 인 경우: 이미 처리했거나 Ack 로 커버된 프레임 → 무시. - } - - // 수신 측 StreamAck 전송: - // - AckSeq: 0부터 시작해 연속으로 수신 완료한 마지막 시퀀스 (expectedSeq-1) - // - LostSeqs: 현재 윈도우 내에서 누락된 시퀀스 중 상한 개수(maxLostReport)까지만 포함 - var ackSeq uint64 - if expectedSeq == 0 { - ackSeq = 0 - } else { - ackSeq = expectedSeq - 1 - } - - lostSeqs := make([]uint64, 0, len(lost)) - for seq := range lost { - if seq >= expectedSeq { - lostSeqs = append(lostSeqs, seq) - } - } - if len(lostSeqs) > 0 { - sort.Slice(lostSeqs, func(i, j int) bool { return lostSeqs[i] < lostSeqs[j] }) - if len(lostSeqs) > maxLostReport { - lostSeqs = lostSeqs[:maxLostReport] - } - } - - ackEnv := protocol.Envelope{ - Type: protocol.MessageTypeStreamAck, - StreamAck: &protocol.StreamAck{ - ID: streamID, - AckSeq: ackSeq, - LostSeqs: lostSeqs, - }, - } - if err := codec.Encode(sess, &ackEnv); err != nil { - return fmt.Errorf("send stream ack: %w", err) - } + envCopy := env + receiver.inCh <- &envCopy case protocol.MessageTypeStreamClose: sc := env.StreamClose if sc == nil { + close(receiver.inCh) + _ = <-errCh return fmt.Errorf("stream_close payload is nil") } if sc.ID != streamID { + close(receiver.inCh) + _ = <-errCh return fmt.Errorf("stream_close for unexpected stream id %q (expected %q)", sc.ID, streamID) } - // sc.Error 는 최소 구현에서는 로컬 요청 에러와 별도로 취급하지 않습니다. (ko) - // For the minimal implementation we do not surface sc.Error here. (en) - goto haveBody + // StreamClose 프레임을 receiver 에게 전달한 뒤 채널을 닫고 종료를 기다립니다. (ko) + // After forwarding StreamClose, close the channel and wait for receiver to finish. (en) + envCopy := env + receiver.inCh <- &envCopy + close(receiver.inCh) + return <-errCh + default: + // 예상치 못한 Envelope 타입: 해당 스트림에 한정된 프로토콜 에러로 보고 receiver 를 종료합니다. (ko) + // Unexpected envelope type: treat as per-stream protocol error and shut down receiver. (en) + close(receiver.inCh) + if recvErr := <-errCh; recvErr != nil { + return recvErr + } return fmt.Errorf("unexpected envelope type %q while reading stream request body", env.Type) } } - -haveBody: - bodyBytes := bodyBuf.Bytes() - - // 로컬 HTTP 요청 생성 (stream 기반 요청을 실제 HTTP 요청으로 변환). (ko) - // Build the local HTTP request from the stream-based metadata and body. (en) - req, err := http.NewRequestWithContext(ctx, method, u.String(), nil) - if err != nil { - return fmt.Errorf("create http request from stream: %w", err) - } - if len(bodyBytes) > 0 { - buf := bytes.NewReader(bodyBytes) - req.Body = io.NopCloser(buf) - req.ContentLength = int64(len(bodyBytes)) - } - req.Header = httpHeader - - start := time.Now() - logReq := log.With(logging.Fields{ - "request_id": string(streamID), - "service": so.Service, - "method": method, - "url": urlStr, - "stream_id": string(streamID), - "local_target": p.LocalTarget, - }) - logReq.Info("received stream_open envelope from server", nil) - - res, err := p.HTTPClient.Do(req) - if err != nil { - // 로컬 요청 실패 시, 502 + 에러 메시지를 스트림 응답으로 전송합니다. (ko) - // On local request failure, send a 502 response over the stream. (en) - errMsg := fmt.Sprintf("perform http request: %v", err) - streamRespHeader := map[string][]string{ - "Content-Type": {"text/plain; charset=utf-8"}, - protocol.HeaderKeyStatus: {strconv.Itoa(http.StatusBadGateway)}, - } - respOpen := protocol.Envelope{ - Type: protocol.MessageTypeStreamOpen, - StreamOpen: &protocol.StreamOpen{ - ID: streamID, - Service: so.Service, - TargetAddr: so.TargetAddr, - Header: streamRespHeader, - }, - } - if err2 := codec.Encode(sess, &respOpen); err2 != nil { - logReq.Error("failed to encode stream response open envelope (error path)", logging.Fields{ - "error": err2.Error(), - }) - return err2 - } - - dataEnv := protocol.Envelope{ - Type: protocol.MessageTypeStreamData, - StreamData: &protocol.StreamData{ - ID: streamID, - Seq: 0, - Data: []byte("HopGate: " + errMsg), - }, - } - // 에러 응답 프레임도 ARQ 대상에 등록합니다. - sender.register(0, dataEnv.StreamData.Data) - if err2 := codec.Encode(sess, &dataEnv); err2 != nil { - logReq.Error("failed to encode stream response data envelope (error path)", logging.Fields{ - "error": err2.Error(), - }) - return err2 - } - - closeEnv := protocol.Envelope{ - Type: protocol.MessageTypeStreamClose, - StreamClose: &protocol.StreamClose{ - ID: streamID, - Error: errMsg, - }, - } - if err2 := codec.Encode(sess, &closeEnv); err2 != nil { - logReq.Error("failed to encode stream response close envelope (error path)", logging.Fields{ - "error": err2.Error(), - }) - return err2 - } - - logReq.Error("local http request failed (stream)", logging.Fields{ - "error": err.Error(), - }) - return nil - } - defer res.Body.Close() - - // 응답을 StreamOpen + StreamData(4KiB chunk) + StreamClose 프레임으로 전송합니다. (ko) - // Send the response as StreamOpen + StreamData (4KiB chunks) + StreamClose frames. (en) - - // 응답 헤더 맵을 복사하고 상태 코드를 pseudo-header 로 추가합니다. (ko) - // Copy response headers and attach status code as a pseudo-header. (en) - streamRespHeader := make(map[string][]string, len(res.Header)+1) - for k, vs := range res.Header { - streamRespHeader[k] = append([]string(nil), vs...) - } - statusCode := res.StatusCode - if statusCode == 0 { - statusCode = http.StatusOK - } - streamRespHeader[protocol.HeaderKeyStatus] = []string{strconv.Itoa(statusCode)} - - respOpen := protocol.Envelope{ - Type: protocol.MessageTypeStreamOpen, - StreamOpen: &protocol.StreamOpen{ - ID: streamID, - Service: so.Service, - TargetAddr: so.TargetAddr, - Header: streamRespHeader, - }, - } - - if err := codec.Encode(sess, &respOpen); err != nil { - logReq.Error("failed to encode stream response open envelope", logging.Fields{ - "error": err.Error(), - }) - return err - } - - // 응답 바디를 4KiB(StreamChunkSize) 단위로 잘라 StreamData 프레임으로 전송합니다. (ko) - // Chunk the response body into 4KiB (StreamChunkSize) StreamData frames. (en) - var seq uint64 - chunk := make([]byte, protocol.StreamChunkSize) - for { - n, err := res.Body.Read(chunk) - if n > 0 { - dataCopy := append([]byte(nil), chunk[:n]...) - // 송신 측 ARQ: Seq 별 payload 를 기록해 두었다가, StreamAck 의 LostSeqs 를 기반으로 재전송할 수 있습니다. - sender.register(seq, dataCopy) - - dataEnv := protocol.Envelope{ - Type: protocol.MessageTypeStreamData, - StreamData: &protocol.StreamData{ - ID: streamID, - Seq: seq, - Data: dataCopy, - }, - } - if err2 := codec.Encode(sess, &dataEnv); err2 != nil { - logReq.Error("failed to encode stream response data envelope", logging.Fields{ - "error": err2.Error(), - }) - return err2 - } - seq++ - } - if err == io.EOF { - break - } - if err != nil { - return fmt.Errorf("read http response body for streaming: %w", err) - } - } - - closeEnv := protocol.Envelope{ - Type: protocol.MessageTypeStreamClose, - StreamClose: &protocol.StreamClose{ - ID: streamID, - Error: "", - }, - } - - if err := codec.Encode(sess, &closeEnv); err != nil { - logReq.Error("failed to encode stream response close envelope", logging.Fields{ - "error": err.Error(), - }) - return err - } - - logReq.Info("stream http response sent to server", logging.Fields{ - "status": statusCode, - "elapsed_ms": time.Since(start).Milliseconds(), - "error": "", - }) - - return nil } // forwardToLocal 는 protocol.Request 를 로컬 HTTP 요청으로 변환하고 protocol.Response 를 채웁니다. (ko) diff --git a/progress.md b/progress.md index 36d614c..5ffd0ac 100644 --- a/progress.md +++ b/progress.md @@ -418,13 +418,13 @@ This section introduces a **client-side stream demultiplexer + per-stream gorout ##### 3.3B.1 클라이언트 측 중앙 readLoop → 스트림 demux 설계 ##### 3.3B.1 Design client-side central readLoop → per-stream demux -- [ ] `ClientProxy.StartLoop` 의 역할을 명확히 분리 +- [x] `ClientProxy.StartLoop` 의 역할을 명확히 분리 - DTLS 세션에서 `Envelope` 를 연속해서 읽어들이는 **중앙 readLoop** 를 유지하되, - 개별 스트림의 HTTP 처리 로직(현재 `handleStreamRequest` 내부 로직)을 분리해 별도 타입/구조체로 옮길 계획을 문서화합니다. -- [ ] 스트림 demux 위한 자료구조 설계 +- [x] 스트림 demux 위한 자료구조 설계 - `map[protocol.StreamID]*streamReceiver` 형태의 수신측 스트림 상태 테이블을 정의합니다. - 각 `streamReceiver` 는 자신만의 입력 채널(예: `inCh chan *protocol.Envelope`)을 가져, 중앙 readLoop 로부터 `StreamOpen/StreamData/StreamClose` 를 전달받도록 합니다. -- [ ] 중앙 readLoop 에서 스트림별 라우팅 규칙 정의 +- [x] 중앙 readLoop 에서 스트림별 라우팅 규칙 정의 - `Envelope.Type` 에 따라: - `StreamOpen` / `StreamData` / `StreamClose`: - `streamID` 를 추출하고, 해당 `streamReceiver` 의 `inCh` 로 전달. @@ -438,7 +438,7 @@ This section introduces a **client-side stream demultiplexer + per-stream gorout ##### 3.3B.2 streamReceiver 타입 설계 및 HTTP 매핑 리팩터링 ##### 3.3B.2 Design streamReceiver type and refactor HTTP mapping -- [ ] `streamReceiver` 타입 정의 +- [x] `streamReceiver` 타입 정의 - 필드 예시: - `id protocol.StreamID` - 수신 ARQ 상태: `expectedSeq`, `received map[uint64][]byte`, `lost map[uint64]struct{}` @@ -448,14 +448,14 @@ This section introduces a **client-side stream demultiplexer + per-stream gorout - 역할: - 서버에서 온 `StreamOpen`/`StreamData`/`StreamClose` 를 순서대로 처리해 로컬 HTTP 요청을 구성하고, - 로컬 HTTP 응답을 다시 `StreamOpen`/`StreamData`/`StreamClose` 로 역방향 전송합니다. -- [ ] 기존 `ClientProxy.handleStreamRequest` 의 로직을 `streamReceiver` 로 이전 +- [x] 기존 `ClientProxy.handleStreamRequest` 의 로직을 `streamReceiver` 로 이전 - 현재 `handleStreamRequest` 안에서 수행하던 작업을 단계적으로 옮깁니다: - `StreamOpen` 의 pseudo-header 에서 HTTP 메서드/URL/헤더를 복원. - 요청 바디 수신용 수신 측 ARQ(`expectedSeq`, `received`, `lost`) 처리. - 로컬 HTTP 요청 생성/실행 및 에러 처리. - 응답을 4KiB `StreamData` chunk 로 전송 + 송신 측 ARQ(`streamSender.register`) 기록. - 이때 **DTLS reader 를 직접 읽던 부분**은 제거하고, 대신 `inCh` 에서 전달된 `Envelope` 만 사용하도록 리팩터링합니다. -- [ ] streamReceiver 생명주기 관리 +- [x] streamReceiver 생명주기 관리 - `StreamClose` 수신 시: - 로컬 HTTP 요청 바디 구성 종료. - 로컬 HTTP 요청 실행 및 응답 스트림 전송 완료 후, @@ -466,14 +466,14 @@ This section introduces a **client-side stream demultiplexer + per-stream gorout ##### 3.3B.3 StartLoop 와 streamReceiver 통합 ##### 3.3B.3 Integrate StartLoop and streamReceiver -- [ ] `ClientProxy.StartLoop` 을 “중앙 readLoop + demux” 로 단순화 +- [x] `ClientProxy.StartLoop` 을 “중앙 readLoop + demux” 로 단순화 - `MessageTypeStreamOpen` 수신 시: - `streamID := env.StreamOpen.ID` 를 기준으로 기존 `streamReceiver` 존재 여부를 검사. - 없으면 새 `streamReceiver` 생성 후, goroutine 을 띄우고 `inCh <- env` 로 첫 메시지 전달. - `MessageTypeStreamData` / `MessageTypeStreamClose` 수신 시: - 해당 `streamReceiver` 의 `inCh` 로 그대로 전달. - `MessageTypeStreamAck` 는 기존처럼 송신 측 `streamSender` 로 라우팅. -- [ ] 에러/종료 처리 전략 정리 +- [x] 에러/종료 처리 전략 정리 - 개별 `streamReceiver` 에서 발생하는 에러는: - 로컬 HTTP 에러 → 스트림 응답에 5xx/에러 바디로 반영. - 프로토콜 위반(예: 잘못된 순서의 `StreamClose`) → 해당 스트림만 정리하고 세션은 유지하는지 여부를 정의. @@ -486,7 +486,7 @@ This section introduces a **client-side stream demultiplexer + per-stream gorout ##### 3.3B.4 세션 단위 직렬화 락 제거 및 멀티플렉싱 검증 ##### 3.3B.4 Remove session-level serialization lock and validate multiplexing -- [ ] 서버 측 세션 직렬화 락 제거 계획 수립 +- [x] 서버 측 세션 직렬화 락 제거 계획 수립 - 현재 서버는 [`dtlsSessionWrapper`](cmd/server/main.go:111)에 `requestMu` 를 두어, - 동일 DTLS 세션에서 동시에 하나의 `ForwardHTTP` 만 수행하도록 직렬화하고 있습니다. - 클라이언트 측 멀티플렉싱이 안정화되면, `requestMu` 를 제거하고