Skip to main content

编写 Channel 插件

本指南以一个假想的 Webhook 平台为例,从零开始编写一个完整的 Channel 插件。

概述

一个 Channel 插件的最小结构:

my-channel-plugin/
├── elftia-channel.json # 清单文件(必须)
├── package.json # npm 包描述
├── tsconfig.json # TypeScript 配置
├── src/
│ └── index.ts # 入口文件(导出工厂函数)
└── dist/
└── index.cjs # 编译输出(CommonJS 格式)

第一步:初始化项目

mkdir elftia-channel-webhook
cd elftia-channel-webhook
npm init -y

安装开发依赖:

npm install -D typescript tsup @elftia/channel-sdk

第二步:创建清单文件

创建 elftia-channel.json

{
"name": "elftia-channel-webhook",
"type": "webhook",
"displayName": "Webhook",
"version": "1.0.0",
"description": "通过 HTTP Webhook 接收和发送消息",
"author": "Your Name",
"entry": "dist/index.cjs",
"credentials": [
{
"key": "incomingUrl",
"label": "Incoming Webhook URL",
"type": "text",
"required": true,
"placeholder": "https://example.com/webhook/incoming",
"helpText": "接收消息的 Webhook 端点"
},
{
"key": "outgoingSecret",
"label": "Outgoing Secret",
"type": "password",
"required": false,
"helpText": "用于验证出站请求的密钥"
}
],
"capabilities": {
"typing": false,
"reactions": false,
"attachments": false,
"threads": false,
"groupChat": false
},
"maxMessageLength": 10000
}

第三步:实现插件

创建 src/index.ts

import type {
ChannelPlugin,
ChannelPluginContext,
ChannelPluginFactory,
} from '@elftia/channel-sdk';

/**
* Webhook Channel 插件实现
*/
class WebhookPlugin implements ChannelPlugin {
readonly type = 'webhook';
private connected = false;
private credentials: Record<string, string> = {};
private pollTimer: ReturnType<typeof setInterval> | null = null;

constructor(private ctx: ChannelPluginContext) {}

async connect(
credentials: Record<string, string>,
_options?: Record<string, unknown>,
): Promise<void> {
this.credentials = credentials;

if (!credentials.incomingUrl) {
throw new Error('Incoming Webhook URL is required');
}

this.ctx.log.info('Connecting to webhook endpoint', {
url: credentials.incomingUrl,
});

// 验证端点可达性
try {
const response = await fetch(credentials.incomingUrl, {
method: 'HEAD',
signal: AbortSignal.timeout(5000),
});
if (!response.ok) {
throw new Error(`Endpoint returned ${response.status}`);
}
} catch (err) {
throw new Error(
`Cannot reach webhook endpoint: ${(err as Error).message}`,
);
}

// 开始轮询新消息(示例:每 5 秒)
this.pollTimer = setInterval(() => {
this.pollMessages().catch((err) => {
this.ctx.log.error('Poll error', err as Error);
});
}, 5000);

this.connected = true;
this.ctx.emitStatusChange('connected');
this.ctx.log.info('Connected successfully');
}

async disconnect(): Promise<void> {
if (this.pollTimer) {
clearInterval(this.pollTimer);
this.pollTimer = null;
}
this.connected = false;
this.ctx.emitStatusChange('disconnected');
this.ctx.log.info('Disconnected');
}

isConnected(): boolean {
return this.connected;
}

async sendMessage(chatId: string, text: string): Promise<void> {
const url = this.credentials.incomingUrl;
if (!url) throw new Error('Not connected');

const body = JSON.stringify({
chatId,
text,
secret: this.credentials.outgoingSecret,
});

const response = await fetch(url, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body,
signal: AbortSignal.timeout(10000),
});

if (!response.ok) {
throw new Error(`Send failed: HTTP ${response.status}`);
}

this.ctx.log.debug('Message sent', { chatId, length: text.length });
}

async dispose(): Promise<void> {
await this.disconnect();
}

// ─── 内部方法 ──────────────────────────

private async pollMessages(): Promise<void> {
const lastPollTime = await this.ctx.storage.get<string>('lastPollTime');
const since = lastPollTime || new Date(0).toISOString();

const url = `${this.credentials.incomingUrl}?since=${encodeURIComponent(since)}`;

const response = await fetch(url, {
signal: AbortSignal.timeout(5000),
});

if (!response.ok) return;

const data = (await response.json()) as {
messages: Array<{
id: string;
chatId: string;
senderId: string;
senderName: string;
content: string;
timestamp: string;
}>;
};

for (const msg of data.messages) {
this.ctx.emitMessage({
id: msg.id,
chatId: msg.chatId,
senderId: msg.senderId,
senderName: msg.senderName,
content: msg.content,
timestamp: msg.timestamp,
isFromMe: false,
isGroup: false,
});
}

if (data.messages.length > 0) {
const latest = data.messages[data.messages.length - 1];
await this.ctx.storage.set('lastPollTime', latest.timestamp);
}
}
}

