Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add golang filter and mcp-server #1942

Merged
merged 1 commit into from
Mar 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Makefile.core.mk
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,9 @@ build-gateway: prebuild buildx-prepare
build-gateway-local: prebuild
TARGET_ARCH=${TARGET_ARCH} DOCKER_TARGETS="docker.proxyv2" ./tools/hack/build-istio-image.sh docker

build-golang-filter:
./tools/hack/build-golang-filters.sh

build-istio: prebuild buildx-prepare
DOCKER_TARGETS="docker.pilot" IMG_URL="${IMG_URL}" ./tools/hack/build-istio-image.sh docker.buildx

Expand Down
20 changes: 20 additions & 0 deletions plugins/golang-filter/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
FROM golang:1.23 AS golang-base

ARG GOPROXY
ARG GO_FILTER_NAME

ENV GOFLAGS=-buildvcs=false
ENV GOPROXY=${GOPROXY}

WORKDIR /workspace

COPY . .

WORKDIR /workspace/$GO_FILTER_NAME

RUN go mod tidy
RUN go build -o /$GO_FILTER_NAME.so -buildmode=c-shared .

FROM scratch AS output
ARG GO_FILTER_NAME
COPY --from=golang-base /$GO_FILTER_NAME.so $GO_FILTER_NAME.so
10 changes: 10 additions & 0 deletions plugins/golang-filter/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
GO_FILTER_NAME ?= mcp-server
GOPROXY := $(shell go env GOPROXY)

.DEFAULT:
build:
DOCKER_BUILDKIT=1 docker build --build-arg GOPROXY=$(GOPROXY) \
--build-arg GO_FILTER_NAME=${GO_FILTER_NAME} \
-t ${GO_FILTER_NAME} \
--output ./${GO_FILTER_NAME} \
.
9 changes: 9 additions & 0 deletions plugins/golang-filter/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
## 介绍

## 快速构建

使用以下命令可以快速构建 golang filter 插件:

```bash
GO_FILTER_NAME=mcp-server make build
```
122 changes: 122 additions & 0 deletions plugins/golang-filter/mcp-server/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package main

import (
"errors"
"fmt"

xds "github.com/cncf/xds/go/xds/type/v3"
"github.com/mark3labs/mcp-go/mcp"
"google.golang.org/protobuf/types/known/anypb"

"github.com/envoyproxy/envoy/contrib/golang/common/go/api"
envoyHttp "github.com/envoyproxy/envoy/contrib/golang/filters/http/source/go/pkg/http"
"github.com/envoyproxy/envoy/examples/golang-http/simple/internal"
"github.com/envoyproxy/envoy/examples/golang-http/simple/servers/gorm"
)

const Name = "mcp-server"
const SCHEME_PATH = "scheme"

func init() {
envoyHttp.RegisterHttpFilterFactoryAndConfigParser(Name, filterFactory, &parser{})
}

type config struct {
echoBody string
// other fields
dbClient *gorm.DBClient
redisClient *internal.RedisClient
stopChan chan struct{}
SSEServer *internal.SSEServer
}

type parser struct {
}

// Parse the filter configuration
func (p *parser) Parse(any *anypb.Any, callbacks api.ConfigCallbackHandler) (interface{}, error) {
configStruct := &xds.TypedStruct{}
if err := any.UnmarshalTo(configStruct); err != nil {
return nil, err
}

v := configStruct.Value
conf := &config{}

dsn, ok := v.AsMap()["dsn"].(string)
if !ok {
return nil, errors.New("missing dsn")
}

dbType, ok := v.AsMap()["dbType"].(string)
if !ok {
return nil, errors.New("missing database type")
}

dbClient, err := gorm.NewDBClient(dsn, dbType)
if err != nil {
return nil, fmt.Errorf("failed to initialize DBClient: %w", err)
}
conf.dbClient = dbClient

conf.stopChan = make(chan struct{})
redisClient, err := internal.NewRedisClient("localhost:6379", conf.stopChan)
if err != nil {
return nil, fmt.Errorf("failed to initialize RedisClient: %w", err)
}
conf.redisClient = redisClient

conf.SSEServer = internal.NewSSEServer(NewServer(conf.dbClient), internal.WithRedisClient(redisClient))
return conf, nil
}

func (p *parser) Merge(parent interface{}, child interface{}) interface{} {
parentConfig := parent.(*config)
childConfig := child.(*config)

newConfig := *parentConfig
if childConfig.echoBody != "" {
newConfig.echoBody = childConfig.echoBody
}
if childConfig.dbClient != nil {
newConfig.dbClient = childConfig.dbClient
}
if childConfig.redisClient != nil {
newConfig.redisClient = childConfig.redisClient
}
return &newConfig
}

func filterFactory(c interface{}, callbacks api.FilterCallbackHandler) api.StreamFilter {
conf, ok := c.(*config)
if !ok {
panic("unexpected config type")
}
return &filter{
callbacks: callbacks,
config: conf,
}
}

func NewServer(dbClient *gorm.DBClient) *internal.MCPServer {
mcpServer := internal.NewMCPServer(
"mcp-server-envoy-poc",
"1.0.0",
)

// Add query tool
mcpServer.AddTool(
mcp.NewToolWithRawSchema("query", "Run a read-only SQL query in clickhouse database with repository git data", gorm.GetQueryToolSchema()),
gorm.HandleQueryTool(dbClient),
)
api.LogInfo("Added query tool successfully")

// Add favorite files tool
mcpServer.AddTool(
mcp.NewToolWithRawSchema("author_favorite_files", "Favorite files for an author", gorm.GetFavoriteToolSchema()),
gorm.HandleFavoriteTool(dbClient),
)
return mcpServer
}

func main() {}
128 changes: 128 additions & 0 deletions plugins/golang-filter/mcp-server/filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package main

import (
"net/http"
"net/http/httptest"
"net/url"

"github.com/envoyproxy/envoy/contrib/golang/common/go/api"
)

// The callbacks in the filter, like `DecodeHeaders`, can be implemented on demand.
// Because api.PassThroughStreamFilter provides a default implementation.
type filter struct {
api.PassThroughStreamFilter

callbacks api.FilterCallbackHandler
path string
config *config

req *http.Request
sse bool
message bool
}

// Callbacks which are called in request path
// The endStream is true if the request doesn't have body
func (f *filter) DecodeHeaders(header api.RequestHeaderMap, endStream bool) api.StatusType {
fullPath, _ := header.Get(":path")
parsedURL, _ := url.Parse(fullPath)
f.path = parsedURL.Path
method, _ := header.Get(":method")
api.LogInfo(f.path)
if f.path == f.config.SSEServer.SSEEndpoint {
if method != http.MethodGet {
f.callbacks.DecoderFilterCallbacks().SendLocalReply(http.StatusMethodNotAllowed, "Method not allowed", nil, 0, "")
} else {
f.sse = true
body := "SSE connection create"
f.callbacks.DecoderFilterCallbacks().SendLocalReply(http.StatusOK, body, nil, 0, "")
}
api.LogInfo("SSE connection started")
return api.LocalReply
} else if f.path == f.config.SSEServer.MessageEndpoint {
if method != http.MethodPost {
f.callbacks.DecoderFilterCallbacks().SendLocalReply(http.StatusMethodNotAllowed, "Method not allowed", nil, 0, "")
}
// Create a new http.Request object
f.req = &http.Request{
Method: method,
URL: parsedURL,
Header: make(http.Header),
}
api.LogInfof("Message request: %v", parsedURL)
// Copy headers from api.RequestHeaderMap to http.Header
header.Range(func(key, value string) bool {
f.req.Header.Add(key, value)
return true
})
f.message = true
if endStream {
return api.Continue
} else {
return api.StopAndBuffer
}
}
if endStream {
return api.Continue
} else {
return api.StopAndBuffer
}
}

