📄 state.ts

import crypto from "node:crypto";
import type { StreamState, PendingInbound, ActiveReplyState, WecomWebhookTarget } from "./types.js";
import type { WecomInboundMessage } from "../types.js";

// Constants
export const LIMITS = {
    STREAM_TTL_MS: 10 * 60 * 1000,
    ACTIVE_REPLY_TTL_MS: 60 * 60 * 1000,
    DEFAULT_DEBOUNCE_MS: 500,
    STREAM_MAX_BYTES: 20_480,
    REQUEST_TIMEOUT_MS: 15_000
};

/**
 * **StreamStore (流状态会话存储)**
 * 
 * 管理企业微信回调的流式会话状态、消息去重和防抖聚合逻辑。
 * 负责维护 msgid 到 streamId 的映射,以及临时缓存待处理的 Pending 消息。
 */
export class StreamStore {
    private streams = new Map<string, StreamState>();
    private msgidToStreamId = new Map<string, string>();
    private pendingInbounds = new Map<string, PendingInbound>();
    private conversationState = new Map<string, { activeBatchKey: string; queue: string[]; nextSeq: number }>();
    private streamIdToBatchKey = new Map<string, string>();
    private batchStreamIdToAckStreamIds = new Map<string, string[]>();
    private onFlush?: (pending: PendingInbound) => void;

    /**
     * **setFlushHandler (设置防抖刷新回调)**
     * 
     * 当防抖计时器结束时调用的处理函数。通常用于触发 Agent 进行消息处理。
     * @param handler 回调函数,接收聚合后的 PendingInbound 对象
     */
    public setFlushHandler(handler: (pending: PendingInbound) => void) {
        this.onFlush = handler;
    }

    /**
     * **createStream (创建流会话)**
     * 
     * 初始化一个新的流式会话状态。
     * @param params.msgid (可选) 企业微信消息 ID,用于后续去重映射
     * @returns 生成的 streamId (Hex 字符串)
     */
    createStream(params: { msgid?: string; conversationKey?: string; batchKey?: string }): string {
        const streamId = crypto.randomBytes(16).toString("hex");

        if (params.msgid) {
            this.msgidToStreamId.set(String(params.msgid), streamId);
        }

        this.streams.set(streamId, {
            streamId,
            msgid: params.msgid,
            conversationKey: params.conversationKey,
            batchKey: params.batchKey,
            createdAt: Date.now(),
            updatedAt: Date.now(),
            started: false,
            finished: false,
            content: ""
        });

        if (params.batchKey) {
            this.streamIdToBatchKey.set(streamId, params.batchKey);
        }

        return streamId;
    }

    /**
     * **getStream (获取流状态)**
     * 
     * 根据 streamId 获取当前的会话状态。
     * @param streamId 流会话 ID
     */
    getStream(streamId: string): StreamState | undefined {
        return this.streams.get(streamId);
    }

    /**
     * **getStreamByMsgId (通过 msgid 查找流 ID)**
     * 
     * 用于消息去重:检查该 msgid 是否已经关联由正在进行或已完成的流会话。
     * @param msgid 企业微信消息 ID
     */
    getStreamByMsgId(msgid: string): string | undefined {
        return this.msgidToStreamId.get(String(msgid));
    }

    setStreamIdForMsgId(msgid: string, streamId: string): void {
        const key = String(msgid).trim();
        const value = String(streamId).trim();
        if (!key || !value) return;
        this.msgidToStreamId.set(key, value);
    }

    /**
     * 将“回执流”(ack stream) 关联到某个“批次流”(batch stream)。
     * 用于:当用户连发多条消息被合并排队时,让后续消息的 stream 最终也能更新为可理解的提示,而不是永久停留在“已合并排队…”。
     */
    addAckStreamForBatch(params: { batchStreamId: string; ackStreamId: string }): void {
        const batchStreamId = params.batchStreamId.trim();
        const ackStreamId = params.ackStreamId.trim();
        if (!batchStreamId || !ackStreamId) return;
        const list = this.batchStreamIdToAckStreamIds.get(batchStreamId) ?? [];
        list.push(ackStreamId);
        this.batchStreamIdToAckStreamIds.set(batchStreamId, list);
    }

    /**
     * 取出并清空某个批次流关联的所有回执流。
     */
    drainAckStreamsForBatch(batchStreamId: string): string[] {
        const key = batchStreamId.trim();
        if (!key) return [];
        const list = this.batchStreamIdToAckStreamIds.get(key) ?? [];
        this.batchStreamIdToAckStreamIds.delete(key);
        return list;
    }

