Skip to content

Commit e84d2c3

Browse files
committedDec 25, 2024
Merge branch 'develop'
2 parents 887c14a + b8f9f31 commit e84d2c3

26 files changed

+2441
-1350
lines changed
 

‎.github/workflows/publish.yml

+19-16
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,17 @@
11
name: Publish
2+
23
on:
34
push:
45
tags:
56
- "v*"
6-
strategy:
7-
matrix:
8-
python-version: ["3.11"]
97

108
jobs:
119
test:
1210
runs-on: ubuntu-latest
11+
strategy:
12+
matrix:
13+
python-version: ["3.11"]
14+
1315
steps:
1416
- uses: actions/checkout@v4
1517

@@ -23,14 +25,14 @@ jobs:
2325
python -m pip install --upgrade pip
2426
pip install uv
2527
26-
- name: Install dev/test dependencies
28+
- name: Install dependencies
2729
run: |
28-
pip install -e ".[dev]"
29-
pip install -e ".[test]"
30+
uv venv
31+
uv pip install -e ".[dev,test]"
3032
31-
- name: Run tests
33+
- name: Run tests and checks
3234
run: |
33-
make check
35+
uv run make all
3436
3537
publish:
3638
needs: test
@@ -44,14 +46,13 @@ jobs:
4446

