继AI MCP协议之后,Google又提出了一个新协议,A2A,全称为Agent-to-Agent,他允许一个Agent调用另一个Agent。

简单使用

在官方A2A文档中,提供的Python和NodeJS的代码,本篇文章中使用NodeJS作为演示。

  1. 拉取官方示例代码,主要需要image-20250417104519297这两个文件夹的代码
  2. 编写服务端代码,其实就是包装一个Agent
    • 我们需要new A2AServer创建一个A2A的服务端,他有两个入参TaskHandleroptions
    • 对于TaskHandler
      • 这里不多说关于编程语言,在这个方法定义里你需要
        • 确认当前Agent状态并且告诉taskStore来传递给客户端
        • 实现Agent
  3. 编写客户端代码
    • 我们需要new A2AClient创建一个A2A的客户端,他需要一个serverUrl,也就是服务端的地址
    • 通过/.well-known/agent.json获取到服务端Agent的能力,这里返回的信息就是在服务端的card中定义的信息
    • 通过client.sendTaskSubscribe调用服务端Agent sendTaskSubscribe是流式sendTask是阻塞式
  4. 运行服务端代码,会通过express开启一个端口号默认为41241的应用
  5. 运行客户端代码,我们现在就可以通过客户端调用服务端Agent了

剖析

服务端

A2AServer都干了什么

最主要的就是创建一个express应用,将自己暴露出去

TaskHandler

下面是taskhandler的定义源码

/**
* Represents the possible types of updates a TaskHandler can yield.
* It's either a partial TaskStatus (without the server-managed timestamp)
* or a complete Artifact object.
*/
export type TaskYieldUpdate =
| Omit<schema.TaskStatus, "timestamp">
| schema.Artifact;

/**
* Defines the signature for a task handler function.
*
* Handlers are implemented as async generators. They receive context about the
* task and the triggering message. They can perform work and `yield` status
* or artifact updates (`TaskYieldUpdate`). The server consumes these yields,
* updates the task state in the store, and streams events if applicable.
*
* @param context - The TaskContext object containing task details, cancellation status, and store access.
* @yields {TaskYieldUpdate} - Updates to the task's status or artifacts.
* @returns {Promise<schema.Task | void>} - Optionally returns the final complete Task object
* (needed for non-streaming 'tasks/send'). If void is returned, the server uses the
* last known state from the store after processing all yields.
*/
export type TaskHandler = (
context: TaskContext
) => AsyncGenerator<TaskYieldUpdate, schema.Task | void, unknown>;

我们可以看到入参是context,可以持续返回TaskYieldUpdate,最终返回一个Task或者空

context
/**
* Context object provided to the TaskHandler.
*/
export interface TaskContext {
/**
* The current state of the task when the handler is invoked or resumed.
* Note: This is a snapshot. For the absolute latest state during async operations,
* the handler might need to reload the task via the store.
*/
task: schema.Task;

/**
* The specific user message that triggered this handler invocation or resumption.
*/
userMessage: schema.Message;

/**
* Function to check if cancellation has been requested for this task.
* Handlers should ideally check this periodically during long-running operations.
* @returns {boolean} True if cancellation has been requested, false otherwise.
*/
isCancelled(): boolean;

/**
* The message history associated with the task up to the point the handler is invoked.
* Optional, as history might not always be available or relevant.
*/
history?: schema.Message[];

// taskStore is removed as the server now handles loading/saving directly.
// If a handler specifically needs history, it would need to be passed differently
// or the handler pattern might need adjustment based on use case.

// Potential future additions:
// - logger instance
// - AbortSignal linked to cancellation
}
  • Task:指的就是当前这个任务
    • 任务中的TaskStatus包含working,input-required,completed,canceled,failed。看名字大家就知道这几个分别代表着什么哈
    • 任务中还有sessionId,artifacts,metadata这里不细说
  • userMessage:这个就是用户发送的消息
  • history:这个就是用户发送的消息的集合
TaskYieldUpdate

TaskStatusUpdateEvent主要包含的也是任务中的TaskStatus,持续返回告诉客户端目前任务是什么状态

Task

在上文已经讲过了

A2AServerOptions

options包含了

  • taskStore 上下文存储
  • corsOptions 是否接受跨域
  • basePath 路由前缀
  • card 这个是最重要的,可以把这个card理解为MCP的描述和参数描述
card

基础信息name,description,url等
还有skills,相当于大模型的FunctionCalling
实例中的card是

{
name: "Movie Agent",
description:
"An agent that can answer questions about movies and actors using TMDB.",
url: "http://localhost:41241", // Default port used in the script
provider: {
organization: "A2A Samples",
},
version: "0.0.1",
capabilities: {
// Although it yields multiple updates, it doesn't seem to implement full A2A streaming via TaskYieldUpdate artifacts
// It uses Genkit streaming internally, but the A2A interface yields start/end messages.
// State history seems reasonable as it processes history.
streaming: false,
pushNotifications: false,
stateTransitionHistory: true,
},
authentication: null,
defaultInputModes: ["text"],
defaultOutputModes: ["text"],
skills: [
{
id: "general_movie_chat",
name: "General Movie Chat",
description:
"Answer general questions or chat about movies, actors, directors.",
tags: ["movies", "actors", "directors"],
examples: [
"Tell me about the plot of Inception.",
"Recommend a good sci-fi movie.",
"Who directed The Matrix?",
"What other movies has Scarlett Johansson been in?",
"Find action movies starring Keanu Reeves",
"Which came out first, Jurassic Park or Terminator 2?",
],
},
// The specific tools are used internally by the Genkit agent,
// but from the A2A perspective, it exposes one general chat skill.
],
}

根据这个示例来修改

客户端

A2AClient都干了什么

获取服务端的card

上文提到过,这个功能标准的A2AClient并没有帮我们实现,我们需要手动去做

sendTask&sendTaskSubscribe

发送阻塞请求和流式请求,分别调用的是服务端的tasks/sendtasks/sendSubscribe两个接口,其实也就是Http请求

getTask

通过taskid获取任务,调用tasks/get接口

cancelTask

不多解释,就是调用tasks/cancel接口

setTaskPushNotification&getTaskPushNotification

调用tasks/pushNotification/settasks/pushNotification/get
NodeJS的示例还没有实现

// Add other methods like tasks/pushNotification/*, tasks/resubscribe later if needed

resubscribeTask

调用tasks/resubscribe接口,对于中断的任务做重新连接的方法,也没有实现,同上

A2AClient是如何与A2AServer交互的呢

image-20250418153043320
image-20250418154256633
事例中没有是Agent调用Agent的路径,只是给大家介绍了之间的协议

延展阅读

A2A(Agent2Agent)系列专题 (五) A2A 协议架构:客户端-服务器模型解析