    /**
     * **updateStream (更新流状态)**
     * 
     * 原子更新流状态,并自动刷新 updatedAt 时间戳。
     * @param streamId 流会话 ID
     * @param mutator 状态修改函数
     */
    updateStream(streamId: string, mutator: (state: StreamState) => void): void {
        const state = this.streams.get(streamId);
        if (state) {
            mutator(state);
            state.updatedAt = Date.now();
        }
    }

    /**
     * **markStarted (标记流开始)**
     * 
     * 标记该流会话已经开始处理(通常在 Agent 启动后调用)。
     */
    markStarted(streamId: string): void {
        this.updateStream(streamId, (s) => { s.started = true; });
    }

    /**
     * **markFinished (标记流结束)**
     * 
     * 标记该流会话已完成,不再接收内容更新。
     */
    markFinished(streamId: string): void {
        this.updateStream(streamId, (s) => { s.finished = true; });
    }

    /**
     * **addPendingMessage (添加待处理消息 / 防抖聚合)**
     * 
     * 将收到的消息加入待处理队列。如果相同 pendingKey 已存在,则是防抖聚合;否则创建新条目。
     * 会自动设置或重置防抖定时器。
     * 
     * @param params 消息参数
     * @returns { streamId, isNew } isNew=true 表示这是新的一组消息,需初始化 ActiveReply
     */
    addPendingMessage(params: {
        conversationKey: string;
        target: WecomWebhookTarget;
        msg: WecomInboundMessage;
        msgContent: string;
        nonce: string;
        timestamp: string;
        debounceMs?: number;
    }): { streamId: string; status: "active_new" | "active_merged" | "queued_new" | "queued_merged" } {
        const { conversationKey, target, msg, msgContent, nonce, timestamp, debounceMs } = params;
        const effectiveDebounceMs = debounceMs ?? LIMITS.DEFAULT_DEBOUNCE_MS;

        const state = this.conversationState.get(conversationKey);
        if (!state) {
            // 第一批次(active)
            const batchKey = conversationKey;
            const streamId = this.createStream({ msgid: msg.msgid, conversationKey, batchKey });
            const pending: PendingInbound = {
                streamId,
                conversationKey,
                batchKey,
                target,
                msg,
                contents: [msgContent],
                msgids: msg.msgid ? [msg.msgid] : [],
                nonce,
                timestamp,
                createdAt: Date.now(),
                timeout: setTimeout(() => {
                    this.requestFlush(batchKey);
                }, effectiveDebounceMs)
            };
            this.pendingInbounds.set(batchKey, pending);
            this.conversationState.set(conversationKey, { activeBatchKey: batchKey, queue: [], nextSeq: 1 });
            return { streamId, status: "active_new" };
        }

        // 合并规则(排队语义):
        // - 初始批次(batchKey===conversationKey)不接收合并:避免 1/2 都刷出同一份最终答案。
        // - 如果 active 批次是“排队批次”(batchKey!=conversationKey)且还没开始处理(started=false),
        //   则允许把后续消息合并进该 active 批次(典型:1 很快结束,2 变 active 但还没开始跑,3 合并到 2)。
        const activeBatchKey = state.activeBatchKey;
        const activeIsInitial = activeBatchKey === conversationKey;
        const activePending = this.pendingInbounds.get(activeBatchKey);
        if (activePending && !activeIsInitial) {
            const activeStream = this.streams.get(activePending.streamId);
            const activeStarted = Boolean(activeStream?.started);
            if (!activeStarted) {
                activePending.contents.push(msgContent);
                if (msg.msgid) {
                    activePending.msgids.push(msg.msgid);
                    // 注意:不把该 msgid 映射到 active streamId(避免该消息最终也刷出同一份完整答案)
                }
                if (activePending.timeout) clearTimeout(activePending.timeout);
                activePending.timeout = setTimeout(() => {
                    this.requestFlush(activeBatchKey);
                }, effectiveDebounceMs);
                return { streamId: activePending.streamId, status: "active_merged" };
            }
        }

        // active 批次已经开始处理;后续消息进入队列批次(queued),并允许在队列批次内做防抖聚合。
        const queuedBatchKey = state.queue[0];
        if (queuedBatchKey) {
            const existingQueued = this.pendingInbounds.get(queuedBatchKey);
            if (existingQueued) {
                existingQueued.contents.push(msgContent);
                if (msg.msgid) {
                    existingQueued.msgids.push(msg.msgid);
                    // 注意:不把该 msgid 映射到 queued streamId(避免该消息最终也刷出同一份完整答案)
                }
                if (existingQueued.timeout) clearTimeout(existingQueued.timeout);

                existingQueued.timeout = setTimeout(() => {
                    this.requestFlush(queuedBatchKey);
                }, effectiveDebounceMs);
                return { streamId: existingQueued.streamId, status: "queued_merged" };
            }
        }

        // 创建新的 queued 批次(会话只保留 1 个“下一批次”,后续消息继续合并到该批次)
        const seq = state.nextSeq++;
        const batchKey = `${conversationKey}#q${seq}`;
        state.queue = [batchKey];
        const streamId = this.createStream({ msgid: msg.msgid, conversationKey, batchKey });
        const pending: PendingInbound = {
            streamId,
            conversationKey,
            batchKey,
            target,
            msg,
            contents: [msgContent],
            msgids: msg.msgid ? [msg.msgid] : [],
            nonce,
            timestamp,
            createdAt: Date.now(),
            timeout: setTimeout(() => {
                this.requestFlush(batchKey);
            }, effectiveDebounceMs)
        };
        this.pendingInbounds.set(batchKey, pending);
        this.conversationState.set(conversationKey, state);
        return { streamId, status: "queued_new" };
    }

