Initial mctelnet MCP server
Telnet client capabilities for LLMs with expect-style automation: - Multi-connection management with connection IDs - Basic send/read operations with auto-response - expect() for pattern matching across multiple patterns - expect_send() for classic expect-style interactions - run_script() for full automation sequences - Password hiding for sensitive data
This commit is contained in:
commit
a8fbd71e0c
48
.gitignore
vendored
Normal file
48
.gitignore
vendored
Normal file
@ -0,0 +1,48 @@
|
||||
# Python
|
||||
__pycache__/
|
||||
*.py[cod]
|
||||
*$py.class
|
||||
*.so
|
||||
.Python
|
||||
build/
|
||||
develop-eggs/
|
||||
dist/
|
||||
downloads/
|
||||
eggs/
|
||||
.eggs/
|
||||
lib/
|
||||
lib64/
|
||||
parts/
|
||||
sdist/
|
||||
var/
|
||||
wheels/
|
||||
*.egg-info/
|
||||
.installed.cfg
|
||||
*.egg
|
||||
|
||||
# Virtual environments
|
||||
.venv/
|
||||
venv/
|
||||
ENV/
|
||||
|
||||
# IDE
|
||||
.idea/
|
||||
.vscode/
|
||||
*.swp
|
||||
*.swo
|
||||
*~
|
||||
|
||||
# Testing
|
||||
.pytest_cache/
|
||||
.coverage
|
||||
htmlcov/
|
||||
.tox/
|
||||
.nox/
|
||||
|
||||
# uv
|
||||
.python-version
|
||||
uv.lock
|
||||
|
||||
# OS
|
||||
.DS_Store
|
||||
Thumbs.db
|
||||
88
README.md
Normal file
88
README.md
Normal file
@ -0,0 +1,88 @@
|
||||
# mctelnet
|
||||
|
||||
MCP server providing telnet client capabilities for LLMs. Connect to BBSes, MUDs, network devices, and any telnet-accessible system.
|
||||
|
||||
## Installation
|
||||
|
||||
```bash
|
||||
uvx mctelnet
|
||||
```
|
||||
|
||||
Or add to Claude Code:
|
||||
|
||||
```bash
|
||||
claude mcp add mctelnet "uvx mctelnet"
|
||||
```
|
||||
|
||||
## Features
|
||||
|
||||
- **Multi-connection management** - Handle multiple simultaneous telnet sessions
|
||||
- **Expect-style automation** - Pattern matching for interactive prompts
|
||||
- **Script execution** - Run complete login/automation sequences
|
||||
- **Password hiding** - Redact sensitive data from outputs
|
||||
|
||||
## Tools
|
||||
|
||||
| Tool | Description |
|
||||
|------|-------------|
|
||||
| `connect` | Establish telnet connection, returns connection ID |
|
||||
| `send` | Send text to connection (auto-reads response) |
|
||||
| `read` | Read available data from connection |
|
||||
| `expect` | Wait for one of multiple patterns |
|
||||
| `expect_send` | Wait for pattern, then send text |
|
||||
| `run_script` | Execute expect-style automation script |
|
||||
| `list_connections` | Show all active connections |
|
||||
| `disconnect` | Close a connection |
|
||||
| `disconnect_all` | Close all connections |
|
||||
|
||||
## Usage Examples
|
||||
|
||||
### Basic Interaction
|
||||
|
||||
```python
|
||||
# Connect to a server
|
||||
conn = await connect("bbs.example.com", 23)
|
||||
# conn returns {"id": "abc123", ...}
|
||||
|
||||
# Send a command
|
||||
await send("abc123", "help")
|
||||
|
||||
# Disconnect
|
||||
await disconnect("abc123")
|
||||
```
|
||||
|
||||
### Expect-Style Login
|
||||
|
||||
```python
|
||||
# Connect and wait for login prompt
|
||||
conn = await connect("server.example.com", 23)
|
||||
|
||||
# Handle login sequence
|
||||
await expect_send(conn_id, "login:", "myuser")
|
||||
await expect_send(conn_id, "Password:", "mypass", hide_send=True)
|
||||
await expect_send(conn_id, "$ ", "ls -la")
|
||||
```
|
||||
|
||||
### Scripted Automation
|
||||
|
||||
```python
|
||||
await run_script(conn_id, [
|
||||
{"expect": "login:", "send": "admin"},
|
||||
{"expect": "Password:", "send": "secret", "hide": True},
|
||||
{"expect": "$ ", "send": "show version"},
|
||||
{"expect": "$ ", "send": "show interfaces"},
|
||||
{"expect": "$ ", "send": "exit"}
|
||||
])
|
||||
```
|
||||
|
||||
## Use Cases
|
||||
|
||||
- Connecting to retro BBSes and MUDs
|
||||
- Network device configuration (routers, switches)
|
||||
- Legacy system automation
|
||||
- Interactive service testing
|
||||
- Terminal-based game playing
|
||||
|
||||
## License
|
||||
|
||||
MIT
|
||||
58
pyproject.toml
Normal file
58
pyproject.toml
Normal file
@ -0,0 +1,58 @@
|
||||
[build-system]
|
||||
requires = ["hatchling"]
|
||||
build-backend = "hatchling.build"
|
||||
|
||||
[project]
|
||||
name = "mctelnet"
|
||||
version = "2025.02.06"
|
||||
description = "MCP server providing telnet client capabilities for LLMs"
|
||||
readme = "README.md"
|
||||
license = "MIT"
|
||||
requires-python = ">=3.11"
|
||||
authors = [
|
||||
{name = "Ryan Malloy", email = "ryan@supported.systems"}
|
||||
]
|
||||
keywords = ["mcp", "telnet", "llm", "fastmcp", "automation"]
|
||||
classifiers = [
|
||||
"Development Status :: 4 - Beta",
|
||||
"Intended Audience :: Developers",
|
||||
"License :: OSI Approved :: MIT License",
|
||||
"Programming Language :: Python :: 3",
|
||||
"Programming Language :: Python :: 3.11",
|
||||
"Programming Language :: Python :: 3.12",
|
||||
"Topic :: System :: Networking",
|
||||
]
|
||||
|
||||
dependencies = [
|
||||
"fastmcp>=2.0.0",
|
||||
"telnetlib3>=2.0.0",
|
||||
]
|
||||
|
||||
[project.optional-dependencies]
|
||||
dev = [
|
||||
"ruff>=0.9.0",
|
||||
"pytest>=8.0.0",
|
||||
"pytest-asyncio>=0.24.0",
|
||||
]
|
||||
|
||||
[project.scripts]
|
||||
mctelnet = "mctelnet:main"
|
||||
|
||||
[project.urls]
|
||||
Homepage = "https://git.supported.systems/rpm/mctelnet"
|
||||
Repository = "https://git.supported.systems/rpm/mctelnet"
|
||||
|
||||
[tool.hatch.build.targets.wheel]
|
||||
packages = ["src/mctelnet"]
|
||||
|
||||
[tool.ruff]
|
||||
target-version = "py311"
|
||||
line-length = 100
|
||||
|
||||
[tool.ruff.lint]
|
||||
select = ["E", "F", "I", "UP", "B", "SIM"]
|
||||
ignore = ["E501"]
|
||||
|
||||
[tool.pytest.ini_options]
|
||||
asyncio_mode = "auto"
|
||||
testpaths = ["tests"]
|
||||
5
src/mctelnet/__init__.py
Normal file
5
src/mctelnet/__init__.py
Normal file
@ -0,0 +1,5 @@
|
||||
"""mctelnet - MCP server providing telnet client capabilities for LLMs."""
|
||||
|
||||
from mctelnet.server import main, mcp
|
||||
|
||||
__all__ = ["main", "mcp"]
|
||||
584
src/mctelnet/server.py
Normal file
584
src/mctelnet/server.py
Normal file
@ -0,0 +1,584 @@
|
||||
"""mctelnet MCP server - telnet client capabilities for LLMs."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from dataclasses import dataclass
|
||||
from typing import Annotated
|
||||
from uuid import uuid4
|
||||
|
||||
import telnetlib3
|
||||
from fastmcp import FastMCP
|
||||
|
||||
|
||||
# Connection storage
|
||||
@dataclass
|
||||
class TelnetConnection:
|
||||
"""Represents an active telnet connection."""
|
||||
|
||||
id: str
|
||||
host: str
|
||||
port: int
|
||||
reader: telnetlib3.TelnetReader
|
||||
writer: telnetlib3.TelnetWriter
|
||||
buffer: str = ""
|
||||
connected: bool = True
|
||||
|
||||
|
||||
# Global connection registry
|
||||
_connections: dict[str, TelnetConnection] = {}
|
||||
|
||||
|
||||
def get_version() -> str:
|
||||
"""Get package version."""
|
||||
try:
|
||||
from importlib.metadata import version
|
||||
|
||||
return version("mctelnet")
|
||||
except Exception:
|
||||
return "2025.02.06"
|
||||
|
||||
|
||||
# Create MCP server
|
||||
mcp = FastMCP(
|
||||
name="mctelnet",
|
||||
instructions="""Telnet client for LLMs. Connect to telnet servers, BBSes, MUDs,
|
||||
network devices, and other telnet-accessible systems.
|
||||
|
||||
BASIC WORKFLOW:
|
||||
1. connect(host, port) - get connection ID
|
||||
2. send(id, "command") - send commands (auto-reads response)
|
||||
3. disconnect(id) - close when done
|
||||
|
||||
EXPECT-STYLE AUTOMATION (for login sequences, scripted interactions):
|
||||
- expect(id, ["login:", "Password:"]) - wait for patterns
|
||||
- expect_send(id, "login:", "myuser") - wait then send
|
||||
- run_script(id, [...steps...]) - run full automation script
|
||||
|
||||
EXAMPLE LOGIN SCRIPT:
|
||||
run_script(conn_id, [
|
||||
{"expect": "login:", "send": "admin"},
|
||||
{"expect": "Password:", "send": "secret", "hide": true},
|
||||
{"expect": "$ ", "send": "show version"}
|
||||
])
|
||||
|
||||
TIPS:
|
||||
- Use expect_send() for interactive prompts (login, password, menus)
|
||||
- Use send() for simple command/response interactions
|
||||
- Set hide_send=True for passwords to redact from output
|
||||
- Common prompts: "login:", "Password:", "$ ", "> ", "#"
|
||||
""",
|
||||
)
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
async def connect(
|
||||
host: Annotated[str, "Hostname or IP address to connect to"],
|
||||
port: Annotated[int, "Port number (default: 23)"] = 23,
|
||||
timeout: Annotated[float, "Connection timeout in seconds"] = 10.0,
|
||||
) -> str:
|
||||
"""
|
||||
Establish a new telnet connection to a remote host.
|
||||
|
||||
Returns a connection ID to use with other tools.
|
||||
Common ports: 23 (standard telnet), 2323 (alt telnet),
|
||||
4000-4100 (MUDs), various for BBSes.
|
||||
"""
|
||||
conn_id = str(uuid4())[:8]
|
||||
|
||||
try:
|
||||
reader, writer = await asyncio.wait_for(
|
||||
telnetlib3.open_connection(host, port, connect_minwait=0.5),
|
||||
timeout=timeout,
|
||||
)
|
||||
|
||||
conn = TelnetConnection(
|
||||
id=conn_id,
|
||||
host=host,
|
||||
port=port,
|
||||
reader=reader,
|
||||
writer=writer,
|
||||
)
|
||||
_connections[conn_id] = conn
|
||||
|
||||
# Try to read initial banner/prompt
|
||||
initial_data = await _read_available(conn, timeout=2.0)
|
||||
|
||||
result = f"Connected! ID: {conn_id}\nHost: {host}:{port}"
|
||||
if initial_data:
|
||||
result += f"\n\n--- Initial Output ---\n{initial_data}"
|
||||
|
||||
return result
|
||||
|
||||
except TimeoutError:
|
||||
return f"Connection timed out after {timeout}s connecting to {host}:{port}"
|
||||
except OSError as e:
|
||||
return f"Connection failed to {host}:{port}: {e}"
|
||||
except Exception as e:
|
||||
return f"Unexpected error connecting to {host}:{port}: {e}"
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
async def send(
|
||||
connection_id: Annotated[str, "Connection ID from connect()"],
|
||||
text: Annotated[str, "Text to send to the remote host"],
|
||||
newline: Annotated[bool, "Append CRLF newline after text"] = True,
|
||||
read_response: Annotated[bool, "Read and return response after sending"] = True,
|
||||
read_timeout: Annotated[float, "Timeout for reading response"] = 3.0,
|
||||
) -> str:
|
||||
"""
|
||||
Send text to an active telnet connection.
|
||||
|
||||
By default appends CRLF and reads the response.
|
||||
For interactive sessions, you may want to send character-by-character
|
||||
with newline=False.
|
||||
"""
|
||||
conn = _connections.get(connection_id)
|
||||
if not conn:
|
||||
return f"Connection {connection_id} not found. Use list_connections() to see active connections."
|
||||
|
||||
if not conn.connected:
|
||||
return f"Connection {connection_id} is closed."
|
||||
|
||||
try:
|
||||
# Send the text
|
||||
data_to_send = text
|
||||
if newline:
|
||||
data_to_send += "\r\n"
|
||||
|
||||
conn.writer.write(data_to_send)
|
||||
await conn.writer.drain()
|
||||
|
||||
result = f"Sent: {repr(text)}"
|
||||
|
||||
if read_response:
|
||||
# Small delay to let response arrive
|
||||
await asyncio.sleep(0.1)
|
||||
response = await _read_available(conn, timeout=read_timeout)
|
||||
if response:
|
||||
result += f"\n\n--- Response ---\n{response}"
|
||||
else:
|
||||
result += "\n\n(No response received)"
|
||||
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
conn.connected = False
|
||||
return f"Error sending to {connection_id}: {e}"
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
async def read(
|
||||
connection_id: Annotated[str, "Connection ID from connect()"],
|
||||
timeout: Annotated[float, "Read timeout in seconds"] = 5.0,
|
||||
wait_for: Annotated[str | None, "Wait until this string appears in output"] = None,
|
||||
) -> str:
|
||||
"""
|
||||
Read available data from a telnet connection.
|
||||
|
||||
Use wait_for to block until a specific string appears (like a prompt).
|
||||
Otherwise returns whatever data is currently available.
|
||||
"""
|
||||
conn = _connections.get(connection_id)
|
||||
if not conn:
|
||||
return f"Connection {connection_id} not found."
|
||||
|
||||
if not conn.connected:
|
||||
return f"Connection {connection_id} is closed."
|
||||
|
||||
try:
|
||||
if wait_for:
|
||||
data = await _read_until(conn, wait_for, timeout)
|
||||
else:
|
||||
data = await _read_available(conn, timeout)
|
||||
|
||||
if data:
|
||||
return data
|
||||
return "(No data available)"
|
||||
|
||||
except Exception as e:
|
||||
return f"Error reading from {connection_id}: {e}"
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
async def disconnect(
|
||||
connection_id: Annotated[str, "Connection ID to disconnect"],
|
||||
) -> str:
|
||||
"""
|
||||
Close a telnet connection and clean up resources.
|
||||
"""
|
||||
conn = _connections.pop(connection_id, None)
|
||||
if not conn:
|
||||
return f"Connection {connection_id} not found."
|
||||
|
||||
try:
|
||||
conn.writer.close()
|
||||
conn.connected = False
|
||||
return f"Disconnected from {conn.host}:{conn.port} (ID: {connection_id})"
|
||||
except Exception as e:
|
||||
return f"Error during disconnect: {e}"
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
async def list_connections() -> str:
|
||||
"""
|
||||
List all active telnet connections.
|
||||
|
||||
Shows connection IDs, hosts, and connection status.
|
||||
"""
|
||||
if not _connections:
|
||||
return "No active connections."
|
||||
|
||||
lines = ["Active Telnet Connections:", ""]
|
||||
for conn_id, conn in _connections.items():
|
||||
status = "connected" if conn.connected else "disconnected"
|
||||
lines.append(f" {conn_id}: {conn.host}:{conn.port} [{status}]")
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
async def expect(
|
||||
connection_id: Annotated[str, "Connection ID from connect()"],
|
||||
patterns: Annotated[list[str], "List of patterns to watch for"],
|
||||
timeout: Annotated[float, "Timeout in seconds"] = 30.0,
|
||||
) -> dict:
|
||||
"""
|
||||
Wait for one of multiple patterns to appear in the output.
|
||||
|
||||
Returns which pattern matched and all output received.
|
||||
Similar to the Unix 'expect' utility.
|
||||
|
||||
Example patterns: ["login:", "Password:", "$ ", "> "]
|
||||
"""
|
||||
conn = _connections.get(connection_id)
|
||||
if not conn:
|
||||
return {"error": f"Connection {connection_id} not found.", "matched": None}
|
||||
|
||||
if not conn.connected:
|
||||
return {"error": f"Connection {connection_id} is closed.", "matched": None}
|
||||
|
||||
collected = []
|
||||
deadline = asyncio.get_event_loop().time() + timeout
|
||||
|
||||
try:
|
||||
while asyncio.get_event_loop().time() < deadline:
|
||||
remaining = deadline - asyncio.get_event_loop().time()
|
||||
if remaining <= 0:
|
||||
break
|
||||
|
||||
try:
|
||||
data = await asyncio.wait_for(
|
||||
conn.reader.read(4096),
|
||||
timeout=min(remaining, 0.5),
|
||||
)
|
||||
if not data:
|
||||
break
|
||||
collected.append(data)
|
||||
|
||||
# Check all patterns
|
||||
full_text = "".join(collected)
|
||||
for i, pattern in enumerate(patterns):
|
||||
if pattern in full_text:
|
||||
return {
|
||||
"matched": pattern,
|
||||
"index": i,
|
||||
"output": full_text,
|
||||
}
|
||||
|
||||
except TimeoutError:
|
||||
continue
|
||||
|
||||
except Exception as e:
|
||||
return {"error": str(e), "matched": None, "output": "".join(collected)}
|
||||
|
||||
return {
|
||||
"matched": None,
|
||||
"timeout": True,
|
||||
"output": "".join(collected),
|
||||
"patterns_checked": patterns,
|
||||
}
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
async def expect_send(
|
||||
connection_id: Annotated[str, "Connection ID from connect()"],
|
||||
expect_pattern: Annotated[str, "Pattern to wait for before sending"],
|
||||
send_text: Annotated[str, "Text to send after pattern is matched"],
|
||||
timeout: Annotated[float, "Timeout waiting for pattern"] = 30.0,
|
||||
newline: Annotated[bool, "Append CRLF after send_text"] = True,
|
||||
hide_send: Annotated[bool, "Hide sent text in output (for passwords)"] = False,
|
||||
) -> dict:
|
||||
"""
|
||||
Wait for a pattern, then send text. Classic expect-style interaction.
|
||||
|
||||
Perfect for login sequences:
|
||||
expect_send(conn, "login:", "myuser")
|
||||
expect_send(conn, "Password:", "mypass", hide_send=True)
|
||||
expect_send(conn, "$ ", "ls -la")
|
||||
"""
|
||||
conn = _connections.get(connection_id)
|
||||
if not conn:
|
||||
return {"error": f"Connection {connection_id} not found."}
|
||||
|
||||
if not conn.connected:
|
||||
return {"error": f"Connection {connection_id} is closed."}
|
||||
|
||||
# First, wait for the pattern
|
||||
collected = []
|
||||
deadline = asyncio.get_event_loop().time() + timeout
|
||||
pattern_found = False
|
||||
|
||||
try:
|
||||
while asyncio.get_event_loop().time() < deadline:
|
||||
remaining = deadline - asyncio.get_event_loop().time()
|
||||
if remaining <= 0:
|
||||
break
|
||||
|
||||
try:
|
||||
data = await asyncio.wait_for(
|
||||
conn.reader.read(4096),
|
||||
timeout=min(remaining, 0.5),
|
||||
)
|
||||
if not data:
|
||||
break
|
||||
collected.append(data)
|
||||
|
||||
if expect_pattern in "".join(collected):
|
||||
pattern_found = True
|
||||
break
|
||||
|
||||
except TimeoutError:
|
||||
continue
|
||||
|
||||
except Exception as e:
|
||||
return {"error": str(e), "output": "".join(collected)}
|
||||
|
||||
output_before = "".join(collected)
|
||||
|
||||
if not pattern_found:
|
||||
return {
|
||||
"error": f"Timeout waiting for pattern: {repr(expect_pattern)}",
|
||||
"output": output_before,
|
||||
}
|
||||
|
||||
# Pattern found, now send
|
||||
try:
|
||||
data_to_send = send_text
|
||||
if newline:
|
||||
data_to_send += "\r\n"
|
||||
|
||||
conn.writer.write(data_to_send)
|
||||
await conn.writer.drain()
|
||||
|
||||
# Brief wait then read response
|
||||
await asyncio.sleep(0.1)
|
||||
response = await _read_available(conn, timeout=2.0)
|
||||
|
||||
displayed_send = "********" if hide_send else send_text
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"pattern_matched": expect_pattern,
|
||||
"sent": displayed_send,
|
||||
"output_before_send": output_before,
|
||||
"output_after_send": response,
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
return {"error": f"Error sending: {e}", "output": output_before}
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
async def run_script(
|
||||
connection_id: Annotated[str, "Connection ID from connect()"],
|
||||
script: Annotated[
|
||||
list[dict],
|
||||
"List of {expect: pattern, send: text} or {expect: pattern} or {send: text} steps",
|
||||
],
|
||||
timeout_per_step: Annotated[float, "Timeout for each expect step"] = 30.0,
|
||||
) -> dict:
|
||||
"""
|
||||
Run an expect-style script: a sequence of expect/send pairs.
|
||||
|
||||
Script format - list of steps, each step is a dict:
|
||||
{"expect": "login:", "send": "myuser"}
|
||||
{"expect": "Password:", "send": "secret", "hide": true}
|
||||
{"expect": "$ "} # just wait, don't send
|
||||
{"send": "exit"} # just send, don't wait first
|
||||
|
||||
Returns transcript of the entire interaction.
|
||||
"""
|
||||
conn = _connections.get(connection_id)
|
||||
if not conn:
|
||||
return {"error": f"Connection {connection_id} not found."}
|
||||
|
||||
transcript = []
|
||||
step_num = 0
|
||||
|
||||
for step in script:
|
||||
step_num += 1
|
||||
expect_pattern = step.get("expect")
|
||||
send_text = step.get("send")
|
||||
hide = step.get("hide", False)
|
||||
|
||||
step_result = {"step": step_num}
|
||||
|
||||
# Handle expect
|
||||
if expect_pattern:
|
||||
collected = []
|
||||
deadline = asyncio.get_event_loop().time() + timeout_per_step
|
||||
matched = False
|
||||
|
||||
try:
|
||||
while asyncio.get_event_loop().time() < deadline:
|
||||
remaining = deadline - asyncio.get_event_loop().time()
|
||||
if remaining <= 0:
|
||||
break
|
||||
try:
|
||||
data = await asyncio.wait_for(
|
||||
conn.reader.read(4096),
|
||||
timeout=min(remaining, 0.5),
|
||||
)
|
||||
if not data:
|
||||
break
|
||||
collected.append(data)
|
||||
if expect_pattern in "".join(collected):
|
||||
matched = True
|
||||
break
|
||||
except TimeoutError:
|
||||
continue
|
||||
except Exception as e:
|
||||
step_result["error"] = str(e)
|
||||
transcript.append(step_result)
|
||||
return {"error": f"Step {step_num} failed", "transcript": transcript}
|
||||
|
||||
step_result["expected"] = expect_pattern
|
||||
step_result["matched"] = matched
|
||||
step_result["output"] = "".join(collected)
|
||||
|
||||
if not matched:
|
||||
step_result["error"] = "Pattern not found (timeout)"
|
||||
transcript.append(step_result)
|
||||
return {"error": f"Step {step_num}: pattern not found", "transcript": transcript}
|
||||
|
||||
# Handle send
|
||||
if send_text is not None:
|
||||
try:
|
||||
conn.writer.write(send_text + "\r\n")
|
||||
await conn.writer.drain()
|
||||
step_result["sent"] = "********" if hide else send_text
|
||||
await asyncio.sleep(0.1)
|
||||
except Exception as e:
|
||||
step_result["error"] = f"Send failed: {e}"
|
||||
transcript.append(step_result)
|
||||
return {"error": f"Step {step_num}: send failed", "transcript": transcript}
|
||||
|
||||
transcript.append(step_result)
|
||||
|
||||
# Read any final output
|
||||
final_output = await _read_available(conn, timeout=1.0)
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"steps_completed": step_num,
|
||||
"transcript": transcript,
|
||||
"final_output": final_output,
|
||||
}
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
async def disconnect_all() -> str:
|
||||
"""
|
||||
Close all active telnet connections.
|
||||
|
||||
Useful for cleanup at the end of a session.
|
||||
"""
|
||||
if not _connections:
|
||||
return "No connections to close."
|
||||
|
||||
closed = []
|
||||
for conn_id in list(_connections.keys()):
|
||||
conn = _connections.pop(conn_id)
|
||||
try:
|
||||
conn.writer.close()
|
||||
conn.connected = False
|
||||
closed.append(f"{conn_id} ({conn.host}:{conn.port})")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return f"Closed {len(closed)} connection(s): {', '.join(closed)}"
|
||||
|
||||
|
||||
# Helper functions
|
||||
async def _read_available(conn: TelnetConnection, timeout: float = 1.0) -> str:
|
||||
"""Read whatever data is currently available, with timeout."""
|
||||
collected = []
|
||||
|
||||
try:
|
||||
while True:
|
||||
try:
|
||||
data = await asyncio.wait_for(
|
||||
conn.reader.read(4096),
|
||||
timeout=min(timeout, 0.5),
|
||||
)
|
||||
if not data:
|
||||
break
|
||||
collected.append(data)
|
||||
# Quick check for more data
|
||||
timeout = 0.1
|
||||
except TimeoutError:
|
||||
break
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return "".join(collected)
|
||||
|
||||
|
||||
async def _read_until(conn: TelnetConnection, pattern: str, timeout: float) -> str:
|
||||
"""Read until pattern appears or timeout."""
|
||||
collected = []
|
||||
deadline = asyncio.get_event_loop().time() + timeout
|
||||
|
||||
try:
|
||||
while asyncio.get_event_loop().time() < deadline:
|
||||
remaining = deadline - asyncio.get_event_loop().time()
|
||||
if remaining <= 0:
|
||||
break
|
||||
|
||||
try:
|
||||
data = await asyncio.wait_for(
|
||||
conn.reader.read(4096),
|
||||
timeout=min(remaining, 0.5),
|
||||
)
|
||||
if not data:
|
||||
break
|
||||
collected.append(data)
|
||||
|
||||
# Check if pattern found
|
||||
full_text = "".join(collected)
|
||||
if pattern in full_text:
|
||||
return full_text
|
||||
|
||||
except TimeoutError:
|
||||
continue
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
result = "".join(collected)
|
||||
if result:
|
||||
return result + f"\n\n(Timeout waiting for: {repr(pattern)})"
|
||||
return f"(Timeout waiting for: {repr(pattern)})"
|
||||
|
||||
|
||||
def main():
|
||||
"""Entry point for mctelnet MCP server."""
|
||||
version = get_version()
|
||||
print(f"🌐 mctelnet v{version}")
|
||||
print(" Telnet client MCP server for LLMs")
|
||||
print()
|
||||
mcp.run()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
0
tests/__init__.py
Normal file
0
tests/__init__.py
Normal file
38
tests/test_server.py
Normal file
38
tests/test_server.py
Normal file
@ -0,0 +1,38 @@
|
||||
"""Tests for mctelnet server."""
|
||||
|
||||
import pytest
|
||||
|
||||
from mctelnet.server import _connections, mcp
|
||||
|
||||
|
||||
def test_mcp_server_exists():
|
||||
"""Verify MCP server is configured."""
|
||||
assert mcp.name == "mctelnet"
|
||||
|
||||
|
||||
def test_tools_registered():
|
||||
"""Verify all expected tools are registered."""
|
||||
# Access tools through the tool manager's internal storage
|
||||
tool_manager = mcp._tool_manager
|
||||
tool_names = set(tool_manager._tools.keys())
|
||||
|
||||
expected_tools = {
|
||||
"connect",
|
||||
"send",
|
||||
"read",
|
||||
"expect",
|
||||
"expect_send",
|
||||
"run_script",
|
||||
"list_connections",
|
||||
"disconnect",
|
||||
"disconnect_all",
|
||||
}
|
||||
|
||||
assert expected_tools.issubset(tool_names), f"Missing tools: {expected_tools - tool_names}"
|
||||
|
||||
|
||||
def test_connection_storage_initially_empty():
|
||||
"""Verify no connections exist at startup."""
|
||||
# Clear any lingering connections
|
||||
_connections.clear()
|
||||
assert len(_connections) == 0
|
||||
Loading…
x
Reference in New Issue
Block a user