Originally from https://docs.google.com/document/d/1cF98sQM6D2HqvfizBDTZf0yOYFsSjWJ-aLhsFdnjs8E/edit?tab=t.0.
Design Doc: Agentic Tasks and Workflows #
Feb 11, 2025
Issue: https://github.com/funnelstory/product/issues/2721
Original workflows design for reference: Workflows design doc
Design Doc: Agentic Tasks and Workflows #
Introduction #
This document outlines the design and implementation of Agentic Workflows, a system that leverages Large Language Models (LLMs) to drive dynamic, automated processing. The system is designed to handle various workflow types, including Event-Based and Account-Bound workflows, and provides a flexible and scalable architecture for managing workflow runs.
Background #
Agentic workflows leverage LLMs to drive dynamic, automated processing that:
- Determines whether an inbound trigger should start a workflow.
- Plans the specific steps in that workflow run. This plan would include all the steps.
- Decides the subsequent step based on already occurred events and the original plan.
System Components #
Our agentic workflows system is built around four main components that work together to provide streamlined, dynamic automation:
- Large Language Model (LLM)
- Acts as the decision-making “brain” of the system.
- Consumes current workflow context—including trigger details, account information, and historical actions—to plan the workflow run and determine the next best action.
- Leverages techniques like fine-tuning and Retrieval-Augmented Generation (RAG) to inject best practices into its recommendations.
- Workflow Engine
- Orchestrates and manages individual workflow runs, maintaining both their state and context over time.
- Ensures that each workflow run follows a defined plan, prevents duplicate enrollment (only one active run per event/account trigger), and records all actions taken in the workflow.
- Coordinates with the LLM to adapt the flow based on evolving conditions and task outcomes.
- Scheduler
- Handles the timing aspect of workflow execution, especially when the LLM directs a delay via its wait mechanism.
- Records the required pause period and reactivates workflow runs when the specified duration elapses.
- Ensures that workflows are resumed seamlessly after any intentional delays.
- Agent Workflow Messages
- Functions as a centralized event source designed specifically for workflow orchestration.
- Aggregates events and changes from various other parts of the system—such as signal updates, activities, task modifications, and new note additions—into a single repository.
- Eliminates the need for running complex, multi-table queries by providing a unified log of recent changes (e.g., within the last two hours).
- Enables the workflow engine to quickly detect relevant updates, triggering re-evaluation or continuation of workflow runs as needed.
Together, these components create a flexible, responsive, and maintainable architecture for agentic workflows, allowing us to efficiently manage both immediate triggers and longer-running, account-centric processes.
Configuration and Workflow Types #
Agentic workflows are configured declaratively using YAML or a UI, with two primary types:
Event-Based Workflows #
- Triggered by a discrete event (e.g., a new conversation).
- Keyed by a unique event identifier combined with the account ID.
- E.g.
{"account_id": "0015e00000dRNSoAAO", "ticket_id": "432f1a7e0b77"}
- E.g.
Account-Based Workflows #
- Designed for longer-term, account-centric workflows (e.g., an “account renewal” workflow).
- Keyed by the account ID.
Configuration Details #
Admins specify the following in the configuration:
- Prompt: Describes the trigger and objective of the workflow.
- Examples:
Start a get-well plan for customers predicted to churnStart an expansion plan when customers are hitting their license limitSend me and the AE assigned to the account when they have concerns about pricing
- Examples:
- Workflow Properties:
- Whether the workflow is a draft (a DB flag).
- Enabled sources and actions.
- Sources: An array of objects that represent the types of events or inputs that can trigger the workflow.
- Example:
[{"type": "email"}, {"type": "ticket"}, {"type": "note"}, {"type": "meeting"}]
- Example:
- Actions: A map of objects—for example:
- Sources: An array of objects that represent the types of events or inputs that can trigger the workflow.
{
"create_tasks": {
"enabled": true,
"expiry": "7d"
},
"send_notifications": {
"enabled": true
},
"follow_ups": {
"enabled": true
}
}
Workflow Versioning and Configuration Stability #
To avoid interruptions during in-flight runs when workflow configurations are updated, the system uses workflow versioning:
- Each workflow configuration is stored as a snapshot in the database (in the
agent_workflow_versions.configJSONB field) along with a version identifier. - The
agent_workflowstable maintains alatest_versioncolumn, but every workflow run holds a reference (workflow_version) to the specific version in effect at the time of its start. - When a workflow config is updated and a new version is published, new workflow runs are initiated using the new version while all running workflow runs continue processing according to the version they started with.
- This approach ensures configuration updates do not interrupt or alter the logic of running workflow runs.
Workflow Enrollment #
To prevent duplicate processing, strict enrollment policies are enforced:
- Unique Active Enrollment: An account (or an event-based trigger for an account) can only be enrolled in a workflow once until the active workflow run completes.
- Metadata Linkage: When a workflow run creates a task, the task metadata includes an identifier linking back to the originating workflow run. This metadata ensures that any subsequent task updates re-trigger the correct workflow run.
Planning and Determining the Next Best Action #
One of the key capabilities of our agent is determining what to do in each scenario. This can be achieved through fine-tuning with examples or by providing knowledge through Retrieval-Augmented Generation (RAG).
We will design for injecting best practices as part of the agent context using a new tool called generate_plan(scenario). The scenario can be phrases like “expansion,” “get well plan,” or “adoption.” These are determined by the LLM. Using these phrases, we can determine (via embeddings) the relevant resources to extract best practices from and come up with a plan.
For example, we can leverage the GitLab Customer Success handbook, which is open source: GitLab Customer Success Handbook. By leveraging such best practices, we can create a smart agent that is aware of industry standards. We will start with a basic version and continue to build on it, creating our own knowledge base that can become our “secret sauce” or intellectual property.
Workflow Execution #
When an inbound trigger occurs, the agentic workflows engine processes the event as follows:
- Trigger Evaluation: External events (activities, signals, account updates) are checked to see if the conditions defined in the workflow config are met.
- Events falling within a defined time window (e.g., the last 2 hours) are considered. This is a fixed time window.
- For event-based workflow runs, the key is built from the event ID and account ID; for account-bound workflow runs, the key is based solely on the account ID plus workflow identifier.
- Workflow Run Initiation or Update:
- If no workflow run is active for a given key, a new workflow run is started using the current workflow version.
- If a workflow run is already active, the incoming event updates that workflow run’s context.
- This ensures unique enrollment—preventing duplicate active workflow runs.
- Step-by-Step Execution: The LLM is responsible for:
- Defining the plan and determining actions for each step.
- Deciding whether to proceed immediately or delay further execution.
- Wait Mechanism via LLM: If the LLM determines that the next action should be delayed, it invokes a new tool—
wait(duration). When this occurs:- The workflow run is paused and the desired duration or target timestamp is recorded.
- A scheduler (in our Go backend) reactivates the workflow run once the wait period expires.
- Paused workflow runs are unaffected by changes to the original workflow configuration.
- Completion: If the LLM determines that the workflow run should be marked as complete, it invokes a
complete()tool. When this occurs:- The workflow run is marked as complete.
Workflow Run Context #
The workflow run context is crucial for the LLM to make informed decisions throughout the workflow execution. It includes the following elements:
- Prompt: The initial trigger or objective of the workflow, which provides the LLM with the context for the workflow’s purpose.
- Information about the Account: Details about the account involved in the workflow, such as account status, history, and any relevant metadata. This information helps the LLM understand the specific context of the account. This information is present regardless of the workflow type.
- What the Agent Has Done in This Workflow Run So Far: A record of the actions taken by the agent during the current workflow run. This includes:
- Creating Tasks: Details about any tasks that have been created as part of the workflow.
- Full Information About the Task: Comprehensive details about each task, including task description, assigned user, due date, and any other relevant metadata. This ensures the LLM has a complete understanding of the task’s context and can make appropriate decisions based on the task’s status and content.
- Creating Tasks: Details about any tasks that have been created as part of the workflow.
Possible Actions #
The agent can perform the following actions within the system:
- Creating a Task: The agent can create tasks that are clearly designated as system-generated. These tasks are added to the workflow context and can be assigned to users for further action.
- Sending Email Notifications: The agent can send email notifications to relevant stakeholders, ensuring that all parties are informed about the workflow’s progress and any required actions.
Agent Awareness of Task State #
Agents are fully aware of the tasks they initiate. Any changes in a task’s state trigger a re-evaluation of the originating workflow run. The agent should be aware of the full task content in its context to make informed decisions. This ensures that the workflow remains dynamic and responsive to changes in task status, allowing for timely adjustments and actions.
API Endpoints #
The following API endpoints will support the creation, retrieval, and inspection of agentic workflows:
- GET /api/ai/workflows: Returns a list of configured workflows.
- POST /api/ai/workflows: Creates a new workflow configuration.
- GET /api/ai/workflows/{workflow_id}: Retrieves detailed information about a specific workflow, including its current configuration and status.
- GET /api/ai/workflows/{workflow_id}/runs: Returns a list of workflow runs (active and historical) associated with a particular workflow.
- GET /api/ai/workflows/{workflow_id}/runs/{run_id}: Retrieves details for a specific workflow run instance.
- POST /api/ai/workflows/{workflow_id}/runs/{run_id}/cancel: Cancels a workflow run
- GET /api/ai/workflows/{workflow_id}/versions/{version}: Retrieves the configuration snapshot for a particular version of a workflow.
Database Schema #
CREATE TABLE agent_workflows (
workspace_id UUID NOT NULL REFERENCES workspaces(id) ON DELETE CASCADE,
id UUID NOT NULL,
name TEXT NOT NULL,
latest_version TEXT NOT NULL,
created_at TIMESTAMP WITH TIME ZONE NOT NULL,
updated_at TIMESTAMP WITH TIME ZONE NOT NULL,
created_by UUID, -- Referencing users.id; on delete set to null.
updated_by UUID, -- Referencing users.id; on delete set to null.
CONSTRAINT agent_workflows_pkey PRIMARY KEY (workspace_id, id),
CONSTRAINT agent_workflows_created_by_fk FOREIGN KEY (created_by)
REFERENCES users(id) ON DELETE SET NULL,
CONSTRAINT agent_workflows_updated_by_fk FOREIGN KEY (updated_by)
REFERENCES users(id) ON DELETE SET NULL
);
CREATE TABLE agent_workflow_versions (
workspace_id UUID NOT NULL REFERENCES workspaces(id) ON DELETE CASCADE,
workflow_id UUID NOT NULL,
version TEXT NOT NULL,
config JSONB NOT NULL DEFAULT '{}'::jsonb,
created_by UUID, -- Referencing users.id; on delete set to null.
updated_by UUID, -- Referencing users.id; on delete set to null.
CONSTRAINT agent_workflow_versions_pkey PRIMARY KEY (workspace_id, workflow_id, version),
CONSTRAINT agent_workflow_versions_workflow_fk FOREIGN KEY (workspace_id, workflow_id)
REFERENCES agent_workflows (workspace_id, id) ON DELETE CASCADE,
CONSTRAINT agent_workflow_versions_created_by_fk FOREIGN KEY (created_by)
REFERENCES users(id) ON DELETE SET NULL,
CONSTRAINT agent_workflow_versions_updated_by_fk FOREIGN KEY (updated_by)
REFERENCES users(id) ON DELETE SET NULL
);
CREATE TABLE agent_workflow_runs (
workspace_id UUID NOT NULL REFERENCES workspaces(id) ON DELETE CASCADE,
workflow_id UUID NOT NULL,
workflow_version TEXT NOT NULL,
id UUID NOT NULL,
key JSONB NOT NULL DEFAULT '{}'::jsonb,
active BOOLEAN NOT NULL DEFAULT FALSE,
status JSONB,
error JSONB,
started_at TIMESTAMP WITH TIME ZONE,
completed_at TIMESTAMP WITH TIME ZONE,
scheduled_at TIMESTAMP WITH TIME ZONE,
created_at TIMESTAMP WITH TIME ZONE NOT NULL,
canceled_at TIMESTAMP WITH TIME ZONE,
CONSTRAINT agent_workflow_runs_pkey PRIMARY KEY (workspace_id, id),
CONSTRAINT agent_workflow_runs_workflow_fk FOREIGN KEY (workspace_id, workflow_id)
REFERENCES agent_workflows (workspace_id, id) ON DELETE CASCADE,
CONSTRAINT agent_workflow_runs_workflow_version_fk FOREIGN KEY (workspace_id, workflow_id, workflow_version)
REFERENCES agent_workflow_versions (workspace_id, workflow_id, version) ON DELETE CASCADE
);
Use cases #
- Meeting happened (say, Zoom)
- Action item identified from transcript
- Task created with description, due date, assignee, priority, account
- Notification when the task is assigned
- Notification when the task is incomplete and due within 24 hours
- Notification when the task is incomplete and overdue by 24 hours
- Question:
- What has this agent done for me?
- Meetings analyzed
- Action items identified across X accounts
- Y tasks created
- X tasks completed on time
- What amount of human work did it replace
- What has this agent done for me?
Tasks/Components to work on #
- Scheduler
- Subscribes to specific events
- Resumes workflow runs on new inputs or when time is due
- Planner
- Knowledge
- Event pubsub
- workspace_id
- Topic
Pubsub #
This design implements a deterministic, high-performance pubsub system in PostgreSQL. It supports hundreds of messages per second with at-least-once delivery semantics. The system avoids database-generated timestamps and autoincrement, instead relying on the application to supply both message IDs (or time-based IDs) and published timestamps. This approach minimizes duplicate reads and contention.
Components #
-
Messages Table (
pubsub_messages)
Stores published messages with:- Topic: A text identifier.
- Message ID: A manually generated identifier (e.g., sequential or time-based) ensuring ordering per topic.
- Data: Message payload stored as JSONB.
- Published At: A timestamp provided by the application for deterministic ordering.
-
Offsets Table (
pubsub_offsets)
Tracks consumer offsets for schedulers:- Consumer ID: A unique key (e.g., scheduler instance ID).
- Offset: The last processed message ID for a given topic.
Schema Definition #
-- Table for published messages
CREATE TABLE pubsub_messages (
topic TEXT NOT NULL,
message_id BIGINT NOT NULL, -- Provided externally (or use TIMESTAMP for time-based IDs)
data JSONB NOT NULL,
published_at TIMESTAMP NOT NULL, -- Provided by the application
PRIMARY KEY (topic, message_id)
);
-- Table for consumer offsets
CREATE TABLE pubsub_offsets (
consumer_id TEXT PRIMARY KEY, -- Unique consumer/scheduler identifier
offset BIGINT NOT NULL -- Last processed message_id
);
Key Design Decisions #
-
Manual Message IDs & Timestamps:
- The application generates message IDs and supplies the timestamp, removing the need for autoincrement and
now()calls. - This ensures deterministic behavior and reduces database contention.
- The application generates message IDs and supplies the timestamp, removing the need for autoincrement and
-
Minimizing Duplicate Reads:
- Consumers introduce a processing delay (e.g., 30 seconds) to ensure that all messages, even with minor clock skews (typically a couple of seconds on ECS Fargate), are fully written and available.
- Query example:
SELECT *
FROM pubsub_messages
WHERE topic = 'task_topic'
AND published_at <= (current_timestamp - interval '30 seconds')
AND message_id > (
SELECT offset FROM pubsub_offsets WHERE consumer_id = 'scheduler_1'
)
ORDER BY message_id;
- At-Least-Once Delivery & Idempotency:
- Consumers are designed to be idempotent, allowing safe reprocessing of messages in the event of duplicate reads.
- Offset updates use an upsert strategy:
INSERT INTO pubsub_offsets (consumer_id, offset)
VALUES ('scheduler_1', 1001) -- Latest processed message_id
ON CONFLICT (consumer_id) DO UPDATE
SET offset = EXCLUDED.offset;
- Performance & Contention:
- The design minimizes read-before-write operations by leveraging application-controlled IDs and delayed consumption.
- Optionally, table partitioning (by topic or time range) can be introduced if throughput demands increase.
Conclusion #
This PostgreSQL-based pubsub design meets the requirements for deterministic message ordering, high throughput, and minimal duplicate reads. The combination of delayed consumption, idempotent processing, and manual control over message IDs and timestamps ensures that the system operates reliably even in environments with slight clock skews.