    /**
     * 请求刷新:如果该批次当前为 active,则立即 flush;否则标记 ready,等待前序批次完成后再 flush。
     */
    private requestFlush(batchKey: string): void {
        const pending = this.pendingInbounds.get(batchKey);
        if (!pending) return;

        const state = this.conversationState.get(pending.conversationKey);
        const isActive = state?.activeBatchKey === batchKey;
        if (!isActive) {
            if (pending.timeout) {
                clearTimeout(pending.timeout);
                pending.timeout = null;
            }
            pending.readyToFlush = true;
            return;
        }
        this.flushPending(batchKey);
    }

    /**
     * **flushPending (触发消息处理)**
     * 
     * 内部方法:防抖时间结束后,将聚合的消息一次性推送给 flushHandler。
     */
    private flushPending(pendingKey: string): void {
        const pending = this.pendingInbounds.get(pendingKey);
        if (!pending) return;

        this.pendingInbounds.delete(pendingKey);
        if (pending.timeout) {
            clearTimeout(pending.timeout);
            pending.timeout = null;
        }
        pending.readyToFlush = false;

        if (this.onFlush) {
            this.onFlush(pending);
        }
    }

    /**
     * 在一个 stream 完成后推进会话队列:将 queued 批次提升为 active,并在需要时触发 flush。
     */
    onStreamFinished(streamId: string): void {
        const batchKey = this.streamIdToBatchKey.get(streamId);
        const state = batchKey ? this.streams.get(streamId) : undefined;
        const conversationKey = state?.conversationKey;
        if (!batchKey || !conversationKey) return;

        const conv = this.conversationState.get(conversationKey);
        if (!conv) return;
        if (conv.activeBatchKey !== batchKey) return;

        const next = conv.queue.shift();
        if (!next) {
            // 队列为空:会话已空闲。删除状态,避免后续消息被误判为“排队但永远不触发”。
            this.conversationState.delete(conversationKey);
            return;
        }
        conv.activeBatchKey = next;
        this.conversationState.set(conversationKey, conv);

        const pending = this.pendingInbounds.get(next);
        if (!pending) return;
        if (pending.readyToFlush) {
            this.flushPending(next);
        }
        // 否则等待该批次自己的 debounce timer 到期后 requestFlush(next) 执行
    }

    /**
     * **prune (清理过期状态)**
     * 
     * 清理过期的流会话、msgid 映射以及残留的 Pending 消息。
     * @param now 当前时间戳 (毫秒)
     */
    prune(now: number = Date.now()): void {
        const streamCutoff = now - LIMITS.STREAM_TTL_MS;

        // 清理过期的流会话
        for (const [id, state] of this.streams.entries()) {
            if (state.updatedAt < streamCutoff) {
                this.streams.delete(id);
                if (state.msgid) {
                    // 如果 msgid 映射仍指向该 stream,则一并移除
                    if (this.msgidToStreamId.get(state.msgid) === id) {
                        this.msgidToStreamId.delete(state.msgid);
                    }
                }
            }
        }

        // 清理悬空的 msgid 映射 (Double check)
        for (const [msgid, id] of this.msgidToStreamId.entries()) {
            if (!this.streams.has(id)) {
                this.msgidToStreamId.delete(msgid);
            }
        }

        // 清理超时的 Pending 消息 (通常由 timeout 清理,此处作为兜底)
        for (const [key, pending] of this.pendingInbounds.entries()) {
            if (now - pending.createdAt > LIMITS.STREAM_TTL_MS) {
                if (pending.timeout) clearTimeout(pending.timeout);
                this.pendingInbounds.delete(key);
            }
        }

        // 清理 conversationState:active 已不存在且队列为空的会话
        for (const [convKey, conv] of this.conversationState.entries()) {
            const activeExists = this.pendingInbounds.has(conv.activeBatchKey) || Array.from(this.streamIdToBatchKey.values()).includes(conv.activeBatchKey);
            const hasQueue = conv.queue.length > 0;
            if (!activeExists && !hasQueue) {
                this.conversationState.delete(convKey);
            }
        }
    }
}

