import { DWClient, TOPIC_ROBOT, type DWClientDownStream } from "dingtalk-stream";
import type { OpenClawConfig, RuntimeEnv } from "openclaw/plugin-sdk";
import type { DingTalkMessageData, ResolvedDingTalkAccount, DingTalkGroupConfig, AudioContent, VideoContent, FileContent, PictureContent, RichTextContent, RichTextElement, RichTextPictureElement } from "./types.js";
import { replyViaWebhook, getFileDownloadUrl, downloadFromUrl, sendTextMessage } from "./client.js";
import { resolveDingTalkAccount } from "./accounts.js";
import { getDingTalkRuntime } from "./runtime.js";
import { logger } from "./logger.js";
import { PLUGIN_ID } from "./constants.js";
// ============================================================================
// 媒体信息类型定义
// ============================================================================
/** 媒体类型枚举(与钉钉消息类型一致) */
export type MediaKind = "picture" | "audio" | "video" | "file";
/** 单个媒体项 */
export interface MediaItem {
/** 媒体类型 */
kind: MediaKind;
/** 本地文件路径 */
path: string;
/** MIME 类型 */
contentType: string;
/** 文件名(可选) */
fileName?: string;
/** 文件大小(字节) */
fileSize?: number;
/** 时长(秒,音视频专用) */
duration?: number;
}
/** 入站消息的媒体上下文 */
export interface InboundMediaContext {
/** 媒体项列表(支持多媒体混排) */
items: MediaItem[];
/** 主媒体(第一个媒体项,兼容旧逻辑) */
primary?: MediaItem;
}
/** 生成媒体占位符文本 */
function generateMediaPlaceholder(media: InboundMediaContext): string {
if (media.items.length === 0) return "";
return media.items
.map((item) => {
switch (item.kind) {
case "picture":
return "<media:picture>";
case "audio":
return `<media:audio${item.duration ? ` duration=${item.duration}s` : ""}>`;
case "video":
return `<media:video${item.duration ? ` duration=${item.duration}s` : ""}>`;
case "file":
return `<media:file${item.fileName ? ` name="${item.fileName}"` : ""}>`;
default:
return `<media:${item.kind}>`;
}
})
.join(" ");
}
/** 从 InboundMediaContext 构建上下文的媒体字段 */
function buildMediaContextFields(media?: InboundMediaContext): Record<string, unknown> {
if (!media || media.items.length === 0) {
return {};
}
const primary = media.primary ?? media.items[0];
// 基础字段(兼容旧逻辑,使用主媒体)
const baseFields: Record<string, unknown> = {
MediaPath: primary.path,
MediaType: primary.contentType,
MediaUrl: primary.path,
};
// 多媒体字段(与 Telegram 保持一致的命名)
// 即使只有一个媒体也添加这些字段,保持一致性
if (media.items.length > 0) {
baseFields.MediaPaths = media.items.map((m) => m.path);
baseFields.MediaUrls = media.items.map((m) => m.path);
baseFields.MediaTypes = media.items.map((m) => m.contentType).filter(Boolean);
}
// 根据主媒体类型添加特定字段
if (primary.kind === "audio" || primary.kind === "video") {
if (primary.duration !== undefined) {
baseFields.MediaDuration = primary.duration;
}
}
if (primary.kind === "file") {
if (primary.fileName) {
baseFields.MediaFileName = primary.fileName;
}
if (primary.fileSize !== undefined) {
baseFields.MediaFileSize = primary.fileSize;
}
}
return baseFields;
}
// ============================================================================
// 消息处理器类型定义
// ============================================================================
/** 消息处理结果 */
interface MessageHandleResult {
/** 是否成功处理 */
success: boolean;
/** 媒体上下文(支持多媒体混排) */
media?: InboundMediaContext;
/** 错误信息 */
errorMessage?: string;
/** 是否需要跳过后续处理 */
skipProcessing?: boolean;
}
/** 消息处理器接口 */
interface MessageHandler {
/** 是否能处理该消息类型 */
canHandle(data: DingTalkMessageData): boolean;
/** 获取消息预览(用于日志) */
getPreview(data: DingTalkMessageData): string;
/** 校验消息 */
validate(data: DingTalkMessageData): { valid: boolean; errorMessage?: string };
/** 处理消息 */
handle(data: DingTalkMessageData, account: ResolvedDingTalkAccount): Promise<MessageHandleResult>;
}
// ============================================================================
// 消息处理器实现
// ============================================================================
/** 文本消息处理器 */
const textMessageHandler: MessageHandler = {
canHandle: (data) => data.msgtype === "text",
getPreview: (data) => {
const text = data.text?.content?.trim() ?? "";
return text.slice(0, 50) + (text.length > 50 ? "..." : "");
},
validate: (data) => {
const text = data.text?.content?.trim() ?? "";
if (!text) {
return { valid: false, errorMessage: undefined }; // 空消息静默忽略,不需要回复错误
}
return { valid: true };
},
handle: async () => {
// 文本消息不需要预处理,直接返回成功
return { success: true };
},
};
/** 图片消息处理器 */
const pictureMessageHandler: MessageHandler = {
canHandle: (data) => data.msgtype === "picture",
getPreview: () => "[图片]",
validate: (data) => {
const content = data.content as PictureContent | undefined;
const downloadCode = content?.downloadCode ?? content?.pictureDownloadCode;
if (!downloadCode) {
return { valid: false, errorMessage: "图片处理失败:缺少下载码" };
}
return { valid: true };
},
handle: async (data, account) => {
const content = data.content as PictureContent;
const downloadCode = (content?.downloadCode ?? content?.pictureDownloadCode)!;
try {
const saved = await downloadAndSaveMedia({
downloadCode,
account,
mediaKind: "picture",
extension: content?.extension,
});
const mediaItem: MediaItem = {
kind: "picture",
path: saved.path,
contentType: saved.contentType,
fileSize: saved.fileSize,
};
return {
success: true,
media: { items: [mediaItem], primary: mediaItem },
};
} catch (err) {
logger.error("图片处理失败:", err);
return { success: false, errorMessage: `图片处理失败:${getErrorMessage(err)}` };
}
},
};
/** 语音消息处理器 */
const audioMessageHandler: MessageHandler = {
canHandle: (data) => data.msgtype === "audio",
getPreview: (data) => {
const content = data.content as AudioContent | undefined;
const duration = content?.duration;
return duration ? `[语音 ${Number(duration).toFixed(1)}s]` : "[语音]";
},
validate: (data) => {
const content = data.content as AudioContent | undefined;
if (!content?.downloadCode) {
return { valid: false, errorMessage: "语音处理失败:缺少下载码" };
}
return { valid: true };
},
handle: async (data, account) => {
const content = data.content as AudioContent;
const downloadCode = content.downloadCode!;
try {
const saved = await downloadAndSaveMedia({
downloadCode,
account,
mediaKind: "audio",
extension: content.extension ?? "amr",
});
const mediaItem: MediaItem = {
kind: "audio",
path: saved.path,
contentType: saved.contentType,
fileSize: saved.fileSize,
duration: content.duration != null ? Number(content.duration) : undefined,
};
return {
success: true,
media: { items: [mediaItem], primary: mediaItem },
};
} catch (err) {
logger.error("语音处理失败:", err);
return { success: false, errorMessage: `语音处理失败:${getErrorMessage(err)}` };
}
},
};
/** 视频消息处理器 */
const videoMessageHandler: MessageHandler = {
canHandle: (data) => data.msgtype === "video",
getPreview: (data) => {
const content = data.content as VideoContent | undefined;
const duration = content?.duration;
return duration ? `[视频 ${Number(duration).toFixed(1)}s]` : "[视频]";
},
validate: (data) => {
const content = data.content as VideoContent | undefined;
if (!content?.downloadCode) {
return { valid: false, errorMessage: "视频处理失败:缺少下载码" };
}
return { valid: true };
},
handle: async (data, account) => {
const content = data.content as VideoContent;
const downloadCode = content.downloadCode!;
try {
const saved = await downloadAndSaveMedia({
downloadCode,
account,
mediaKind: "video",
extension: content.extension ?? "mp4",
});
const mediaItem: MediaItem = {
kind: "video",
path: saved.path,
contentType: saved.contentType,
fileSize: saved.fileSize,
duration: content.duration != null ? Number(content.duration) : undefined,
};
return {
success: true,
media: { items: [mediaItem], primary: mediaItem },
};
} catch (err) {
logger.error("视频处理失败:", err);
return { success: false, errorMessage: `视频处理失败:${getErrorMessage(err)}` };
}
},
};
/** 文件消息处理器 */
const fileMessageHandler: MessageHandler = {
canHandle: (data) => data.msgtype === "file",
getPreview: (data) => {
const content = data.content as FileContent | undefined;
const fileName = content?.fileName;
return fileName ? `[文件] ${fileName}` : "[文件]";
},
validate: (data) => {
const content = data.content as FileContent | undefined;
if (!content?.downloadCode) {
return { valid: false, errorMessage: "文件处理失败:缺少下载码" };
}
return { valid: true };
},
handle: async (data, account) => {
const content = data.content as FileContent;
const downloadCode = content.downloadCode!;
try {
const saved = await downloadAndSaveMedia({
downloadCode,
account,
mediaKind: "file",
extension: content.extension,
fileName: content.fileName,
});
const mediaItem: MediaItem = {
kind: "file",
path: saved.path,
contentType: saved.contentType,
fileSize: saved.fileSize,
fileName: content.fileName,
};
return {
success: true,
media: { items: [mediaItem], primary: mediaItem },
};
} catch (err) {
logger.error("文件处理失败:", err);
return { success: false, errorMessage: `文件处理失败:${getErrorMessage(err)}` };
}
},
};
// ============================================================================
// 富文本消息处理辅助函数
// ============================================================================
/** 判断富文本元素是否为图片 */
function isRichTextPicture(element: RichTextElement): element is RichTextPictureElement {
return element.type === "picture";
}
/** 从富文本元素中提取下载码 */
function getRichTextPictureDownloadCode(element: RichTextPictureElement): string | undefined {
return element.downloadCode ?? element.pictureDownloadCode;
}
/** 解析富文本内容,提取文本和图片信息 */
function parseRichTextContent(content: RichTextContent): {
textParts: string[];
imageInfos: Array<{
downloadCode: string;
width?: number;
height?: number;
extension?: string;
}>;
} {
const textParts: string[] = [];
const imageInfos: Array<{
downloadCode: string;
width?: number;
height?: number;
extension?: string;
}> = [];
for (const element of content.richText) {
if (isRichTextPicture(element)) {
// 图片元素
const downloadCode = getRichTextPictureDownloadCode(element);
if (downloadCode) {
imageInfos.push({
downloadCode,
width: element.width,
height: element.height,
extension: element.extension,
});
}
} else {
// 文本元素(type 为 undefined 或 "text")
if (element.text) {
textParts.push(element.text);
}
}
}
return { textParts, imageInfos };
}
/** 富文本消息处理器 */
const richTextMessageHandler: MessageHandler = {
canHandle: (data) => data.msgtype === "richText",
getPreview: (data) => {
const content = data.content as RichTextContent | undefined;
if (!content?.richText) return "[富文本]";
const { textParts, imageInfos } = parseRichTextContent(content);
const textPreview = textParts.join(" ").slice(0, 30);
const imageCount = imageInfos.length;
if (textPreview && imageCount > 0) {
return `[图文] ${textPreview}${textParts.join(" ").length > 30 ? "..." : ""} +${imageCount}图`;
} else if (textPreview) {
return `[富文本] ${textPreview}${textParts.join(" ").length > 30 ? "..." : ""}`;
} else if (imageCount > 0) {
return `[图文] ${imageCount}张图片`;
}
return "[富文本]";
},
validate: (data) => {
const content = data.content as RichTextContent | undefined;
if (!content?.richText || !Array.isArray(content.richText)) {
return { valid: false, errorMessage: "富文本消息格式错误" };
}
// 至少要有文本或图片
const { textParts, imageInfos } = parseRichTextContent(content);
if (textParts.length === 0 && imageInfos.length === 0) {
return { valid: false, errorMessage: undefined }; // 空消息静默忽略
}
return { valid: true };
},
handle: async (data, account) => {
const content = data.content as RichTextContent;
const { textParts, imageInfos } = parseRichTextContent(content);
try {
const mediaItems: MediaItem[] = [];
// 下载并保存所有图片
for (let i = 0; i < imageInfos.length; i++) {
const imgInfo = imageInfos[i];
logger.log(`处理富文本图片 ${i + 1}/${imageInfos.length}...`);
const saved = await downloadAndSaveMedia({
downloadCode: imgInfo.downloadCode,
account,
mediaKind: "picture",
extension: imgInfo.extension,
});
mediaItems.push({
kind: "picture",
path: saved.path,
contentType: saved.contentType,
fileSize: saved.fileSize,
});
}
// 构建媒体上下文
// 对于图文混排,将文本内容存入 data.text 以便后续处理
// 这里通过修改 data 对象来传递文本内容
const combinedText = textParts.join("\n").trim();
if (combinedText) {
// 将富文本中的文本内容写入 text 字段,以便后续流程使用
data.text = { content: combinedText };
}
const media: InboundMediaContext | undefined = mediaItems.length > 0
? { items: mediaItems, primary: mediaItems[0] }
: undefined;
return {
success: true,
media,
};
} catch (err) {
logger.error("富文本消息处理失败:", err);
return { success: false, errorMessage: `富文本消息处理失败:${getErrorMessage(err)}` };
}
},
};
/** 不支持的消息类型处理器 */
const unsupportedMessageHandler: MessageHandler = {
canHandle: () => true, // 作为兜底处理器
getPreview: (data) => `[${data.msgtype}]`,
validate: () => ({
valid: false,
errorMessage: "暂不支持该类型消息,请发送文本、图片、语音、视频、文件或图文混排消息。",
}),
handle: async () => {
return { success: false, skipProcessing: true };
},
};
/** 消息处理器注册表(按优先级排序) */
const messageHandlers: MessageHandler[] = [
textMessageHandler,
pictureMessageHandler,
audioMessageHandler,
videoMessageHandler,
fileMessageHandler,
richTextMessageHandler,
unsupportedMessageHandler, // 兜底处理器必须放在最后
];
/** 获取消息处理器 */
function getMessageHandler(data: DingTalkMessageData): MessageHandler {
return messageHandlers.find((h) => h.canHandle(data))!;
}
/** 通过 webhook 发送错误回复(静默失败) */
function replyError(webhook: string | undefined, message: string | undefined): void {
if (!webhook || !message) return;
replyViaWebhook(webhook, message).catch((err) => {
logger.error("回复错误提示失败:", err);
});
}
export interface MonitorOptions {
clientId: string;
clientSecret: string;
accountId: string;
config: OpenClawConfig;
runtime?: RuntimeEnv;
abortSignal?: AbortSignal;
}
export type MonitorResult = Promise<void>;
// Track runtime state in memory
const runtimeState = new Map<
string,
{
running: boolean;
lastStartAt: number | null;
lastStopAt: number | null;
lastError: string | null;
lastInboundAt?: number | null;
lastOutboundAt?: number | null;
}
>();
function recordChannelRuntimeState(params: {
channel: string;
accountId: string;
state: Partial<{
running: boolean;
lastStartAt: number | null;
lastStopAt: number | null;
lastError: string | null;
lastInboundAt: number | null;
lastOutboundAt: number | null;
}>;
}): void {
const key = `${params.channel}:${params.accountId}`;
const existing = runtimeState.get(key) ?? {
running: false,
lastStartAt: null,
lastStopAt: null,
lastError: null,
};
runtimeState.set(key, { ...existing, ...params.state });
}
export function getDingTalkRuntimeState(accountId: string) {
return runtimeState.get(`${PLUGIN_ID}:${accountId}`);
}
// ============================================================================
// 媒体下载与保存
// ============================================================================
/** 媒体下载保存选项 */
interface DownloadMediaOptions {
/** 下载码 */
downloadCode: string;
/** 账户配置 */
account: ResolvedDingTalkAccount;
/** 媒体类型(用于日志) */
mediaKind: MediaKind;
/** 文件扩展名(可选,用于确定 MIME 和文件后缀) */
extension?: string;
/** 原始文件名(可选,用于保存时保留后缀) */
fileName?: string;
/** 强制指定的 contentType */
contentType?: string;
}
/** 媒体下载保存结果 */
interface DownloadMediaResult {
path: string;
contentType: string;
fileSize: number;
}
/**
* 下载钉钉媒体文件并保存到本地(通用函数)
* 失败时直接抛出错误,错误消息可直接展示给用户
*/
async function downloadAndSaveMedia(
options: DownloadMediaOptions
): Promise<DownloadMediaResult> {
const { downloadCode, account, mediaKind, fileName } = options;
const pluginRuntime = getDingTalkRuntime();
const kindLabel = {
picture: "图片",
audio: "语音",
video: "视频",
file: "文件",
}[mediaKind];
// 1. 获取下载链接
const downloadUrl = await getFileDownloadUrl(downloadCode, account);
logger.log(`获取${kindLabel}下载链接成功`);
// 2. 下载文件
const buffer = await downloadFromUrl(downloadUrl);
const sizeStr = buffer.length > 1024 * 1024
? `${(buffer.length / 1024 / 1024).toFixed(2)} MB`
: `${(buffer.length / 1024).toFixed(2)} KB`;
logger.log(`下载${kindLabel}成功,大小: ${sizeStr}`);
// 3. 使用 OpenClaw 的 media 工具保存,让 OpenClaw 自己处理文件名和后缀
const saved = await pluginRuntime.channel.media.saveMediaBuffer(
buffer,
undefined, // contentType: 让 OpenClaw 自动检测
"inbound",
buffer.length, // maxBytes: 使用实际大小,避免默认 5MB 限制
fileName // originalFilename: 直接传原始文件名
);
logger.log(`${kindLabel}已保存到: ${saved.path}`);
return {
path: saved.path,
contentType: saved.contentType ?? "application/octet-stream",
fileSize: buffer.length,
};
}
/** 提取错误消息(不含堆栈) */
function getErrorMessage(err: unknown): string {
if (err instanceof Error) {
return err.message;
}
return String(err);
}
/**
* 启动钉钉 Stream 监听器
*/
export function monitorDingTalkProvider(options: MonitorOptions): MonitorResult {
const { clientId, clientSecret, accountId, config, abortSignal } = options;
const pluginRuntime = getDingTalkRuntime();
const account = resolveDingTalkAccount({ cfg: config, accountId });
/** 检查发送者是否在 allowFrom 白名单中 */
const isSenderAllowed = (senderId: string): boolean => {
const allowList = account.allowFrom.map((entry) => String(entry).trim()).filter(Boolean);
if (allowList.length === 0 || allowList.includes("*")) {
return true;
}
const prefixPattern = new RegExp(`^${PLUGIN_ID}:(?:user:)?`, "i");
return allowList
.map((entry) => entry.replace(prefixPattern, ""))
.includes(senderId);
};
// Record starting state
recordChannelRuntimeState({
channel: PLUGIN_ID,
accountId,
state: {
running: true,
lastStartAt: Date.now(),
},
});
// 创建钉钉 Stream 客户端
const client = new DWClient({
clientId,
clientSecret,
debug: false,
});
// ============================================================================
// 群聊策略与 Mention 门控
// ============================================================================
/** 解析群组配置(按 openConversationId 查找) */
const resolveGroupConfig = (groupId: string): DingTalkGroupConfig | undefined => {
const groups = account.groups;
if (!groups) return undefined;
// 精确匹配或不区分大小写匹配
const key = Object.keys(groups).find(
(k) => k === groupId || k.toLowerCase() === groupId.toLowerCase()
);
return key ? groups[key] : undefined;
};
/** 检查群聊是否被允许 */
const isGroupAllowed = (groupId: string): boolean => {
const policy = account.groupPolicy;
if (policy === "disabled") return false;
if (policy === "open") return true;
// allowlist
const allowList = account.groupAllowFrom.map((e) => String(e).trim()).filter(Boolean);
if (allowList.length === 0 || allowList.includes("*")) return true;
return allowList.some((entry) => entry === groupId || entry.toLowerCase() === groupId.toLowerCase());
};
/** 检查机器人是否被 @ */
const checkBotMentioned = (data: DingTalkMessageData): boolean => {
// 钉钉 isInAtList 字段标识当前机器人是否在 @列表中
if (data.isInAtList) return true;
// 备用检查:atUsers 中是否包含 chatbotUserId
if (data.atUsers?.some((u) => u.dingtalkId === data.chatbotUserId)) return true;
return false;
};
// ============================================================================
// 消息处理核心逻辑
// ============================================================================
/** 构建发送者信息 */
const buildSenderInfo = (data: DingTalkMessageData) => {
const senderId = data.senderStaffId;
const senderName = data.senderNick;
const isGroup = data.conversationType === "2";
const groupId = data.openConversationId ?? data.conversationId;
// 参照飞书:From 始终标识发送者身份,避免不同用户被视为同一人
// To 区分群聊和单聊目标
const chatId = isGroup ? groupId : senderId;
const fromAddress = `${PLUGIN_ID}:${senderId}`;
const toAddress = isGroup ? `${PLUGIN_ID}:chat:${groupId}` : `${PLUGIN_ID}:user:${senderId}`;
const label = isGroup
? (data.conversationTitle ?? groupId)
: (senderName || senderId);
return {
senderId,
senderName,
chatId,
fromAddress,
toAddress,
label,
isGroup,
groupId: isGroup ? groupId : undefined,
conversationTitle: data.conversationTitle,
};
};
/** 构建消息体内容 */
const buildMessageBody = (data: DingTalkMessageData, media?: InboundMediaContext) => {
const textContent = data.text?.content?.trim() ?? "";
const mediaPlaceholder = media ? generateMediaPlaceholder(media) : "";
// 优先使用文本内容,如果没有则使用媒体占位符
const rawBody = textContent || mediaPlaceholder;
return { textContent, rawBody };
};
/** 构建入站消息上下文 */
const buildInboundContext = (
data: DingTalkMessageData,
sender: ReturnType<typeof buildSenderInfo>,
rawBody: string,
media?: InboundMediaContext
) => {
const isGroup = sender.isGroup;
const chatType = isGroup ? "group" : "direct";
// 解析路由:群聊以群 ID 为 peer,单聊以用户 ID 为 peer
const route = pluginRuntime.channel.routing.resolveAgentRoute({
cfg: config,
channel: PLUGIN_ID,
accountId,
peer: {
kind: isGroup ? "group" : "dm",
id: sender.chatId,
},
});
// 格式化入站消息体
const envelopeOptions = pluginRuntime.channel.reply.resolveEnvelopeFormatOptions(config);
const body = pluginRuntime.channel.reply.formatInboundEnvelope({
channel: "DingTalk",
from: isGroup ? `${sender.senderName} in ${sender.conversationTitle ?? sender.groupId}` : sender.label,
timestamp: parseInt(data.createAt),
body: rawBody,
chatType,
sender: {
id: sender.senderId,
name: sender.senderName,
},
envelope: envelopeOptions,
});
// 构建基础上下文
const baseContext: Record<string, unknown> = {
Body: body,
RawBody: rawBody,
CommandBody: rawBody,
From: sender.fromAddress,
To: sender.toAddress,
SessionKey: route.sessionKey,
AccountId: accountId,
ChatType: chatType,
ConversationLabel: sender.label,
SenderId: sender.senderId,
SenderName: sender.senderName,
Provider: PLUGIN_ID,
Surface: PLUGIN_ID,
MessageSid: data.msgId,
Timestamp: parseInt(data.createAt),
WasMentioned: checkBotMentioned(data),
OriginatingChannel: PLUGIN_ID,
OriginatingTo: sender.toAddress,
CommandAuthorized: isSenderAllowed(sender.senderId),
};
// 群聊特有字段
if (isGroup) {
baseContext.GroupSubject = sender.conversationTitle ?? sender.groupId;
}
// 合并媒体字段
const mediaFields = buildMediaContextFields(media);
return pluginRuntime.channel.reply.finalizeInboundContext({
...baseContext,
...mediaFields,
});
};
/** 创建回复分发器 */
const createReplyDispatcher = (data: DingTalkMessageData) => ({
deliver: async (payload: { text?: string }) => {
const replyText = payload.text ?? "";
if (!replyText) return;
const isGroup = data.conversationType === "2";
const groupId = data.openConversationId ?? data.conversationId;
// 优先使用 sessionWebhook 回复(群聊/单聊通用)
if (data.sessionWebhook) {
const result = await replyViaWebhook(data.sessionWebhook, replyText);
if (result.errcode === 0) {
recordChannelRuntimeState({
channel: PLUGIN_ID,
accountId,
state: { lastOutboundAt: Date.now() },
});
return;
}
// webhook 失败(可能已过期),尝试主动发送 API 降级
logger.warn(`Webhook 回复失败 (errcode: ${result.errcode}), 尝试主动发送 API 降级`);
}
// 降级:通过主动发送 API
const to = isGroup ? `chat:${groupId}` : data.senderStaffId;
await sendTextMessage(to, replyText, { account });
recordChannelRuntimeState({
channel: PLUGIN_ID,
accountId,
state: { lastOutboundAt: Date.now() },
});
},
onError: (err: unknown, info: { kind: string }) => {
logger.error(`${info.kind} reply failed:`, err);
},
});
/** 异步处理消息(不阻塞钉钉响应) */
const processMessageAsync = async (
data: DingTalkMessageData,
media?: InboundMediaContext
) => {
try {
// 1. 构建发送者信息
const sender = buildSenderInfo(data);
// 2. 构建消息体
const { rawBody } = buildMessageBody(data, media);
// 3. 构建入站上下文
const ctxPayload = buildInboundContext(data, sender, rawBody, media);
// 4. 分发消息给 OpenClaw
const { queuedFinal } = await pluginRuntime.channel.reply.dispatchReplyWithBufferedBlockDispatcher({
ctx: ctxPayload,
cfg: config,
dispatcherOptions: createReplyDispatcher(data),
replyOptions: {},
});
if (!queuedFinal) {
logger.log(`no response generated for message from ${sender.label}`);
}
} catch (error) {
logger.error("处理消息出错:", error);
recordChannelRuntimeState({
channel: PLUGIN_ID,
accountId,
state: {
lastError: error instanceof Error ? error.message : String(error),
},
});
}
};
// 处理消息的回调函数(立即返回成功,异步处理)
const handleMessage = async (message: DWClientDownStream) => {
try {
const data = JSON.parse(message.data) as DingTalkMessageData;
const isGroup = data.conversationType === "2";
const groupId = data.openConversationId ?? data.conversationId;
// 群聊策略检查
if (isGroup) {
if (!isGroupAllowed(groupId)) {
logger.log(`群聊消息被策略拒绝 | groupPolicy: ${account.groupPolicy} | groupId: ${groupId}`);
client.socketCallBackResponse(message.headers.messageId, { status: "SUCCESS" });
return;
}
// 群组级别 enabled 检查
const groupConfig = resolveGroupConfig(groupId);
if (groupConfig?.enabled === false) {
logger.log(`群聊消息被群组配置禁用 | groupId: ${groupId}`);
client.socketCallBackResponse(message.headers.messageId, { status: "SUCCESS" });
return;
}
}
// 获取消息处理器
const handler = getMessageHandler(data);
// 打印收到的消息信息
const preview = handler.getPreview(data);
const chatLabel = isGroup
? `群聊(${data.conversationTitle ?? groupId})`
: "单聊";
logger.log(`收到消息 | ${chatLabel} | ${data.senderNick}(${data.senderStaffId}) | ${preview}`);
// 记录入站活动
recordChannelRuntimeState({
channel: PLUGIN_ID,
accountId,
state: { lastInboundAt: Date.now() },
});
// 立即返回成功响应给钉钉服务器,避免超时
client.socketCallBackResponse(message.headers.messageId, { status: "SUCCESS" });
// 校验消息
const validation = handler.validate(data);
if (!validation.valid) {
replyError(data.sessionWebhook, validation.errorMessage);
return;
}
// 异步处理消息
handler.handle(data, account)
.then((result) => {
if (!result.success) {
replyError(data.sessionWebhook, result.errorMessage);
return;
}
if (result.skipProcessing) {
return;
}
// 分发消息给 OpenClaw
return processMessageAsync(data, result.media);
})
.catch((err) => {
const errMsg = getErrorMessage(err);
logger.error(`处理 ${data.msgtype} 消息失败:`, err);
replyError(data.sessionWebhook, `消息处理失败:${errMsg}`);
});
} catch (error) {
const errMsg = getErrorMessage(error);
logger.error("解析消息出错:", error);
recordChannelRuntimeState({
channel: PLUGIN_ID,
accountId,
state: {
lastError: errMsg,
},
});
client.socketCallBackResponse(message.headers.messageId, { status: "FAILURE" });
}
};
// 注册消息监听器
client.registerCallbackListener(TOPIC_ROBOT, handleMessage);
// 注册连接事件
client.on("open", () => {
logger.log(`[${accountId}] Stream 连接已建立`);
});
client.on("close", () => {
logger.log(`[${accountId}] Stream 连接已关闭`);
recordChannelRuntimeState({
channel: PLUGIN_ID,
accountId,
state: {
running: false,
lastStopAt: Date.now(),
},
});
});
client.on("error", (error: Error) => {
logger.error(`[${accountId}] Stream 连接错误:`, error);
recordChannelRuntimeState({
channel: PLUGIN_ID,
accountId,
state: {
lastError: error.message,
},
});
});
// 启动连接 — 包装 connect 方法,确保所有调用(含 DWClient 内部自动重连)都不会产生 unhandled rejection
const originalConnect = client.connect.bind(client);
client.connect = () =>
originalConnect().catch((err: unknown) => {
const errMsg = err instanceof Error ? err.message : String(err);
logger.error(`[${accountId}] DingTalk Stream 连接失败: ${errMsg}`);
recordChannelRuntimeState({
channel: PLUGIN_ID,
accountId,
state: {
running: false,
lastStopAt: Date.now(),
lastError: errMsg,
},
});
});
client.connect();
// 返回一个在 abort/disconnect 之前一直 pending 的 Promise。
// OpenClaw 框架将 startAccount 返回的 Promise resolve 视为 "channel 已退出",
// 会触发 auto-restart。因此需要保持 pending 直到 abort。
return new Promise<void>((resolve) => {
const stopHandler = () => {
logger.log(`[${accountId}] 停止 provider`);
client.disconnect();
recordChannelRuntimeState({
channel: PLUGIN_ID,
accountId,
state: {
running: false,
lastStopAt: Date.now(),
},
});
resolve();
};
if (abortSignal) {
abortSignal.addEventListener("abort", stopHandler);
}
});
}