📄 state.queue.test.ts

import { describe, expect, test, vi } from "vitest";

import type { WecomInboundMessage } from "../types.js";
import type { WecomWebhookTarget } from "./types.js";
import { StreamStore } from "./state.js";

describe("wecom StreamStore queue", () => {
  test("does not merge into active batch; flushes queued batch after active finishes", async () => {
    vi.useFakeTimers();
    try {
      const store = new StreamStore();
      const flushed: string[] = [];
      store.setFlushHandler((pending) => flushed.push(pending.streamId));

      const target = {
        account: {} as any,
        config: {} as any,
        runtime: {},
        core: {} as any,
        path: "/wecom",
      } satisfies WecomWebhookTarget;

      const conversationKey = "wecom:default:U:C";

      const msg1 = { msgid: "M1" } satisfies WecomInboundMessage;
      const msg2 = { msgid: "M2" } satisfies WecomInboundMessage;

      const r1 = store.addPendingMessage({
        conversationKey,
        target,
        msg: msg1,
        msgContent: "1",
        nonce: "n",
        timestamp: "t",
        debounceMs: 10,
      });
      const r2 = store.addPendingMessage({
        conversationKey,
        target,
        msg: msg2,
        msgContent: "2",
        nonce: "n",
        timestamp: "t",
        debounceMs: 10,
      });

      expect(r1.status).toBe("active_new");
      // 初始批次不接收合并:第二条进入 queued
      expect(r2.status).toBe("queued_new");
      expect(r2.streamId).not.toBe(r1.streamId);

      // Follow-ups within queued should merge into queued (status queued_merged).
      const r3 = store.addPendingMessage({
        conversationKey,
        target,
        msg: { msgid: "M3" } as any,
        msgContent: "3",
        nonce: "n",
        timestamp: "t",
        debounceMs: 10,
      });
      expect(r3.status).toBe("queued_merged");
      expect(r3.streamId).toBe(r2.streamId);

      // Active batch flushes at debounce time.
      await vi.advanceTimersByTimeAsync(11);
      expect(flushed).toEqual([r1.streamId]);

      // Queued batch timer also fires, but cannot flush until active finishes.
      await vi.advanceTimersByTimeAsync(11);
      expect(flushed).toEqual([r1.streamId]);

      // Once the active stream finishes, queued batch is promoted and flushes immediately.
      store.onStreamFinished(r1.streamId);
      expect(flushed).toEqual([r1.streamId, r2.streamId]);
    } finally {
      vi.useRealTimers();
    }
  });

  test("merges into active batch when it has not started yet (even after promotion)", async () => {
    vi.useFakeTimers();
    try {
      const store = new StreamStore();
      const flushed: string[] = [];
      store.setFlushHandler((pending) => flushed.push(pending.streamId));

      const target = {
        account: {} as any,
        config: {} as any,
        runtime: {},
        core: {} as any,
        path: "/wecom",
      } satisfies WecomWebhookTarget;

      const conversationKey = "wecom:default:U:C2";

      // 1 becomes active and flushes; mark as started to simulate "processing started".
      const r1 = store.addPendingMessage({
        conversationKey,
        target,
        msg: { msgid: "M1" } as any,
        msgContent: "1",
        nonce: "n",
        timestamp: "t",
        debounceMs: 10,
      });
      store.markStarted(r1.streamId);
      await vi.advanceTimersByTimeAsync(11);
      expect(flushed).toEqual([r1.streamId]);

      // 2 enters queued with a longer debounce; it should NOT become readyToFlush yet.
      const r2 = store.addPendingMessage({
        conversationKey,
        target,
        msg: { msgid: "M2" } as any,
        msgContent: "2",
        nonce: "n",
        timestamp: "t",
        debounceMs: 100,
      });
      expect(flushed).toEqual([r1.streamId]);

      // Finish 1, promote 2 to active (but do NOT flush immediately since it's not readyToFlush).
      store.onStreamFinished(r1.streamId);
      expect(flushed).toEqual([r1.streamId]);

      // Now 2 is active, but (in real monitor) it may still be in debounce before markStarted.
      // We simulate that by NOT calling markStarted. Follow-up should merge into active (same streamId).
      const r3 = store.addPendingMessage({
        conversationKey,
        target,
        msg: { msgid: "M3" } as any,
        msgContent: "3",
        nonce: "n",
        timestamp: "t",
        debounceMs: 10,
      });
      expect(r3.streamId).toBe(r2.streamId);
      expect(r3.status).toBe("active_merged");
    } finally {
      vi.useRealTimers();
    }
  });

  test("clears conversation state when idle so next message becomes active", async () => {
    const store = new StreamStore();
    store.setFlushHandler(() => { });

    const target = {
      account: {} as any,
      config: {} as any,
      runtime: {},
      core: {} as any,
      path: "/wecom",
    } satisfies WecomWebhookTarget;

    const conversationKey = "wecom:default:U:idle";

    const r1 = store.addPendingMessage({
      conversationKey,
      target,
      msg: { msgid: "M1" } as any,
      msgContent: "1",
      nonce: "n",
      timestamp: "t",
      debounceMs: 10,
    });
    store.markStarted(r1.streamId);
    store.markFinished(r1.streamId);
    store.onStreamFinished(r1.streamId);

    const r2 = store.addPendingMessage({
      conversationKey,
      target,
      msg: { msgid: "M2" } as any,
      msgContent: "2",
      nonce: "n",
      timestamp: "t",
      debounceMs: 10,
    });
    expect(r2.status).toBe("active_new");
    expect(r2.streamId).not.toBe(r1.streamId);
  });
});