diff --git a/cmd/client/main.go b/cmd/client/main.go index a00185d..a84c8b8 100644 --- a/cmd/client/main.go +++ b/cmd/client/main.go @@ -1,20 +1,28 @@ package main import ( + "bytes" "context" "crypto/tls" "crypto/x509" "flag" + "fmt" "io" "net" + "net/http" + "net/url" "os" + "strconv" "strings" + "sync" + "time" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "github.com/dalbodeule/hop-gate/internal/config" "github.com/dalbodeule/hop-gate/internal/logging" + "github.com/dalbodeule/hop-gate/internal/protocol" protocolpb "github.com/dalbodeule/hop-gate/internal/protocol/pb" ) @@ -146,8 +154,412 @@ func runGRPCTunnelClient(ctx context.Context, logger logging.Logger, finalCfg *c "api_key_mask": maskAPIKey(finalCfg.ClientAPIKey), }) - // 수신 루프: 현재는 수신된 Envelope 의 타입만 로그에 남기고 종료하지 않습니다. (ko) - // Receive loop: currently only logs envelope payload types and keeps the tunnel open. (en) + // 로컬 HTTP 프록시용 HTTP 클라이언트 구성. (ko) + // HTTP client used to forward requests to the local target. (en) + httpClient := &http.Client{ + Timeout: 30 * time.Second, + Transport: &http.Transport{ + Proxy: http.ProxyFromEnvironment, + DialContext: (&net.Dialer{ + Timeout: 10 * time.Second, + KeepAlive: 30 * time.Second, + }).DialContext, + ForceAttemptHTTP2: true, + MaxIdleConns: 100, + IdleConnTimeout: 90 * time.Second, + TLSHandshakeTimeout: 10 * time.Second, + ExpectContinueTimeout: 1 * time.Second, + }, + } + + // 서버→클라이언트 방향 StreamOpen/StreamData/StreamClose 를 + // HTTP 요청 단위로 모으기 위한 per-stream 상태 테이블입니다. (ko) + // Per-stream state table to assemble HTTP requests from StreamOpen/Data/Close. (en) + type inboundStream struct { + open *protocolpb.StreamOpen + body bytes.Buffer + } + + streams := make(map[string]*inboundStream) + var streamsMu sync.Mutex + + // gRPC 스트림에 대한 Send 는 동시 호출이 안전하지 않으므로, sendMu 로 직렬화합니다. (ko) + // gRPC streaming Send is not safe for concurrent calls; protect with a mutex. (en) + var sendMu sync.Mutex + sendEnv := func(e *protocolpb.Envelope) error { + sendMu.Lock() + defer sendMu.Unlock() + return stream.Send(e) + } + + // 서버에서 전달된 StreamOpen/StreamData/StreamClose 를 로컬 HTTP 요청으로 변환하고, + // 응답을 StreamOpen/StreamData/StreamClose 로 다시 서버에 전송하는 헬퍼입니다. (ko) + // handleStream forwards a single logical HTTP request to the local target + // and sends the response back as StreamOpen/StreamData/StreamClose frames. (en) + handleStream := func(so *protocolpb.StreamOpen, body []byte) { + go func() { + streamID := strings.TrimSpace(so.Id) + if streamID == "" { + log.Error("inbound stream has empty id", logging.Fields{}) + return + } + + if finalCfg.LocalTarget == "" { + log.Error("local target is empty; cannot forward request", logging.Fields{ + "stream_id": streamID, + }) + return + } + + // Pseudo-headers 에서 메서드/URL/Host 추출. (ko) + // Extract method/URL/host from pseudo-headers. (en) + method := http.MethodGet + if hv, ok := so.Header[protocol.HeaderKeyMethod]; ok && hv != nil && len(hv.Values) > 0 && strings.TrimSpace(hv.Values[0]) != "" { + method = hv.Values[0] + } + urlStr := "/" + if hv, ok := so.Header[protocol.HeaderKeyURL]; ok && hv != nil && len(hv.Values) > 0 && strings.TrimSpace(hv.Values[0]) != "" { + urlStr = hv.Values[0] + } + + u, err := url.Parse(urlStr) + if err != nil { + errMsg := fmt.Sprintf("parse url from stream_open: %v", err) + log.Error("failed to parse url from stream_open", logging.Fields{ + "stream_id": streamID, + "error": err.Error(), + }) + + respHeader := map[string]*protocolpb.HeaderValues{ + "Content-Type": { + Values: []string{"text/plain; charset=utf-8"}, + }, + protocol.HeaderKeyStatus: { + Values: []string{strconv.Itoa(http.StatusBadGateway)}, + }, + } + respOpen := &protocolpb.Envelope{ + Payload: &protocolpb.Envelope_StreamOpen{ + StreamOpen: &protocolpb.StreamOpen{ + Id: streamID, + ServiceName: so.ServiceName, + TargetAddr: so.TargetAddr, + Header: respHeader, + }, + }, + } + if err2 := sendEnv(respOpen); err2 != nil { + log.Error("failed to send error stream_open from client", logging.Fields{ + "stream_id": streamID, + "error": err2.Error(), + }) + return + } + + dataEnv := &protocolpb.Envelope{ + Payload: &protocolpb.Envelope_StreamData{ + StreamData: &protocolpb.StreamData{ + Id: streamID, + Seq: 0, + Data: []byte("HopGate client: " + errMsg), + }, + }, + } + if err2 := sendEnv(dataEnv); err2 != nil { + log.Error("failed to send error stream_data from client", logging.Fields{ + "stream_id": streamID, + "error": err2.Error(), + }) + return + } + + closeEnv := &protocolpb.Envelope{ + Payload: &protocolpb.Envelope_StreamClose{ + StreamClose: &protocolpb.StreamClose{ + Id: streamID, + Error: errMsg, + }, + }, + } + if err2 := sendEnv(closeEnv); err2 != nil { + log.Error("failed to send error stream_close from client", logging.Fields{ + "stream_id": streamID, + "error": err2.Error(), + }) + } + return + } + u.Scheme = "http" + u.Host = finalCfg.LocalTarget + + // 로컬 HTTP 요청용 헤더 구성 (pseudo-headers 제거). (ko) + // Build local HTTP headers, stripping pseudo-headers. (en) + httpHeader := make(http.Header, len(so.Header)) + for k, hv := range so.Header { + if k == protocol.HeaderKeyMethod || + k == protocol.HeaderKeyURL || + k == protocol.HeaderKeyHost || + k == protocol.HeaderKeyStatus { + continue + } + if hv == nil { + continue + } + for _, v := range hv.Values { + httpHeader.Add(k, v) + } + } + + var reqBody io.Reader + if len(body) > 0 { + reqBody = bytes.NewReader(body) + } + + req, err := http.NewRequestWithContext(ctx, method, u.String(), reqBody) + if err != nil { + errMsg := fmt.Sprintf("create http request from stream: %v", err) + log.Error("failed to create local http request", logging.Fields{ + "stream_id": streamID, + "error": err.Error(), + }) + + respHeader := map[string]*protocolpb.HeaderValues{ + "Content-Type": { + Values: []string{"text/plain; charset=utf-8"}, + }, + protocol.HeaderKeyStatus: { + Values: []string{strconv.Itoa(http.StatusBadGateway)}, + }, + } + respOpen := &protocolpb.Envelope{ + Payload: &protocolpb.Envelope_StreamOpen{ + StreamOpen: &protocolpb.StreamOpen{ + Id: streamID, + ServiceName: so.ServiceName, + TargetAddr: so.TargetAddr, + Header: respHeader, + }, + }, + } + if err2 := sendEnv(respOpen); err2 != nil { + log.Error("failed to send error stream_open from client", logging.Fields{ + "stream_id": streamID, + "error": err2.Error(), + }) + return + } + + dataEnv := &protocolpb.Envelope{ + Payload: &protocolpb.Envelope_StreamData{ + StreamData: &protocolpb.StreamData{ + Id: streamID, + Seq: 0, + Data: []byte("HopGate client: " + errMsg), + }, + }, + } + if err2 := sendEnv(dataEnv); err2 != nil { + log.Error("failed to send error stream_data from client", logging.Fields{ + "stream_id": streamID, + "error": err2.Error(), + }) + return + } + + closeEnv := &protocolpb.Envelope{ + Payload: &protocolpb.Envelope_StreamClose{ + StreamClose: &protocolpb.StreamClose{ + Id: streamID, + Error: errMsg, + }, + }, + } + if err2 := sendEnv(closeEnv); err2 != nil { + log.Error("failed to send error stream_close from client", logging.Fields{ + "stream_id": streamID, + "error": err2.Error(), + }) + } + return + } + req.Header = httpHeader + if len(body) > 0 { + req.ContentLength = int64(len(body)) + } + + start := time.Now() + logReq := log.With(logging.Fields{ + "component": "grpc_client_proxy", + "stream_id": streamID, + "service": so.ServiceName, + "method": method, + "url": urlStr, + "local_target": finalCfg.LocalTarget, + }) + logReq.Info("forwarding stream http request to local target", nil) + + res, err := httpClient.Do(req) + if err != nil { + errMsg := fmt.Sprintf("perform local http request: %v", err) + logReq.Error("local http request failed", logging.Fields{ + "error": err.Error(), + }) + + respHeader := map[string]*protocolpb.HeaderValues{ + "Content-Type": { + Values: []string{"text/plain; charset=utf-8"}, + }, + protocol.HeaderKeyStatus: { + Values: []string{strconv.Itoa(http.StatusBadGateway)}, + }, + } + respOpen := &protocolpb.Envelope{ + Payload: &protocolpb.Envelope_StreamOpen{ + StreamOpen: &protocolpb.StreamOpen{ + Id: streamID, + ServiceName: so.ServiceName, + TargetAddr: so.TargetAddr, + Header: respHeader, + }, + }, + } + if err2 := sendEnv(respOpen); err2 != nil { + logReq.Error("failed to send error stream_open from client", logging.Fields{ + "error": err2.Error(), + }) + return + } + + dataEnv := &protocolpb.Envelope{ + Payload: &protocolpb.Envelope_StreamData{ + StreamData: &protocolpb.StreamData{ + Id: streamID, + Seq: 0, + Data: []byte("HopGate client: " + errMsg), + }, + }, + } + if err2 := sendEnv(dataEnv); err2 != nil { + logReq.Error("failed to send error stream_data from client", logging.Fields{ + "error": err2.Error(), + }) + return + } + + closeEnv := &protocolpb.Envelope{ + Payload: &protocolpb.Envelope_StreamClose{ + StreamClose: &protocolpb.StreamClose{ + Id: streamID, + Error: errMsg, + }, + }, + } + if err2 := sendEnv(closeEnv); err2 != nil { + logReq.Error("failed to send error stream_close from client", logging.Fields{ + "error": err2.Error(), + }) + } + return + } + defer res.Body.Close() + + // 응답 헤더 맵을 복사하고 상태 코드를 pseudo-header 로 추가합니다. (ko) + // Copy response headers and attach status code as a pseudo-header. (en) + respHeader := make(map[string]*protocolpb.HeaderValues, len(res.Header)+1) + for k, vs := range res.Header { + hv := &protocolpb.HeaderValues{ + Values: append([]string(nil), vs...), + } + respHeader[k] = hv + } + statusCode := res.StatusCode + if statusCode == 0 { + statusCode = http.StatusOK + } + respHeader[protocol.HeaderKeyStatus] = &protocolpb.HeaderValues{ + Values: []string{strconv.Itoa(statusCode)}, + } + + respOpen := &protocolpb.Envelope{ + Payload: &protocolpb.Envelope_StreamOpen{ + StreamOpen: &protocolpb.StreamOpen{ + Id: streamID, + ServiceName: so.ServiceName, + TargetAddr: so.TargetAddr, + Header: respHeader, + }, + }, + } + if err := sendEnv(respOpen); err != nil { + logReq.Error("failed to send stream response open envelope from client", logging.Fields{ + "error": err.Error(), + }) + return + } + + // 응답 바디를 4KiB(StreamChunkSize) 단위로 잘라 StreamData 프레임으로 전송합니다. (ko) + // Chunk the response body into 4KiB (StreamChunkSize) StreamData frames. (en) + buf := make([]byte, protocol.StreamChunkSize) + var seq uint64 + for { + n, err := res.Body.Read(buf) + if n > 0 { + dataCopy := append([]byte(nil), buf[:n]...) + dataEnv := &protocolpb.Envelope{ + Payload: &protocolpb.Envelope_StreamData{ + StreamData: &protocolpb.StreamData{ + Id: streamID, + Seq: seq, + Data: dataCopy, + }, + }, + } + if err2 := sendEnv(dataEnv); err2 != nil { + logReq.Error("failed to send stream response data envelope from client", logging.Fields{ + "error": err2.Error(), + }) + return + } + seq++ + } + if err == io.EOF { + break + } + if err != nil { + logReq.Error("failed to read local http response body", logging.Fields{ + "error": err.Error(), + }) + break + } + } + + closeEnv := &protocolpb.Envelope{ + Payload: &protocolpb.Envelope_StreamClose{ + StreamClose: &protocolpb.StreamClose{ + Id: streamID, + Error: "", + }, + }, + } + if err := sendEnv(closeEnv); err != nil { + logReq.Error("failed to send stream response close envelope from client", logging.Fields{ + "error": err.Error(), + }) + return + } + + logReq.Info("stream http response sent from client", logging.Fields{ + "status": statusCode, + "elapsed_ms": time.Since(start).Milliseconds(), + "error": "", + }) + }() + } + + // 수신 루프: 서버에서 들어오는 StreamOpen/StreamData/StreamClose 를 + // 로컬 HTTP 요청으로 변환하고 응답을 다시 터널로 전송합니다. (ko) + // Receive loop: convert incoming StreamOpen/StreamData/StreamClose into local + // HTTP requests and send responses back over the tunnel. (en) for { if ctx.Err() != nil { log.Info("context cancelled, closing grpc tunnel client", logging.Fields{ @@ -169,27 +581,114 @@ func runGRPCTunnelClient(ctx context.Context, logger logging.Logger, finalCfg *c } payloadType := "unknown" - switch in.Payload.(type) { + switch payload := in.Payload.(type) { case *protocolpb.Envelope_HttpRequest: payloadType = "http_request" case *protocolpb.Envelope_HttpResponse: payloadType = "http_response" case *protocolpb.Envelope_StreamOpen: payloadType = "stream_open" + + so := payload.StreamOpen + if so == nil { + log.Error("received stream_open with nil payload on grpc tunnel client", logging.Fields{}) + continue + } + streamID := strings.TrimSpace(so.Id) + if streamID == "" { + log.Error("received stream_open with empty stream id on grpc tunnel client", logging.Fields{}) + continue + } + + streamsMu.Lock() + if _, exists := streams[streamID]; exists { + log.Error("received duplicate stream_open for existing stream on grpc tunnel client", logging.Fields{ + "stream_id": streamID, + }) + streamsMu.Unlock() + continue + } + streams[streamID] = &inboundStream{open: so} + streamsMu.Unlock() + case *protocolpb.Envelope_StreamData: payloadType = "stream_data" + + sd := payload.StreamData + if sd == nil { + log.Error("received stream_data with nil payload on grpc tunnel client", logging.Fields{}) + continue + } + streamID := strings.TrimSpace(sd.Id) + if streamID == "" { + log.Error("received stream_data with empty stream id on grpc tunnel client", logging.Fields{}) + continue + } + + streamsMu.Lock() + st := streams[streamID] + streamsMu.Unlock() + if st == nil { + log.Warn("received stream_data for unknown stream on grpc tunnel client", logging.Fields{ + "stream_id": streamID, + }) + continue + } + if len(sd.Data) > 0 { + if _, err := st.body.Write(sd.Data); err != nil { + log.Error("failed to buffer stream_data body on grpc tunnel client", logging.Fields{ + "stream_id": streamID, + "error": err.Error(), + }) + } + } + case *protocolpb.Envelope_StreamClose: payloadType = "stream_close" + + sc := payload.StreamClose + if sc == nil { + log.Error("received stream_close with nil payload on grpc tunnel client", logging.Fields{}) + continue + } + streamID := strings.TrimSpace(sc.Id) + if streamID == "" { + log.Error("received stream_close with empty stream id on grpc tunnel client", logging.Fields{}) + continue + } + + streamsMu.Lock() + st := streams[streamID] + if st != nil { + delete(streams, streamID) + } + streamsMu.Unlock() + if st == nil { + log.Warn("received stream_close for unknown stream on grpc tunnel client", logging.Fields{ + "stream_id": streamID, + }) + continue + } + + // 현재까지 수신한 메타데이터/바디를 사용해 로컬 HTTP 요청을 수행하고, + // 응답을 다시 터널로 전송합니다. (ko) + // Use the accumulated metadata/body to perform the local HTTP request and + // send the response back over the tunnel. (en) + bodyCopy := append([]byte(nil), st.body.Bytes()...) + handleStream(st.open, bodyCopy) + case *protocolpb.Envelope_StreamAck: payloadType = "stream_ack" + // 현재 gRPC 터널에서는 StreamAck 를 사용하지 않습니다. (ko) + // StreamAck is currently unused for gRPC tunnels. (en) + + default: + payloadType = fmt.Sprintf("unknown(%T)", in.Payload) } log.Info("received envelope on grpc tunnel client", logging.Fields{ "payload_type": payloadType, }) - - // 이후 단계에서 여기서 HTTP 프록시와의 연동(요청/응답 처리)을 구현할 예정입니다. (ko) - // Future 3.3 work will hook HTTP proxy logic here. (en) } }