/**
* 工厂函数 — 插件的默认导出
*/
const factory: ChannelPluginFactory = (ctx) => new WebhookPlugin(ctx);
export default factory;

第四步:配置构建

创建 tsconfig.json

{
"compilerOptions": {
"target": "ES2022",
"module": "ESNext",
"moduleResolution": "bundler",
"strict": true,
"outDir": "dist",
"declaration": false,
"skipLibCheck": true
},
"include": ["src"]
}

package.json 中添加构建脚本:

{
"name": "elftia-channel-webhook",
"version": "1.0.0",
"main": "dist/index.cjs",
"scripts": {
"build": "tsup src/index.ts --format cjs --outDir dist --clean",
"dev": "tsup src/index.ts --format cjs --outDir dist --watch"
},
"devDependencies": {
"@elftia/channel-sdk": "^1.0.0",
"tsup": "^8.0.0",
"typescript": "^5.6.0"
}
}

构建插件:

npm run build

确认 dist/index.cjs 已生成。

第五步:本地测试

将插件目录链接到 Elftia 的插件目录:

# Windows
mklink /J "%APPDATA%\elftia\channel-plugins\webhook" "C:\path\to\elftia-channel-webhook"

# macOS / Linux
ln -s /path/to/elftia-channel-webhook ~/.config/elftia/channel-plugins/webhook

或者使用 Elftia 的本地安装功能:

ChannelPluginLoader.installFromLocal('/path/to/elftia-channel-webhook')

验证加载

  1. 启动 Elftia
  2. 打开 设置 → Channel → 添加渠道
  3. 确认列表中出现 Webhook 选项
  4. 创建实例,填写凭证,测试连接

开发模式

在开发过程中使用 npm run dev(watch 模式),修改代码后 tsup 会自动重新编译。重启 Elftia 即可加载最新代码。

第六步:发布到 Marketplace

打包

将插件打包为 .zip 文件(根目录必须包含 elftia-channel.json):

cd elftia-channel-webhook
zip -r elftia-channel-webhook-1.0.0.zip \
elftia-channel.json \
package.json \
dist/

发布

提交到 Elftia Channel Plugin Marketplace:

  1. 确保 elftia-channel.json 中的版本号正确
  2. 计算 zip 文件的 SHA-256 校验和
  3. 提交 zip 文件和校验和到 Marketplace 仓库
  4. 等待审核通过后,插件会出现在 CDN 的 channel-manifest.json

开发注意事项

入口文件格式

插件入口必须是 CommonJS 格式(.cjs 或标准 .js),因为 Elftia 使用 require() 加载插件:

const mod = require(entryPath);
const factory = mod.default || mod;

错误处理

  • connect() 中的错误会被 Registry 捕获并将实例状态设为 error
  • sendMessage() 中的错误会被 Router 捕获并记录日志
  • 建议使用 ctx.log.error() 记录内部错误
  • 网络请求务必设置超时

状态上报

使用 ctx.emitStatusChange() 及时上报状态变化:

状态何时上报
connecting开始建立连接
connected连接成功
disconnected主动断开连接
error连接失败或运行时错误
reconnecting自动重连中

Registry 会对状态进行标准化处理:不会将 connected 降级为 connecting(防止自动重连导致状态闪烁)。

存储最佳实践

  • 使用 ctx.storage 存储需要跨会话保留的状态(如轮询偏移量)
  • 值会被 JSON 序列化,确保存储的数据可序列化
  • 实例删除时,框架会自动清除该实例的所有存储数据

数据目录

ctx.dataDir 提供了一个持久化的文件系统目录,适合存储:

  • 下载的文件(如接收到的图片)
  • 缓存数据
  • 临时文件

目录由框架自动创建,路径格式:{userData}/channel-data/{channelId}/

System Prompt 注入

如果你的平台有特殊的消息格式或能力标签,可以实现 getSystemPrompt() 方法:

getSystemPrompt(context: SystemPromptContext): string | undefined {
if (context.isGroup) {
return `你正在一个 Webhook 群组聊天中。使用纯文本格式回复。`;
}
return `你正在与 ${context.senderName} 进行 Webhook 私聊。`;
}

返回的文本会被追加到 Agent 的基础 system prompt 中,让 AI 了解当前平台的上下文。

下一步