Kimi Code 源码架构分析
项目概述
Kimi Code CLI 是一个基于 AI 的 CLI 代理工具,用于软件工程工作流。支持交互式 Shell UI、ACP 服务器模式(IDE 集成)和 MCP 工具加载。
技术栈
- 语言:Python 3.12+(工具配置为 3.14)
- CLI 框架:Typer
- 异步运行时:asyncio
- LLM 框架:kosong(统一消息结构、异步工具编排、可插拔聊天提供者)
- MCP 集成:fastmcp
- 日志:loguru
- 包管理/构建:uv + uv_build;PyInstaller 用于二进制构建
- 测试:pytest + pytest-asyncio
- 代码质量:ruff(lint/format)、pyright + ty(类型检查)
核心架构
1. 入口层(CLI)
python
# src/kimi_cli/cli/__init__.py:12-14
cli = typer.Typer(
epilog="""\b\
Documentation: https://moonshotai.github.io/kimi-cli/\n- 位置:
src/kimi_cli/cli/__init__.py - 功能:解析命令行参数,支持多种 UI 模式(shell/print/acp/wire)
- 子命令:
info、mcp、web、login、logout、acp、term
2. 应用层(App)
python
# src/kimi_cli/app.py:54-168
class KimiCLI:
@staticmethod
async def create(
session: Session,
*,
# Basic configuration
config: Config | Path | None = None,
model_name: str | None = None,
thinking: bool | None = None,
# Run mode
yolo: bool = False,
# Extensions
agent_file: Path | None = None,
mcp_configs: list[MCPConfig] | list[dict[str, Any]] | None = None,
skills_dir: KaosPath | None = None,
# Loop control
max_steps_per_turn: int | None = None,
max_retries_per_step: int | None = None,
max_ralph_iterations: int | None = None,
) -> KimiCLI:- 位置:
src/kimi_cli/app.py - 职责:
- 加载配置和模型
- 创建 Runtime 和 Agent
- 初始化 KimiSoul
- 提供多种运行模式(shell/print/acp/wire)
3. 核心代理层(Soul)
3.1 KimiSoul(主循环)
python
# src/kimi_cli/soul/kimisoul.py:89-208
class KimiSoul:
"""The soul of Kimi Code CLI."""
def __init__(
self,
agent: Agent,
*,
context: Context,
):
"""
Initialize the soul.
Args:
agent (Agent): The agent to run.
context (Context): The context of the agent.
"""
self._agent = agent
self._runtime = agent.runtime
self._denwa_renji = agent.runtime.denwa_renji
self._approval = agent.runtime.approval
self._context = context
self._loop_control = agent.runtime.config.loop_control
self._compaction = SimpleCompaction() # TODO: maybe configurable and composable
for tool in agent.toolset.tools:
if tool.name == SendDMail_NAME:
self._checkpoint_with_user_message = True
break
else:
self._checkpoint_with_user_message = False
self._slash_commands = self._build_slash_commands()
self._slash_command_map = self._index_slash_commands(self._slash_commands)
@property
def name(self) -> str:
return self._agent.name
@property
def model_name(self) -> str:
return self._runtime.llm.chat_provider.model_name if self._runtime.llm else ""
@property
def model_capabilities(self) -> set[ModelCapability] | None:
if self._runtime.llm is None:
return None
return self._runtime.llm.capabilities
@property
def thinking(self) -> bool | None:
"""Whether thinking mode is enabled."""
if self._runtime.llm is None:
return None
if thinking_effort := self._runtime.llm.chat_provider.thinking_effort:
return thinking_effort != "off"
return None
@property
def status(self) -> StatusSnapshot:
return StatusSnapshot(
context_usage=self._context_usage,
yolo_enabled=self._approval.is_yolo(),
)
@property
def agent(self) -> Agent:
return self._agent
@property
def runtime(self) -> Runtime:
return self._runtime
@property
def context(self) -> Context:
return self._context
@property
def _context_usage(self) -> float:
if self._runtime.llm is not None:
return self._context.token_count / self._runtime.llm.max_context_size
return 0.0
@property
def wire_file(self) -> WireFile:
return self._runtime.session.wire_file
async def _checkpoint(self):
await self._context.checkpoint(self._checkpoint_with_user_message)
@property
def available_slash_commands(self) -> list[SlashCommand[Any]]:
return self._slash_commands
async def run(self, user_input: str | list[ContentPart]):
# Refresh OAuth tokens on each turn to avoid idle-time expirations.
await self._runtime.oauth.ensure_fresh(self._runtime)
wire_send(TurnBegin(user_input=user_input))
user_message = Message(role="user", content=user_input)
text_input = user_message.extract_text(" ").strip()
if command_call := parse_slash_command_call(text_input):
command = self._find_slash_command(command_call.name)
if command is None:
# this should not happen actually, the shell should have filtered it out
wire_send(TextPart(text=f'Unknown slash command "/{command_call.name}".'))
else:
ret = command.func(self, command_call.args)
if isinstance(ret, Awaitable):
await ret
elif self._loop_control.max_ralph_iterations != 0:
runner = FlowRunner.ralph_loop(
user_message,
self._loop_control.max_ralph_iterations,
)
await runner.run(self, "")
else:
await self._turn(user_message)
wire_send(TurnEnd())- 位置:
src/kimi_cli/soul/kimisoul.py - 职责:
- 执行代理主循环(
_turn→_agent_loop→_step) - 处理工具调用
- 管理上下文压缩
- 处理斜杠命令和技能
- 执行代理主循环(
3.2 Runtime(运行时环境)
python
# src/kimi_cli/soul/agent.py:63-124
@dataclass(slots=True, kw_only=True)
class Runtime:
"""Agent runtime."""
config: Config
oauth: OAuthManager
llm: LLM | None # we do not freeze the `Runtime` dataclass because LLM can be changed
session: Session
builtin_args: BuiltinSystemPromptArgs
denwa_renji: DenwaRenji
approval: Approval
labor_market: LaborMarket
environment: Environment
skills: dict[str, Skill]
@staticmethod
async def create(
config: Config,
oauth: OAuthManager,
llm: LLM | None,
session: Session,
yolo: bool,
skills_dir: KaosPath | None = None,
) -> Runtime:
ls_output, agents_md, environment = await asyncio.gather(
list_directory(session.work_dir),
load_agents_md(session.work_dir),
Environment.detect(),
)
# Discover and format skills
skills_roots = await resolve_skills_roots(session.work_dir, skills_dir_override=skills_dir)
skills = await discover_skills_from_roots(skills_roots)
skills_by_name = index_skills(skills)
logger.info("Discovered {count} skill(s)", count=len(skills))
skills_formatted = "\n".join(
(
f"- {skill.name}\n"
f" - Path: {skill.skill_md_file}\n"
f" - Description: {skill.description}"
)
for skill in skills
)
return Runtime(
config=config,
oauth=oauth,
llm=llm,
session=session,
builtin_args=BuiltinSystemPromptArgs(
KIMI_NOW=datetime.now().astimezone().isoformat(),
KIMI_WORK_DIR=session.work_dir,
KIMI_WORK_DIR_LS=ls_output,
KIMI_AGENTS_MD=agents_md or "",
KIMI_SKILLS=skills_formatted or "No skills found.",
),
denwa_renji=DenwaRenji(),
approval=Approval(yolo=yolo),
labor_market=LaborMarket(),
environment=environment,
skills=skills_by_name,
)- 位置:
src/kimi_cli/soul/agent.py - 职责:
- 管理配置、OAuth、LLM、会话
- 提供内置系统提示参数
- 管理技能发现和索引
- 管理子代理(LaborMarket)
3.3 Context(上下文管理)
- 位置:
src/kimi_cli/soul/context.py - 职责:
- 管理对话历史
- 实现检查点机制
- 跟踪 token 使用
3.4 Toolset(工具系统)
- 位置:
src/kimi_cli/soul/toolset.py - 职责:
- 加载工具(内置 + MCP)
- 执行工具调用
- 注入依赖
4. 工具系统(Tools)
内置工具位于 src/kimi_cli/tools/:
- 文件操作:
file/(read, write, replace, grep, glob) - Shell 命令:
shell/(bash, powershell) - Web 操作:
web/(fetch, search) - 多代理:
multiagent/(task, create) - 其他:
dmail/、think/、todo/
5. 通信层(Wire)
python
# src/kimi_cli/wire/__init__.py:18-56
class Wire:
"""
A spmc channel for communication between the soul and the UI during a soul run.
"""
def __init__(self, *, file_backend: WireFile | None = None):
self._raw_queue = WireMessageQueue()
self._merged_queue = WireMessageQueue()
self._soul_side = WireSoulSide(self._raw_queue, self._merged_queue)
if file_backend is not None:
# record all complete Wire messages to the file backend
self._recorder = _WireRecorder(file_backend, self._merged_queue.subscribe())
else:
self._recorder = None
@property
def soul_side(self) -> WireSoulSide:
return self._soul_side
def ui_side(self, *, merge: bool) -> WireUISide:
"""
Create a UI side of the `Wire`.
Args:
merge: Whether to merge `Wire` messages as much as possible.
"""
if merge:
return WireUISide(self._merged_queue.subscribe())
else:
return WireUISide(self._raw_queue.subscribe())
def shutdown(self) -> None:
self.soul_side.flush()
logger.debug("Shutting down wire")
self._raw_queue.shutdown()
self._merged_queue.shutdown()- 位置:
src/kimi_cli/wire/ - 职责:
- 在 Soul 和 UI 之间传递消息
- 支持消息合并
- 支持文件后端记录
6. UI 层
- Shell UI:
src/kimi_cli/ui/shell/(交互式 TUI) - Print UI:
src/kimi_cli/ui/print/(非交互式输出) - ACP UI:
src/kimi_cli/ui/acp/(ACP 协议支持) - Web UI:
src/kimi_cli/web/(Web 界面)
7. 配置与扩展
7.1 Agent 规范
- 位置:
src/kimi_cli/agents/(YAML 文件) - 功能:
- 定义系统提示
- 选择工具
- 定义固定子代理
- 支持继承(extend)
7.2 Skills(技能)
- 位置:
src/kimi_cli/skills/和用户目录 - 类型:
- 标准技能:
/skill:<name> - 流程技能:
/flow:<name>
- 标准技能:
7.3 MCP 集成
- 位置:
src/kimi_cli/mcp.py、src/kimi_cli/cli/mcp.py - 功能:加载和管理 MCP 服务器工具
数据流
用户输入
↓
CLI 解析 (cli/__init__.py)
↓
KimiCLI.create() (app.py)
↓
Runtime.create() + Agent.load()
↓
KimiSoul.run() (soul/kimisoul.py)
↓
_turn() → _agent_loop() → _step()
↓
LLM 调用 (kosong) + 工具执行
↓
Wire 消息 → UI 层
↓
用户看到结果关键特性
- 多模式运行:Shell/Print/ACP/Wire
- 工具扩展:内置工具 + MCP 工具
- 技能系统:标准技能和流程技能
- 子代理:支持多代理协作
- 上下文管理:检查点、压缩、D-Mail
- 审批机制:工具调用需要用户批准(可配置 yolo 模式)
项目组织
- Monorepo:使用 uv workspace 管理多个包
- 工作区包:
packages/kosong/:LLM 抽象层packages/kaos/:操作系统抽象层packages/kimi-code/:Kimi Code 相关sdks/kimi-sdk/:SDK
总结
采用分层架构:
- 入口层:CLI 参数解析
- 应用层:初始化和协调
- 核心层:代理循环和运行时
- 工具层:可扩展工具系统
- 通信层:Soul-UI 消息传递
- UI 层:多种界面实现
设计强调模块化、可扩展性和异步处理,支持多种运行模式和扩展方式。