// Monitor: WebSocket connection to Go server for real-time message handling
// Supports: API Token auth, conversation tracking for multi-turn dialogues
import type { PluginLogger, ClawdbotConfig } from "openclaw/plugin-sdk";
import { getAdpOpenclawRuntime, setActiveWebSocket } from "./runtime.js";
import {
getChatHistory,
listSessions,
resolveSessionKey,
type ChatHistoryResponse,
type SessionsListResponse,
} from "./session-history.js";
import {
ADP_UPLOAD_TOOL_NAME,
ADP_UPLOAD_TOOL_SCHEMA,
executeAdpUploadTool,
uploadResultEmitter,
UPLOAD_RESULT_EVENT,
type AdpUploadToolResult,
} from "./adp-upload-tool.js";
import {
formatUploadResultForUser,
formatUploadResultAsMarkdown,
} from "./tool-result-message-blocks.js";
import crypto from "crypto";
// @ts-ignore - import JSON file
import packageJson from "../package.json" with { type: "json" };
// Plugin version from package.json
const PLUGIN_VERSION = packageJson.version;
// WebSocket reconnect delay (fixed at 1 second)
const RECONNECT_DELAY_MS = 1000;
export type MonitorParams = {
wsUrl: string; // WebSocket URL (direct, no conversion needed)
clientToken: string;
signKey?: string; // HMAC key for signature generation
abortSignal?: AbortSignal;
log?: PluginLogger;
cfg?: ClawdbotConfig; // OpenClaw config for model settings
};
// WebSocket message types
const MsgType = {
Auth: "auth",
AuthResult: "auth_result",
Ping: "ping",
Pong: "pong",
Inbound: "inbound",
Outbound: "outbound",
OutboundChunk: "outbound_chunk",
OutboundEnd: "outbound_end",
Ack: "ack",
Error: "error",
// OpenClaw session history: GoServer triggers client to fetch from OpenClaw Gateway
ConvHistory: "conv_history", // Request OpenClaw chat history
ConvResponse: "conv_response", // OpenClaw chat history response
// OpenClaw sessions list
FetchOpenClawSessions: "fetch_openclaw_sessions",
OpenClawSessionsResponse: "openclaw_sessions_response",
} as const;
type WSMessage = {
type: string;
requestId?: string;
payload?: unknown;
timestamp: number;
};
// UserInfo represents full user identity (matching Go server's UserInfo)
type UserInfo = {
userId: string;
username?: string;
avatar?: string;
email?: string;
tenantId?: string;
source?: string;
extra?: Record<string, string>;
};
type InboundMessage = {
id: string;
conversationId: string;
recordId?: string; // Record ID from server for tracking message pairs
clientId: string;
from: string;
to: string;
text: string;
timestamp: number;
user?: UserInfo; // Full user identity information
};
type AuthResultPayload = {
success: boolean;
clientId?: string;
message?: string;
};
// Generate HMAC-SHA256 signature for authentication (includes timestamp for anti-replay)
// Uses signKey as the HMAC key, and "token:nonce:timestamp" as the message
function generateSignature(signKey: string, token: string, nonce: string, timestamp: number): string {
// Use HMAC-SHA256 with signKey as the key, and "token:nonce:timestamp" as the message
return crypto.createHmac("sha256", signKey).update(`${token}:${nonce}:${timestamp}`).digest("hex");
}
// Generate random nonce
function generateNonce(): string {
return crypto.randomBytes(16).toString("hex");
}
// Generate unique request ID
function generateRequestId(): string {
return `req-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`;
}
export async function monitorAdpOpenclaw(params: MonitorParams): Promise<void> {
const { wsUrl, clientToken, signKey, abortSignal, log, cfg } = params;
const runtime = getAdpOpenclawRuntime();
log?.info(`[adp-openclaw] WebSocket monitor started, connecting to ${wsUrl}`);
while (!abortSignal?.aborted) {
try {
await connectAndHandle({
wsUrl,
clientToken,
signKey,
abortSignal,
log,
runtime,
cfg,
});
} catch (err) {
if (abortSignal?.aborted) break;
log?.error(`[adp-openclaw] WebSocket error: ${err}`);
}
// Wait before reconnecting
if (!abortSignal?.aborted) {
log?.info(`[adp-openclaw] Reconnecting in ${RECONNECT_DELAY_MS}ms...`);
await sleep(RECONNECT_DELAY_MS, abortSignal);
}
}
log?.info(`[adp-openclaw] WebSocket monitor stopped`);
}
type ConnectParams = {
wsUrl: string;
clientToken: string;
signKey?: string;
abortSignal?: AbortSignal;
log?: PluginLogger;
runtime: ReturnType<typeof getAdpOpenclawRuntime>;
cfg?: ClawdbotConfig;
};
async function connectAndHandle(params: ConnectParams): Promise<void> {
const { wsUrl, clientToken, signKey, abortSignal, log, runtime, cfg } = params;
// Dynamic import for WebSocket (works in both Node.js and browser)
const WebSocket = (await import("ws")).default;
return new Promise((resolve, reject) => {
const ws = new WebSocket(wsUrl);
let authenticated = false;
let pingInterval: NodeJS.Timeout | null = null;
// Handle abort signal
const abortHandler = () => {
ws.close();
resolve();
};
abortSignal?.addEventListener("abort", abortHandler);
ws.on("open", () => {
log?.info(`[adp-openclaw] WebSocket connected, authenticating...`);
// Save active WebSocket for outbound messaging
setActiveWebSocket(ws);
// Send authentication message with signature (includes timestamp for anti-replay)
const nonce = generateNonce();
const timestamp = Date.now();
// Generate signature only if signKey is provided
const signature = signKey ? generateSignature(signKey, clientToken, nonce, timestamp) : "";
const authMsg: WSMessage = {
type: MsgType.Auth,
requestId: generateRequestId(),
payload: {
token: clientToken,
nonce: signKey ? nonce : undefined,
signature: signKey ? signature : undefined,
timestamp: signKey ? timestamp : undefined, // Include timestamp in payload for server verification
},
timestamp: Date.now(),
};
ws.send(JSON.stringify(authMsg));
});
ws.on("message", async (data: Buffer) => {
try {
const msg: WSMessage = JSON.parse(data.toString());
switch (msg.type) {
case MsgType.AuthResult: {
const result = msg.payload as AuthResultPayload;
if (result.success) {
authenticated = true;
log?.info(`[adp-openclaw] Plugin v${PLUGIN_VERSION} authenticated as client ${result.clientId}`);
// Start ping interval
pingInterval = setInterval(() => {
if (ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify({
type: MsgType.Ping,
requestId: generateRequestId(),
timestamp: Date.now(),
}));
}
}, 25000);
} else {
log?.error(`[adp-openclaw] Authentication failed: ${result.message}`);
ws.close();
}
break;
}
case MsgType.Pong:
// Heartbeat response, connection is alive
break;
case MsgType.Inbound: {
if (!authenticated) break;
// Debug: log raw payload to verify recordId is received
log?.info(`[adp-openclaw] Raw payload: ${JSON.stringify(msg.payload)}`);
const inMsg = msg.payload as InboundMessage;
log?.info(`[adp-openclaw] Received: ${inMsg.from}: ${inMsg.text} (conv=${inMsg.conversationId}, rec=${inMsg.recordId || 'none'}, user=${JSON.stringify(inMsg.user || {})})`);
// Process the message with full user identity
try {
// Build user identity string for From field (like Feishu: "feishu:user_id")
const userIdentifier = inMsg.user?.userId || inMsg.from;
const tenantPrefix = inMsg.user?.tenantId ? `${inMsg.user.tenantId}:` : "";
// Build metadata for user context (passed through to openclaw)
const userMetadata: Record<string, string> = {};
if (inMsg.user) {
if (inMsg.user.username) userMetadata.username = inMsg.user.username;
if (inMsg.user.email) userMetadata.email = inMsg.user.email;
if (inMsg.user.avatar) userMetadata.avatar = inMsg.user.avatar;
if (inMsg.user.tenantId) userMetadata.tenantId = inMsg.user.tenantId;
if (inMsg.user.source) userMetadata.source = inMsg.user.source;
if (inMsg.user.extra) {
Object.entries(inMsg.user.extra).forEach(([k, v]) => {
userMetadata[`extra_${k}`] = v;
});
}
}
// Use resolveAgentRoute to get proper sessionKey (like QQBot does)
// This ensures session history is correctly associated
const peerId = inMsg.conversationId;
const route = runtime.channel.routing.resolveAgentRoute({
cfg: {
...(cfg ?? {}),
session: {
...(cfg?.session ?? {}),
// Override dmScope to "per-peer" so each user gets their own session
// This prevents all DM users from sharing the same "main" session
dmScope: "per-peer",
},
},
channel: "adp-openclaw",
accountId: "default",
peer: {
kind: "dm", // direct message
id: peerId,
},
});
// Get envelope format options and messages config (like QQBot does)
const envelopeOptions = runtime.channel.reply.resolveEnvelopeFormatOptions(cfg ?? {});
const messagesConfig = runtime.channel.reply.resolveEffectiveMessagesConfig(cfg ?? {}, route.agentId);
// Use formatInboundEnvelope to format the message body (like QQBot does)
const formattedBody = runtime.channel.reply.formatInboundEnvelope({
channel: "ADP-OpenClaw",
from: inMsg.user?.username ?? inMsg.from,
timestamp: inMsg.timestamp || Date.now(),
body: inMsg.text,
chatType: "direct",
sender: {
id: userIdentifier,
name: inMsg.user?.username,
},
envelope: envelopeOptions,
});
const ctx = runtime.channel.reply.finalizeInboundContext({
Body: formattedBody,
RawBody: inMsg.text,
CommandBody: inMsg.text,
// User identity: format as "adp-openclaw:{tenantId}:{userId}" for multi-tenant support
From: `adp-openclaw:${tenantPrefix}${userIdentifier}`,
To: `adp-openclaw:bot`,
// SessionKey from resolveAgentRoute for proper session history tracking
SessionKey: route.sessionKey,
AccountId: route.accountId,
ChatType: "direct",
// SenderId carries the raw user ID for identification
SenderId: userIdentifier,
SenderName: inMsg.user?.username,
Provider: "adp-openclaw",
Surface: inMsg.user?.source || "adp-openclaw",
MessageSid: inMsg.id,
MessageSidFull: inMsg.id,
Timestamp: inMsg.timestamp || Date.now(),
OriginatingChannel: "adp-openclaw",
OriginatingTo: "adp-openclaw:bot",
// Pass user metadata through context (like Feishu does)
...userMetadata,
});
// Generate unique stream ID for this response
const streamId = `stream-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`;
let chunkIndex = 0;
let lastPartialText = ""; // Track last sent text for delta calculation
let finalSent = false; // Track if outbound_end has been sent
const displayName = inMsg.user?.username || inMsg.from;
// 收集上传结果,在发送最终回复时追加完整的下载链接
let pendingUploadResults: AdpUploadToolResult[] = [];
// 监听上传结果事件
const uploadResultHandler = (event: { toolCallId: string; result: AdpUploadToolResult }) => {
log?.info(`[adp-openclaw] Received upload result event for toolCallId=${event.toolCallId}`);
if (event.result.ok && event.result.files && event.result.files.length > 0) {
pendingUploadResults.push(event.result);
// 打印完整的下载链接
for (const file of event.result.files) {
log?.info(`[adp-openclaw] Upload result - file.downloadUrl: ${file.downloadUrl}`);
}
}
};
uploadResultEmitter.on(UPLOAD_RESULT_EVENT, uploadResultHandler);
// Helper function to send outbound_end message
const sendOutboundEnd = (text: string) => {
if (finalSent) return; // Prevent duplicate sends
finalSent = true;
// 移除事件监听
uploadResultEmitter.off(UPLOAD_RESULT_EVENT, uploadResultHandler);
// 如果有上传结果,追加完整的下载链接到回复中
let finalText = text;
if (pendingUploadResults.length > 0) {
const uploadLinks: string[] = [];
for (const result of pendingUploadResults) {
for (const file of (result.files || [])) {
if (file.downloadUrl) {
// 使用完整的下载链接,包括签名参数
uploadLinks.push(`📎 [${file.name}](${file.downloadUrl})`);
log?.info(`[adp-openclaw] Appending download link: ${file.downloadUrl.substring(0, 100)}...`);
}
}
}
if (uploadLinks.length > 0) {
finalText = `${text}\n\n**文件下载链接(24小时有效):**\n${uploadLinks.join("\n")}`;
log?.info(`[adp-openclaw] Added ${uploadLinks.length} download links to final response`);
}
}
if (chunkIndex > 0) {
log?.info(`[adp-openclaw] Sending outbound_end to ${displayName}: ${finalText.slice(0, 50)}... (chunks=${chunkIndex})`);
const endMsg: WSMessage = {
type: MsgType.OutboundEnd,
requestId: generateRequestId(),
payload: {
to: inMsg.from,
text: finalText,
conversationId: inMsg.conversationId,
recordId: inMsg.recordId, // Pass recordId back to server
streamId: streamId,
totalChunks: chunkIndex,
user: inMsg.user,
},
timestamp: Date.now(),
};
ws.send(JSON.stringify(endMsg));
} else {
// No streaming chunks were sent, send as regular outbound message
log?.info(`[adp-openclaw] Sending outbound to ${displayName}: ${finalText.slice(0, 50)}...`);
const outMsg: WSMessage = {
type: MsgType.Outbound,
requestId: generateRequestId(),
payload: {
to: inMsg.from,
text: finalText,
conversationId: inMsg.conversationId,
recordId: inMsg.recordId, // Pass recordId back to server
user: inMsg.user,
},
timestamp: Date.now(),
};
ws.send(JSON.stringify(outMsg));
}
};
log?.info(`[adp-openclaw] Starting dispatchReplyWithBufferedBlockDispatcher for ${displayName}`);
await runtime.channel.reply.dispatchReplyWithBufferedBlockDispatcher({
ctx,
cfg: cfg ?? {},
// Enable block streaming for SSE support
replyOptions: {
disableBlockStreaming: false, // Force enable block streaming
// Use onPartialReply for real-time streaming (character-level)
// onPartialReply receives cumulative text, so we need to calculate delta
onPartialReply: async (payload: { text?: string }) => {
const fullText = payload.text || "";
if (!fullText) return;
// Calculate delta (new text since last send)
let delta = fullText;
if (fullText.startsWith(lastPartialText)) {
delta = fullText.slice(lastPartialText.length);
} else {
// Text was reset or non-monotonic, send full text
log?.debug?.(`[adp-openclaw] Partial text reset, sending full text`);
}
// Skip if no new content
if (!delta) return;
lastPartialText = fullText;
// Send delta as streaming chunk
log?.debug?.(`[adp-openclaw] Partial delta[${chunkIndex}] to ${displayName}: ${delta.slice(0, 30)}...`);
const chunkMsg: WSMessage = {
type: MsgType.OutboundChunk,
requestId: generateRequestId(),
payload: {
to: inMsg.from,
chunk: delta, // Send only the new delta, not cumulative
conversationId: inMsg.conversationId,
recordId: inMsg.recordId, // Pass recordId back to server
streamId: streamId,
index: chunkIndex,
isPartial: true, // Mark as incremental delta
user: inMsg.user,
},
timestamp: Date.now(),
};
ws.send(JSON.stringify(chunkMsg));
chunkIndex++;
},
},
dispatcherOptions: {
// ⭐ Add responsePrefix from messagesConfig (like QQBot does)
// This tells the AI what tools are available
responsePrefix: messagesConfig.responsePrefix,
// Unified deliver callback - handles both streaming blocks and final reply
// SDK calls this with info.kind = "block" for streaming chunks, "final" for complete response
deliver: async (payload: { text?: string; toolName?: string; toolArgs?: unknown; toolCallId?: string; toolResult?: unknown }, info?: { kind?: string }) => {
const text = payload.text || "";
const kind = info?.kind;
// Debug log for all deliver calls - log the actual info object
log?.info(`[adp-openclaw] deliver called: kind=${kind}, text.length=${text.length}, toolName=${payload.toolName || 'none'}, info=${JSON.stringify(info)}`);
// Handle streaming block - IGNORE because handlePartial already sent deltas
// The "block" callback contains cumulative text (same as final), not incremental delta
// Sending it would cause duplicate data on the server side
if (kind === "block") {
log?.debug?.(`[adp-openclaw] Ignoring block callback (handlePartial already sent deltas), text.length=${text.length}`);
return;
}
// Handle tool result - check if it's adp_upload_file tool and send file links to user
if (kind === "tool") {
const toolName = payload.toolName;
const toolResult = payload.toolResult;
log?.info(`[adp-openclaw] Tool result received: toolName=${toolName}, result=${JSON.stringify(toolResult)?.slice(0, 200)}`);
// If it's our upload tool and it succeeded, send file links to user
if (toolName === ADP_UPLOAD_TOOL_NAME && toolResult && typeof toolResult === "object") {
const result = toolResult as AdpUploadToolResult;
if (result.ok && result.files && result.files.length > 0) {
// Debug: print full downloadUrl before formatting
for (const file of result.files) {
log?.info(`[adp-openclaw] File downloadUrl (full): ${file.downloadUrl}`);
}
// Format upload result as user-readable message
const uploadMessage = formatUploadResultAsMarkdown(result);
log?.info(`[adp-openclaw] Sending upload result to user: ${uploadMessage.slice(0, 100)}...`);
// Send the file links as a message chunk
const chunkMsg: WSMessage = {
type: MsgType.OutboundChunk,
requestId: generateRequestId(),
payload: {
to: inMsg.from,
chunk: `\n\n${uploadMessage}\n\n`,
conversationId: inMsg.conversationId,
recordId: inMsg.recordId,
streamId: streamId,
index: chunkIndex,
isPartial: true,
user: inMsg.user,
isFileUpload: true, // Mark as file upload result
},
timestamp: Date.now(),
};
ws.send(JSON.stringify(chunkMsg));
chunkIndex++;
// Update lastPartialText to include the upload message
lastPartialText += `\n\n${uploadMessage}\n\n`;
}
}
return;
}
// Handle final reply or undefined kind - send outbound_end
// SDK may call deliver without kind when streaming ends
if (kind === "final" || kind === undefined) {
log?.info(`[adp-openclaw] deliver triggering sendOutboundEnd (kind=${kind})`);
log?.info(`[adp-openclaw] Final text content: ${text}`);
sendOutboundEnd(text || lastPartialText);
}
},
onError: (err: Error) => {
log?.error(`[adp-openclaw] Reply error: ${err.message}`);
},
},
});
log?.info(`[adp-openclaw] dispatchReplyWithBufferedBlockDispatcher returned (finalSent=${finalSent}, chunkIndex=${chunkIndex})`);
// IMPORTANT: After dispatchReplyWithBufferedBlockDispatcher completes,
// ensure outbound_end is sent even if "final" deliver was not called.
// This handles cases where the SDK only sends blocks without a final callback.
if (!finalSent && chunkIndex > 0) {
// Use the last accumulated partial text as the final text
const finalText = lastPartialText || "";
log?.info(`[adp-openclaw] dispatchReply completed without final, sending outbound_end (chunks=${chunkIndex})`);
sendOutboundEnd(finalText);
}
} catch (err) {
log?.error(`[adp-openclaw] Failed to process message: ${err}`);
}
break;
}
case MsgType.Ack:
// Message acknowledgment
log?.debug?.(`[adp-openclaw] Message acknowledged: ${msg.requestId}`);
break;
case MsgType.Error: {
const error = msg.payload as { error: string; message: string };
log?.error(`[adp-openclaw] Server error: ${error.error} - ${error.message}`);
break;
}
// Handle fetch OpenClaw chat history request from GoServer
case MsgType.ConvHistory: {
if (!authenticated) break;
const historyPayload = msg.payload as {
sessionKey?: string;
conversationId?: string;
limit?: number;
};
log?.info(`[adp-openclaw] Received conv_history request: sessionKey=${historyPayload.sessionKey}, conversationId=${historyPayload.conversationId}`);
try {
const limit = historyPayload.limit ?? 200;
// Resolve session key from various inputs
let sessionKey = historyPayload.sessionKey || "main";
if (!historyPayload.sessionKey && historyPayload.conversationId) {
sessionKey = resolveSessionKey(historyPayload.conversationId);
}
// Use CLI backend only
const result: ChatHistoryResponse = await getChatHistory(sessionKey, {
limit,
log,
});
log?.info(`[adp-openclaw] Sending conv_response: ${result.messages.length} messages (backend=cli)`);
// Send response back to GoServer
const responseMsg: WSMessage = {
type: MsgType.ConvResponse,
requestId: msg.requestId,
payload: result as unknown as Record<string, unknown>,
timestamp: Date.now(),
};
ws.send(JSON.stringify(responseMsg));
} catch (err) {
log?.error(`[adp-openclaw] Failed to fetch OpenClaw history: ${err}`);
const errorMsg: WSMessage = {
type: MsgType.ConvResponse,
requestId: msg.requestId,
payload: {
error: true,
message: String(err),
sessionKey: historyPayload.sessionKey || historyPayload.conversationId || "main",
messages: [],
} as unknown as Record<string, unknown>,
timestamp: Date.now(),
};
ws.send(JSON.stringify(errorMsg));
}
break;
}
// Handle fetch OpenClaw sessions list request from GoServer
case MsgType.FetchOpenClawSessions: {
if (!authenticated) break;
const sessionsPayload = msg.payload as {
limit?: number;
includeLastMessage?: boolean;
backend?: "file" | "cli" | "auto";
};
const backend = sessionsPayload.backend || "auto";
log?.info(`[adp-openclaw] Received fetch_openclaw_sessions request: limit=${sessionsPayload.limit}, backend=${backend}`);
try {
// Use unified listSessions with backend selection
const result: SessionsListResponse = await listSessions({
limit: sessionsPayload.limit ?? 100,
includeLastMessage: sessionsPayload.includeLastMessage ?? true,
log,
backend,
});
log?.info(`[adp-openclaw] Sending openclaw_sessions_response: ${result.sessions.length} sessions (backend=${backend})`);
// Send response back to GoServer
const responseMsg: WSMessage = {
type: MsgType.OpenClawSessionsResponse,
requestId: msg.requestId,
payload: {
...result as unknown as Record<string, unknown>,
backend,
},
timestamp: Date.now(),
};
ws.send(JSON.stringify(responseMsg));
} catch (err) {
log?.error(`[adp-openclaw] Failed to list OpenClaw sessions: ${err}`);
const errorMsg: WSMessage = {
type: MsgType.OpenClawSessionsResponse,
requestId: msg.requestId,
payload: {
error: true,
message: String(err),
sessions: [],
} as unknown as Record<string, unknown>,
timestamp: Date.now(),
};
ws.send(JSON.stringify(errorMsg));
}
break;
}
default:
log?.warn(`[adp-openclaw] Unknown message type: ${msg.type}`);
}
} catch (err) {
log?.error(`[adp-openclaw] Failed to parse message: ${err}`);
}
});
ws.on("close", (code, reason) => {
if (pingInterval) clearInterval(pingInterval);
abortSignal?.removeEventListener("abort", abortHandler);
// Clear active WebSocket when connection closes
setActiveWebSocket(null);
log?.info(`[adp-openclaw] WebSocket closed: ${code} ${reason.toString()}`);
resolve();
});
ws.on("error", (err) => {
if (pingInterval) clearInterval(pingInterval);
abortSignal?.removeEventListener("abort", abortHandler);
reject(err);
});
});
}
function sleep(ms: number, signal?: AbortSignal): Promise<void> {
return new Promise((resolve) => {
if (signal?.aborted) {
resolve();
return;
}
const timeout = setTimeout(resolve, ms);
signal?.addEventListener("abort", () => {
clearTimeout(timeout);
resolve();
});
});
}