Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add tools for description updates #20

Open
wants to merge 21 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ build-backend = "hatchling.build"
[project]
name = "keboola-mcp-server"

version = "0.2.1"
version = "0.2.2"
description = "MCP server for interacting with Keboola Connection"
readme = "README.md"
requires-python = ">=3.10"
Expand Down
38 changes: 38 additions & 0 deletions src/keboola_mcp_server/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,44 @@ async def post(self, endpoint: str, data: Optional[Dict[str, Any]] = None) -> Di
response.raise_for_status()
return cast(Dict[str, Any], response.json())

async def put(self, endpoint: str, data: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""Make a PUT request to Keboola Storage API.

Args:
endpoint: API endpoint to call
data: Request payload

Returns:
API response as dictionary
"""
async with httpx.AsyncClient() as client:
response = await client.put(
f"{self.base_url}/v2/storage/{endpoint}",
headers=self.headers,
data=data if data is not None else {},
)
response.raise_for_status()
return cast(Dict[str, Any], response.json())

async def delete(self, endpoint: str, data: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""Make a DELETE request to Keboola Storage API.

Args:
endpoint: API endpoint to call
data: Request payload

Returns:
API response as dictionary
"""
async with httpx.AsyncClient() as client:
response = await client.delete(
f"{self.base_url}/v2/storage/{endpoint}",
headers=self.headers,
)
response.raise_for_status()

return cast(Dict[str, Any], response.json())

async def download_table_data_async(self, table_id: str) -> str:
"""Download table data using the export endpoint.

Expand Down
10 changes: 10 additions & 0 deletions src/keboola_mcp_server/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import dataclasses
import logging
from dataclasses import dataclass
from enum import Enum
from typing import Mapping, Optional

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -55,3 +56,12 @@ def __repr__(self):
else:
params.append(f"{f.name}=None")
return f'Config({", ".join(params)})'


class MetadataField(str, Enum):
"""
Enum to hold predefined names of Keboola metadata fields
Add others as needed
"""

DESCRIPTION = "KBC.description"
6 changes: 4 additions & 2 deletions src/keboola_mcp_server/server.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
"""MCP server implementation for Keboola Connection."""

import logging
from typing import Any, Dict, List, Optional, cast
from typing import Annotated, Any, Dict, List, Optional, cast

from mcp.server.fastmcp import Context, FastMCP
from pydantic import Field

from keboola_mcp_server.client import KeboolaClient
from keboola_mcp_server.config import Config
Expand All @@ -13,7 +14,7 @@
SessionState,
SessionStateFactory,
)
from keboola_mcp_server.sql_tools import WorkspaceManager
from keboola_mcp_server.sql_tools import WorkspaceManager, query_table
from keboola_mcp_server.storage_tools import add_storage_tools

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -74,6 +75,7 @@ def create_server(config: Optional[Config] = None) -> FastMCP:
],
)

mcp.add_tool(query_table)
add_storage_tools(mcp)

@mcp.tool()
Expand Down
97 changes: 94 additions & 3 deletions src/keboola_mcp_server/storage_tools.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
"""Storage-related tools for the MCP server (buckets, tables, etc.)."""

import logging
from typing import Annotated, Any, Dict, List, Optional, cast
from typing import Annotated, Any, Dict, List, Optional, Union, cast

from mcp.server.fastmcp import Context, FastMCP
from pydantic import AliasChoices, BaseModel, Field, model_validator

from keboola_mcp_server.client import KeboolaClient
from keboola_mcp_server.config import MetadataField
from keboola_mcp_server.sql_tools import WorkspaceManager

logger = logging.getLogger(__name__)
Expand All @@ -18,6 +19,8 @@ def add_storage_tools(mcp: FastMCP) -> None:
mcp.add_tool(get_bucket_metadata)
mcp.add_tool(list_bucket_tables)
mcp.add_tool(get_table_metadata)
mcp.add_tool(update_bucket_description)
mcp.add_tool(update_table_description)

logger.info("Component tools added to the MCP server.")

Expand All @@ -32,7 +35,10 @@ def extract_description(values: Dict[str, Any]) -> Optional[str]:
(
value
for item in metadata
if (item.get("key") == "KBC.description" and (value := item.get("value")))
if (
item.get("key") == MetadataField.DESCRIPTION.value
and (value := item.get("value"))
)
),
None,
)
Expand Down Expand Up @@ -131,6 +137,47 @@ def set_description(cls, values):
return values


class UpdateBucketDescriptionResponse(BaseModel):
success: bool = True
description: str = Field(description="The updated description value")
timestamp: str = Field(description="When the description was updated")

@model_validator(mode="before")
def extract_from_response(cls, values):
if isinstance(values, list) and values:
data = values[0] # the response returns a list - elements for each update
return {
"success": True,
"description": data.get("value"),
"timestamp": data.get("timestamp"),
}
else:
raise ValueError(
"Expected input data in UpdateBucketDescriptionResponse to be non-empty list."
)


class UpdateTableDescriptionResponse(BaseModel):
success: bool = True
description: str = Field(description="The updated table description value")
timestamp: str = Field(description="When the description was updated")

@model_validator(mode="before")
def extract_metadata(cls, values):
metadata = values.get("metadata", [])
if isinstance(metadata, list) and metadata:
entry = metadata[0]
return {
"success": True,
"description": entry.get("value"),
"timestamp": entry.get("timestamp"),
}
else:
raise ValueError(
"Expected input data in UpdateTableDescriptionResponse to have non-empty list in 'metadata' field."
)


async def get_bucket_metadata(
bucket_id: Annotated[str, Field(description="Unique ID of the bucket.")], ctx: Context
) -> BucketInfo:
Expand Down Expand Up @@ -165,10 +212,12 @@ async def get_table_metadata(
workspace_manager = WorkspaceManager.from_state(ctx.session.state)
assert isinstance(workspace_manager, WorkspaceManager)

table_fqn = await workspace_manager.get_table_fqn(raw_table)

return TableDetail(
**raw_table,
column_identifiers=column_info,
db_identifier=workspace_manager.get_table_fqn(raw_table).snowflake_fqn,
db_identifier=table_fqn.snowflake_fqn,
)


Expand All @@ -181,3 +230,45 @@ async def list_bucket_tables(
raw_tables = cast(List[Dict[str, Any]], client.storage_client.buckets.list_tables(bucket_id))

return [TableDetail(**raw_table) for raw_table in raw_tables]


async def update_bucket_description(
bucket_id: Annotated[str, Field(description="The ID of the bucket to update.")],
description: Annotated[str, Field(description="The new description for the bucket.")],
ctx: Context,
) -> Annotated[
UpdateBucketDescriptionResponse,
Field(description="The response object of the Bucket description update."),
]:
"""Update the description for a given Keboola bucket."""
client = KeboolaClient.from_state(ctx.session.state)
metadata_endpoint = f"buckets/{bucket_id}/metadata"

data = {
"provider": "user",
"metadata": [{"key": MetadataField.DESCRIPTION.value, "value": description}],
}
response = await client.post(endpoint=metadata_endpoint, data=data)

return UpdateBucketDescriptionResponse.model_validate(response)


async def update_table_description(
table_id: Annotated[str, Field(description="The ID of the table to update.")],
description: Annotated[str, Field(description="The new description for the table.")],
ctx: Context,
) -> Annotated[
UpdateTableDescriptionResponse,
Field(description="The response object of the Table description update."),
]:
"""Update the description for a given Keboola table."""
client = KeboolaClient.from_state(ctx.session.state)
metadata_endpoint = f"tables/{table_id}/metadata"

data = {
"provider": "user",
"metadata": [{"key": MetadataField.DESCRIPTION.value, "value": description}],
}
response = await client.post(endpoint=metadata_endpoint, data=data)

return UpdateTableDescriptionResponse.model_validate(response)
Loading