// DecodeData might be called multiple times during handling the request body.
// The endStream is true when handling the last piece of the body.
func (f *filter) DecodeData(buffer api.BufferInstance, endStream bool) api.StatusType {
api.LogInfo("Message DecodeData")
// support suspending & resuming the filter in a background goroutine
api.LogInfof("DecodeData: {%v}", buffer)
if f.message {
// Create a response recorder to capture the response
recorder := httptest.NewRecorder()
// Call the handleMessage method of SSEServer
f.config.SSEServer.HandleMessage(recorder, f.req, buffer.Bytes())
f.message = false
api.LogInfof("Message DecodeData SendLocalReply %v", recorder)
f.callbacks.DecoderFilterCallbacks().SendLocalReply(recorder.Code, recorder.Body.String(), recorder.Header(), 0, "")
return api.LocalReply
}
return api.Continue
}

// Callbacks which are called in response path
// The endStream is true if the response doesn't have body
func (f *filter) EncodeHeaders(header api.ResponseHeaderMap, endStream bool) api.StatusType {
if f.sse {
header.Set("Content-Type", "text/event-stream")
header.Set("Cache-Control", "no-cache")
header.Set("Connection", "keep-alive")
header.Set("Access-Control-Allow-Origin", "*")
header.Del("Content-Length")
api.LogInfo("SSE connection header set")
return api.Continue
}
return api.Continue
}

// TODO: 连接多种数据库
// TODO: 多种存储类型
// TODO: 数据库多个实例
// EncodeData might be called multiple times during handling the response body.
// The endStream is true when handling the last piece of the body.
func (f *filter) EncodeData(buffer api.BufferInstance, endStream bool) api.StatusType {
if f.sse {
//TODO: buffer cleanup
f.config.SSEServer.HandleSSE(f.callbacks)
f.sse = false
return api.Running
}
return api.Continue
}

// OnDestroy 或 OnStreamComplete 中停止 goroutine
func (f *filter) OnDestroy(reason api.DestroyReason) {
if f.sse && f.config.stopChan != nil {
api.LogInfo("Stopping SSE connection")
close(f.config.stopChan)
}
}
49 changes: 49 additions & 0 deletions plugins/golang-filter/mcp-server/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
module github.com/envoyproxy/envoy/examples/golang-http/simple

go 1.23

require (
github.com/envoyproxy/envoy v1.33.1-0.20250224062430-6c11eac01993
google.golang.org/protobuf v1.36.5
github.com/cncf/xds/go v0.0.0-20250121191232-2f005788dc42
github.com/go-redis/redis/v8 v8.11.5
github.com/google/uuid v1.6.0
github.com/mark3labs/mcp-go v0.12.0
gorm.io/driver/clickhouse v0.6.1
gorm.io/driver/postgres v1.5.11
gorm.io/gorm v1.25.12
)

require (
cel.dev/expr v0.15.0 // indirect
github.com/ClickHouse/ch-go v0.61.5 // indirect
github.com/ClickHouse/clickhouse-go/v2 v2.23.2 // indirect
github.com/andybalholm/brotli v1.1.0 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/envoyproxy/protoc-gen-validate v1.0.4 // indirect
github.com/go-faster/city v1.0.1 // indirect
github.com/go-faster/errors v0.7.1 // indirect
github.com/hashicorp/go-version v1.6.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/pgx/v5 v5.5.5 // indirect
github.com/jackc/puddle/v2 v2.2.1 // indirect
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.5 // indirect
github.com/klauspost/compress v1.17.8 // indirect
github.com/paulmach/orb v0.11.1 // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/segmentio/asm v1.2.0 // indirect
github.com/shopspring/decimal v1.4.0 // indirect
go.opentelemetry.io/otel v1.26.0 // indirect
go.opentelemetry.io/otel/trace v1.26.0 // indirect
golang.org/x/crypto v0.21.0 // indirect
golang.org/x/sync v0.6.0 // indirect
golang.org/x/sys v0.19.0 // indirect
golang.org/x/text v0.14.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20230822172742-b8732ec3820d // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Loading
Loading