|
|
@@ -0,0 +1,492 @@
|
|
|
+import httpx
|
|
|
+import json
|
|
|
+import argparse
|
|
|
+import os
|
|
|
+from fastmcp import FastMCP
|
|
|
+from typing import Dict, List, Optional, Union, Any, Tuple
|
|
|
+from fastmcp.server.dependencies import get_http_headers
|
|
|
+from mcp.server.session import ServerSession
|
|
|
+
|
|
|
+
|
|
|
+old__received_request = ServerSession._received_request
|
|
|
+
|
|
|
+async def _received_request(self, *args, **kwargs):
|
|
|
+ try:
|
|
|
+ return await old__received_request(self, *args, **kwargs)
|
|
|
+ except RuntimeError:
|
|
|
+ pass
|
|
|
+
|
|
|
+# pylint: disable-next=protected-access
|
|
|
+ServerSession._received_request = _received_request
|
|
|
+
|
|
|
+# 初始化FastMCP服务器
|
|
|
+app = FastMCP('rap2-api-server')
|
|
|
+
|
|
|
+# 设置服务器配置
|
|
|
+HOST = '0.0.0.0' # 绑定到所有接口,允许局域网访问
|
|
|
+PORT = 8903 # 使用8903端口
|
|
|
+TRANSPORT = 'stdio' # 使用SSE传输
|
|
|
+
|
|
|
+# 解析命令行参数
|
|
|
+parser = argparse.ArgumentParser(description='RAP2 MCP服务器')
|
|
|
+parser.add_argument('--rap2-url', help='RAP2服务器地址')
|
|
|
+parser.add_argument('--rap2-sid', help='RAP2 koa.sid Cookie值')
|
|
|
+parser.add_argument('--rap2-sid-sig', help='RAP2 koa.sid.sig Cookie值')
|
|
|
+
|
|
|
+# 全局存储命令行参数
|
|
|
+args = parser.parse_args()
|
|
|
+
|
|
|
+# 错误消息常量
|
|
|
+RAP2_URL_MISSING_ERROR = "未提供RAP2服务器地址,请通过命令行参数--rap2-url、环境变量RAP2_URL或请求头X-RAP2-BASE-URL提供"
|
|
|
+EMPTY_RESPONSE_ERROR = "服务器返回了空响应"
|
|
|
+
|
|
|
+# 通用的JSON解析函数
|
|
|
+def parse_rap2_response(response: httpx.Response) -> Tuple[bool, Dict]:
|
|
|
+ """
|
|
|
+ 解析RAP2 API响应
|
|
|
+
|
|
|
+ Args:
|
|
|
+ response: HTTP响应对象
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ Tuple[bool, Dict]: (是否成功, 数据或错误信息)
|
|
|
+ """
|
|
|
+ # 检查响应内容是否为空
|
|
|
+ if not response.text.strip():
|
|
|
+ return False, {"error": EMPTY_RESPONSE_ERROR}
|
|
|
+
|
|
|
+ # 尝试解析JSON响应
|
|
|
+ try:
|
|
|
+ data = response.json()
|
|
|
+ except json.JSONDecodeError as e:
|
|
|
+ return False, {
|
|
|
+ "error": f"无法解析返回的JSON: {str(e)}",
|
|
|
+ "content": response.text[:500]
|
|
|
+ }
|
|
|
+
|
|
|
+ # 检查是否有错误消息
|
|
|
+ if data.get("errMsg"):
|
|
|
+ return False, {"error": data.get("errMsg")}
|
|
|
+
|
|
|
+ return True, data
|
|
|
+
|
|
|
+# 创建带Cookie的请求头,同时支持从命令行参数和请求头获取
|
|
|
+def get_requestInfo():
|
|
|
+ """
|
|
|
+ 获取RAP2请求信息,支持从环境变量、命令行参数和请求头获取
|
|
|
+ 优先级: 请求头 > 命令行参数 > 环境变量
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ Dict: 包含url和header的字典
|
|
|
+ """
|
|
|
+ info = {
|
|
|
+ "url": None,
|
|
|
+ "header": {
|
|
|
+ "cookies": {}
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ # 按优先级获取URL
|
|
|
+ info["url"] = (get_http_headers().get("X-RAP2-BASE-URL") if get_http_headers() else None) or \
|
|
|
+ args.rap2_url or \
|
|
|
+ os.environ.get("RAP2_URL")
|
|
|
+
|
|
|
+ # 按优先级获取Cookie
|
|
|
+ headers = get_http_headers()
|
|
|
+
|
|
|
+ # koa.sid
|
|
|
+ sid = (headers.get("X-RAP2-SID") if headers else None) or \
|
|
|
+ args.rap2_sid or \
|
|
|
+ os.environ.get("RAP2_SID")
|
|
|
+ if sid:
|
|
|
+ info["header"]["cookies"]["koa.sid"] = sid
|
|
|
+
|
|
|
+ # koa.sid.sig
|
|
|
+ sid_sig = (headers.get("X-RAP2-SID-SIG") if headers else None) or \
|
|
|
+ args.rap2_sid_sig or \
|
|
|
+ os.environ.get("RAP2_SID_SIG")
|
|
|
+ if sid_sig:
|
|
|
+ info["header"]["cookies"]["koa.sid.sig"] = sid_sig
|
|
|
+
|
|
|
+ return info
|
|
|
+
|
|
|
+@app.tool()
|
|
|
+async def get_interface_by_id(interface_id: str) -> Dict:
|
|
|
+ """
|
|
|
+ 通过接口ID获取RAP2接口的详细信息
|
|
|
+
|
|
|
+ Args:
|
|
|
+ interface_id: RAP2中的接口ID
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ 接口的详细信息,包括URL、方法、参数和响应数据结构
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ # 从请求中获取RAP2信息
|
|
|
+ rap2_info = get_requestInfo()
|
|
|
+ rap2_base_url = rap2_info["url"]
|
|
|
+ rap2_header = rap2_info["header"]
|
|
|
+
|
|
|
+ if not rap2_base_url:
|
|
|
+ return {"error": RAP2_URL_MISSING_ERROR}
|
|
|
+
|
|
|
+ # 处理请求头,将cookies字典转换为字符串格式
|
|
|
+ headers = {}
|
|
|
+ if "cookies" in rap2_header:
|
|
|
+ cookies_dict = rap2_header["cookies"]
|
|
|
+ cookie_str = "; ".join([f"{k}={v}" for k, v in cookies_dict.items()])
|
|
|
+ headers["Cookie"] = cookie_str
|
|
|
+
|
|
|
+ # 调用RAP2 API获取接口详情
|
|
|
+ async with httpx.AsyncClient() as client:
|
|
|
+ url = f"{rap2_base_url}/interface/get?id={interface_id}"
|
|
|
+
|
|
|
+ response = await client.get(url, headers=headers)
|
|
|
+ response.raise_for_status()
|
|
|
+
|
|
|
+ # 解析响应
|
|
|
+ success, result = parse_rap2_response(response)
|
|
|
+ if not success:
|
|
|
+ return result
|
|
|
+
|
|
|
+ interface = result.get("data", {})
|
|
|
+ return interface
|
|
|
+ except httpx.HTTPError as e:
|
|
|
+ return {"error": f"HTTP请求失败: {str(e)}"}
|
|
|
+ except Exception as e:
|
|
|
+ return {"error": f"获取接口信息失败: {str(e)}"}
|
|
|
+
|
|
|
+@app.tool()
|
|
|
+async def get_repository_interfaces(repository_id: str) -> List[Dict]:
|
|
|
+ """
|
|
|
+ 获取指定仓库中的所有接口列表
|
|
|
+
|
|
|
+ Args:
|
|
|
+ repository_id: RAP2中的仓库ID
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ 仓库中的所有接口列表
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ # 从请求中获取RAP2信息
|
|
|
+ rap2_info = get_requestInfo()
|
|
|
+ rap2_base_url = rap2_info["url"]
|
|
|
+ rap2_header = rap2_info["header"]
|
|
|
+
|
|
|
+ if not rap2_base_url:
|
|
|
+ return {"error": RAP2_URL_MISSING_ERROR}
|
|
|
+
|
|
|
+ async with httpx.AsyncClient() as client:
|
|
|
+ url = f"{rap2_base_url}/repository/get?id={repository_id}"
|
|
|
+
|
|
|
+ response = await client.get(url, headers=rap2_header)
|
|
|
+ response.raise_for_status()
|
|
|
+
|
|
|
+ # 解析响应
|
|
|
+ success, result = parse_rap2_response(response)
|
|
|
+ if not success:
|
|
|
+ return result
|
|
|
+
|
|
|
+ # 返回仓库中的所有模块和接口
|
|
|
+ modules = result.get("data", {}).get("modules", [])
|
|
|
+ interfaces = []
|
|
|
+
|
|
|
+ for module in modules:
|
|
|
+ for interface in module.get("interfaces", []):
|
|
|
+ interface["moduleName"] = module.get("name", "")
|
|
|
+ interfaces.append(interface)
|
|
|
+
|
|
|
+ return interfaces
|
|
|
+ except httpx.HTTPError as e:
|
|
|
+ return {"error": f"HTTP请求失败: {str(e)}"}
|
|
|
+ except Exception as e:
|
|
|
+ return {"error": f"获取仓库接口列表失败: {str(e)}"}
|
|
|
+
|
|
|
+@app.tool()
|
|
|
+async def search_interfaces_by_keyword(keyword: str, repository_id: Optional[str] = None) -> List[Dict]:
|
|
|
+ """
|
|
|
+ 通过关键词搜索接口
|
|
|
+
|
|
|
+ Args:
|
|
|
+ keyword: 搜索关键词,可以是接口名称或URL的一部分
|
|
|
+ repository_id: 可选,限制在指定的仓库中搜索
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ 匹配的接口列表
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ # 从请求中获取RAP2信息
|
|
|
+ rap2_info = get_requestInfo()
|
|
|
+ rap2_base_url = rap2_info["url"]
|
|
|
+ rap2_header = rap2_info["header"]
|
|
|
+
|
|
|
+ if not rap2_base_url:
|
|
|
+ return {"error": RAP2_URL_MISSING_ERROR}
|
|
|
+
|
|
|
+ # 如果提供了repository_id,则在指定仓库中搜索
|
|
|
+ if repository_id:
|
|
|
+ interfaces = await get_repository_interfaces(repository_id)
|
|
|
+ if isinstance(interfaces, dict) and interfaces.get("error"):
|
|
|
+ return interfaces
|
|
|
+ else:
|
|
|
+ # 否则使用RAP2的搜索API
|
|
|
+ async with httpx.AsyncClient() as client:
|
|
|
+ url = f"{rap2_base_url}/interface/list?keyword={keyword}"
|
|
|
+
|
|
|
+ response = await client.get(url, headers=rap2_header)
|
|
|
+ response.raise_for_status()
|
|
|
+
|
|
|
+ # 解析响应
|
|
|
+ success, result = parse_rap2_response(response)
|
|
|
+ if not success:
|
|
|
+ return result
|
|
|
+
|
|
|
+ interfaces = result.get("data", [])
|
|
|
+
|
|
|
+ # 如果是在指定仓库中搜索,需要过滤结果
|
|
|
+ if repository_id and not isinstance(interfaces, dict):
|
|
|
+ filtered_interfaces = []
|
|
|
+ for interface in interfaces:
|
|
|
+ if (keyword.lower() in interface.get("name", "").lower() or
|
|
|
+ keyword.lower() in interface.get("url", "").lower()):
|
|
|
+ filtered_interfaces.append(interface)
|
|
|
+ return filtered_interfaces
|
|
|
+
|
|
|
+ return interfaces
|
|
|
+ except httpx.HTTPError as e:
|
|
|
+ return {"error": f"HTTP请求失败: {str(e)}"}
|
|
|
+ except Exception as e:
|
|
|
+ return {"error": f"搜索接口失败: {str(e)}"}
|
|
|
+
|
|
|
+@app.tool()
|
|
|
+async def generate_api_implementation(interface_id: str, framework: str = "fetch") -> Dict:
|
|
|
+ """
|
|
|
+ 根据接口ID生成API调用代码
|
|
|
+
|
|
|
+ Args:
|
|
|
+ interface_id: RAP2中的接口ID
|
|
|
+ framework: 代码框架,可选值: fetch, axios, react-query
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ 生成的API调用代码和类型定义
|
|
|
+ """
|
|
|
+ interface = await get_interface_by_id(interface_id)
|
|
|
+
|
|
|
+ if isinstance(interface, dict) and interface.get("error"):
|
|
|
+ return interface
|
|
|
+
|
|
|
+ # 构建基本信息
|
|
|
+ url = interface.get("url", "")
|
|
|
+ method = interface.get("method", "GET")
|
|
|
+ name = interface.get("name", "")
|
|
|
+
|
|
|
+ # 获取请求参数和响应数据结构
|
|
|
+ properties = interface.get("properties", [])
|
|
|
+ request_params = [p for p in properties if p.get("scope") == "request"]
|
|
|
+ response_params = [p for p in properties if p.get("scope") == "response"]
|
|
|
+
|
|
|
+ # 构建TypeScript类型定义
|
|
|
+ req_type_def = _build_type_definition(request_params, "Request")
|
|
|
+ res_type_def = _build_type_definition(response_params, "Response")
|
|
|
+
|
|
|
+ # 根据框架生成不同的实现代码
|
|
|
+ impl_code = _generate_implementation_code(url, method, name, framework)
|
|
|
+
|
|
|
+ return {
|
|
|
+ "name": name,
|
|
|
+ "url": url,
|
|
|
+ "method": method,
|
|
|
+ "requestType": req_type_def,
|
|
|
+ "responseType": res_type_def,
|
|
|
+ "implementationCode": impl_code,
|
|
|
+ "fullImplementation": f"{req_type_def}\n\n{res_type_def}\n\n{impl_code}"
|
|
|
+ }
|
|
|
+
|
|
|
+def _build_type_definition(params: List[Dict], suffix: str) -> str:
|
|
|
+ """构建TypeScript类型定义"""
|
|
|
+ # TypeScript类型映射
|
|
|
+ ts_type_map = {
|
|
|
+ "String": "string",
|
|
|
+ "Number": "number",
|
|
|
+ "Boolean": "boolean",
|
|
|
+ "Object": "object",
|
|
|
+ "Array": "any[]"
|
|
|
+ }
|
|
|
+
|
|
|
+ result = [f"interface {suffix} {{"]
|
|
|
+
|
|
|
+ for param in params:
|
|
|
+ prop_name = param.get("name", "")
|
|
|
+ prop_type = param.get("type", "String")
|
|
|
+ required = not param.get("required", False)
|
|
|
+ description = param.get("description", "")
|
|
|
+
|
|
|
+ ts_type = ts_type_map.get(prop_type, "any")
|
|
|
+ comment = f" // {description}" if description else ""
|
|
|
+ optional_mark = "?" if not required else ""
|
|
|
+
|
|
|
+ result.append(f" {prop_name}{optional_mark}: {ts_type};{comment}")
|
|
|
+
|
|
|
+ result.append("}")
|
|
|
+ return "\n".join(result)
|
|
|
+
|
|
|
+def _generate_implementation_code(url: str, method: str, name: str, framework: str) -> str:
|
|
|
+ """根据不同框架生成实现代码"""
|
|
|
+ fn_name = "".join([word.lower() if i == 0 else word.capitalize() for i, word in enumerate(name.split())])
|
|
|
+ method_upper = method.upper()
|
|
|
+ is_body_method = method_upper in ['POST', 'PUT', 'PATCH']
|
|
|
+
|
|
|
+ if framework == "fetch":
|
|
|
+ return f"""
|
|
|
+// 使用fetch实现{name}接口
|
|
|
+export async function {fn_name}(params: Request): Promise<Response> {{
|
|
|
+ const response = await fetch('{url}', {{
|
|
|
+ method: '{method}',
|
|
|
+ headers: {{
|
|
|
+ 'Content-Type': 'application/json',
|
|
|
+ }},
|
|
|
+ body: {is_body_method and 'JSON.stringify(params)' or 'undefined'},
|
|
|
+ }});
|
|
|
+
|
|
|
+ if (!response.ok) {{
|
|
|
+ throw new Error(`API请求失败: ${{response.status}}`);
|
|
|
+ }}
|
|
|
+
|
|
|
+ return response.json();
|
|
|
+}}
|
|
|
+""".strip()
|
|
|
+ elif framework == "axios":
|
|
|
+ return f"""
|
|
|
+// 使用axios实现{name}接口
|
|
|
+import axios from 'axios';
|
|
|
+
|
|
|
+export async function {fn_name}(params: Request): Promise<Response> {{
|
|
|
+ const response = await axios({{
|
|
|
+ url: '{url}',
|
|
|
+ method: '{method}',
|
|
|
+ data: {is_body_method and 'params' or 'undefined'},
|
|
|
+ params: {method_upper == 'GET' and 'params' or 'undefined'},
|
|
|
+ }});
|
|
|
+
|
|
|
+ return response.data;
|
|
|
+}}
|
|
|
+""".strip()
|
|
|
+ elif framework == "react-query":
|
|
|
+ is_get = method_upper == 'GET'
|
|
|
+ hook_name = f"use{name.replace(' ', '')}"
|
|
|
+
|
|
|
+ if is_get:
|
|
|
+ return f"""
|
|
|
+// 使用react-query实现{name}接口
|
|
|
+import {{ useQuery }} from '@tanstack/react-query';
|
|
|
+import axios from 'axios';
|
|
|
+
|
|
|
+// GET请求实现
|
|
|
+export function {hook_name}(params: Request) {{
|
|
|
+ return useQuery(['{name}', params], async () => {{
|
|
|
+ const response = await axios.get('{url}', {{ params }});
|
|
|
+ return response.data;
|
|
|
+ }});
|
|
|
+}}
|
|
|
+""".strip()
|
|
|
+ else:
|
|
|
+ return f"""
|
|
|
+// 使用react-query实现{name}接口
|
|
|
+import {{ useMutation }} from '@tanstack/react-query';
|
|
|
+import axios from 'axios';
|
|
|
+
|
|
|
+// 变更请求实现
|
|
|
+export function {hook_name}() {{
|
|
|
+ return useMutation(async (params: Request) => {{
|
|
|
+ const response = await axios.{method.lower()}('{url}', params);
|
|
|
+ return response.data;
|
|
|
+ }});
|
|
|
+}}
|
|
|
+""".strip()
|
|
|
+ else:
|
|
|
+ return f"// 未支持的框架: {framework}"
|
|
|
+
|
|
|
+@app.tool()
|
|
|
+async def test_rap2_connection() -> Dict:
|
|
|
+ """
|
|
|
+ 测试与RAP2服务器的连接
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ 测试结果,包含状态和详细信息
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ # 从请求中获取RAP2信息
|
|
|
+ rap2_info = get_requestInfo()
|
|
|
+ rap2_base_url = rap2_info["url"]
|
|
|
+ rap2_header = rap2_info["header"]
|
|
|
+
|
|
|
+ if not rap2_base_url:
|
|
|
+ return {
|
|
|
+ "status": "连接失败",
|
|
|
+ "message": RAP2_URL_MISSING_ERROR,
|
|
|
+ "url": "未知"
|
|
|
+ }
|
|
|
+
|
|
|
+ # 测试1: 不带Cookie的请求
|
|
|
+ async with httpx.AsyncClient() as client:
|
|
|
+ response1 = await client.get(f"{rap2_base_url}/test/test", timeout=10.0)
|
|
|
+
|
|
|
+ # 测试2: 带Cookie的请求
|
|
|
+ async with httpx.AsyncClient() as client:
|
|
|
+ response2 = await client.get(f"{rap2_base_url}/repository/joined", headers=rap2_header, timeout=10.0)
|
|
|
+
|
|
|
+ # 解析响应
|
|
|
+ success, result = parse_rap2_response(response2)
|
|
|
+ if not success:
|
|
|
+ return {
|
|
|
+ "status": "解析失败",
|
|
|
+ "message": result.get("error", "未知错误"),
|
|
|
+ "statusCode": response2.status_code
|
|
|
+ }
|
|
|
+
|
|
|
+ # 检查身份验证状态
|
|
|
+ if isinstance(result, dict):
|
|
|
+ if result.get("isOk") is False and "没有访问权限" in str(result.get("errMsg", "")):
|
|
|
+ return {
|
|
|
+ "status": "未授权",
|
|
|
+ "message": "Cookie无效或已过期,请更新Cookie",
|
|
|
+ "detail": str(result.get("errMsg", ""))
|
|
|
+ }
|
|
|
+
|
|
|
+ return {
|
|
|
+ "status": "连接成功",
|
|
|
+ "message": f"成功连接到RAP2服务器: {rap2_base_url}",
|
|
|
+ "statusCode1": response1.status_code,
|
|
|
+ "statusCode2": response2.status_code
|
|
|
+ }
|
|
|
+ except Exception as e:
|
|
|
+ return {
|
|
|
+ "status": "连接失败",
|
|
|
+ "message": f"无法连接到RAP2服务器: {str(e)}",
|
|
|
+ "url": rap2_base_url if 'rap2_base_url' in locals() else "未知"
|
|
|
+ }
|
|
|
+
|
|
|
+if __name__ == "__main__":
|
|
|
+ # 输出启动配置信息
|
|
|
+ print("\n=== RAP2 MCP服务器启动 ===")
|
|
|
+ print(f"服务器地址: {HOST}:{PORT}")
|
|
|
+ print(f"传输方式: {TRANSPORT}")
|
|
|
+
|
|
|
+ # 检查RAP2配置
|
|
|
+ rap2_url = args.rap2_url or os.environ.get("RAP2_URL")
|
|
|
+ rap2_sid = args.rap2_sid or os.environ.get("RAP2_SID")
|
|
|
+ rap2_sid_sig = args.rap2_sid_sig or os.environ.get("RAP2_SID_SIG")
|
|
|
+
|
|
|
+ if rap2_url:
|
|
|
+ print(f"RAP2服务器: {rap2_url}")
|
|
|
+ if rap2_sid and rap2_sid_sig:
|
|
|
+ print("RAP2认证: 已配置")
|
|
|
+ else:
|
|
|
+ print("RAP2认证: 未配置 (将使用请求头获取)")
|
|
|
+ else:
|
|
|
+ print("RAP2服务器: 未配置 (将使用请求头获取)")
|
|
|
+
|
|
|
+ print("=========================\n")
|
|
|
+
|
|
|
+ # 直接使用 SSE 传输协议运行 MCP 服务器
|
|
|
+ app.run(transport=TRANSPORT)
|