-
Notifications
You must be signed in to change notification settings - Fork 21
/
Copy pathapp.py
149 lines (118 loc) · 5.35 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
#!/usr/bin/env python3
"""
AWS Cost Explorer Assistant
This Chainlit application provides a conversational interface to help users
explore and analyze their AWS costs. It uses Claude via Bedrock and integrates
with an MCP server that provides tools for AWS cost analysis.
The application maintains a conversation history to provide context-aware
responses across multiple interactions.
To configure the MCP server, set environment variables before running:
export MCP_SERVER_URL=your-server-hostname
export MCP_SERVER_PORT=your-server-port
Example:
export MCP_SERVER_URL=localhost
export MCP_SERVER_PORT=8000
chainlit run app.py --port 8080
"""
import os
import chainlit as cl
from langchain_aws import ChatBedrock
from langgraph.prebuilt import create_react_agent
from langchain_mcp_adapters.client import MultiServerMCPClient
# Get configuration from environment variables with defaults
MCP_SERVER_URL = os.getenv("MCP_SERVER_URL", "ec2-44-192-72-20.compute-1.amazonaws.com")
MCP_SERVER_PORT = os.getenv("MCP_SERVER_PORT", "8000")
SECURE = 's' if MCP_SERVER_PORT == "443" else ''
FULL_MCP_URL = f"http{SECURE}://{MCP_SERVER_URL}:{MCP_SERVER_PORT}/sse"
AWS_REGION = os.getenv("AWS_REGION", "us-east-1")
AWS_ACCOUNT_ID = os.getenv("AWS_ACCOUNT_ID", "")
# Print configuration at module load time
print(f"MCP Server configured at: {FULL_MCP_URL}")
print("To change this configuration, set the MCP_SERVER_URL and MCP_SERVER_PORT environment variables")
# Initialize the model
model = ChatBedrock(model="us.anthropic.claude-3-5-haiku-20241022-v1:0", region_name=AWS_REGION)
@cl.on_chat_start
async def start():
welcome_message = f"""
# 👋 Welcome to your AWS cost explorer assistant.
I'm ready to help you with your questions related to your AWS spend. How can I help you save today?
_Connected to MCP server at: {MCP_SERVER_URL}:{MCP_SERVER_PORT}_
"""
await cl.Message(content=welcome_message).send()
# Initialize conversation history with a system message at the beginning
cl.user_session.set(
"message_history",
[] # Start with an empty history - we'll add the system message when formatting for the agent
)
@cl.on_message
async def main(message: cl.Message):
# Get the conversation history
message_history = cl.user_session.get("message_history")
# Add the current user message to history
message_history.append({"role": "user", "content": message.content})
# Show a thinking message
thinking_msg = cl.Message(content="Thinking...")
await thinking_msg.send()
try:
async with MultiServerMCPClient(
{
"aws_cost_explorer_mcp_server": {
"url": FULL_MCP_URL,
"transport": "sse",
}
}
) as client:
prompt = await client.get_prompt("aws_cost_explorer_mcp_server", "system_prompt_for_agent", dict(aws_account_id=AWS_ACCOUNT_ID))
print(f"Available prompt: {prompt}")
system_prompt = prompt[0].content
# Create the agent
agent = create_react_agent(
model,
client.get_tools()
)
# Format messages for the agent - ensure system message is first
formatted_messages = [
{"role": "system", "content": system_prompt}
]
# Add the rest of the conversation history
formatted_messages.extend(message_history)
# Invoke the agent with properly formatted message history
print(f"Sending request to MCP server at: {FULL_MCP_URL}")
print(f"formatted_messages={formatted_messages}")
response = await agent.ainvoke({"messages": formatted_messages})
# Remove the thinking message
await thinking_msg.remove()
# Extract the content from the response
if response and "messages" in response and response["messages"]:
last_message = response["messages"][-1]
if isinstance(last_message, dict) and "content" in last_message:
content = last_message["content"]
else:
content = str(last_message.content)
# Add the assistant's response to the conversation history
message_history.append({"role": "assistant", "content": content})
# Save the updated history (without system message)
cl.user_session.set("message_history", message_history)
# Send the message
await cl.Message(content=content).send()
else:
await cl.Message(content="No valid response received").send()
except Exception as e:
# Remove the thinking message
await thinking_msg.remove()
# Send error message
error_message = f"""
## ❌ Error Occurred
```
{str(e)}
```
Please try again or check your query.
"""
await cl.Message(content=error_message, author="System").send()
# Print error to console for debugging
print(f"Error: {str(e)}")
if __name__ == "__main__":
cl.run(
title="AWS Cost Explorer",
description="An intelligent assistant for analyzing your AWS costs"
)