mirror of
https://github.com/dalbodeule/hop-gate.git
synced 2026-02-04 15:52:24 +09:00
[feat](client): add local HTTP proxying for gRPC-based tunnels
- Enhanced gRPC client with logic to forward incoming tunnel streams as HTTP requests to a local target. - Implemented per-stream state management for matching StreamOpen/StreamData/StreamClose to HTTP requests/responses. - Added mechanisms to assemble HTTP requests, send them locally, and respond via tunnel streams. - Introduced a configurable HTTP client with proper headers and connection settings for robust forwarding.
This commit is contained in:
@@ -1,20 +1,28 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
"crypto/x509"
|
"crypto/x509"
|
||||||
"flag"
|
"flag"
|
||||||
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"net"
|
"net"
|
||||||
|
"net/http"
|
||||||
|
"net/url"
|
||||||
"os"
|
"os"
|
||||||
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
"google.golang.org/grpc/credentials"
|
"google.golang.org/grpc/credentials"
|
||||||
|
|
||||||
"github.com/dalbodeule/hop-gate/internal/config"
|
"github.com/dalbodeule/hop-gate/internal/config"
|
||||||
"github.com/dalbodeule/hop-gate/internal/logging"
|
"github.com/dalbodeule/hop-gate/internal/logging"
|
||||||
|
"github.com/dalbodeule/hop-gate/internal/protocol"
|
||||||
protocolpb "github.com/dalbodeule/hop-gate/internal/protocol/pb"
|
protocolpb "github.com/dalbodeule/hop-gate/internal/protocol/pb"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -146,8 +154,412 @@ func runGRPCTunnelClient(ctx context.Context, logger logging.Logger, finalCfg *c
|
|||||||
"api_key_mask": maskAPIKey(finalCfg.ClientAPIKey),
|
"api_key_mask": maskAPIKey(finalCfg.ClientAPIKey),
|
||||||
})
|
})
|
||||||
|
|
||||||
// 수신 루프: 현재는 수신된 Envelope 의 타입만 로그에 남기고 종료하지 않습니다. (ko)
|
// 로컬 HTTP 프록시용 HTTP 클라이언트 구성. (ko)
|
||||||
// Receive loop: currently only logs envelope payload types and keeps the tunnel open. (en)
|
// HTTP client used to forward requests to the local target. (en)
|
||||||
|
httpClient := &http.Client{
|
||||||
|
Timeout: 30 * time.Second,
|
||||||
|
Transport: &http.Transport{
|
||||||
|
Proxy: http.ProxyFromEnvironment,
|
||||||
|
DialContext: (&net.Dialer{
|
||||||
|
Timeout: 10 * time.Second,
|
||||||
|
KeepAlive: 30 * time.Second,
|
||||||
|
}).DialContext,
|
||||||
|
ForceAttemptHTTP2: true,
|
||||||
|
MaxIdleConns: 100,
|
||||||
|
IdleConnTimeout: 90 * time.Second,
|
||||||
|
TLSHandshakeTimeout: 10 * time.Second,
|
||||||
|
ExpectContinueTimeout: 1 * time.Second,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// 서버→클라이언트 방향 StreamOpen/StreamData/StreamClose 를
|
||||||
|
// HTTP 요청 단위로 모으기 위한 per-stream 상태 테이블입니다. (ko)
|
||||||
|
// Per-stream state table to assemble HTTP requests from StreamOpen/Data/Close. (en)
|
||||||
|
type inboundStream struct {
|
||||||
|
open *protocolpb.StreamOpen
|
||||||
|
body bytes.Buffer
|
||||||
|
}
|
||||||
|
|
||||||
|
streams := make(map[string]*inboundStream)
|
||||||
|
var streamsMu sync.Mutex
|
||||||
|
|
||||||
|
// gRPC 스트림에 대한 Send 는 동시 호출이 안전하지 않으므로, sendMu 로 직렬화합니다. (ko)
|
||||||
|
// gRPC streaming Send is not safe for concurrent calls; protect with a mutex. (en)
|
||||||
|
var sendMu sync.Mutex
|
||||||
|
sendEnv := func(e *protocolpb.Envelope) error {
|
||||||
|
sendMu.Lock()
|
||||||
|
defer sendMu.Unlock()
|
||||||
|
return stream.Send(e)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 서버에서 전달된 StreamOpen/StreamData/StreamClose 를 로컬 HTTP 요청으로 변환하고,
|
||||||
|
// 응답을 StreamOpen/StreamData/StreamClose 로 다시 서버에 전송하는 헬퍼입니다. (ko)
|
||||||
|
// handleStream forwards a single logical HTTP request to the local target
|
||||||
|
// and sends the response back as StreamOpen/StreamData/StreamClose frames. (en)
|
||||||
|
handleStream := func(so *protocolpb.StreamOpen, body []byte) {
|
||||||
|
go func() {
|
||||||
|
streamID := strings.TrimSpace(so.Id)
|
||||||
|
if streamID == "" {
|
||||||
|
log.Error("inbound stream has empty id", logging.Fields{})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if finalCfg.LocalTarget == "" {
|
||||||
|
log.Error("local target is empty; cannot forward request", logging.Fields{
|
||||||
|
"stream_id": streamID,
|
||||||
|
})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Pseudo-headers 에서 메서드/URL/Host 추출. (ko)
|
||||||
|
// Extract method/URL/host from pseudo-headers. (en)
|
||||||
|
method := http.MethodGet
|
||||||
|
if hv, ok := so.Header[protocol.HeaderKeyMethod]; ok && hv != nil && len(hv.Values) > 0 && strings.TrimSpace(hv.Values[0]) != "" {
|
||||||
|
method = hv.Values[0]
|
||||||
|
}
|
||||||
|
urlStr := "/"
|
||||||
|
if hv, ok := so.Header[protocol.HeaderKeyURL]; ok && hv != nil && len(hv.Values) > 0 && strings.TrimSpace(hv.Values[0]) != "" {
|
||||||
|
urlStr = hv.Values[0]
|
||||||
|
}
|
||||||
|
|
||||||
|
u, err := url.Parse(urlStr)
|
||||||
|
if err != nil {
|
||||||
|
errMsg := fmt.Sprintf("parse url from stream_open: %v", err)
|
||||||
|
log.Error("failed to parse url from stream_open", logging.Fields{
|
||||||
|
"stream_id": streamID,
|
||||||
|
"error": err.Error(),
|
||||||
|
})
|
||||||
|
|
||||||
|
respHeader := map[string]*protocolpb.HeaderValues{
|
||||||
|
"Content-Type": {
|
||||||
|
Values: []string{"text/plain; charset=utf-8"},
|
||||||
|
},
|
||||||
|
protocol.HeaderKeyStatus: {
|
||||||
|
Values: []string{strconv.Itoa(http.StatusBadGateway)},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
respOpen := &protocolpb.Envelope{
|
||||||
|
Payload: &protocolpb.Envelope_StreamOpen{
|
||||||
|
StreamOpen: &protocolpb.StreamOpen{
|
||||||
|
Id: streamID,
|
||||||
|
ServiceName: so.ServiceName,
|
||||||
|
TargetAddr: so.TargetAddr,
|
||||||
|
Header: respHeader,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
if err2 := sendEnv(respOpen); err2 != nil {
|
||||||
|
log.Error("failed to send error stream_open from client", logging.Fields{
|
||||||
|
"stream_id": streamID,
|
||||||
|
"error": err2.Error(),
|
||||||
|
})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
dataEnv := &protocolpb.Envelope{
|
||||||
|
Payload: &protocolpb.Envelope_StreamData{
|
||||||
|
StreamData: &protocolpb.StreamData{
|
||||||
|
Id: streamID,
|
||||||
|
Seq: 0,
|
||||||
|
Data: []byte("HopGate client: " + errMsg),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
if err2 := sendEnv(dataEnv); err2 != nil {
|
||||||
|
log.Error("failed to send error stream_data from client", logging.Fields{
|
||||||
|
"stream_id": streamID,
|
||||||
|
"error": err2.Error(),
|
||||||
|
})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
closeEnv := &protocolpb.Envelope{
|
||||||
|
Payload: &protocolpb.Envelope_StreamClose{
|
||||||
|
StreamClose: &protocolpb.StreamClose{
|
||||||
|
Id: streamID,
|
||||||
|
Error: errMsg,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
if err2 := sendEnv(closeEnv); err2 != nil {
|
||||||
|
log.Error("failed to send error stream_close from client", logging.Fields{
|
||||||
|
"stream_id": streamID,
|
||||||
|
"error": err2.Error(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
u.Scheme = "http"
|
||||||
|
u.Host = finalCfg.LocalTarget
|
||||||
|
|
||||||
|
// 로컬 HTTP 요청용 헤더 구성 (pseudo-headers 제거). (ko)
|
||||||
|
// Build local HTTP headers, stripping pseudo-headers. (en)
|
||||||
|
httpHeader := make(http.Header, len(so.Header))
|
||||||
|
for k, hv := range so.Header {
|
||||||
|
if k == protocol.HeaderKeyMethod ||
|
||||||
|
k == protocol.HeaderKeyURL ||
|
||||||
|
k == protocol.HeaderKeyHost ||
|
||||||
|
k == protocol.HeaderKeyStatus {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if hv == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
for _, v := range hv.Values {
|
||||||
|
httpHeader.Add(k, v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var reqBody io.Reader
|
||||||
|
if len(body) > 0 {
|
||||||
|
reqBody = bytes.NewReader(body)
|
||||||
|
}
|
||||||
|
|
||||||
|
req, err := http.NewRequestWithContext(ctx, method, u.String(), reqBody)
|
||||||
|
if err != nil {
|
||||||
|
errMsg := fmt.Sprintf("create http request from stream: %v", err)
|
||||||
|
log.Error("failed to create local http request", logging.Fields{
|
||||||
|
"stream_id": streamID,
|
||||||
|
"error": err.Error(),
|
||||||
|
})
|
||||||
|
|
||||||
|
respHeader := map[string]*protocolpb.HeaderValues{
|
||||||
|
"Content-Type": {
|
||||||
|
Values: []string{"text/plain; charset=utf-8"},
|
||||||
|
},
|
||||||
|
protocol.HeaderKeyStatus: {
|
||||||
|
Values: []string{strconv.Itoa(http.StatusBadGateway)},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
respOpen := &protocolpb.Envelope{
|
||||||
|
Payload: &protocolpb.Envelope_StreamOpen{
|
||||||
|
StreamOpen: &protocolpb.StreamOpen{
|
||||||
|
Id: streamID,
|
||||||
|
ServiceName: so.ServiceName,
|
||||||
|
TargetAddr: so.TargetAddr,
|
||||||
|
Header: respHeader,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
if err2 := sendEnv(respOpen); err2 != nil {
|
||||||
|
log.Error("failed to send error stream_open from client", logging.Fields{
|
||||||
|
"stream_id": streamID,
|
||||||
|
"error": err2.Error(),
|
||||||
|
})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
dataEnv := &protocolpb.Envelope{
|
||||||
|
Payload: &protocolpb.Envelope_StreamData{
|
||||||
|
StreamData: &protocolpb.StreamData{
|
||||||
|
Id: streamID,
|
||||||
|
Seq: 0,
|
||||||
|
Data: []byte("HopGate client: " + errMsg),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
if err2 := sendEnv(dataEnv); err2 != nil {
|
||||||
|
log.Error("failed to send error stream_data from client", logging.Fields{
|
||||||
|
"stream_id": streamID,
|
||||||
|
"error": err2.Error(),
|
||||||
|
})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
closeEnv := &protocolpb.Envelope{
|
||||||
|
Payload: &protocolpb.Envelope_StreamClose{
|
||||||
|
StreamClose: &protocolpb.StreamClose{
|
||||||
|
Id: streamID,
|
||||||
|
Error: errMsg,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
if err2 := sendEnv(closeEnv); err2 != nil {
|
||||||
|
log.Error("failed to send error stream_close from client", logging.Fields{
|
||||||
|
"stream_id": streamID,
|
||||||
|
"error": err2.Error(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
req.Header = httpHeader
|
||||||
|
if len(body) > 0 {
|
||||||
|
req.ContentLength = int64(len(body))
|
||||||
|
}
|
||||||
|
|
||||||
|
start := time.Now()
|
||||||
|
logReq := log.With(logging.Fields{
|
||||||
|
"component": "grpc_client_proxy",
|
||||||
|
"stream_id": streamID,
|
||||||
|
"service": so.ServiceName,
|
||||||
|
"method": method,
|
||||||
|
"url": urlStr,
|
||||||
|
"local_target": finalCfg.LocalTarget,
|
||||||
|
})
|
||||||
|
logReq.Info("forwarding stream http request to local target", nil)
|
||||||
|
|
||||||
|
res, err := httpClient.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
errMsg := fmt.Sprintf("perform local http request: %v", err)
|
||||||
|
logReq.Error("local http request failed", logging.Fields{
|
||||||
|
"error": err.Error(),
|
||||||
|
})
|
||||||
|
|
||||||
|
respHeader := map[string]*protocolpb.HeaderValues{
|
||||||
|
"Content-Type": {
|
||||||
|
Values: []string{"text/plain; charset=utf-8"},
|
||||||
|
},
|
||||||
|
protocol.HeaderKeyStatus: {
|
||||||
|
Values: []string{strconv.Itoa(http.StatusBadGateway)},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
respOpen := &protocolpb.Envelope{
|
||||||
|
Payload: &protocolpb.Envelope_StreamOpen{
|
||||||
|
StreamOpen: &protocolpb.StreamOpen{
|
||||||
|
Id: streamID,
|
||||||
|
ServiceName: so.ServiceName,
|
||||||
|
TargetAddr: so.TargetAddr,
|
||||||
|
Header: respHeader,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
if err2 := sendEnv(respOpen); err2 != nil {
|
||||||
|
logReq.Error("failed to send error stream_open from client", logging.Fields{
|
||||||
|
"error": err2.Error(),
|
||||||
|
})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
dataEnv := &protocolpb.Envelope{
|
||||||
|
Payload: &protocolpb.Envelope_StreamData{
|
||||||
|
StreamData: &protocolpb.StreamData{
|
||||||
|
Id: streamID,
|
||||||
|
Seq: 0,
|
||||||
|
Data: []byte("HopGate client: " + errMsg),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
if err2 := sendEnv(dataEnv); err2 != nil {
|
||||||
|
logReq.Error("failed to send error stream_data from client", logging.Fields{
|
||||||
|
"error": err2.Error(),
|
||||||
|
})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
closeEnv := &protocolpb.Envelope{
|
||||||
|
Payload: &protocolpb.Envelope_StreamClose{
|
||||||
|
StreamClose: &protocolpb.StreamClose{
|
||||||
|
Id: streamID,
|
||||||
|
Error: errMsg,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
if err2 := sendEnv(closeEnv); err2 != nil {
|
||||||
|
logReq.Error("failed to send error stream_close from client", logging.Fields{
|
||||||
|
"error": err2.Error(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer res.Body.Close()
|
||||||
|
|
||||||
|
// 응답 헤더 맵을 복사하고 상태 코드를 pseudo-header 로 추가합니다. (ko)
|
||||||
|
// Copy response headers and attach status code as a pseudo-header. (en)
|
||||||
|
respHeader := make(map[string]*protocolpb.HeaderValues, len(res.Header)+1)
|
||||||
|
for k, vs := range res.Header {
|
||||||
|
hv := &protocolpb.HeaderValues{
|
||||||
|
Values: append([]string(nil), vs...),
|
||||||
|
}
|
||||||
|
respHeader[k] = hv
|
||||||
|
}
|
||||||
|
statusCode := res.StatusCode
|
||||||
|
if statusCode == 0 {
|
||||||
|
statusCode = http.StatusOK
|
||||||
|
}
|
||||||
|
respHeader[protocol.HeaderKeyStatus] = &protocolpb.HeaderValues{
|
||||||
|
Values: []string{strconv.Itoa(statusCode)},
|
||||||
|
}
|
||||||
|
|
||||||
|
respOpen := &protocolpb.Envelope{
|
||||||
|
Payload: &protocolpb.Envelope_StreamOpen{
|
||||||
|
StreamOpen: &protocolpb.StreamOpen{
|
||||||
|
Id: streamID,
|
||||||
|
ServiceName: so.ServiceName,
|
||||||
|
TargetAddr: so.TargetAddr,
|
||||||
|
Header: respHeader,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
if err := sendEnv(respOpen); err != nil {
|
||||||
|
logReq.Error("failed to send stream response open envelope from client", logging.Fields{
|
||||||
|
"error": err.Error(),
|
||||||
|
})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// 응답 바디를 4KiB(StreamChunkSize) 단위로 잘라 StreamData 프레임으로 전송합니다. (ko)
|
||||||
|
// Chunk the response body into 4KiB (StreamChunkSize) StreamData frames. (en)
|
||||||
|
buf := make([]byte, protocol.StreamChunkSize)
|
||||||
|
var seq uint64
|
||||||
|
for {
|
||||||
|
n, err := res.Body.Read(buf)
|
||||||
|
if n > 0 {
|
||||||
|
dataCopy := append([]byte(nil), buf[:n]...)
|
||||||
|
dataEnv := &protocolpb.Envelope{
|
||||||
|
Payload: &protocolpb.Envelope_StreamData{
|
||||||
|
StreamData: &protocolpb.StreamData{
|
||||||
|
Id: streamID,
|
||||||
|
Seq: seq,
|
||||||
|
Data: dataCopy,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
if err2 := sendEnv(dataEnv); err2 != nil {
|
||||||
|
logReq.Error("failed to send stream response data envelope from client", logging.Fields{
|
||||||
|
"error": err2.Error(),
|
||||||
|
})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
seq++
|
||||||
|
}
|
||||||
|
if err == io.EOF {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
logReq.Error("failed to read local http response body", logging.Fields{
|
||||||
|
"error": err.Error(),
|
||||||
|
})
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
closeEnv := &protocolpb.Envelope{
|
||||||
|
Payload: &protocolpb.Envelope_StreamClose{
|
||||||
|
StreamClose: &protocolpb.StreamClose{
|
||||||
|
Id: streamID,
|
||||||
|
Error: "",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
if err := sendEnv(closeEnv); err != nil {
|
||||||
|
logReq.Error("failed to send stream response close envelope from client", logging.Fields{
|
||||||
|
"error": err.Error(),
|
||||||
|
})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
logReq.Info("stream http response sent from client", logging.Fields{
|
||||||
|
"status": statusCode,
|
||||||
|
"elapsed_ms": time.Since(start).Milliseconds(),
|
||||||
|
"error": "",
|
||||||
|
})
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
// 수신 루프: 서버에서 들어오는 StreamOpen/StreamData/StreamClose 를
|
||||||
|
// 로컬 HTTP 요청으로 변환하고 응답을 다시 터널로 전송합니다. (ko)
|
||||||
|
// Receive loop: convert incoming StreamOpen/StreamData/StreamClose into local
|
||||||
|
// HTTP requests and send responses back over the tunnel. (en)
|
||||||
for {
|
for {
|
||||||
if ctx.Err() != nil {
|
if ctx.Err() != nil {
|
||||||
log.Info("context cancelled, closing grpc tunnel client", logging.Fields{
|
log.Info("context cancelled, closing grpc tunnel client", logging.Fields{
|
||||||
@@ -169,27 +581,114 @@ func runGRPCTunnelClient(ctx context.Context, logger logging.Logger, finalCfg *c
|
|||||||
}
|
}
|
||||||
|
|
||||||
payloadType := "unknown"
|
payloadType := "unknown"
|
||||||
switch in.Payload.(type) {
|
switch payload := in.Payload.(type) {
|
||||||
case *protocolpb.Envelope_HttpRequest:
|
case *protocolpb.Envelope_HttpRequest:
|
||||||
payloadType = "http_request"
|
payloadType = "http_request"
|
||||||
case *protocolpb.Envelope_HttpResponse:
|
case *protocolpb.Envelope_HttpResponse:
|
||||||
payloadType = "http_response"
|
payloadType = "http_response"
|
||||||
case *protocolpb.Envelope_StreamOpen:
|
case *protocolpb.Envelope_StreamOpen:
|
||||||
payloadType = "stream_open"
|
payloadType = "stream_open"
|
||||||
|
|
||||||
|
so := payload.StreamOpen
|
||||||
|
if so == nil {
|
||||||
|
log.Error("received stream_open with nil payload on grpc tunnel client", logging.Fields{})
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
streamID := strings.TrimSpace(so.Id)
|
||||||
|
if streamID == "" {
|
||||||
|
log.Error("received stream_open with empty stream id on grpc tunnel client", logging.Fields{})
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
streamsMu.Lock()
|
||||||
|
if _, exists := streams[streamID]; exists {
|
||||||
|
log.Error("received duplicate stream_open for existing stream on grpc tunnel client", logging.Fields{
|
||||||
|
"stream_id": streamID,
|
||||||
|
})
|
||||||
|
streamsMu.Unlock()
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
streams[streamID] = &inboundStream{open: so}
|
||||||
|
streamsMu.Unlock()
|
||||||
|
|
||||||
case *protocolpb.Envelope_StreamData:
|
case *protocolpb.Envelope_StreamData:
|
||||||
payloadType = "stream_data"
|
payloadType = "stream_data"
|
||||||
|
|
||||||
|
sd := payload.StreamData
|
||||||
|
if sd == nil {
|
||||||
|
log.Error("received stream_data with nil payload on grpc tunnel client", logging.Fields{})
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
streamID := strings.TrimSpace(sd.Id)
|
||||||
|
if streamID == "" {
|
||||||
|
log.Error("received stream_data with empty stream id on grpc tunnel client", logging.Fields{})
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
streamsMu.Lock()
|
||||||
|
st := streams[streamID]
|
||||||
|
streamsMu.Unlock()
|
||||||
|
if st == nil {
|
||||||
|
log.Warn("received stream_data for unknown stream on grpc tunnel client", logging.Fields{
|
||||||
|
"stream_id": streamID,
|
||||||
|
})
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if len(sd.Data) > 0 {
|
||||||
|
if _, err := st.body.Write(sd.Data); err != nil {
|
||||||
|
log.Error("failed to buffer stream_data body on grpc tunnel client", logging.Fields{
|
||||||
|
"stream_id": streamID,
|
||||||
|
"error": err.Error(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
case *protocolpb.Envelope_StreamClose:
|
case *protocolpb.Envelope_StreamClose:
|
||||||
payloadType = "stream_close"
|
payloadType = "stream_close"
|
||||||
|
|
||||||
|
sc := payload.StreamClose
|
||||||
|
if sc == nil {
|
||||||
|
log.Error("received stream_close with nil payload on grpc tunnel client", logging.Fields{})
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
streamID := strings.TrimSpace(sc.Id)
|
||||||
|
if streamID == "" {
|
||||||
|
log.Error("received stream_close with empty stream id on grpc tunnel client", logging.Fields{})
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
streamsMu.Lock()
|
||||||
|
st := streams[streamID]
|
||||||
|
if st != nil {
|
||||||
|
delete(streams, streamID)
|
||||||
|
}
|
||||||
|
streamsMu.Unlock()
|
||||||
|
if st == nil {
|
||||||
|
log.Warn("received stream_close for unknown stream on grpc tunnel client", logging.Fields{
|
||||||
|
"stream_id": streamID,
|
||||||
|
})
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// 현재까지 수신한 메타데이터/바디를 사용해 로컬 HTTP 요청을 수행하고,
|
||||||
|
// 응답을 다시 터널로 전송합니다. (ko)
|
||||||
|
// Use the accumulated metadata/body to perform the local HTTP request and
|
||||||
|
// send the response back over the tunnel. (en)
|
||||||
|
bodyCopy := append([]byte(nil), st.body.Bytes()...)
|
||||||
|
handleStream(st.open, bodyCopy)
|
||||||
|
|
||||||
case *protocolpb.Envelope_StreamAck:
|
case *protocolpb.Envelope_StreamAck:
|
||||||
payloadType = "stream_ack"
|
payloadType = "stream_ack"
|
||||||
|
// 현재 gRPC 터널에서는 StreamAck 를 사용하지 않습니다. (ko)
|
||||||
|
// StreamAck is currently unused for gRPC tunnels. (en)
|
||||||
|
|
||||||
|
default:
|
||||||
|
payloadType = fmt.Sprintf("unknown(%T)", in.Payload)
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Info("received envelope on grpc tunnel client", logging.Fields{
|
log.Info("received envelope on grpc tunnel client", logging.Fields{
|
||||||
"payload_type": payloadType,
|
"payload_type": payloadType,
|
||||||
})
|
})
|
||||||
|
|
||||||
// 이후 단계에서 여기서 HTTP 프록시와의 연동(요청/응답 처리)을 구현할 예정입니다. (ko)
|
|
||||||
// Future 3.3 work will hook HTTP proxy logic here. (en)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user