diff --git a/cmd/server/main.go b/cmd/server/main.go index 81e98e5..bb74c30 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -20,7 +20,9 @@ import ( "github.com/prometheus/client_golang/prometheus/promhttp" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/peer" + "google.golang.org/grpc/status" "github.com/dalbodeule/hop-gate/internal/acme" "github.com/dalbodeule/hop-gate/internal/admin" @@ -793,11 +795,439 @@ func firstHeaderValue(hdr map[string][]string, key, def string) string { return def } +// firstHeaderValueFromPB 는 map[string]*HeaderValues 형태의 헤더에서 첫 번째 값을 반환하고, +// 값이 없으면 기본값을 반환합니다. (ko) +// firstHeaderValueFromPB returns the first value for a header key in +// map[string]*protocolpb.HeaderValues, or the provided default if the key is +// missing or empty. (en) +func firstHeaderValueFromPB(hdr map[string]*protocolpb.HeaderValues, key, def string) string { + if hdr == nil { + return def + } + if hv, ok := hdr[key]; ok && hv != nil && len(hv.Values) > 0 { + return hv.Values[0] + } + return def +} + +// newGRPCTunnelSession 는 단일 OpenTunnel bi-di 스트림에 대한 gRPC 터널 세션을 생성합니다. (ko) +// newGRPCTunnelSession constructs a grpcTunnelSession for a single OpenTunnel +// bi-directional stream. (en) +func newGRPCTunnelSession(stream protocolpb.HopGateTunnel_OpenTunnelServer, logger logging.Logger) *grpcTunnelSession { + if logger == nil { + logger = logging.NewStdJSONLogger("grpc_tunnel_session") + } + return &grpcTunnelSession{ + stream: stream, + logger: logger, + pending: make(map[string]*grpcPendingRequest), + readerDone: make(chan struct{}), + } +} + +func (t *grpcTunnelSession) send(env *protocolpb.Envelope) error { + t.sendMu.Lock() + defer t.sendMu.Unlock() + return t.stream.Send(env) +} + +func (t *grpcTunnelSession) nextHTTPStreamID() string { + t.mu.Lock() + id := t.nextStreamID + t.nextStreamID++ + t.mu.Unlock() + return fmt.Sprintf("http-%d", id) +} + +// recvLoop 는 OpenTunnel gRPC 스트림에서 Envelope 를 지속적으로 읽어 +// HTTP 요청별 pending 테이블로 전달합니다. (ko) +// recvLoop continuously reads Envelope messages from the OpenTunnel gRPC stream +// and dispatches them to per-request pending tables. (en) +func (t *grpcTunnelSession) recvLoop() error { + defer close(t.readerDone) + + for { + env, err := t.stream.Recv() + if err != nil { + if err == io.EOF { + t.logger.Info("grpc tunnel session closed by client", nil) + return nil + } + t.logger.Error("grpc tunnel receive error", logging.Fields{ + "error": err.Error(), + }) + return err + } + + var streamID string + switch payload := env.Payload.(type) { + case *protocolpb.Envelope_StreamOpen: + if payload.StreamOpen != nil { + streamID = payload.StreamOpen.Id + } + case *protocolpb.Envelope_StreamData: + if payload.StreamData != nil { + streamID = payload.StreamData.Id + } + case *protocolpb.Envelope_StreamClose: + if payload.StreamClose != nil { + streamID = payload.StreamClose.Id + } + case *protocolpb.Envelope_StreamAck: + // StreamAck 는 gRPC 터널에서는 사용하지 않습니다. HTTP/2 가 신뢰성/순서를 보장합니다. (ko) + // StreamAck is currently unused for gRPC tunnels; HTTP/2 already + // guarantees reliable, ordered delivery. (en) + continue + default: + t.logger.Warn("received unsupported envelope payload on grpc tunnel session", logging.Fields{ + "payload_type": fmt.Sprintf("%T", env.Payload), + }) + continue + } + + if streamID == "" { + t.logger.Warn("received envelope with empty stream id on grpc tunnel session", logging.Fields{}) + continue + } + + t.mu.Lock() + pending := t.pending[streamID] + t.mu.Unlock() + + if pending == nil { + t.logger.Warn("received envelope for unknown stream id on grpc tunnel session", logging.Fields{ + "stream_id": streamID, + }) + continue + } + + select { + case pending.respCh <- env: + case <-pending.doneCh: + t.logger.Warn("pending grpc tunnel request already closed", logging.Fields{ + "stream_id": streamID, + }) + default: + t.logger.Warn("grpc tunnel response channel buffer full, dropping frame", logging.Fields{ + "stream_id": streamID, + }) + } + } +} + +// ForwardHTTP 는 HTTP 요청을 gRPC 터널 위의 StreamOpen/StreamData/StreamClose 프레임으로 전송하고, +// 역방향 스트림 응답을 수신해 protocol.Response 로 반환합니다. (ko) +// ForwardHTTP forwards an HTTP request over the gRPC tunnel using +// StreamOpen/StreamData/StreamClose frames and reconstructs the reverse +// stream into a protocol.Response. (en) +func (t *grpcTunnelSession) ForwardHTTP(ctx context.Context, logger logging.Logger, req *http.Request, serviceName string) (*protocol.Response, error) { + if ctx == nil { + ctx = context.Background() + } + + // Generate a unique stream ID for this HTTP request. + streamID := t.nextHTTPStreamID() + + // Channel buffer size for response frames to avoid blocking recvLoop. + const responseChannelBuffer = 16 + + pending := &grpcPendingRequest{ + streamID: streamID, + respCh: make(chan *protocolpb.Envelope, responseChannelBuffer), + doneCh: make(chan struct{}), + } + + t.mu.Lock() + if t.pending == nil { + t.pending = make(map[string]*grpcPendingRequest) + } + t.pending[streamID] = pending + t.mu.Unlock() + + // Ensure cleanup on exit. + defer func() { + t.mu.Lock() + delete(t.pending, streamID) + t.mu.Unlock() + close(pending.doneCh) + }() + + log := logger.With(logging.Fields{ + "component": "http_to_tunnel", + "request_id": streamID, + "method": req.Method, + "url": req.URL.String(), + }) + + log.Info("forwarding http request over grpc tunnel", logging.Fields{ + "host": req.Host, + "scheme": req.URL.Scheme, + }) + + // Build request headers and pseudo-headers. + hdr := make(map[string]*protocolpb.HeaderValues, len(req.Header)+3) + addHeaderValues := func(key string, values []string) { + if len(values) == 0 { + return + } + hv, ok := hdr[key] + if !ok || hv == nil { + hv = &protocolpb.HeaderValues{} + hdr[key] = hv + } + hv.Values = append(hv.Values, values...) + } + + for k, vs := range req.Header { + addHeaderValues(k, vs) + } + addHeaderValues(protocol.HeaderKeyMethod, []string{req.Method}) + if req.URL != nil { + addHeaderValues(protocol.HeaderKeyURL, []string{req.URL.String()}) + } + host := req.Host + if host == "" && req.URL != nil { + host = req.URL.Host + } + if host != "" { + addHeaderValues(protocol.HeaderKeyHost, []string{host}) + } + + // Send StreamOpen specifying the logical service and headers. + open := &protocolpb.StreamOpen{ + Id: streamID, + ServiceName: serviceName, + TargetAddr: "", + Header: hdr, + } + openEnv := &protocolpb.Envelope{ + Payload: &protocolpb.Envelope_StreamOpen{StreamOpen: open}, + } + if err := t.send(openEnv); err != nil { + log.Error("failed to send stream_open on grpc tunnel", logging.Fields{ + "error": err.Error(), + }) + return nil, err + } + + // Send request body as StreamData frames. + var seq uint64 + if req.Body != nil { + buf := make([]byte, protocol.StreamChunkSize) + for { + n, err := req.Body.Read(buf) + if n > 0 { + dataCopy := append([]byte(nil), buf[:n]...) + dataEnv := &protocolpb.Envelope{ + Payload: &protocolpb.Envelope_StreamData{ + StreamData: &protocolpb.StreamData{ + Id: streamID, + Seq: seq, + Data: dataCopy, + }, + }, + } + if err2 := t.send(dataEnv); err2 != nil { + log.Error("failed to send stream_data on grpc tunnel", logging.Fields{ + "error": err2.Error(), + }) + return nil, err2 + } + seq++ + } + if err == io.EOF { + break + } + if err != nil { + return nil, fmt.Errorf("read http request body for streaming: %w", err) + } + } + } + + // Send StreamClose to mark the end of the request body. + closeEnv := &protocolpb.Envelope{ + Payload: &protocolpb.Envelope_StreamClose{ + StreamClose: &protocolpb.StreamClose{ + Id: streamID, + Error: "", + }, + }, + } + if err := t.send(closeEnv); err != nil { + log.Error("failed to send request stream_close on grpc tunnel", logging.Fields{ + "error": err.Error(), + }) + return nil, err + } + + // Receive reverse stream response (StreamOpen + StreamData* + StreamClose). + var ( + resp protocol.Response + bodyBuf bytes.Buffer + gotOpen bool + statusCode = http.StatusOK + ) + + resp.RequestID = streamID + resp.Header = make(map[string][]string) + + for { + select { + case <-ctx.Done(): + log.Error("context cancelled while waiting for response", logging.Fields{ + "error": ctx.Err().Error(), + }) + return nil, ctx.Err() + + case <-t.readerDone: + log.Error("grpc tunnel closed while waiting for response", nil) + return nil, fmt.Errorf("grpc tunnel closed") + + case env, ok := <-pending.respCh: + if !ok { + log.Error("grpc tunnel response channel closed unexpectedly", nil) + return nil, fmt.Errorf("grpc tunnel response channel closed") + } + + switch payload := env.Payload.(type) { + case *protocolpb.Envelope_StreamOpen: + so := payload.StreamOpen + if so == nil { + return nil, fmt.Errorf("stream_open response payload is nil") + } + statusStr := firstHeaderValueFromPB(so.Header, protocol.HeaderKeyStatus, strconv.Itoa(http.StatusOK)) + if sc, err := strconv.Atoi(statusStr); err == nil && sc > 0 { + statusCode = sc + } + for k, hv := range so.Header { + if k == protocol.HeaderKeyMethod || + k == protocol.HeaderKeyURL || + k == protocol.HeaderKeyHost || + k == protocol.HeaderKeyStatus { + continue + } + if hv == nil || len(hv.Values) == 0 { + continue + } + resp.Header[k] = append([]string(nil), hv.Values...) + } + gotOpen = true + + case *protocolpb.Envelope_StreamData: + sd := payload.StreamData + if sd == nil { + return nil, fmt.Errorf("stream_data response payload is nil") + } + if len(sd.Data) > 0 { + if _, err := bodyBuf.Write(sd.Data); err != nil { + return nil, fmt.Errorf("buffer stream_data response: %w", err) + } + } + + case *protocolpb.Envelope_StreamClose: + sc := payload.StreamClose + if sc == nil { + return nil, fmt.Errorf("stream_close response payload is nil") + } + // Complete the protocol.Response using collected headers/body. (en) + resp.Status = statusCode + resp.Body = bodyBuf.Bytes() + resp.Error = sc.Error + + log.Info("received stream http response over grpc tunnel", logging.Fields{ + "status": resp.Status, + "error": resp.Error, + }) + if !gotOpen { + return nil, fmt.Errorf("received stream_close without prior stream_open for stream %q", streamID) + } + return &resp, nil + + default: + return nil, fmt.Errorf("unexpected envelope payload type %T in stream response", env.Payload) + } + } + } +} + var ( sessionsMu sync.RWMutex sessionsByDomain = make(map[string]*dtlsSessionWrapper) ) +// grpcPendingRequest tracks a single HTTP request waiting for its response on a gRPC tunnel. (en) +type grpcPendingRequest struct { + streamID string + respCh chan *protocolpb.Envelope + doneCh chan struct{} +} + +// grpcTunnelSession represents a single long-lived gRPC tunnel (OpenTunnel stream) +// that can multiplex multiple HTTP requests by StreamID. (en) +type grpcTunnelSession struct { + stream protocolpb.HopGateTunnel_OpenTunnelServer + logger logging.Logger + + mu sync.Mutex + nextStreamID uint64 + pending map[string]*grpcPendingRequest + + readerDone chan struct{} + + sendMu sync.Mutex +} + +var ( + tunnelsMu sync.RWMutex + tunnelsByDomain = make(map[string]*grpcTunnelSession) +) + +func registerTunnelForDomain(domain string, sess *grpcTunnelSession, logger logging.Logger) string { + d := strings.ToLower(strings.TrimSpace(domain)) + if d == "" || sess == nil { + return "" + } + tunnelsMu.Lock() + tunnelsByDomain[d] = sess + tunnelsMu.Unlock() + + logger.Info("registered grpc tunnel for domain", logging.Fields{ + "domain": d, + }) + return d +} + +func unregisterTunnelForDomain(domain string, sess *grpcTunnelSession, logger logging.Logger) { + d := strings.ToLower(strings.TrimSpace(domain)) + if d == "" || sess == nil { + return + } + tunnelsMu.Lock() + cur := tunnelsByDomain[d] + if cur == sess { + delete(tunnelsByDomain, d) + } + tunnelsMu.Unlock() + + logger.Info("unregistered grpc tunnel for domain", logging.Fields{ + "domain": d, + }) +} + +func getTunnelForHost(host string) *grpcTunnelSession { + h := host + if i := strings.Index(h, ":"); i != -1 { + h = h[:i] + } + h = strings.ToLower(strings.TrimSpace(h)) + if h == "" { + return nil + } + tunnelsMu.RLock() + defer tunnelsMu.RUnlock() + return tunnelsByDomain[h] +} + // statusRecorder 는 HTTP 응답 상태 코드를 캡처하기 위한 래퍼입니다. // Prometheus 메트릭에서 status 라벨을 기록하는 데 사용합니다. // statusRecorder 는 HTTP 응답 상태 코드를 캡처하기 위한 래퍼입니다. @@ -817,23 +1247,30 @@ func (w *statusRecorder) WriteHeader(code int) { type grpcTunnelServer struct { protocolpb.UnimplementedHopGateTunnelServer - logger logging.Logger + logger logging.Logger + validator dtls.DomainValidator } // newGRPCTunnelServer 는 gRPC 터널 서버 구현체를 생성합니다. (ko) // newGRPCTunnelServer constructs a new gRPC tunnel server implementation. (en) -func newGRPCTunnelServer(logger logging.Logger) *grpcTunnelServer { +func newGRPCTunnelServer(logger logging.Logger, validator dtls.DomainValidator) *grpcTunnelServer { + baseLogger := logger + if baseLogger == nil { + baseLogger = logging.NewStdJSONLogger("grpc_tunnel") + } return &grpcTunnelServer{ - logger: logger.With(logging.Fields{ + logger: baseLogger.With(logging.Fields{ "component": "grpc_tunnel", }), + validator: validator, } } // OpenTunnel 은 클라이언트와 서버 간 장기 유지 bi-directional gRPC 스트림을 처리합니다. (ko) // OpenTunnel handles the long-lived bi-directional gRPC stream between the -// server and a HopGate client. At this stage, it only logs incoming envelopes -// and does not yet integrate with the HTTP proxy layer. (en) +// server and a HopGate client. It performs an initial control-stream +// handshake (domain/API key validation), registers the tunnel for the +// authenticated domain, and runs a central receive loop for HTTP streams. (en) func (s *grpcTunnelServer) OpenTunnel(stream protocolpb.HopGateTunnel_OpenTunnelServer) error { ctx := stream.Context() @@ -848,44 +1285,73 @@ func (s *grpcTunnelServer) OpenTunnel(stream protocolpb.HopGateTunnel_OpenTunnel log.Info("grpc tunnel opened", nil) defer log.Info("grpc tunnel closed", nil) - for { - env, err := stream.Recv() - if err != nil { - if err == io.EOF { - // 클라이언트가 정상적으로 스트림을 종료한 경우. (ko) - // Client closed the stream normally. (en) - return nil - } - log.Error("grpc tunnel receive error", logging.Fields{ - "error": err.Error(), - }) - return err + // 1) 초기 control StreamOpen(id="control-0") 을 수신하여 핸드셰이크를 수행합니다. (ko) + // 1) Receive initial control StreamOpen (id="control-0") and perform handshake. (en) + env, err := stream.Recv() + if err != nil { + if err == io.EOF { + log.Warn("grpc tunnel closed before sending control stream_open", nil) + return status.Error(codes.InvalidArgument, "missing initial control stream_open") } - - // 현재 단계에서는 수신된 Envelope 의 payload 타입만 로그에 남기고, - // 실제 HTTP 프록시 연동은 후속 3.3 작업에서 구현합니다. (ko) - // At this stage we only log the envelope payload type; HTTP proxy - // integration will be implemented in later 3.3 steps. (en) - payloadType := "unknown" - switch env.Payload.(type) { - case *protocolpb.Envelope_HttpRequest: - payloadType = "http_request" - case *protocolpb.Envelope_HttpResponse: - payloadType = "http_response" - case *protocolpb.Envelope_StreamOpen: - payloadType = "stream_open" - case *protocolpb.Envelope_StreamData: - payloadType = "stream_data" - case *protocolpb.Envelope_StreamClose: - payloadType = "stream_close" - case *protocolpb.Envelope_StreamAck: - payloadType = "stream_ack" - } - - log.Info("received envelope on grpc tunnel", logging.Fields{ - "payload_type": payloadType, + log.Error("failed to receive initial control stream_open", logging.Fields{ + "error": err.Error(), }) + return err } + + soPayload, ok := env.Payload.(*protocolpb.Envelope_StreamOpen) + if !ok || soPayload.StreamOpen == nil { + log.Error("first envelope on grpc tunnel is not stream_open", logging.Fields{ + "payload_type": fmt.Sprintf("%T", env.Payload), + }) + return status.Error(codes.InvalidArgument, "first envelope on tunnel must be control stream_open") + } + + control := soPayload.StreamOpen + controlID := strings.TrimSpace(control.Id) + + headers := control.Header + domain := firstHeaderValueFromPB(headers, "X-HopGate-Domain", "") + apiKey := firstHeaderValueFromPB(headers, "X-HopGate-API-Key", "") + localTarget := firstHeaderValueFromPB(headers, "X-HopGate-Local-Target", "") + + if domain == "" || apiKey == "" { + log.Warn("grpc tunnel control stream missing domain or api key", logging.Fields{ + "control_id": controlID, + }) + return status.Error(codes.Unauthenticated, "missing domain or api key on control stream_open") + } + + // Validate (domain, api_key) using the shared domain validator. + if s.validator != nil { + if err := s.validator.ValidateDomainAPIKey(ctx, domain, apiKey); err != nil { + log.Warn("grpc tunnel domain/api_key validation failed", logging.Fields{ + "domain": domain, + "error": err.Error(), + }) + return status.Error(codes.PermissionDenied, "invalid domain or api key") + } + } + + log.Info("grpc tunnel handshake succeeded", logging.Fields{ + "domain": domain, + "local_target": localTarget, + "control_id": controlID, + }) + + // Register this tunnel session for the authenticated domain. + sessionLogger := s.logger.With(logging.Fields{ + "domain": domain, + }) + tunnel := newGRPCTunnelSession(stream, sessionLogger) + normalizedDomain := registerTunnelForDomain(domain, tunnel, s.logger) + defer unregisterTunnelForDomain(normalizedDomain, tunnel, s.logger) + + // 2) 이후 수신되는 StreamOpen/StreamData/StreamClose 는 grpcTunnelSession.recvLoop 에서 + // HTTP 요청별로 demux 됩니다. (ko) + // 2) Subsequent StreamOpen/StreamData/StreamClose frames are demultiplexed per + // HTTP request by grpcTunnelSession.recvLoop. (en) + return tunnel.recvLoop() } // hopGateOwnedHeaders 는 HopGate 서버가 스스로 관리하는 응답 헤더 목록입니다. (ko) @@ -974,10 +1440,6 @@ func hostDomainHandler(allowedDomain string, logger logging.Logger, next http.Ha // on a single HTTPS port, based on Content-Type and protocol (HTTP/2). (en) func grpcOrHTTPHandler(grpcServer *grpc.Server, httpHandler http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - // 임시 디버그 로그: gRPC 라우팅 조건이 어떻게 보이는지 확인합니다. (ko) - // Temporary debug log to inspect gRPC routing conditions. (en) - fmt.Printf("grpcOrHTTPHandler debug: proto=%q protoMajor=%d contentType=%q host=%q path=%q\n", - r.Proto, r.ProtoMajor, r.Header.Get("Content-Type"), r.Host, r.URL.Path) // gRPC 요청은 HTTP/2 + Content-Type: application/grpc 조합으로 들어옵니다. (ko) // gRPC requests arrive as HTTP/2 with Content-Type: application/grpc. (en) @@ -1133,8 +1595,10 @@ func newHTTPHandler(logger logging.Logger, proxyTimeout time.Duration) http.Hand return } - // 2. 일반 HTTP 요청은 DTLS 를 통해 클라이언트로 포워딩 - // 간단한 서비스 이름 결정: 우선 "web" 고정, 추후 Router 도입 시 개선. + // 2. 일반 HTTP 요청은 활성 gRPC 터널을 통해 클라이언트로 포워딩합니다. (ko) + // 2. Regular HTTP requests are forwarded to clients over active gRPC tunnels. (en) + // 간단한 서비스 이름 결정: 우선 "web" 고정, 추후 Router 도입 시 개선. (ko) + // For now, use a fixed logical service name "web"; this can be improved with a Router later. (en) serviceName := "web" // Host 헤더에서 포트를 제거하고 소문자로 정규화합니다. @@ -1157,14 +1621,14 @@ func newHTTPHandler(logger logging.Logger, proxyTimeout time.Duration) http.Hand return } - sessWrapper := getSessionForHost(hostLower) - if sessWrapper == nil { - log.Warn("no dtls session for host", logging.Fields{ + tunnel := getTunnelForHost(hostLower) + if tunnel == nil { + log.Warn("no tunnel for host", logging.Fields{ "host": r.Host, }) - observability.ProxyErrorsTotal.WithLabelValues("no_dtls_session").Inc() - // 등록되지 않았거나 활성 세션이 없는 도메인으로의 요청은 404 로 응답합니다. (ko) - // Requests for hosts without an active DTLS session return 404. (en) + observability.ProxyErrorsTotal.WithLabelValues("no_tunnel_session").Inc() + // 등록되지 않았거나 활성 터널이 없는 도메인으로의 요청은 404 로 응답합니다. (ko) + // Requests for hosts without an active tunnel return 404. (en) writeErrorPage(sr, r, http.StatusNotFound) return } @@ -1192,14 +1656,15 @@ func newHTTPHandler(logger logging.Logger, proxyTimeout time.Duration) http.Hand } } - // r.Body 는 ForwardHTTP 내에서 읽고 닫지 않으므로 여기서 닫기 + // r.Body 는 ForwardHTTP 내에서 읽고 닫지 않으므로 여기서 닫기 (ko) + // r.Body is consumed inside ForwardHTTP; ensure it is closed here. (en) defer r.Body.Close() - // 서버 측에서 DTLS → 클라이언트 → 로컬 서비스까지의 전체 왕복 시간을 제한하기 위해 + // 서버 측에서 gRPC 터널 → 클라이언트 → 로컬 서비스까지의 전체 왕복 시간을 제한하기 위해 // 요청 컨텍스트에 타임아웃을 적용합니다. 기본값은 15초이며, // HOP_SERVER_PROXY_TIMEOUT_SECONDS 로 재정의할 수 있습니다. (ko) // Apply an overall timeout (default 15s, configurable via - // HOP_SERVER_PROXY_TIMEOUT_SECONDS) to the DTLS forward path so that + // HOP_SERVER_PROXY_TIMEOUT_SECONDS) to the tunnel forward path so that // excessively slow backends surface as gateway timeouts. (en) ctx := r.Context() if proxyTimeout > 0 { @@ -1220,7 +1685,7 @@ func newHTTPHandler(logger logging.Logger, proxyTimeout time.Duration) http.Hand // Context cancelled, do not proceed. return default: - resp, err := sessWrapper.ForwardHTTP(ctx, logger, r, serviceName) + resp, err := tunnel.ForwardHTTP(ctx, logger, r, serviceName) resultCh <- forwardResult{resp: resp, err: err} } }() @@ -1229,20 +1694,20 @@ func newHTTPHandler(logger logging.Logger, proxyTimeout time.Duration) http.Hand select { case <-ctx.Done(): - log.Error("forward over dtls timed out", logging.Fields{ + log.Error("forward over tunnel timed out", logging.Fields{ "timeout_seconds": int64(proxyTimeout.Seconds()), "error": ctx.Err().Error(), }) - observability.ProxyErrorsTotal.WithLabelValues("dtls_forward_timeout").Inc() + observability.ProxyErrorsTotal.WithLabelValues("tunnel_forward_timeout").Inc() writeErrorPage(sr, r, errorpages.StatusGatewayTimeout) return case res := <-resultCh: if res.err != nil { - log.Error("forward over dtls failed", logging.Fields{ + log.Error("forward over tunnel failed", logging.Fields{ "error": res.err.Error(), }) - observability.ProxyErrorsTotal.WithLabelValues("dtls_forward_failed").Inc() + observability.ProxyErrorsTotal.WithLabelValues("tunnel_forward_failed").Inc() writeErrorPage(sr, r, errorpages.StatusTLSHandshakeFailed) return } @@ -1359,6 +1824,10 @@ func main() { }) } + // gRPC 터널 핸드셰이크에서 사용할 도메인 검증기 구성. (ko) + // Construct domain validator to be used by the gRPC tunnel handshake. (en) + domainValidator := admin.NewEntDomainValidator(logger, dbClient) + // 3. TLS 설정: ACME(lego)로 인증서를 관리하고, Debug 모드에서는 DTLS에는 self-signed 를 사용하되 // ACME 는 항상 시도하되 Staging 모드로 동작하도록 합니다. // 3. TLS setup: manage certificates via ACME (lego); in debug mode DTLS uses self-signed @@ -1552,7 +2021,7 @@ func main() { // gRPC server for client tunnels (OpenTunnel). (en) // 클라이언트 터널(OpenTunnel)을 처리하는 gRPC 서버 인스턴스를 생성합니다. (ko) grpcSrv := grpc.NewServer() - protocolpb.RegisterHopGateTunnelServer(grpcSrv, newGRPCTunnelServer(logger)) + protocolpb.RegisterHopGateTunnelServer(grpcSrv, newGRPCTunnelServer(logger, domainValidator)) // HTTP: 평문 포트 httpSrv := &http.Server{ @@ -1593,10 +2062,6 @@ func main() { } }() - // 6. 도메인 검증기 준비 (향후 gRPC 터널 핸드셰이크에서 사용 예정). (ko) - // Prepare domain validator (to be used in future gRPC tunnel handshakes). (en) - _ = admin.NewEntDomainValidator(logger, dbClient) - // DTLS 레이어 제거 이후에는 gRPC 및 HTTP/HTTPS 서버 goroutine 만 유지합니다. (ko) // After removing the DTLS layer, only the gRPC and HTTP/HTTPS servers are kept running. (en) select {} diff --git a/internal/errorpages/assets/favicon.png b/internal/errorpages/assets/favicon.png new file mode 100644 index 0000000..e395e77 Binary files /dev/null and b/internal/errorpages/assets/favicon.png differ diff --git a/internal/errorpages/templates/400.html b/internal/errorpages/templates/400.html index d4d4fd7..e4e1b3a 100644 --- a/internal/errorpages/templates/400.html +++ b/internal/errorpages/templates/400.html @@ -6,6 +6,7 @@ +