1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93
| import json from openai import OpenAI
client = OpenAI()
tools = [ { "type": "function", "function": { "name": "execute_command", "description": "执行 shell 命令并返回输出", "parameters": { "type": "object", "properties": { "command": {"type": "string", "description": "要执行的 shell 命令"} }, "required": ["command"] } } }, { "type": "function", "function": { "name": "read_file", "description": "读取文件内容", "parameters": { "type": "object", "properties": { "path": {"type": "string", "description": "文件路径"} }, "required": ["path"] } } }, ]
def execute_command(command): import subprocess result = subprocess.run(command, shell=True, capture_output=True, text=True, timeout=30) return result.stdout or result.stderr
def read_file(path): with open(path, "r") as f: return f.read()
def run_agent(user_message, max_rounds=10): messages = [ {"role": "system", "content": "你是一个有用的助手,可以执行命令和读取文件来帮助用户。"}, {"role": "user", "content": user_message}, ]
for round_num in range(max_rounds): response = client.chat.completions.create( model="gpt-4o", messages=messages, tools=tools, )
msg = response.choices[0].message
if not msg.tool_calls: return msg.content
messages.append(msg) for tool_call in msg.tool_calls: func_name = tool_call.function.name args = json.loads(tool_call.function.arguments)
if func_name == "execute_command": result = execute_command(args["command"]) elif func_name == "read_file": result = read_file(args["path"]) else: result = f"Unknown tool: {func_name}"
print(f" [Tool] {func_name}({args}) → {result[:200]}")
messages.append({ "role": "tool", "tool_call_id": tool_call.id, "content": result, })
return "达到最大轮次,任务未完成"
answer = run_agent("查看当前目录有哪些 Python 文件,以及它们的总行数") print(answer)
|