编写 Channel 插件
本指南以一个假想的 Webhook 平台为例,从零开始编写一个完整的 Channel 插件。
概述
一个 Channel 插件的最小结构:
my-channel-plugin/
├── elftia-channel.json # 清单文件(必须)
├── package.json # npm 包描述
├── tsconfig.json # TypeScript 配置
├── src/
│ └── index.ts # 入口文件(导出工厂函数)
└── dist/
└── index.cjs # 编译输出(CommonJS 格式)
第一步:初始化项目
mkdir elftia-channel-webhook
cd elftia-channel-webhook
npm init -y
安装开发依赖:
npm install -D typescript tsup @elftia/channel-sdk
第二步:创建清单文件
创建 elftia-channel.json:
{
"name": "elftia-channel-webhook",
"type": "webhook",
"displayName": "Webhook",
"version": "1.0.0",
"description": "通过 HTTP Webhook 接收和发送消息",
"author": "Your Name",
"entry": "dist/index.cjs",
"credentials": [
{
"key": "incomingUrl",
"label": "Incoming Webhook URL",
"type": "text",
"required": true,
"placeholder": "https://example.com/webhook/incoming",
"helpText": "接收消息的 Webhook 端点"
},
{
"key": "outgoingSecret",
"label": "Outgoing Secret",
"type": "password",
"required": false,
"helpText": "用于验证出站请求的密钥"
}
],
"capabilities": {
"typing": false,
"reactions": false,
"attachments": false,
"threads": false,
"groupChat": false
},
"maxMessageLength": 10000
}
第三步:实现插件
创建 src/index.ts:
import type {
ChannelPlugin,
ChannelPluginContext,
ChannelPluginFactory,
} from '@elftia/channel-sdk';
/**
* Webhook Channel 插件实现
*/
class WebhookPlugin implements ChannelPlugin {
readonly type = 'webhook';
private connected = false;
private credentials: Record<string, string> = {};
private pollTimer: ReturnType<typeof setInterval> | null = null;
constructor(private ctx: ChannelPluginContext) {}
async connect(
credentials: Record<string, string>,
_options?: Record<string, unknown>,
): Promise<void> {
this.credentials = credentials;
if (!credentials.incomingUrl) {
throw new Error('Incoming Webhook URL is required');
}
this.ctx.log.info('Connecting to webhook endpoint', {
url: credentials.incomingUrl,
});
// 验证端点可达性
try {
const response = await fetch(credentials.incomingUrl, {
method: 'HEAD',
signal: AbortSignal.timeout(5000),
});
if (!response.ok) {
throw new Error(`Endpoint returned ${response.status}`);
}
} catch (err) {
throw new Error(
`Cannot reach webhook endpoint: ${(err as Error).message}`,
);
}
// 开始轮询新消息(示例:每 5 秒)
this.pollTimer = setInterval(() => {
this.pollMessages().catch((err) => {
this.ctx.log.error('Poll error', err as Error);
});
}, 5000);
this.connected = true;
this.ctx.emitStatusChange('connected');
this.ctx.log.info('Connected successfully');
}
async disconnect(): Promise<void> {
if (this.pollTimer) {
clearInterval(this.pollTimer);
this.pollTimer = null;
}
this.connected = false;
this.ctx.emitStatusChange('disconnected');
this.ctx.log.info('Disconnected');
}
isConnected(): boolean {
return this.connected;
}
async sendMessage(chatId: string, text: string): Promise<void> {
const url = this.credentials.incomingUrl;
if (!url) throw new Error('Not connected');
const body = JSON.stringify({
chatId,
text,
secret: this.credentials.outgoingSecret,
});
const response = await fetch(url, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body,
signal: AbortSignal.timeout(10000),
});
if (!response.ok) {
throw new Error(`Send failed: HTTP ${response.status}`);
}
this.ctx.log.debug('Message sent', { chatId, length: text.length });
}
async dispose(): Promise<void> {
await this.disconnect();
}
// ─── 内部方法 ──────────────────────────
private async pollMessages(): Promise<void> {
const lastPollTime = await this.ctx.storage.get<string>('lastPollTime');
const since = lastPollTime || new Date(0).toISOString();
const url = `${this.credentials.incomingUrl}?since=${encodeURIComponent(since)}`;
const response = await fetch(url, {
signal: AbortSignal.timeout(5000),
});
if (!response.ok) return;
const data = (await response.json()) as {
messages: Array<{
id: string;
chatId: string;
senderId: string;
senderName: string;
content: string;
timestamp: string;
}>;
};
for (const msg of data.messages) {
this.ctx.emitMessage({
id: msg.id,
chatId: msg.chatId,
senderId: msg.senderId,
senderName: msg.senderName,
content: msg.content,
timestamp: msg.timestamp,
isFromMe: false,
isGroup: false,
});
}
if (data.messages.length > 0) {
const latest = data.messages[data.messages.length - 1];
await this.ctx.storage.set('lastPollTime', latest.timestamp);
}
}
}
/**
* 工厂函数 — 插件的默认导出
*/
const factory: ChannelPluginFactory = (ctx) => new WebhookPlugin(ctx);
export default factory;
第四步:配置构建
创建 tsconfig.json:
{
"compilerOptions": {
"target": "ES2022",
"module": "ESNext",
"moduleResolution": "bundler",
"strict": true,
"outDir": "dist",
"declaration": false,
"skipLibCheck": true
},
"include": ["src"]
}
在 package.json 中添加构建脚本:
{
"name": "elftia-channel-webhook",
"version": "1.0.0",
"main": "dist/index.cjs",
"scripts": {
"build": "tsup src/index.ts --format cjs --outDir dist --clean",
"dev": "tsup src/index.ts --format cjs --outDir dist --watch"
},
"devDependencies": {
"@elftia/channel-sdk": "^1.0.0",
"tsup": "^8.0.0",
"typescript": "^5.6.0"
}
}
构建插件:
npm run build
确认 dist/index.cjs 已生成。
第五步:本地测试
将插件目录链接到 Elftia 的插件目录:
# Windows
mklink /J "%APPDATA%\elftia\channel-plugins\webhook" "C:\path\to\elftia-channel-webhook"
# macOS / Linux
ln -s /path/to/elftia-channel-webhook ~/.config/elftia/channel-plugins/webhook
或者使用 Elftia 的本地安装功能:
ChannelPluginLoader.installFromLocal('/path/to/elftia-channel-webhook')
验证加载
- 启动 Elftia
- 打开 设置 → Channel → 添加渠道
- 确认列表中出现 Webhook 选项
- 创建实例,填写凭证,测试连接
开发模式
在开发过程中使用 npm run dev(watch 模式),修改代码后 tsup 会自动重新编译。重启 Elftia 即可加载最新代码。
第六步:发布到 Marketplace
打包
将插件打包为 .zip 文件(根目录必须包含 elftia-channel.json):
cd elftia-channel-webhook
zip -r elftia-channel-webhook-1.0.0.zip \
elftia-channel.json \
package.json \
dist/
发布
提交到 Elftia Channel Plugin Marketplace:
- 确保
elftia-channel.json中的版本号正确 - 计算 zip 文件的 SHA-256 校验和
- 提交 zip 文件和校验和到 Marketplace 仓库
- 等待审核通过后,插件会出现在 CDN 的
channel-manifest.json中
开发注意事项
入口文件格式
插件入口必须是 CommonJS 格式(.cjs 或标准 .js),因为 Elftia 使用 require() 加载插件:
const mod = require(entryPath);
const factory = mod.default || mod;
错误处理
connect()中的错误会被 Registry 捕获并将实例状态设为errorsendMessage()中的错误会被 Router 捕获并记录日志- 建议使用
ctx.log.error()记录内部错误 - 网络请求务必设置超时
状态上报
使用 ctx.emitStatusChange() 及时上报状态变化:
| 状态 | 何时上报 |
|---|---|
connecting | 开始建立连接 |
connected | 连接成功 |
disconnected | 主动断开连接 |
error | 连接失败或运行时错误 |
reconnecting | 自动重连中 |
Registry 会对状态进行标准化处理:不会将 connected 降级为 connecting(防止自动重连导致状态闪烁)。
存储最佳实践
- 使用
ctx.storage存储需要跨会话保留的状态(如轮询偏移量) - 值会被 JSON 序列化,确保存储的数据可序列化
- 实例删除时,框架会自动清除该实例的所有存储数据
数据目录
ctx.dataDir 提供了一个持久化的文件系统目录,适合存储:
- 下载的文件(如接收到的图片)
- 缓存数据
- 临时文件
目录由框架自动创建,路径格式:{userData}/channel-data/{channelId}/
System Prompt 注入
如果你的平台有特殊的消息格式或能力标签,可以实现 getSystemPrompt() 方法:
getSystemPrompt(context: SystemPromptContext): string | undefined {
if (context.isGroup) {
return `你正在一个 Webhook 群组聊天中。使用纯文本格式回复。`;
}
return `你正在与 ${context.senderName} 进行 Webhook 私聊。`;
}
返回的文本会被追加到 Agent 的基础 system prompt 中,让 AI 了解当前平台的上下文。
下一步
- Channel 插件 SDK — 完整的接口和类型参考
- 消息路由与安全管线 — 消息到达 Agent 的完整处理流程
- Channel 系统概览 — 系统整体架构