/**
 * **ActiveReplyStore (主动回复地址存储)**
 * 
 * 管理企业微信回调中的 `response_url` (用于被动回复转主动推送) 和 `proxyUrl`。
 * 支持 'once' (一次性) 或 'multi' (多次) 使用策略。
 */
export class ActiveReplyStore {
    private activeReplies = new Map<string, ActiveReplyState>();

    /**
     * @param policy 使用策略: "once" (默认,销毁式) 或 "multi"
     */
    constructor(private policy: "once" | "multi" = "once") { }

    /**
     * **store (存储回复地址)**
     * 
     * 关联 streamId 与 response_url。
     */
    store(streamId: string, responseUrl?: string, proxyUrl?: string): void {
        const url = responseUrl?.trim();
        if (!url) return;
        this.activeReplies.set(streamId, { response_url: url, proxyUrl, createdAt: Date.now() });
    }

    /**
     * **getUrl (获取回复地址)**
     * 
     * 获取指定 streamId 关联的 response_url。
     */
    getUrl(streamId: string): string | undefined {
        return this.activeReplies.get(streamId)?.response_url;
    }

    /**
     * **use (消耗回复地址)**
     * 
     * 使用存储的 response_url 执行操作。
     * - 如果策略是 "once",第二次调用会抛错。
     * - 自动更新使用时间 (usedAt)。
     * 
     * @param streamId 流会话 ID
     * @param fn 执行函数,接收 { responseUrl, proxyUrl }
     */
    async use(streamId: string, fn: (params: { responseUrl: string; proxyUrl?: string }) => Promise<void>): Promise<void> {
        const state = this.activeReplies.get(streamId);
        if (!state?.response_url) {
            return; // 无 URL 可用,安全跳过
        }

        if (this.policy === "once" && state.usedAt) {
            throw new Error(`response_url already used for stream ${streamId} (Policy: once)`);
        }

        try {
            await fn({ responseUrl: state.response_url, proxyUrl: state.proxyUrl });
            state.usedAt = Date.now();
        } catch (err: unknown) {
            state.lastError = err instanceof Error ? err.message : String(err);
            throw err;
        }
    }

    /**
     * **prune (清理过期地址)**
     * 
     * 清理超过 TTL 的 active reply 记录。
     */
    prune(now: number = Date.now()): void {
        const cutoff = now - LIMITS.ACTIVE_REPLY_TTL_MS;
        for (const [id, state] of this.activeReplies.entries()) {
            if (state.createdAt < cutoff) {
                this.activeReplies.delete(id);
            }
        }
    }
}

/**
 * **MonitorState (全局监控状态容器)**
 * 
 * 模块单例,统一管理 StreamStore 和 ActiveReplyStore 实例。
 * 提供生命周期方法 (startPruning / stopPruning) 以自动清理过期数据。
 */
class MonitorState {
    /** 主要的流状态存储 */
    public readonly streamStore = new StreamStore();
    /** 主动回复地址存储 */
    public readonly activeReplyStore = new ActiveReplyStore("multi");

    private pruneInterval?: NodeJS.Timeout;

    /**
     * **startPruning (启动自动清理)**
     * 
     * 启动定时器,定期清理过期的流和回复地址。应在插件有活跃 Target 时调用。
     * @param intervalMs 清理间隔 (默认 60s)
     */
    public startPruning(intervalMs: number = 60_000): void {
        if (this.pruneInterval) return;
        this.pruneInterval = setInterval(() => {
            const now = Date.now();
            this.streamStore.prune(now);
            this.activeReplyStore.prune(now);
        }, intervalMs);
    }

    /**
     * **stopPruning (停止自动清理)**
     * 
     * 停止定时器。应在插件无活跃 Target 时调用以释放资源。
     */
    public stopPruning(): void {
        if (this.pruneInterval) {
            clearInterval(this.pruneInterval);
            this.pruneInterval = undefined;
        }
    }
}

/**
 * **monitorState (全局单例)**
 * 
 * 导出全局唯一的 MonitorState 实例,供整个应用共享状态。
 */
export const monitorState = new MonitorState();