4547
- name: Update version from tag
4648
run: |
47-
# Strip 'v' prefix from tag and update version.py
4849
VERSION=${GITHUB_REF#refs/tags/v}
4950
echo "__version__ = \"${VERSION}\"" > src/mcp_shell_server/version.py
5051
51-
- name: Set up Python ${{ matrix.python-version }}
52+
- name: Set up Python 3.11
5253
uses: actions/setup-python@v5
5354
with:
54-
python-version: ${{ matrix.python-version }}
55+
python-version: "3.11"
5556

5657
- name: Install uv
5758
run: |
@@ -60,10 +61,12 @@ jobs:
6061
6162
- name: Build package
6263
run: |
63-
uv build
64+
uv venv
65+
uv pip install build
66+
uv run python -m build
6467
6568
- name: Publish to PyPI
66-
env:
67-
PYPI_TOKEN: ${{ secrets.PYPI_TOKEN }}
68-
run: |
69-
uv publish --token $PYPI_TOKEN
69+
uses: pypa/gh-action-pypi-publish@release/v1
70+
with:
71+
password: ${{ secrets.PYPI_TOKEN }}
72+
password: ${{ secrets.PYPI_TOKEN }}

‎.github/workflows/test.yml

+6-6
Original file line numberDiff line numberDiff line change
@@ -26,18 +26,18 @@ jobs:
2626
python -m pip install --upgrade pip
2727
pip install uv
2828
29-
- name: Install dev/test dependencies
29+
- name: Install dependencies
3030
run: |
31-
pip install -e ".[dev]"
32-
pip install -e ".[test]"
31+
uv venv
32+
uv pip install -e ".[dev,test]"
3333
34-
- name: Run lint and typecheck
34+
- name: Run format checks and typecheck
3535
run: |
36-
make lint typecheck
36+
uv run make check
3737
3838
- name: Run tests with coverage
3939
run: |
40-
pytest --cov --cov-report=xml
40+
uv run make coverage
4141
4242
- name: Upload coverage to Codecov
4343
uses: codecov/codecov-action@v5

‎pyproject.toml

+5
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,15 @@ asyncio_mode = "strict"
4141
testpaths = "tests"
4242
# Set default event loop scope for async tests
4343
asyncio_default_fixture_loop_scope = "function"
44+
markers = [
45+
"macos: marks tests that should only run on macOS",
46+
"slow: marks tests as slow running",
47+
]
4448
filterwarnings = [
4549
"ignore::RuntimeWarning:selectors:",
4650
"ignore::pytest.PytestUnhandledCoroutineWarning:",
4751
"ignore::pytest.PytestUnraisableExceptionWarning:",
52+
"ignore::DeprecationWarning:pytest_asyncio.plugin:",
4853
]
4954

5055
[tool.ruff]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
import shlex
2+
from typing import Dict, List, Tuple, Union
3+
4+
5+
class CommandPreProcessor:
6+
"""
7+
Pre-processes and validates shell commands before execution
8+
"""
9+
10+
def preprocess_command(self, command: List[str]) -> List[str]:
11+
"""
12+
Preprocess the command to handle cases where '|' is attached to a command.
13+
"""
14+
preprocessed_command = []
15+
for token in command:
16+
if token in ["||", "&&", ";"]: # Special shell operators
17+
preprocessed_command.append(token)
18+
elif "|" in token and token != "|":
19+
parts = token.split("|")
20+
preprocessed_command.extend(
21+
[part.strip() for part in parts if part.strip()]
22+
)
23+
preprocessed_command.append("|")
24+
else:
25+
preprocessed_command.append(token)
26+
return preprocessed_command
27+
28+
def clean_command(self, command: List[str]) -> List[str]:
29+
"""
30+
Clean command by trimming whitespace from each part.
31+
Removes empty strings but preserves arguments that are meant to be spaces.
32+
33+
Args:
34+
command (List[str]): Original command and its arguments
35+
36+
Returns:
37+
List[str]: Cleaned command
38+
"""
39+
return [arg for arg in command if arg] # Remove empty strings
40+
41+
def create_shell_command(self, command: List[str]) -> str:
42+
"""
43+
Create a shell command string from a list of arguments.
44+
Handles wildcards and arguments properly.
45+
"""
46+
if not command:
47+
return ""
48+
49+
escaped_args = []
50+
for arg in command:
51+
if arg.isspace():
52+
# Wrap space-only arguments in single quotes
53+
escaped_args.append(f"'{arg}'")
54+
else:
55+
# Properly escape all arguments including those with wildcards
56+
escaped_args.append(shlex.quote(arg.strip()))
57+
58+
return " ".join(escaped_args)
59+
60+
def split_pipe_commands(self, command: List[str]) -> List[List[str]]:
61+
"""
62+
Split commands by pipe operator into separate commands.
63+
64+
Args:
65+
command (List[str]): Command and its arguments with pipe operators
66+
67+
Returns:
68+
List[List[str]]: List of commands split by pipe operator
69+
"""
70+
commands: List[List[str]] = []
71+
current_command: List[str] = []
72+
73+
for arg in command:
74+
if arg.strip() == "|":
75+
if current_command:
76+
commands.append(current_command)
77+
current_command = []
78+
else:
79+
current_command.append(arg)
80+
81+
if current_command:
82+
commands.append(current_command)
83+
84+
return commands
85+
86+
def parse_command(
87+
self, command: List[str]
88+
) -> Tuple[List[str], Dict[str, Union[None, str, bool]]]:
89+
"""
90+
Parse command and extract redirections.
91+
"""
92+
cmd = []
93+
redirects: Dict[str, Union[None, str, bool]] = {
94+
"stdin": None,
95+
"stdout": None,
96+
"stdout_append": False,
97+
}
98+
99+
i = 0
100+
while i < len(command):
101+
token = command[i]
102+
103+
# Shell operators check
104+
if token in ["|", ";", "&&", "||"]:
105+
raise ValueError(f"Unexpected shell operator: {token}")
106+
107+
# Output redirection
108+
if token in [">", ">>"]:
109+
if i + 1 >= len(command):
110+
raise ValueError("Missing path for output redirection")
111+
if i + 1 < len(command) and command[i + 1] in [">", ">>", "<"]:
112+
raise ValueError("Invalid redirection target: operator found")
113+
path = command[i + 1]
114+
redirects["stdout"] = path
115+
redirects["stdout_append"] = token == ">>"
116+
i += 2
117+
continue
118+
119+
# Input redirection
120+
if token == "<":
121+
if i + 1 >= len(command):
122+
raise ValueError("Missing path for input redirection")
123+
path = command[i + 1]
124+
redirects["stdin"] = path
125+
i += 2
126+
continue
127+
128+
cmd.append(token)
129+
i += 1
130+
131+
return cmd, redirects
+104
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
"""
2+
Provides validation for shell commands and ensures they are allowed to be executed.
3+
"""
4+
5+
import os
6+
from typing import Dict, List
7+
8+
9+
class CommandValidator:
10+
"""
11+
Validates shell commands against a whitelist and checks for unsafe operators.
12+
"""
13+
14+
def __init__(self):
15+
"""
16+
Initialize the validator.
17+
"""
18+
pass
19+
20+
def _get_allowed_commands(self) -> set[str]:
21+
"""Get the set of allowed commands from environment variables"""
22+
allow_commands = os.environ.get("ALLOW_COMMANDS", "")
23+
allowed_commands = os.environ.get("ALLOWED_COMMANDS", "")
24+
commands = allow_commands + "," + allowed_commands
25+
return {cmd.strip() for cmd in commands.split(",") if cmd.strip()}
26+
27+
def get_allowed_commands(self) -> list[str]:
28+
"""Get the list of allowed commands from environment variables"""
29+
return list(self._get_allowed_commands())
30+
31+
def is_command_allowed(self, command: str) -> bool:
32+
"""Check if a command is in the allowed list"""
33+
cmd = command.strip()
34+
return cmd in self._get_allowed_commands()
35+
36+
def validate_no_shell_operators(self, cmd: str) -> None:
37+
"""
38+
Validate that the command does not contain shell operators.
39+
40+
Args:
41+
cmd (str): Command to validate
42+
43+
Raises:
44+
ValueError: If the command contains shell operators
45+
"""
46+
if cmd in [";", "&&", "||", "|"]:
47+
raise ValueError(f"Unexpected shell operator: {cmd}")
48+
49+
def validate_pipeline(self, commands: List[str]) -> Dict[str, str]:
50+
"""
51+
Validate pipeline command and ensure all parts are allowed.
52+
53+
Args:
54+
commands (List[str]): List of commands to validate
55+
56+
Returns:
57+
Dict[str, str]: Error message if validation fails, empty dict if success
58+
59+
Raises:
60+
ValueError: If validation fails
61+
"""
62+
current_cmd: List[str] = []
63+
64+
for token in commands:
65+
if token == "|":
66+
if not current_cmd:
67+
raise ValueError("Empty command before pipe operator")
68+
if not self.is_command_allowed(current_cmd[0]):
69+
raise ValueError(f"Command not allowed: {current_cmd[0]}")
70+
current_cmd = []
71+
elif token in [";", "&&", "||"]:
72+
raise ValueError(f"Unexpected shell operator in pipeline: {token}")
73+
else:
74+
current_cmd.append(token)
75+
76+
if current_cmd:
77+
if not self.is_command_allowed(current_cmd[0]):
78+
raise ValueError(f"Command not allowed: {current_cmd[0]}")
79+
80+
return {}
81+
82+
def validate_command(self, command: List[str]) -> None:
83+
"""
84+
Validate if the command is allowed to be executed.
85+
86+
Args:
87+
command (List[str]): Command and its arguments
88+
89+
Raises:
90+
ValueError: If the command is empty, not allowed, or contains invalid shell operators
91+
"""
92+
if not command:
93+
raise ValueError("Empty command")
94+
95+
allowed_commands = self._get_allowed_commands()
96+
if not allowed_commands:
97+
raise ValueError(
98+
"No commands are allowed. Please set ALLOW_COMMANDS environment variable."
99+
)
100+
101+
# Clean and check the first command
102+
cleaned_cmd = command[0].strip()
103+
if cleaned_cmd not in allowed_commands:
104+
raise ValueError(f"Command not allowed: {cleaned_cmd}")
+52
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
import os
2+
from typing import Optional
3+
4+
5+
class DirectoryManager:
6+
"""
7+
Manages directory validation and path operations for shell command execution.
8+
"""
9+
10+
def validate_directory(self, directory: Optional[str]) -> None:
11+
"""
12+
Validate if the directory exists and is accessible.
13+
14+
Args:
15+
directory (Optional[str]): Directory path to validate
16+
17+
Raises:
18+
ValueError: If the directory doesn't exist, not absolute or is not accessible
19+
"""
20+
# make directory required
21+
if directory is None:
22+
raise ValueError("Directory is required")
23+
24+
# verify directory is absolute path
25+
if not os.path.isabs(directory):
26+
raise ValueError(f"Directory must be an absolute path: {directory}")
27+
28+
if not os.path.exists(directory):
29+
raise ValueError(f"Directory does not exist: {directory}")
30+
31+
if not os.path.isdir(directory):
32+
raise ValueError(f"Not a directory: {directory}")
33+
34+
if not os.access(directory, os.R_OK | os.X_OK):
35+
raise ValueError(f"Directory is not accessible: {directory}")
36+
37+
def get_absolute_path(self, path: str, base_directory: Optional[str] = None) -> str:
38+
"""
39+
Get absolute path by joining base directory with path if path is relative.
40+
41+
Args:
42+
path (str): The path to make absolute
43+
base_directory (Optional[str]): Base directory to join with relative paths
44+
45+
Returns:
46+
str: Absolute path
47+
"""
48+
if os.path.isabs(path):
49+
return path
50+
if not base_directory:
51+
return os.path.abspath(path)
52+
return os.path.join(base_directory, path)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
"""IO redirection handling module for MCP Shell Server."""
2+
3+
import asyncio
4+
import os
5+
from typing import IO, Any, Dict, List, Optional, Tuple, Union
6+
7+
8+
class IORedirectionHandler:
9+
"""Handles input/output redirection for shell commands."""
10+
11+
def validate_redirection_syntax(self, command: List[str]) -> None:
12+
"""
13+
Validate the syntax of redirection operators in the command.
14+
15+
Args:
16+
command (List[str]): Command and its arguments including redirections
17+
18+
Raises:
19+
ValueError: If the redirection syntax is invalid
20+
"""
21+
prev_token = None
22+
for token in command:
23+
if token in [">", ">>", "<"]:
24+
if prev_token and prev_token in [">", ">>", "<"]:
25+
raise ValueError(
26+
"Invalid redirection syntax: consecutive operators"
27+
)
28+
prev_token = token
29+
30+
def process_redirections(
31+
self, command: List[str]
32+
) -> Tuple[List[str], Dict[str, Union[None, str, bool]]]:
33+
"""
34+
Process input/output redirections in the command.
35+
36+
Args:
37+
command (List[str]): Command and its arguments including redirections
38+
39+
Returns:
40+
Tuple[List[str], Dict[str, Any]]: Processed command without redirections and
41+
redirection configuration
42+
43+
Raises:
44+
ValueError: If the redirection syntax is invalid
45+
"""
46+
self.validate_redirection_syntax(command)
47+
48+
cmd = []
49+
redirects: Dict[str, Union[None, str, bool]] = {
50+
"stdin": None,
51+
"stdout": None,
52+
"stdout_append": False,
53+
}
54+
55+
i = 0
56+
while i < len(command):
57+
token = command[i]
58+
59+
# Output redirection
60+
if token in [">", ">>"]:
61+
if i + 1 >= len(command):
62+
raise ValueError("Missing path for output redirection")
63+
if i + 1 < len(command) and command[i + 1] in [">", ">>", "<"]:
64+
raise ValueError("Invalid redirection target: operator found")
65+
path = command[i + 1]
66+
redirects["stdout"] = path
67+
redirects["stdout_append"] = token == ">>"
68+
i += 2
69+
continue
70+
71+
# Input redirection
72+
if token == "<":
73+
if i + 1 >= len(command):
74+
raise ValueError("Missing path for input redirection")
75+
path = command[i + 1]
76+
if path in [">", ">>", "<"]:
77+
raise ValueError("Invalid redirection target: operator found")
78+
redirects["stdin"] = path
79+
i += 2
80+
continue
81+
82+
cmd.append(token)
83+
i += 1
84+
85+
return cmd, redirects
86+
87+
async def setup_redirects(
88+
self,
89+
redirects: Dict[str, Union[None, str, bool]],
90+
directory: Optional[str] = None,
91+
) -> Dict[str, Union[IO[Any], int, str, None]]:
92+
"""
93+
Set up file handles for redirections.
94+
95+
Args:
96+
redirects (Dict[str, Union[None, str, bool]]): Redirection configuration
97+
directory (Optional[str]): Working directory for file paths
98+
99+
Returns:
100+
Dict[str, Union[IO[Any], int, str, None]]: File handles for subprocess
101+
"""
102+
handles: Dict[str, Union[IO[Any], int, str, None]] = {}
103+
104+
# Handle input redirection
105+
if redirects["stdin"]:
106+
path = (
107+
os.path.join(directory or "", str(redirects["stdin"]))
108+
if directory and redirects["stdin"]
109+
else str(redirects["stdin"])
110+
)
111+
try:
112+
file = open(path, "r")
113+
handles["stdin"] = asyncio.subprocess.PIPE
114+
handles["stdin_data"] = file.read()
115+
file.close()
116+
except (FileNotFoundError, IOError) as e:
117+
raise ValueError("Failed to open input file") from e
118+
119+
# Handle output redirection
120+
if redirects["stdout"]:
121+
path = (
122+
os.path.join(directory or "", str(redirects["stdout"]))
123+
if directory and redirects["stdout"]
124+
else str(redirects["stdout"])
125+
)
126+
mode = "a" if redirects.get("stdout_append") else "w"
127+
try:
128+
handles["stdout"] = open(path, mode)
129+
except (IOError, PermissionError) as e:
130+
raise ValueError(f"Failed to open output file: {e}") from e
131+
else:
132+
handles["stdout"] = asyncio.subprocess.PIPE
133+
134+
handles["stderr"] = asyncio.subprocess.PIPE
135+
136+
return handles
137+
138+
async def cleanup_handles(
139+
self, handles: Dict[str, Union[IO[Any], int, None]]
140+
) -> None:
141+
"""
142+
Clean up file handles after command execution.
143+
144+
Args:
145+
handles (Dict[str, Union[IO[Any], int, None]]): File handles to clean up
146+
"""
147+
for key in ["stdout", "stderr"]:
148+
handle = handles.get(key)
149+
if handle and hasattr(handle, "close") and not isinstance(handle, int):
150+
try:
151+
handle.close()
152+
except (IOError, ValueError):
153+
pass
+314
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,314 @@
1+
"""Process management for shell command execution."""
2+
3+
import asyncio
4+
import logging
5+
import os
6+
import signal
7+
from typing import IO, Any, Dict, List, Optional, Set, Tuple, Union
8+
from weakref import WeakSet
9+
10+
11+
class ProcessManager:
12+
"""Manages process creation, execution, and cleanup for shell commands."""
13+
14+
def __init__(self):
15+
"""Initialize ProcessManager with signal handling setup."""
16+
self._processes: Set[asyncio.subprocess.Process] = WeakSet()
17+
self._original_sigint_handler = None
18+
self._original_sigterm_handler = None
19+
self._setup_signal_handlers()
20+
21+
def _setup_signal_handlers(self) -> None:
22+
"""Set up signal handlers for graceful process management."""
23+
if os.name != "posix":
24+
return
25+
26+
def handle_termination(signum: int, _: Any) -> None:
27+
"""Handle termination signals by cleaning up processes."""
28+
if self._processes:
29+
for process in self._processes:
30+
try:
31+
if process.returncode is None:
32+
process.terminate()
33+
except Exception as e:
34+
logging.warning(
35+
f"Error terminating process on signal {signum}: {e}"
36+
)
37+
38+
# Restore original handler and re-raise signal
39+
if signum == signal.SIGINT and self._original_sigint_handler:
40+
signal.signal(signal.SIGINT, self._original_sigint_handler)
41+
elif signum == signal.SIGTERM and self._original_sigterm_handler:
42+
signal.signal(signal.SIGTERM, self._original_sigterm_handler)
43+
44+
# Re-raise signal
45+
os.kill(os.getpid(), signum)
46+
47+
# Store original handlers
48+
self._original_sigint_handler = signal.signal(signal.SIGINT, handle_termination)
49+
self._original_sigterm_handler = signal.signal(
50+
signal.SIGTERM, handle_termination
51+
)
52+
53+
async def start_process_async(
54+
self, cmd: List[str], timeout: Optional[int] = None
55+
) -> asyncio.subprocess.Process:
56+
"""Start a new process asynchronously.
57+
58+
Args:
59+
cmd: Command to execute as list of strings
60+
timeout: Optional timeout in seconds
61+
62+
Returns:
63+
Process object
64+
"""
65+
process = await self.create_process(
66+
" ".join(cmd), directory=None, timeout=timeout
67+
)
68+
process.is_running = lambda self=process: self.returncode is None # type: ignore
69+
return process
70+
71+
async def start_process(
72+
self, cmd: List[str], timeout: Optional[int] = None
73+
) -> asyncio.subprocess.Process:
74+
"""Start a new process asynchronously.
75+
76+
Args:
77+
cmd: Command to execute as list of strings
78+
timeout: Optional timeout in seconds
79+
80+
Returns:
81+
Process object
82+
"""
83+
process = await self.create_process(
84+
" ".join(cmd), directory=None, timeout=timeout
85+
)
86+
process.is_running = lambda self=process: self.returncode is None # type: ignore
87+
return process
88+
89+
async def cleanup_processes(
90+
self, processes: List[asyncio.subprocess.Process]
91+
) -> None:
92+
"""Clean up processes by killing them if they're still running.
93+
94+
Args:
95+
processes: List of processes to clean up
96+
"""
97+
cleanup_tasks = []
98+
for process in processes:
99+
if process.returncode is None:
100+
try:
101+
# Force kill immediately as required by tests
102+
process.kill()
103+
cleanup_tasks.append(asyncio.create_task(process.wait()))
104+
except Exception as e:
105+
logging.warning(f"Error killing process: {e}")
106+
107+
if cleanup_tasks:
108+
try:
109+
# Wait for all processes to be killed
110+
await asyncio.wait(cleanup_tasks, timeout=5)
111+
except asyncio.TimeoutError:
112+
logging.error("Process cleanup timed out")
113+
except Exception as e:
114+
logging.error(f"Error during process cleanup: {e}")
115+
116+
async def cleanup_all(self) -> None:
117+
"""Clean up all tracked processes."""
118+
if self._processes:
119+
processes = list(self._processes)
120+
await self.cleanup_processes(processes)
121+
self._processes.clear()
122+
123+
async def create_process(
124+
self,
125+
shell_cmd: str,
126+
directory: Optional[str],
127+
stdin: Optional[str] = None,
128+
stdout_handle: Any = asyncio.subprocess.PIPE,
129+
envs: Optional[Dict[str, str]] = None,
130+
timeout: Optional[int] = None,
131+
) -> asyncio.subprocess.Process:
132+
"""Create a new subprocess with the given parameters.
133+
134+
Args:
135+
shell_cmd (str): Shell command to execute
136+
directory (Optional[str]): Working directory
137+
stdin (Optional[str]): Input to be passed to the process
138+
stdout_handle: File handle or PIPE for stdout
139+
envs (Optional[Dict[str, str]]): Additional environment variables
140+
timeout (Optional[int]): Timeout in seconds
141+
142+
Returns:
143+
asyncio.subprocess.Process: Created process
144+
145+
Raises:
146+
ValueError: If process creation fails
147+
"""
148+
try:
149+
process = await asyncio.create_subprocess_shell(
150+
shell_cmd,
151+
stdin=asyncio.subprocess.PIPE,
152+
stdout=stdout_handle,
153+
stderr=asyncio.subprocess.PIPE,
154+
env={**os.environ, **(envs or {})},
155+
cwd=directory,
156+
)
157+
158+
# Add process to tracked set
159+
self._processes.add(process)
160+
return process
161+
162+
except OSError as e:
163+
raise ValueError(f"Failed to create process: {str(e)}") from e
164+
except Exception as e:
165+
raise ValueError(
166+
f"Unexpected error during process creation: {str(e)}"
167+
) from e
168+
169+
async def execute_with_timeout(
170+
self,
171+
process: asyncio.subprocess.Process,
172+
stdin: Optional[str] = None,
173+
timeout: Optional[int] = None,
174+
) -> Tuple[bytes, bytes]:
175+
"""Execute the process with timeout handling.
176+
177+
Args:
178+
process: Process to execute
179+
stdin (Optional[str]): Input to pass to the process
180+
timeout (Optional[int]): Timeout in seconds
181+
182+
Returns:
183+
Tuple[bytes, bytes]: Tuple of (stdout, stderr)
184+
185+
Raises:
186+
asyncio.TimeoutError: If execution times out
187+
"""
188+
stdin_bytes = stdin.encode() if stdin else None
189+
190+
async def _kill_process():
191+
if process.returncode is not None:
192+
return
193+
194+
try:
195+
# Try graceful termination first
196+
process.terminate()
197+
for _ in range(5): # Wait up to 0.5 seconds
198+
if process.returncode is not None:
199+
return
200+
await asyncio.sleep(0.1)
201+
202+
# Force kill if still running
203+
if process.returncode is None:
204+
process.kill()
205+
await asyncio.wait_for(process.wait(), timeout=1.0)
206+
except Exception as e:
207+
logging.warning(f"Error killing process: {e}")
208+
209+
try:
210+
if timeout:
211+
try:
212+
return await asyncio.wait_for(
213+
process.communicate(input=stdin_bytes), timeout=timeout
214+
)
215+
except asyncio.TimeoutError:
216+
await _kill_process()
217+
raise
218+
return await process.communicate(input=stdin_bytes)
219+
except Exception as e:
220+
await _kill_process()
221+
raise e
222+
223+
async def execute_pipeline(
224+
self,
225+
commands: List[str],
226+
first_stdin: Optional[bytes] = None,
227+
last_stdout: Union[IO[Any], int, None] = None,
228+
directory: Optional[str] = None,
229+
timeout: Optional[int] = None,
230+
envs: Optional[Dict[str, str]] = None,
231+
) -> Tuple[bytes, bytes, int]:
232+
"""Execute a pipeline of commands.
233+
234+
Args:
235+
commands: List of shell commands to execute in pipeline
236+
first_stdin: Input to pass to the first command
237+
last_stdout: Output handle for the last command
238+
directory: Working directory
239+
timeout: Timeout in seconds
240+
envs: Additional environment variables
241+
242+
Returns:
243+
Tuple[bytes, bytes, int]: Tuple of (stdout, stderr, return_code)
244+
245+
Raises:
246+
ValueError: If no commands provided or command execution fails
247+
"""
248+
if not commands:
249+
raise ValueError("No commands provided")
250+
251+
processes: List[asyncio.subprocess.Process] = []
252+
try:
253+
prev_stdout: Optional[bytes] = first_stdin
254+
final_stderr: bytes = b""
255+
final_stdout: bytes = b""
256+
257+
for i, cmd in enumerate(commands):
258+
process = await self.create_process(
259+
cmd,
260+
directory,
261+
stdout_handle=(
262+
asyncio.subprocess.PIPE
263+
if i < len(commands) - 1 or not last_stdout
264+
else last_stdout
265+
),
266+
envs=envs,
267+
)
268+
if not hasattr(process, "is_running"):
269+
process.is_running = lambda self=process: self.returncode is None # type: ignore
270+
processes.append(process)
271+
272+
try:
273+
stdout, stderr = await self.execute_with_timeout(
274+
process,
275+
stdin=prev_stdout.decode() if prev_stdout else None,
276+
timeout=timeout,
277+
)
278+
279+
final_stderr += stderr if stderr else b""
280+
if process.returncode != 0:
281+
error_msg = stderr.decode("utf-8", errors="replace").strip()
282+
if not error_msg:
283+
error_msg = (
284+
f"Command failed with exit code {process.returncode}"
285+
)
286+
raise ValueError(error_msg)
287+
288+
if i == len(commands) - 1:
289+
if last_stdout and isinstance(last_stdout, IO):
290+
last_stdout.write(stdout.decode("utf-8", errors="replace"))
291+
else:
292+
final_stdout = stdout if stdout else b""
293+
else:
294+
prev_stdout = stdout if stdout else b""
295+
296+
except asyncio.TimeoutError:
297+
process.kill()
298+
raise
299+
except Exception:
300+
process.kill()
301+
raise
302+
303+
return (
304+
final_stdout,
305+
final_stderr,
306+
(
307+
processes[-1].returncode
308+
if processes and processes[-1].returncode is not None
309+
else 1
310+
),
311+
)
312+
313+
finally:
314+
await self.cleanup_processes(processes)

‎src/mcp_shell_server/server.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ def __init__(self):
2828

2929
def get_allowed_commands(self) -> list[str]:
3030
"""Get the allowed commands"""
31-
return self.executor.get_allowed_commands()
31+
return self.executor.validator.get_allowed_commands()
3232

3333
def get_tool_description(self) -> Tool:
3434
"""Get the tool description for the execute command"""

‎src/mcp_shell_server/shell_executor.py

+215-516
Large diffs are not rendered by default.

‎tests/conftest.py

+98-12
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,98 @@
1-
import os
2-
3-
4-
# Configure pytest-asyncio
5-
def pytest_configure(config):
6-
"""Configure pytest-asyncio defaults"""
7-
# Enable command execution for tests
8-
os.environ["ALLOW_COMMANDS"] = "1"
9-
# Add allowed commands for tests
10-
os.environ["ALLOWED_COMMANDS"] = (
11-
"echo,sleep,cat,ls,pwd,touch,mkdir,rm,mv,cp,grep,awk,sed"
12-
)
1+
"""
2+
Test configuration and fixtures.
3+
"""
4+
5+
import asyncio
6+
from typing import IO
7+
from unittest.mock import AsyncMock, MagicMock
8+
9+
import pytest
10+
import pytest_asyncio
11+
12+
from mcp_shell_server.shell_executor import ShellExecutor
13+
14+
15+
@pytest.fixture
16+
def mock_file(mocker):
17+
"""Provide a mock file object."""
18+
mock = mocker.MagicMock(spec=IO)
19+
mock.close = mocker.MagicMock()
20+
return mock
21+
22+
23+
@pytest_asyncio.fixture
24+
async def mock_process_manager():
25+
"""Provide a mock process manager."""
26+
manager = MagicMock()
27+
28+
# Mock process object
29+
process = AsyncMock()
30+
process.returncode = 0
31+
32+
# Mock manager methods
33+
manager.create_process = AsyncMock()
34+
35+
async def create_process_side_effect(*args, **kwargs):
36+
process = AsyncMock()
37+
process.returncode = 0
38+
process.communicate = AsyncMock(return_value=(b"", b""))
39+
process.kill = AsyncMock()
40+
process.wait = AsyncMock()
41+
return process
42+
43+
manager.create_process.side_effect = create_process_side_effect
44+
45+
manager.execute_with_timeout = AsyncMock()
46+
manager.execute_pipeline = AsyncMock()
47+
manager.cleanup_processes = AsyncMock()
48+
49+
# Set empty default return values - tests should override these as needed
50+
manager.execute_with_timeout.return_value = (b"", b"")
51+
manager.execute_pipeline.return_value = (b"", b"", 0)
52+
53+
return manager
54+
55+
56+
@pytest_asyncio.fixture
57+
async def shell_executor_with_mock(mock_process_manager):
58+
"""Provide a shell executor with mock process manager."""
59+
executor = ShellExecutor(process_manager=mock_process_manager)
60+
return executor
61+
62+
63+
@pytest.fixture
64+
def temp_test_dir(tmpdir):
65+
"""Provide a temporary test directory."""
66+
return str(tmpdir)
67+
68+
69+
@pytest_asyncio.fixture(scope="function")
70+
async def event_loop():
71+
"""Create and provide a new event loop for each module."""
72+
loop = asyncio.new_event_loop()
73+
asyncio.set_event_loop(loop)
74+
yield loop
75+
76+
# Clean up the event loop
77+
try:
78+
# Close all tasks
79+
tasks = asyncio.all_tasks(loop)
80+
if tasks:
81+
# Cancel all tasks and wait for their completion
82+
for task in tasks:
83+
task.cancel()
84+
loop.run_until_complete(asyncio.gather(*tasks, return_exceptions=True))
85+
86+
# Clean up all transports
87+
if hasattr(loop, "_transports"):
88+
for transport in list(loop._transports.values()):
89+
if hasattr(transport, "close"):
90+
transport.close()
91+
92+
# Cleanup
93+
loop.stop()
94+
asyncio.set_event_loop(None)
95+
await loop.shutdown_asyncgens()
96+
loop.close()
97+
except Exception as e:
98+
print(f"Error during loop cleanup: {e}")

‎tests/conftest_new.py

+29
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
"""
2+
Test configuration and fixtures.
3+
"""
4+
5+
import asyncio
6+
import os
7+
8+
import pytest_asyncio
9+
10+
11+
@pytest_asyncio.fixture(autouse=True)
12+
async def cleanup_loop():
13+
"""Run after each test to ensure proper event loop cleanup."""
14+
yield
15+
if hasattr(asyncio, "current_task") and asyncio.current_task() is not None:
16+
try:
17+
await asyncio.sleep(0) # Allow pending callbacks to complete
18+
except Exception:
19+
pass
20+
21+
22+
def pytest_configure(config):
23+
"""Configure pytest-asyncio defaults"""
24+
# Enable command execution for tests
25+
os.environ["ALLOW_COMMANDS"] = "1"
26+
# Add allowed commands for tests
27+
os.environ["ALLOWED_COMMANDS"] = (
28+
"echo,sleep,cat,ls,pwd,touch,mkdir,rm,mv,cp,grep,awk,sed"
29+
)

‎tests/test_command_validator.py

+74
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
"""Test cases for the CommandValidator class."""
2+
3+
import pytest
4+
5+
from mcp_shell_server.command_validator import CommandValidator
6+
7+
8+
def clear_env(monkeypatch):
9+
monkeypatch.delenv("ALLOW_COMMANDS", raising=False)
10+
monkeypatch.delenv("ALLOWED_COMMANDS", raising=False)
11+
12+
13+
@pytest.fixture
14+
def validator():
15+
return CommandValidator()
16+
17+
18+
def test_get_allowed_commands(validator, monkeypatch):
19+
clear_env(monkeypatch)
20+
monkeypatch.setenv("ALLOW_COMMANDS", "cmd1,cmd2")
21+
monkeypatch.setenv("ALLOWED_COMMANDS", "cmd3,cmd4")
22+
assert set(validator.get_allowed_commands()) == {"cmd1", "cmd2", "cmd3", "cmd4"}
23+
24+
25+
def test_is_command_allowed(validator, monkeypatch):
26+
clear_env(monkeypatch)
27+
monkeypatch.setenv("ALLOW_COMMANDS", "allowed_cmd")
28+
assert validator.is_command_allowed("allowed_cmd")
29+
assert not validator.is_command_allowed("disallowed_cmd")
30+
31+
32+
def test_validate_no_shell_operators(validator):
33+
validator.validate_no_shell_operators("echo") # Should not raise
34+
with pytest.raises(ValueError, match="Unexpected shell operator"):
35+
validator.validate_no_shell_operators(";")
36+
with pytest.raises(ValueError, match="Unexpected shell operator"):
37+
validator.validate_no_shell_operators("&&")
38+
39+
40+
def test_validate_pipeline(validator, monkeypatch):
41+
clear_env(monkeypatch)
42+
monkeypatch.setenv("ALLOW_COMMANDS", "ls,grep")
43+
44+
# Valid pipeline
45+
validator.validate_pipeline(["ls", "|", "grep", "test"])
46+
47+
# Empty command before pipe
48+
with pytest.raises(ValueError, match="Empty command before pipe operator"):
49+
validator.validate_pipeline(["|", "grep", "test"])
50+
51+
# Command not allowed
52+
with pytest.raises(ValueError, match="Command not allowed"):
53+
validator.validate_pipeline(["invalid_cmd", "|", "grep", "test"])
54+
55+
56+
def test_validate_command(validator, monkeypatch):
57+
clear_env(monkeypatch)
58+
59+
# No allowed commands
60+
with pytest.raises(ValueError, match="No commands are allowed"):
61+
validator.validate_command(["cmd"])
62+
63+
monkeypatch.setenv("ALLOW_COMMANDS", "allowed_cmd")
64+
65+
# Empty command
66+
with pytest.raises(ValueError, match="Empty command"):
67+
validator.validate_command([])
68+
69+
# Command not allowed
70+
with pytest.raises(ValueError, match="Command not allowed"):
71+
validator.validate_command(["disallowed_cmd"])
72+
73+
# Command allowed
74+
validator.validate_command(["allowed_cmd", "-arg"]) # Should not raise

‎tests/test_directory_manager.py

+56
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
"""Tests for directory_manager module."""
2+
3+
import os
4+
5+
import pytest
6+
7+
from mcp_shell_server.directory_manager import DirectoryManager
8+
9+
10+
def test_validate_directory(tmp_path):
11+
"""Test directory validation."""
12+
manager = DirectoryManager()
13+
test_dir = str(tmp_path)
14+
15+
# Valid directory
16+
manager.validate_directory(test_dir)
17+
18+
# None directory
19+
with pytest.raises(ValueError, match="Directory is required"):
20+
manager.validate_directory(None)
21+
22+
# Relative path
23+
with pytest.raises(ValueError, match="Directory must be an absolute path"):
24+
manager.validate_directory("relative/path")
25+
26+
# Non-existent directory
27+
nonexistent = os.path.join(test_dir, "nonexistent")
28+
with pytest.raises(ValueError, match="Directory does not exist"):
29+
manager.validate_directory(nonexistent)
30+
31+
# Not a directory (create a file)
32+
test_file = os.path.join(test_dir, "test.txt")
33+
with open(test_file, "w") as f:
34+
f.write("test")
35+
with pytest.raises(ValueError, match="Not a directory"):
36+
manager.validate_directory(test_file)
37+
38+
39+
def test_get_absolute_path(tmp_path):
40+
"""Test absolute path resolution."""
41+
manager = DirectoryManager()
42+
test_dir = str(tmp_path)
43+
44+
# Already absolute path
45+
abs_path = os.path.join(test_dir, "test")
46+
assert manager.get_absolute_path(abs_path) == abs_path
47+
48+
# Relative path without base directory
49+
rel_path = "test/path"
50+
expected = os.path.abspath(rel_path)
51+
assert manager.get_absolute_path(rel_path) == expected
52+
53+
# Relative path with base directory
54+
rel_path = "test/path"
55+
expected = os.path.join(test_dir, rel_path)
56+
assert manager.get_absolute_path(rel_path, test_dir) == expected

‎tests/test_io_redirection_handler.py

Whitespace-only changes.

‎tests/test_process_manager.py

+193
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
"""Tests for the ProcessManager class."""
2+
3+
import asyncio
4+
from unittest.mock import AsyncMock, MagicMock, patch
5+
6+
import pytest
7+
8+
from mcp_shell_server.process_manager import ProcessManager
9+
10+
11+
def create_mock_process():
12+
"""Create a mock process with all required attributes."""
13+
process = MagicMock()
14+
process.returncode = 0
15+
process.communicate = AsyncMock(return_value=(b"output", b"error"))
16+
process.wait = AsyncMock(return_value=0)
17+
process.terminate = MagicMock()
18+
process.kill = MagicMock()
19+
return process
20+
21+
22+
@pytest.fixture
23+
def process_manager():
24+
"""Fixture for ProcessManager instance."""
25+
return ProcessManager()
26+
27+
28+
@pytest.mark.asyncio
29+
async def test_create_process(process_manager):
30+
"""Test creating a process with basic parameters."""
31+
mock_proc = create_mock_process()
32+
with patch(
33+
"mcp_shell_server.process_manager.asyncio.create_subprocess_shell",
34+
new_callable=AsyncMock,
35+
return_value=mock_proc,
36+
) as mock_create:
37+
process = await process_manager.create_process(
38+
"echo 'test'",
39+
directory="/tmp",
40+
stdin="input",
41+
)
42+
43+
assert process == mock_proc
44+
assert process == mock_proc
45+
mock_create.assert_called_once()
46+
47+
48+
@pytest.mark.asyncio
49+
async def test_execute_with_timeout_success(process_manager):
50+
"""Test executing a process with successful completion."""
51+
mock_proc = create_mock_process()
52+
mock_proc.returncode = 0
53+
mock_proc.communicate.return_value = (b"output", b"error")
54+
55+
stdout, stderr = await process_manager.execute_with_timeout(
56+
mock_proc,
57+
stdin="input",
58+
timeout=10,
59+
)
60+
61+
assert stdout == b"output"
62+
assert stderr == b"error"
63+
mock_proc.communicate.assert_called_once()
64+
65+
66+
@pytest.mark.asyncio
67+
async def test_execute_with_timeout_timeout(process_manager):
68+
"""Test executing a process that times out."""
69+
mock_proc = create_mock_process()
70+
exc = asyncio.TimeoutError("Process timed out")
71+
mock_proc.communicate.side_effect = exc
72+
mock_proc.returncode = None # プロセスがまだ実行中の状態をシミュレート
73+
74+
# プロセスの終了状態をシミュレート
75+
async def set_returncode():
76+
mock_proc.returncode = -15 # SIGTERM
77+
78+
mock_proc.wait.side_effect = set_returncode
79+
80+
with pytest.raises(TimeoutError):
81+
await process_manager.execute_with_timeout(
82+
mock_proc,
83+
timeout=1,
84+
)
85+
86+
mock_proc.terminate.assert_called_once()
87+
88+
89+
@pytest.mark.asyncio
90+
async def test_execute_pipeline_success(process_manager):
91+
"""Test executing a pipeline of commands successfully."""
92+
mock_proc1 = create_mock_process()
93+
mock_proc1.communicate.return_value = (b"output1", b"")
94+
mock_proc1.returncode = 0
95+
96+
mock_proc2 = create_mock_process()
97+
mock_proc2.communicate.return_value = (b"final output", b"")
98+
mock_proc2.returncode = 0
99+
100+
with patch(
101+
"mcp_shell_server.process_manager.asyncio.create_subprocess_shell",
102+
new_callable=AsyncMock,
103+
side_effect=[mock_proc1, mock_proc2],
104+
) as mock_create:
105+
stdout, stderr, return_code = await process_manager.execute_pipeline(
106+
["echo 'test'", "grep test"],
107+
directory="/tmp",
108+
timeout=10,
109+
)
110+
111+
assert stdout == b"final output"
112+
assert stderr == b""
113+
assert return_code == 0
114+
assert mock_create.call_count == 2 # Verify subprocess creation calls
115+
116+
# Verify the command arguments for each subprocess call
117+
calls = mock_create.call_args_list
118+
assert "echo" in calls[0].args[0]
119+
assert "grep" in calls[1].args[0]
120+
121+
122+
@pytest.mark.asyncio
123+
async def test_execute_pipeline_with_error(process_manager):
124+
"""Test executing a pipeline where a command fails."""
125+
mock_proc = create_mock_process()
126+
mock_proc.communicate.return_value = (b"", b"error message")
127+
mock_proc.returncode = 1
128+
129+
create_process_mock = AsyncMock(return_value=mock_proc)
130+
131+
with patch.object(process_manager, "create_process", create_process_mock):
132+
with pytest.raises(ValueError, match="error message"):
133+
await process_manager.execute_pipeline(
134+
["invalid_command"],
135+
directory="/tmp",
136+
)
137+
138+
139+
@pytest.mark.asyncio
140+
async def test_cleanup_processes(process_manager):
141+
"""Test cleaning up processes."""
142+
# Create mock processes with different states
143+
running_proc = create_mock_process()
144+
running_proc.returncode = None
145+
146+
completed_proc = create_mock_process()
147+
completed_proc.returncode = 0
148+
149+
# Execute cleanup
150+
await process_manager.cleanup_processes([running_proc, completed_proc])
151+
152+
# Verify running process was killed and waited for
153+
running_proc.kill.assert_called_once()
154+
running_proc.wait.assert_awaited_once()
155+
156+
# Verify completed process was not killed or waited for
157+
completed_proc.kill.assert_not_called()
158+
completed_proc.wait.assert_not_called()
159+
160+
161+
@pytest.mark.asyncio
162+
async def test_create_process_with_error(process_manager):
163+
"""Test creating a process that fails to start."""
164+
with patch(
165+
"asyncio.create_subprocess_shell",
166+
new_callable=AsyncMock,
167+
side_effect=OSError("Failed to create process"),
168+
):
169+
with pytest.raises(ValueError, match="Failed to create process"):
170+
await process_manager.create_process("invalid command", directory="/tmp")
171+
172+
173+
@pytest.mark.asyncio
174+
async def test_execute_pipeline_empty_commands(process_manager):
175+
"""Test executing a pipeline with no commands."""
176+
with pytest.raises(ValueError, match="No commands provided"):
177+
await process_manager.execute_pipeline([], directory="/tmp")
178+
179+
180+
@pytest.mark.asyncio
181+
async def test_execute_pipeline_timeout(process_manager):
182+
"""Test executing a pipeline that times out."""
183+
mock_proc = create_mock_process()
184+
mock_proc.communicate.side_effect = TimeoutError("Process timed out")
185+
186+
with patch.object(process_manager, "create_process", return_value=mock_proc):
187+
with pytest.raises(TimeoutError, match="Process timed out"):
188+
await process_manager.execute_pipeline(
189+
["sleep 10"],
190+
directory="/tmp",
191+
timeout=1,
192+
)
193+
mock_proc.kill.assert_called_once()

‎tests/test_process_manager_macos.py

+149
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
# tests/test_process_manager_macos.py
2+
import asyncio
3+
import platform
4+
import subprocess
5+
6+
import pytest
7+
8+
pytestmark = [
9+
pytest.mark.skipif(
10+
platform.system() != "Darwin", reason="These tests only run on macOS"
11+
),
12+
pytest.mark.macos,
13+
pytest.mark.slow,
14+
]
15+
16+
17+
@pytest.fixture
18+
def process_manager():
19+
from mcp_shell_server.process_manager import ProcessManager
20+
21+
pm = ProcessManager()
22+
try:
23+
yield pm
24+
finally:
25+
asyncio.run(pm.cleanup_all())
26+
27+
28+
def get_process_status(pid: int) -> str:
29+
"""Get process status using ps command."""
30+
try:
31+
ps = subprocess.run(
32+
["ps", "-o", "stat=", "-p", str(pid)], capture_output=True, text=True
33+
)
34+
return ps.stdout.strip()
35+
except subprocess.CalledProcessError:
36+
return ""
37+
38+
39+
@pytest.mark.asyncio
40+
async def test_zombie_process_cleanup(process_manager):
41+
"""Test that background processes don't become zombies."""
42+
cmd = ["sh", "-c", "sleep 0.5 & wait"]
43+
process = await process_manager.start_process(cmd)
44+
45+
# Wait for the background process to finish
46+
await asyncio.sleep(1)
47+
48+
# Get process status
49+
status = get_process_status(process.pid)
50+
51+
# Verify process is either gone or not zombie (Z state)
52+
assert "Z" not in status, f"Process {process.pid} is zombie (status: {status})"
53+
54+
55+
@pytest.mark.asyncio
56+
async def test_process_timeout(process_manager):
57+
"""Test process timeout functionality."""
58+
# Start a process that should timeout
59+
cmd = ["sleep", "10"]
60+
process = await process_manager.start_process(cmd)
61+
62+
try:
63+
# Communicate with timeout
64+
with pytest.raises(TimeoutError):
65+
_, _ = await process_manager.execute_with_timeout(process, timeout=1)
66+
67+
# プロセスが終了するまで待つ
68+
try:
69+
await asyncio.wait_for(process.wait(), timeout=1.0)
70+
except asyncio.TimeoutError:
71+
process.kill() # Force kill
72+
73+
# Wait for termination
74+
await asyncio.wait_for(process.wait(), timeout=0.5)
75+
76+
# Verify process was terminated
77+
assert process.returncode is not None
78+
assert not process.is_running()
79+
finally:
80+
if process.returncode is None:
81+
try:
82+
process.kill()
83+
await asyncio.wait_for(process.wait(), timeout=0.5)
84+
except (ProcessLookupError, asyncio.TimeoutError):
85+
pass
86+
87+
88+
@pytest.mark.asyncio
89+
async def test_multiple_process_cleanup(process_manager):
90+
"""Test cleanup of multiple processes."""
91+
# Start multiple background processes
92+
# Start multiple processes in parallel
93+
processes = await asyncio.gather(
94+
*[process_manager.start_process(["sleep", "2"]) for _ in range(3)]
95+
)
96+
97+
# Give them a moment to start
98+
await asyncio.sleep(0.1)
99+
100+
try:
101+
# Verify they're all running
102+
assert all(p.is_running() for p in processes)
103+
104+
# Cleanup
105+
await process_manager.cleanup_all()
106+
107+
# Give cleanup a moment to complete
108+
await asyncio.sleep(0.1)
109+
110+
# Verify all processes are gone
111+
for p in processes:
112+
status = get_process_status(p.pid)
113+
assert status == "", f"Process {p.pid} still exists with status: {status}"
114+
finally:
115+
# Ensure cleanup in case of test failure
116+
for p in processes:
117+
if p.returncode is None:
118+
try:
119+
p.kill()
120+
except ProcessLookupError:
121+
pass
122+
123+
124+
@pytest.mark.asyncio
125+
async def test_process_group_termination(process_manager):
126+
"""Test that entire process group is terminated."""
127+
# Create a process that spawns children
128+
cmd = ["sh", "-c", "sleep 10 & sleep 10 & sleep 10 & wait"]
129+
process = await process_manager.start_process(cmd)
130+
131+
try:
132+
# Give processes time to start
133+
await asyncio.sleep(0.5)
134+
135+
# Kill the main process
136+
process.kill()
137+
138+
# Wait a moment for cleanup
139+
await asyncio.sleep(0.5)
140+
141+
# Check if any processes from the group remain
142+
ps = subprocess.run(["pgrep", "-g", str(process.pid)], capture_output=True)
143+
assert ps.returncode != 0, "Process group still exists"
144+
finally:
145+
if process.returncode is None:
146+
try:
147+
process.kill()
148+
except ProcessLookupError:
149+
pass

‎tests/test_server.py

+10-2
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,14 @@ def setup_mock_subprocess(monkeypatch):
4444
"""Set up mock subprocess to avoid interactive shell warnings"""
4545

4646
async def mock_create_subprocess_shell(
47-
cmd, stdin=None, stdout=None, stderr=None, env=None, cwd=None
47+
cmd,
48+
stdin=None,
49+
stdout=None,
50+
stderr=None,
51+
env=None,
52+
cwd=None,
53+
preexec_fn=None,
54+
start_new_session=None,
4855
):
4956
# Return appropriate output based on command
5057
if "echo" in cmd:
@@ -83,8 +90,9 @@ async def test_list_tools():
8390

8491

8592
@pytest.mark.asyncio
86-
async def test_tool_execution_timeout():
93+
async def test_tool_execution_timeout(monkeypatch):
8794
"""Test tool execution with timeout"""
95+
monkeypatch.setenv("ALLOW_COMMANDS", "sleep")
8896
with pytest.raises(RuntimeError, match="Command execution timed out"):
8997
await call_tool(
9098
"shell_execute",

‎tests/test_shell_executor.py

+517-149
Large diffs are not rendered by default.

‎tests/test_shell_executor.py.bak

-503
This file was deleted.

‎tests/test_shell_executor_error_cases.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -42,15 +42,15 @@ async def test_no_allowed_commands_validation(monkeypatch):
4242
ValueError,
4343
match="No commands are allowed. Please set ALLOW_COMMANDS environment variable.",
4444
):
45-
executor._validate_command(["any_command"])
45+
executor.validator.validate_command(["any_command"])
4646

4747

4848
@pytest.mark.asyncio
4949
async def test_shell_operator_validation():
5050
"""Test validation of shell operators"""
5151
executor = ShellExecutor()
5252

53-
operators = [";" "&&", "||", "|"]
53+
operators = [";", "&&", "||", "|"]
5454
for op in operators:
5555
# Test shell operator validation
5656
with pytest.raises(ValueError, match=f"Unexpected shell operator: {op}"):

‎tests/test_shell_executor_new_tests.py

+19-13
Original file line numberDiff line numberDiff line change
@@ -20,21 +20,21 @@ async def test_redirection_validation():
2020

2121
# Missing path for output redirection
2222
with pytest.raises(ValueError, match="Missing path for output redirection"):
23-
executor._process_redirections(["echo", "test", ">"])
23+
executor.io_handler.process_redirections(["echo", "test", ">"])
2424

2525
# Invalid redirection target (operator)
2626
with pytest.raises(
2727
ValueError, match="Invalid redirection syntax: consecutive operators"
2828
):
29-
executor._process_redirections(["echo", "test", ">", ">"])
29+
executor.io_handler.process_redirections(["echo", "test", ">", ">"])
3030

3131
# Missing path for input redirection
3232
with pytest.raises(ValueError, match="Missing path for input redirection"):
33-
executor._process_redirections(["cat", "<"])
33+
executor.io_handler.process_redirections(["cat", "<"])
3434

3535
# Missing path for output redirection after input redirection
3636
with pytest.raises(ValueError, match="Missing path for output redirection"):
37-
executor._process_redirections(["cat", "<", "input.txt", ">"])
37+
executor.io_handler.process_redirections(["cat", "<", "input.txt", ">"])
3838

3939

4040
@pytest.mark.asyncio
@@ -45,25 +45,31 @@ async def test_directory_validation(monkeypatch):
4545

4646
# Directory validation is performed in the _validate_directory method
4747
with pytest.raises(ValueError, match="Directory is required"):
48-
executor._validate_directory(None)
48+
executor.directory_manager.validate_directory(None)
4949

5050
# Directory is not absolute path
5151
with pytest.raises(ValueError, match="Directory must be an absolute path"):
52-
executor._validate_directory("relative/path")
52+
executor.directory_manager.validate_directory("relative/path")
5353

5454
# Directory does not exist
5555
with pytest.raises(ValueError, match="Directory does not exist"):
56-
executor._validate_directory("/path/does/not/exist")
56+
executor.directory_manager.validate_directory("/path/does/not/exist")
5757

5858

5959
@pytest.mark.asyncio
60-
async def test_process_timeout(monkeypatch, temp_test_dir):
60+
async def test_process_timeout(
61+
shell_executor_with_mock, temp_test_dir, mock_process_manager, monkeypatch
62+
):
6163
"""Test process timeout handling"""
6264
monkeypatch.setenv("ALLOW_COMMANDS", "sleep")
63-
executor = ShellExecutor()
64-
65-
# Process timeout with sleep command
66-
command = ["sleep", "5"]
67-
result = await executor.execute(command=command, directory=temp_test_dir, timeout=1)
65+
# Mock timeout behavior
66+
mock_process_manager.execute_with_timeout.side_effect = TimeoutError(
67+
"Command timed out after 1 seconds"
68+
)
69+
70+
# Process timeout test
71+
result = await shell_executor_with_mock.execute(
72+
command=["sleep", "5"], directory=temp_test_dir, timeout=1
73+
)
6874
assert result["error"] == "Command timed out after 1 seconds"
6975
assert result["status"] == -1

‎tests/test_shell_executor_pipe.py

+5-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import os
22
import tempfile
3+
from unittest.mock import AsyncMock
34

45
import pytest
56

@@ -23,11 +24,14 @@ def executor():
2324
async def test_basic_pipe_command(executor, temp_test_dir, monkeypatch):
2425
"""Test basic pipe functionality with allowed commands"""
2526
monkeypatch.setenv("ALLOW_COMMANDS", "echo,grep")
27+
mock_process_manager = AsyncMock()
28+
mock_process_manager.execute_pipeline.return_value = (b"world\n", b"", 0)
29+
executor.process_manager = mock_process_manager
2630
result = await executor.execute(
2731
["echo", "hello world", "|", "grep", "world"], temp_test_dir
2832
)
2933
assert result["status"] == 0
30-
assert result["stdout"].strip() == "hello world"
34+
assert result["stdout"].strip() == "world"
3135

3236

3337
@pytest.mark.asyncio

‎tests/test_shell_executor_pipeline.py

+30-25
Original file line numberDiff line numberDiff line change
@@ -30,18 +30,20 @@ def temp_test_dir():
3030
async def test_pipeline_split(executor):
3131
"""Test pipeline command splitting functionality"""
3232
# Test basic pipe command
33-
commands = executor._split_pipe_commands(["echo", "hello", "|", "grep", "h"])
33+
commands = executor.preprocessor.split_pipe_commands(
34+
["echo", "hello", "|", "grep", "h"]
35+
)
3436
assert len(commands) == 2
3537
assert commands[0] == ["echo", "hello"]
3638
assert commands[1] == ["grep", "h"]
3739

3840
# Test empty pipe sections
39-
commands = executor._split_pipe_commands(["|", "grep", "pattern"])
41+
commands = executor.preprocessor.split_pipe_commands(["|", "grep", "pattern"])
4042
assert len(commands) == 1
4143
assert commands[0] == ["grep", "pattern"]
4244

4345
# Test multiple pipes
44-
commands = executor._split_pipe_commands(
46+
commands = executor.preprocessor.split_pipe_commands(
4547
["cat", "file.txt", "|", "grep", "pattern", "|", "wc", "-l"]
4648
)
4749
assert len(commands) == 3
@@ -50,49 +52,52 @@ async def test_pipeline_split(executor):
5052
assert commands[2] == ["wc", "-l"]
5153

5254
# Test trailing pipe
53-
commands = executor._split_pipe_commands(["echo", "hello", "|"])
55+
commands = executor.preprocessor.split_pipe_commands(["echo", "hello", "|"])
5456
assert len(commands) == 1
5557
assert commands[0] == ["echo", "hello"]
5658

5759

5860
@pytest.mark.asyncio
59-
async def test_pipeline_execution_success(executor, temp_test_dir, monkeypatch):
61+
async def test_pipeline_execution_success(
62+
shell_executor_with_mock, temp_test_dir, mock_process_manager, monkeypatch
63+
):
6064
"""Test successful pipeline execution with proper return value"""
61-
clear_env(monkeypatch)
62-
monkeypatch.setenv("ALLOWED_COMMANDS", "echo,grep")
65+
monkeypatch.setenv("ALLOW_COMMANDS", "echo,grep")
66+
# Set up mock for pipeline execution
67+
expected_output = b"mocked pipeline output\n"
68+
mock_process_manager.execute_pipeline.return_value = (expected_output, b"", 0)
6369

64-
result = await executor.execute(
70+
result = await shell_executor_with_mock.execute(
6571
["echo", "hello world", "|", "grep", "world"],
6672
directory=temp_test_dir,
6773
timeout=5,
6874
)
6975

7076
assert result["error"] is None
7177
assert result["status"] == 0
72-
assert "world" in result["stdout"]
78+
assert result["stdout"].rstrip() == "mocked pipeline output"
7379
assert "execution_time" in result
7480

7581

7682
@pytest.mark.asyncio
77-
async def test_pipeline_cleanup_and_timeouts(executor, temp_test_dir, monkeypatch):
83+
async def test_pipeline_cleanup_and_timeouts(
84+
shell_executor_with_mock, temp_test_dir, mock_process_manager, monkeypatch
85+
):
7886
"""Test cleanup of processes in pipelines and timeout handling"""
79-
clear_env(monkeypatch)
80-
monkeypatch.setenv("ALLOW_COMMANDS", "cat,tr,head,sleep")
81-
82-
# Test pipeline with early termination
83-
test_file = os.path.join(temp_test_dir, "test.txt")
84-
with open(test_file, "w") as f:
85-
f.write("test\n" * 1000)
87+
monkeypatch.setenv("ALLOW_COMMANDS", "echo,grep")
88+
# Mock timeout behavior for pipeline
89+
mock_process_manager.execute_pipeline.side_effect = TimeoutError(
90+
"Command timed out after 1 seconds"
91+
)
8692

87-
result = await executor.execute(
88-
["cat", test_file, "|", "tr", "[:lower:]", "[:upper:]", "|", "head", "-n", "1"],
93+
result = await shell_executor_with_mock.execute(
94+
["echo", "test", "|", "grep", "test"], # Use a pipeline command
8995
temp_test_dir,
90-
timeout=2,
96+
timeout=1,
9197
)
92-
assert result["status"] == 0
93-
assert result["stdout"].strip() == "TEST"
9498

95-
# Test timeout handling in pipeline
96-
result = await executor.execute(["sleep", "5"], temp_test_dir, timeout=1)
9799
assert result["status"] == -1
98-
assert "timed out" in result["error"].lower() # タイムアウトエラーの確認
100+
assert "timed out" in result["error"].lower()
101+
102+
# Verify cleanup was called
103+
mock_process_manager.cleanup_processes.assert_called_once()
+127-104
Original file line numberDiff line numberDiff line change
@@ -1,132 +1,155 @@
1-
import os
2-
import tempfile
3-
from io import TextIOWrapper
1+
"""Tests for IO redirection in shell executor with mocked file operations."""
2+
3+
from unittest.mock import MagicMock, patch
44

55
import pytest
66

7-
from mcp_shell_server.shell_executor import ShellExecutor
7+
from mcp_shell_server.io_redirection_handler import IORedirectionHandler
88

99

1010
@pytest.fixture
11-
def temp_test_dir():
12-
"""Create a temporary directory for testing"""
13-
with tempfile.TemporaryDirectory() as tmpdirname:
14-
yield os.path.realpath(tmpdirname)
11+
def mock_file():
12+
"""Create a mock file object."""
13+
file_mock = MagicMock()
14+
file_mock.closed = False
15+
file_mock.close = MagicMock()
16+
file_mock.write = MagicMock()
17+
file_mock.read = MagicMock(return_value="test content")
18+
return file_mock
1519

1620

17-
@pytest.mark.asyncio
18-
async def test_redirection_setup(temp_test_dir):
19-
"""Test setup of redirections with files"""
20-
executor = ShellExecutor()
21-
22-
# Create a test input file
23-
with open(os.path.join(temp_test_dir, "input.txt"), "w") as f:
24-
f.write("test content")
25-
26-
# Test input redirection setup
27-
redirects = {
28-
"stdin": "input.txt",
29-
"stdout": None,
30-
"stdout_append": False,
31-
}
32-
handles = await executor._setup_redirects(redirects, temp_test_dir)
33-
assert "stdin" in handles
34-
assert "stdin_data" in handles
35-
assert handles["stdin_data"] == "test content"
36-
assert isinstance(handles["stdout"], int)
37-
assert isinstance(handles["stderr"], int)
38-
39-
# Test output redirection setup
40-
output_file = os.path.join(temp_test_dir, "output.txt")
41-
redirects = {
42-
"stdin": None,
43-
"stdout": output_file,
44-
"stdout_append": False,
45-
}
46-
handles = await executor._setup_redirects(redirects, temp_test_dir)
47-
assert isinstance(handles["stdout"], TextIOWrapper)
48-
assert not handles["stdout"].closed
49-
await executor._cleanup_handles(handles)
50-
try:
51-
assert handles["stdout"].closed
52-
except ValueError:
53-
# Ignore errors from already closed file
54-
pass
21+
@pytest.fixture
22+
def handler():
23+
"""Create a new IORedirectionHandler instance for each test."""
24+
return IORedirectionHandler()
5525

5626

5727
@pytest.mark.asyncio
58-
async def test_redirection_append_mode(temp_test_dir):
59-
"""Test output redirection in append mode"""
60-
executor = ShellExecutor()
61-
62-
output_file = os.path.join(temp_test_dir, "output.txt")
28+
async def test_file_input_redirection(handler, mock_file):
29+
"""Test input redirection from a file using mocks."""
30+
with (
31+
patch("builtins.open", return_value=mock_file),
32+
patch("os.path.exists", return_value=True),
33+
):
6334

64-
# Test append mode
65-
redirects = {
66-
"stdin": None,
67-
"stdout": output_file,
68-
"stdout_append": True,
69-
}
70-
handles = await executor._setup_redirects(redirects, temp_test_dir)
71-
assert handles["stdout"].mode == "a"
72-
await executor._cleanup_handles(handles)
35+
redirects = {
36+
"stdin": "input.txt",
37+
"stdout": None,
38+
"stdout_append": False,
39+
}
40+
handles = await handler.setup_redirects(redirects, "/mock/dir")
7341

74-
# Test write mode
75-
redirects["stdout_append"] = False
76-
handles = await executor._setup_redirects(redirects, temp_test_dir)
77-
assert handles["stdout"].mode == "w"
78-
await executor._cleanup_handles(handles)
42+
assert "stdin" in handles
43+
assert "stdin_data" in handles
44+
assert handles["stdin_data"] == "test content"
45+
assert isinstance(handles["stdout"], int)
46+
assert isinstance(handles["stderr"], int)
7947

8048

8149
@pytest.mark.asyncio
82-
async def test_redirection_setup_errors(temp_test_dir):
83-
"""Test error cases in redirection setup"""
84-
executor = ShellExecutor()
85-
86-
# Test non-existent input file
87-
redirects = {
88-
"stdin": "nonexistent.txt",
89-
"stdout": None,
90-
"stdout_append": False,
91-
}
92-
with pytest.raises(ValueError, match="Failed to open input file"):
93-
await executor._setup_redirects(redirects, temp_test_dir)
94-
95-
# Test error in output file creation
96-
os.chmod(temp_test_dir, 0o444) # Make directory read-only
97-
try:
50+
async def test_file_output_redirection(handler, mock_file):
51+
"""Test output redirection to a file using mocks."""
52+
with patch("builtins.open", return_value=mock_file):
9853
redirects = {
9954
"stdin": None,
10055
"stdout": "output.txt",
10156
"stdout_append": False,
10257
}
103-
with pytest.raises(ValueError, match="Failed to open output file"):
104-
await executor._setup_redirects(redirects, temp_test_dir)
105-
finally:
106-
os.chmod(temp_test_dir, 0o755) # Reset permissions
58+
handles = await handler.setup_redirects(redirects, "/mock/dir")
10759

60+
assert "stdout" in handles
61+
assert not handles["stdout"].closed
62+
await handler.cleanup_handles(handles)
63+
mock_file.close.assert_called_once()
10864

109-
@pytest.mark.asyncio
110-
async def test_invalid_redirection_paths():
111-
"""Test invalid redirection path scenarios"""
112-
executor = ShellExecutor()
11365

114-
# Test missing path for output redirection
115-
with pytest.raises(ValueError, match="Missing path for output redirection"):
116-
executor._parse_command(["echo", "test", ">"])
66+
@pytest.mark.asyncio
67+
async def test_append_mode(handler, mock_file):
68+
"""Test output redirection in append mode using mocks."""
69+
with patch("builtins.open", return_value=mock_file):
70+
# Test append mode
71+
redirects = {
72+
"stdin": None,
73+
"stdout": "output.txt",
74+
"stdout_append": True,
75+
}
76+
mock_file.mode = "a" # Set the expected mode
77+
handles = await handler.setup_redirects(redirects, "/mock/dir")
78+
assert handles["stdout"].mode == "a"
79+
await handler.cleanup_handles(handles)
80+
mock_file.close.assert_called_once()
81+
82+
# Reset mock and test write mode
83+
mock_file.reset_mock()
84+
mock_file.mode = "w" # Set the expected mode for write mode
85+
redirects["stdout_append"] = False
86+
handles = await handler.setup_redirects(redirects, "/mock/dir")
87+
assert handles["stdout"].mode == "w"
88+
await handler.cleanup_handles(handles)
89+
mock_file.close.assert_called_once()
90+
91+
92+
def test_validate_redirection_syntax(handler):
93+
"""Test validation of redirection syntax."""
94+
# Valid cases
95+
handler.validate_redirection_syntax(["echo", "hello", ">", "output.txt"])
96+
handler.validate_redirection_syntax(["cat", "<", "input.txt", ">", "output.txt"])
97+
handler.validate_redirection_syntax(["echo", "hello", ">>", "output.txt"])
98+
99+
# Invalid cases
100+
with pytest.raises(ValueError, match="consecutive operators"):
101+
handler.validate_redirection_syntax(["echo", ">", ">", "output.txt"])
102+
103+
with pytest.raises(ValueError, match="consecutive operators"):
104+
handler.validate_redirection_syntax(["cat", "<", ">", "output.txt"])
105+
106+
107+
def test_process_redirections(handler):
108+
"""Test processing of redirection operators."""
109+
# Input redirection
110+
cmd, redirects = handler.process_redirections(["cat", "<", "input.txt"])
111+
assert cmd == ["cat"]
112+
assert redirects["stdin"] == "input.txt"
113+
assert redirects["stdout"] is None
114+
115+
# Output redirection
116+
cmd, redirects = handler.process_redirections(["echo", "test", ">", "output.txt"])
117+
assert cmd == ["echo", "test"]
118+
assert redirects["stdout"] == "output.txt"
119+
assert not redirects["stdout_append"]
120+
121+
# Combined redirections
122+
cmd, redirects = handler.process_redirections(
123+
["cat", "<", "in.txt", ">", "out.txt"]
124+
)
125+
assert cmd == ["cat"]
126+
assert redirects["stdin"] == "in.txt"
127+
assert redirects["stdout"] == "out.txt"
117128

118-
# Test invalid redirection target (operator found)
119-
with pytest.raises(ValueError, match="Invalid redirection target: operator found"):
120-
executor._parse_command(["echo", "test", ">", ">"])
121129

122-
# Test missing path for input redirection
123-
with pytest.raises(ValueError, match="Missing path for input redirection"):
124-
executor._parse_command(["cat", "<"])
130+
@pytest.mark.asyncio
131+
async def test_setup_errors(handler, mock_file):
132+
"""Test error cases in redirection setup using mocks."""
133+
# Test non-existent input file
134+
with patch("os.path.exists", return_value=False):
135+
redirects = {
136+
"stdin": "nonexistent.txt",
137+
"stdout": None,
138+
"stdout_append": False,
139+
}
140+
with pytest.raises(ValueError, match="Failed to open input file"):
141+
await handler.setup_redirects(redirects, "/mock/dir")
125142

126-
# Test missing path for output redirection
127-
with pytest.raises(ValueError, match="Missing path for output redirection"):
128-
executor._parse_command(["echo", "test", ">"])
143+
# Test error in output file creation
144+
# Mock builtins.open to raise PermissionError
145+
mock_open = MagicMock(side_effect=PermissionError("Permission denied"))
146+
with patch("builtins.open", mock_open):
147+
redirects = {
148+
"stdin": None,
149+
"stdout": "output.txt",
150+
"stdout_append": False,
151+
}
152+
with pytest.raises(ValueError, match="Failed to open output file"):
153+
await handler.setup_redirects(redirects, "/mock/dir")
129154

130-
# Test invalid redirection target: operator found for output
131-
with pytest.raises(ValueError, match="Invalid redirection target: operator found"):
132-
executor._parse_command(["echo", "test", ">", ">"])
155+
mock_file.close.assert_not_called()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
import os
2+
import tempfile
3+
from io import TextIOWrapper
4+
5+
import pytest
6+
7+
from mcp_shell_server.shell_executor import ShellExecutor
8+
9+
10+
@pytest.fixture
11+
def temp_test_dir():
12+
"""Create a temporary directory for testing"""
13+
with tempfile.TemporaryDirectory() as tmpdirname:
14+
yield os.path.realpath(tmpdirname)
15+
16+
17+
@pytest.mark.asyncio
18+
async def test_redirection_setup(temp_test_dir):
19+
"""Test setup of redirections with files"""
20+
executor = ShellExecutor()
21+
22+
# Create a test input file
23+
with open(os.path.join(temp_test_dir, "input.txt"), "w") as f:
24+
f.write("test content")
25+
26+
# Test input redirection setup
27+
redirects = {
28+
"stdin": "input.txt",
29+
"stdout": None,
30+
"stdout_append": False,
31+
}
32+
handles = await executor.io_handler.setup_redirects(redirects, temp_test_dir)
33+
assert "stdin" in handles
34+
assert "stdin_data" in handles
35+
assert handles["stdin_data"] == "test content"
36+
assert isinstance(handles["stdout"], int)
37+
assert isinstance(handles["stderr"], int)
38+
39+
# Test output redirection setup
40+
output_file = os.path.join(temp_test_dir, "output.txt")
41+
redirects = {
42+
"stdin": None,
43+
"stdout": output_file,
44+
"stdout_append": False,
45+
}
46+
handles = await executor.io_handler.setup_redirects(redirects, temp_test_dir)
47+
assert isinstance(handles["stdout"], TextIOWrapper)
48+
assert not handles["stdout"].closed
49+
await executor.io_handler.cleanup_handles(handles)
50+
try:
51+
assert handles["stdout"].closed
52+
except ValueError:
53+
# Ignore errors from already closed file
54+
pass
55+
56+
57+
@pytest.mark.asyncio
58+
async def test_redirection_append_mode(temp_test_dir):
59+
"""Test output redirection in append mode"""
60+
executor = ShellExecutor()
61+
62+
output_file = os.path.join(temp_test_dir, "output.txt")
63+
64+
# Test append mode
65+
redirects = {
66+
"stdin": None,
67+
"stdout": output_file,
68+
"stdout_append": True,
69+
}
70+
handles = await executor.io_handler.setup_redirects(redirects, temp_test_dir)
71+
assert handles["stdout"].mode == "a"
72+
await executor.io_handler.cleanup_handles(handles)
73+
74+
# Test write mode
75+
redirects["stdout_append"] = False
76+
handles = await executor.io_handler.setup_redirects(redirects, temp_test_dir)
77+
assert handles["stdout"].mode == "w"
78+
await executor.io_handler.cleanup_handles(handles)
79+
80+
81+
@pytest.mark.asyncio
82+
async def test_redirection_setup_errors(temp_test_dir):
83+
"""Test error cases in redirection setup"""
84+
executor = ShellExecutor()
85+
86+
# Test non-existent input file
87+
redirects = {
88+
"stdin": "nonexistent.txt",
89+
"stdout": None,
90+
"stdout_append": False,
91+
}
92+
with pytest.raises(ValueError, match="Failed to open input file"):
93+
await executor.io_handler.setup_redirects(redirects, temp_test_dir)
94+
95+
# Test error in output file creation
96+
os.chmod(temp_test_dir, 0o444) # Make directory read-only
97+
try:
98+
redirects = {
99+
"stdin": None,
100+
"stdout": "output.txt",
101+
"stdout_append": False,
102+
}
103+
with pytest.raises(ValueError, match="Failed to open output file"):
104+
await executor.io_handler.setup_redirects(redirects, temp_test_dir)
105+
finally:
106+
os.chmod(temp_test_dir, 0o755) # Reset permissions
107+
108+
109+
@pytest.mark.asyncio
110+
async def test_invalid_redirection_paths():
111+
"""Test invalid redirection path scenarios"""
112+
executor = ShellExecutor()
113+
114+
# Test missing path for output redirection
115+
with pytest.raises(ValueError, match="Missing path for output redirection"):
116+
executor.io_handler.process_redirections(["echo", "test", ">"])
117+
118+
# Test invalid redirection target (operator found)
119+
with pytest.raises(ValueError, match="Invalid redirection target: operator found"):
120+
executor.io_handler.process_redirections(["echo", "test", ">", ">"])
121+
122+
# Test missing path for input redirection
123+
with pytest.raises(ValueError, match="Missing path for input redirection"):
124+
executor.io_handler.process_redirections(["cat", "<"])
125+
126+
# Test missing path for output redirection
127+
with pytest.raises(ValueError, match="Missing path for output redirection"):
128+
executor.io_handler.process_redirections(["echo", "test", ">"])
129+
130+
# Test invalid redirection target: operator found for output
131+
with pytest.raises(ValueError, match="Invalid redirection target: operator found"):
132+
executor.io_handler.process_redirections(["echo", "test", ">", ">"])

0 commit comments

Comments
 (0)
Please sign in to comment.