-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Question: How to authorise a client with Bearer header with SSE? #431
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
I was also hoping to do something like this too, it seems like this is possible in the typescript SDK currently. It looks like there's a couple of open PR's for adding OAuth to this project but I've not seen anything on bearer tokens. |
Have you tried mounting the server directly to a Starlette app rather than FastAPI? As per the docs: https://github.com/modelcontextprotocol/python-sdk?tab=readme-ov-file#mounting-to-an-existing-asgi-server. You'd then have to write some auth middleware? https://www.starlette.io/authentication/ |
I have the same question. To work around this, I created a class to save lastUserSession and I got that on tool run, but I know it's not the best solution. But I'm waiting for someone with a better idea from starlette.applications import Starlette
from starlette.routing import Mount
from mcp.server.fastmcp import FastMCP, Context
from mcp.server import Server
from urllib.parse import urlparse, parse_qs
from contextlib import asynccontextmanager
from collections.abc import AsyncIterator
from starlette.applications import Starlette
from starlette.authentication import (
AuthCredentials, AuthenticationBackend, AuthenticationError, SimpleUser
)
from starlette.middleware import Middleware
from starlette.middleware.authentication import AuthenticationMiddleware
import base64
import binascii
class LastUserSession:
def __init__(self):
self._current_session = None
self._current_username = None
def set_session(self, session_id, username):
"""Store the username of the last request"""
self._current_session = session_id
self._current_username = username
#print(f"Latest session updated: {session_id} for user {username}")
def get_username(self, session_id=None):
"""Get the username from the last request"""
# session_id parameter is kept for compatibility but ignored
return self._current_username
def get_current_session(self):
"""Get the session_id from the last request"""
return self._current_session
# Instância da classe para armazenar apenas a última sessão
user_sessions = LastUserSession()
class BasicAuthBackend(AuthenticationBackend):
async def authenticate(self, conn):
# Extrair o session_id da URL
parsed_url = urlparse(str(conn.url))
query_params = parse_qs(parsed_url.query)
session_id = query_params.get('session_id', [None])[0]
if "Authorization" not in conn.headers:
return
auth = conn.headers["Authorization"]
try:
scheme, credentials = auth.split()
if scheme.lower() != 'basic':
return
decoded = base64.b64decode(credentials).decode("ascii")
except (ValueError, UnicodeDecodeError, binascii.Error) as exc:
raise AuthenticationError('Invalid basic auth credentials')
username, _, password = decoded.partition(":")
# Armazena a relação entre session_id e username usando a classe
user_sessions.set_session(session_id, username)
return AuthCredentials(["authenticated"]), SimpleUser(username)
@asynccontextmanager
async def server_lifespan(server: Server) -> AsyncIterator[dict]:
"""Manage server startup and shutdown lifecycle."""
# Initialize resources on startup
yield {"user": user_sessions.get_username()}
# mcp = FastMCP("Example APP", lifespan=server_lifespan, log_level='DEBUG')
mcp = FastMCP("Example APP", lifespan=server_lifespan)
# Add an addition tool
@mcp.tool()
async def who_kill_odete_roitman(ctx: Context) -> str:
"""Answer the question Who Kill Odete Roitman"""
return f"{ctx.request_context.lifespan_context['user']} killed Odete Roitman"
routes=[
Mount('/', app=mcp.sse_app()),
]
middleware = [
Middleware(AuthenticationMiddleware, backend=BasicAuthBackend()),
]
# Mount the SSE server to the existing ASGI server
app = Starlette(routes=routes, middleware=middleware) |
I have the same question -- it looks like this PR will fix it? |
There is a way to use starlette permission with decoration and override handle_sse func. for route in app.routes:
if route.path == "/sse":
original_endpoint = route.endpoint
route.endpoint = requires("authenticated")(original_endpoint) But I dont know how to add this decorator to the Mounted app. |
Finally i make it work, here anexample. import jwt
import datetime
from starlette.applications import Starlette
from starlette.routing import Mount, Route
from starlette.middleware.authentication import AuthenticationMiddleware
from starlette.responses import JSONResponse, PlainTextResponse
from starlette.authentication import (
AuthCredentials, AuthenticationBackend, AuthenticationError, SimpleUser
)
from starlette.authentication import requires
from starlette.middleware import Middleware
from server import mcp
from pprint import pprint
from mcp.server.sse import SseServerTransport
JWT_SECRET = "your-secret-key"
async def welcome_message(request):
return PlainTextResponse("Demo Mcp with jwt auth")
class JWTAuthentication(AuthenticationBackend):
async def authenticate(self, request):
auth_header = request.headers.get("Authorization")
if not auth_header:
return None
token = auth_header.split(" ")[1]
try:
payload = jwt.decode(token, JWT_SECRET, algorithms=["HS256"])
return AuthCredentials(["authenticated"]), SimpleUser(payload)
except jwt.ExpiredSignatureError:
raise AuthenticationError("Expired signature")
except jwt.InvalidTokenError:
raise AuthenticationError("Invalid token")
async def get_token(request):
username = request.query_params.get("username", "demo_user")
scope = request.query_params.get("scope", "mcp:access")
token = jwt.encode({
"username": username,
"scope": scope,
"exp": datetime.datetime.utcnow() + datetime.timedelta(hours=1)
}, JWT_SECRET, algorithm="HS256")
return JSONResponse({"token": token})
# Create SSE transport
sse = SseServerTransport("/messages/")
# MCP SSE handler function
async def handle_sse(request):
async with sse.connect_sse(request.scope, request.receive, request._send) as (
read_stream,
write_stream,
):
await mcp._mcp_server.run(
read_stream, write_stream, mcp._mcp_server.create_initialization_options()
)
async def handle_messages(request):
await sse.handle_post_message(request.scope, request.receive, request._send)
routes = [
Route("/", endpoint=welcome_message),
Route("/get_token", get_token),
# MCP related routes
Route("/sse", endpoint=requires("authenticated")(handle_sse)),
Route("/messages/", endpoint=requires("authenticated")(handle_messages), methods=["POST"]),
]
app = Starlette(
debug=True,
routes=routes
)
app.add_middleware(AuthenticationMiddleware, backend=JWTAuthentication())
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0") it is working but in every call i got a exception:
|
This is how i did this for now.
|
I am building the MCP server application to connect some services to LLM .
One of things i want to implement is authorisation of a user with the token.
I see it must be possible somehow. Because MCP inspector has the Authentification and Bearer Token field
Most of tutorials about MCP are related to STDIO kind of a server run. My will be SSE.
There is my code:
How can i read Authorization header in case if it is sent by the client?
I tried to use approaches of FastAPI - setting dependency, adding request:Request to arguments but this doesn't work.
Is there a way?
The text was updated successfully, but these errors were encountered: