Writing Integrations
Introduction
What is an Integration?
An integration is a plugin that connects to an external service (Gmail, Slack, Google Drive, etc.) and fetches content into your system. Each integration is a self-contained module that knows how to authenticate with the service and retrieve data.
How Integrations Work
Integrations use a task-based architecture where work is broken into small, independent tasks:
- Initial Task - The polling system creates an initial task (e.g., "get email list")
- Task Execution - The integration runs the task and returns two things:
output- Data to save to the databasetasks- New tasks to run (for pagination, downloading files, etc.)
- Recursive Processing - New tasks are queued and processed, spawning more tasks until done
Example: Gmail Flow
1. Task: start (priority 1)
→ Delegates to: getEmailList
2. Task: getEmailList
→ Returns: 50 email IDs + "next page" task
→ Creates: 50 x downloadEmail + 1 x getEmailList (next page)
3. Tasks: 50 x downloadEmail (priority 2) + 1 x getEmailList (priority 2)
→ Each downloadEmail returns: email content
→ getEmailList returns: 50 more email IDs + "next page" task
4. Process continues until no more pages
Getting Started
Integration File Structure
backend/src/integration/gmail/
├── src/
│ └── main.ts # Integration code
├── manifest.json # Integration metadata
└── package.json # Dependencies (tracked but not used yet)
Step 1: Define Your Types
Every integration needs these types:
IMPORTANT: All integrations must support a 'start' task type. This is used by the polling system to initiate fresh syncs every minute with high priority tasks.
// Task types - what operations can your integration perform?
// IMPORTANT: Always include 'start' as the first task type
export type GmailTaskType = 'start' | 'getEmailList' | 'downloadEmail';
// Task payload - data needed to run a task
export class GmailTaskPayload {
emailId?: string | null; // For downloadEmail task
pageToken?: string | null; // For pagination
constructor(config: GmailTaskPayload) {
this.emailId = config.emailId;
this.pageToken = config.pageToken;
}
}
// Task - combines type + payload
export class GmailTask extends BaseTask {
type: GmailTaskType;
payload: GmailTaskPayload;
constructor(type: GmailTaskType, payload: GmailTaskPayload) {
super({ type, payload });
this.type = type;
this.payload = payload;
}
}
// Task output - data to save to database
export class GmailTaskOutput extends BaseTaskOutput {}
// Task result - what runTask() returns
export class GmailTaskResult extends BaseTaskResult {
output: GmailTaskOutput[];
tasks?: GmailTask[];
constructor(config?: GmailTaskResult) {
super();
this.output = config?.output || [];
this.tasks = config?.tasks || [];
}
}
// Credentials - authentication data
export class GmailAuthCredentials extends BaseAuthCredentials {
clientId: string;
clientSecret: string;
accessToken: string;
refreshToken: string;
constructor(credentials: GmailAuthCredentials) {
super();
const { clientId, clientSecret, accessToken, refreshToken } = credentials;
this.clientId = clientId;
this.clientSecret = clientSecret;
this.accessToken = accessToken;
this.refreshToken = refreshToken;
}
}
Step 2: Create Your Integration Class
import { google, gmail_v1 } from 'googleapis';
import {
BaseIntegration,
BaseTask,
BaseTaskResult,
} from '../../_base/main';
export class GmailIntegration extends BaseIntegration {
private oAuth2Client: any;
private gmailClient: gmail_v1.Gmail;
constructor(credentials: GmailAuthCredentials) {
super(credentials);
// Set up your API client
this.oAuth2Client = new google.auth.OAuth2(
this.getCredential('clientId') as string,
this.getCredential('clientSecret') as string,
);
this.gmailClient = google.gmail({
version: 'v1',
auth: this.oAuth2Client,
});
}
// Initialize connection (called once per task)
public async initialize(): Promise<void> {
const credentials: any = {
refresh_token: this.getCredential('refreshToken') as string,
};
this.oAuth2Client.setCredentials(credentials);
}
// Validate credentials
public async validateAuthentication(): Promise<boolean> {
try {
await this.initialize();
await this.gmailClient.users.labels.list({ userId: 'me' });
return true;
} catch (err) {
return false;
}
}
// Main task runner - routes to appropriate handler
public async runTask(task: GmailTask): Promise<GmailTaskResult> {
if (task.type === 'start') {
// Handle the "start" task - this is called every minute by the polling system
// For Gmail, we delegate to getEmailList with no pageToken to start from most recent
return await this.handleGetEmailList(
new GmailTask('getEmailList', new GmailTaskPayload({ pageToken: null }))
);
} else if (task.type === 'getEmailList') {
return await this.handleGetEmailList(task);
} else if (task.type === 'downloadEmail') {
return await this.handleDownloadEmail(task);
}
// Unknown task type
return new GmailTaskResult({ output: [], tasks: [] });
}
// Clean up (called after task completes)
public async shutdown(): Promise<void> {
if (this.oAuth2Client) {
this.oAuth2Client.setCredentials({});
}
}
}
Step 3: Implement Task Handlers
Task Handler Pattern:
- Each task type gets its own handler method
- Handlers return output (data to save) and tasks (work to do)
// Handler for listing emails
private async handleGetEmailList(task: GmailTask): Promise<GmailTaskResult> {
const output: GmailOutput[] = [];
const tasks: GmailTask[] = [];
const { pageToken } = task.payload;
try {
// Call the API
const response = await this.gmailClient.users.messages.list({
userId: 'me',
maxResults: 50,
pageToken: pageToken || undefined,
});
const messages = response.data.messages || [];
// Create a download task for each email
for (const message of messages) {
tasks.push(
new GmailTask('downloadEmail', new GmailTaskPayload({
emailId: message.id,
pageToken: null,
}))
);
}
// If there are more pages, create a task to fetch the next page
if (response.data.nextPageToken) {
tasks.push(
new GmailTask('getEmailList', new GmailTaskPayload({
emailId: null,
pageToken: response.data.nextPageToken,
}))
);
}
} catch (error) {
console.error('Error fetching email list:', error);
}
return new GmailTaskResult({ output, tasks });
}
// Handler for downloading a single email
private async handleDownloadEmail(task: GmailTask): Promise<GmailTaskResult> {
const output: GmailOutput[] = [];
const tasks: GmailTask[] = [];
const { emailId } = task.payload;
try {
// Fetch the full email
const response = await this.gmailClient.users.messages.get({
userId: 'me',
id: emailId!,
format: 'full',
});
// Extract relevant data
const subject = this.extractSubject(response.data);
const from = this.extractFrom(response.data);
// ... extract other fields ...
// Create output in the required format
output.push({
code: 200,
id: emailId!,
timestamp: new Date(parseInt(response.data.internalDate)),
content: {
subject: subject || '(No Subject)',
from,
to,
cc,
body: this.extractBody(response.data),
threadId: response.data.threadId || '',
labelIds: response.data.labelIds || [],
hasAttachments: attachments.length > 0,
attachmentCount: attachments.length,
},
});
} catch (error) {
console.error(`Error downloading email ${emailId}:`, error);
// Return error output
output.push({
code: 500,
id: emailId!,
timestamp: null,
content: { subject: 'Error', body: 'Failed to download email' },
});
}
return new GmailTaskResult({ output, tasks });
}
Step 4: Register Your Integration
At the end of your main.ts file:
export const integration = () => {
return {
manifest: require('../manifest.json'),
Integration: GmailIntegration,
};
};
Step 5: Create manifest.json
{
"domain": "gmail",
"name": "Gmail",
"version": "1.0.0",
"description": "Gmail email integration",
"author": "Your Name",
"controlFlow": {
// TBD: this is for a future UI layer to setup your integration
}
}
Core Concepts
Output Format
When your integration returns output, it must follow this structure:
{
code: 200, // HTTP-style status (200 = success, 500 = error)
id: 'msg-123', // External ID - used for idempotency (prevents duplicates)
content: { ... }, // Integration-specific JSON structure
timestamp: Date, // External timestamp - when this item was created in the source system
}
Important Fields:
code- HTTP-style status code (200 = success, 500 = error)id- Must be the unique ID from the external system. This prevents the same item from being downloaded twice.timestamp- Should be when the item was created in the external system (not when you downloaded it).content- Your integration defines the structure. This is where all your integration-specific data goes.
Example for Gmail:
{
code: 200,
id: 'msg-abc123',
timestamp: new Date('2025-11-09T10:30:00Z'),
content: {
subject: 'Meeting Tomorrow',
from: 'alice@example.com',
to: 'bob@example.com',
cc: 'charlie@example.com',
body: 'Let\'s meet at 2pm...',
threadId: 'thread-xyz',
labelIds: ['INBOX', 'IMPORTANT'],
hasAttachments: true,
attachmentCount: 2,
}
}
Database Storage
The polling system automatically saves your output to the user_integration_content table:
{
id: 'uuid', // Auto-generated
userId: 'user-123', // Which user owns this data
userIntegrationId: 'int-456', // Which user-integration fetched this
integrationDomain: 'gmail', // Your integration domain
externalId: 'msg-123', // The 'id' from your output (usually the id of the data on the external server)
externalTimestamp: Date, // The 'timestamp' from your output (usually the data the data was created on the external server)
content: { ... }, // The 'content' from your output (full JSON object)
contentSize: 1234, // Auto-calculated size in bytes
createdAt: Date, // When saved to our database
updatedAt: Date // When last updated (this should rarely change but might be useful in some situations)
}
Integration Lifecycle
Here's what happens when your integration runs:
1. Polling system creates job in queue
└─> Job contains: credentials, userIntegrationId, task
2. Worker picks up job
└─> Creates instance: new GmailIntegration(credentials, contentExists)
3. Worker calls initialize()
└─> Your code: Set up API client, authenticate
4. Worker calls runTask(task)
└─> Your code: Execute task, return { output, tasks }
└─> Your code: Returns follow up tasks as part of the return value
5. Worker saves output to database
└─> Automatic based on your output format
6. Worker enqueues follow-up tasks
└─> Your tasks array becomes new jobs in the queue
7. Worker calls shutdown()
└─> Your code: Clean up connections
8. Repeat steps 2-7 for each task until queue is empty
Preventing Duplicates (Idempotency)
The system provides a contentExists callback function to check which items already exist in the database:
// In your integration constructor
constructor(credentials: any, contentExists?: ContentExistenceChecker) {
super(credentials, contentExists);
}
// In your task handler
private async handleGetEmailList(task: GmailTask): Promise<GmailTaskResult> {
// ... fetch messages from API ...
// Get all message IDs
const messageIds = messages.map(m => m.id);
// Check which ones we already have
const existingIds = await this.contentExists(messageIds);
// Only create tasks for messages we don't have yet
const newMessages = messages.filter(m => !existingIds.has(m.id));
for (const message of newMessages) {
tasks.push(
new GmailTask('downloadEmail', new GmailTaskPayload({
emailId: message.id,
pageToken: null,
}))
);
}
return new GmailTaskResult({ output, tasks });
}
This prevents re-downloading content that already exists in your database.
Always use this pattern to avoid wasting API quota, database storage, and to avoid duplicate data.
Common Patterns
Pattern 1: List → Download
Most integrations follow this pattern:
- List task - Fetch IDs/metadata of items
- Download tasks - Fetch full content for each item
// Gmail: getEmailList → downloadEmail
// Google Drive: listFiles → downloadFile
// Slack: getChannels → getMessages
Pattern 2: Pagination
Handle pagination by creating a new list task with a page token:
if (response.data.nextPageToken) {
tasks.push(
new GmailTask('getEmailList', new GmailTaskPayload({
emailId: null,
pageToken: response.data.nextPageToken,
}))
);
}
Pattern 3: Recursive Directory Traversal
For file systems, spawn tasks for subdirectories:
// FTP example
if (item.type === 'directory') {
tasks.push(
new FtpTask('listDirectory', new FtpTaskPayload({
path: item.path,
}))
);
}
Advanced Features
Priority Queue System
The polling system uses job priorities to ensure new emails/content are fetched quickly, even during long-running initial syncs.
Priority Levels (lower number = higher priority):
The system uses gradual priority degradation - each generation of tasks gets slightly lower priority:
- Priority 1 -
'start'tasks from cron (highest) - Priority 2 - First generation (email list page 1, fresh downloads)
- Priority 3 - Second generation (page 2, or downloads from page 1)
- Priority 4 - Third generation
- Priority 5+ - Deeper generations
- Priority 10 - Maximum (deep backfill tasks, capped)
Why this matters:
Imagine your initial Gmail sync is downloading 10,000 old emails:
- Without priorities: New emails that arrive won't be fetched until all 10,000 old emails are processed (could take hours)
- With priorities:
- Every minute, a fresh
'start'task (priority 1) jumps to the front - It discovers 5 new emails and creates download tasks for them (priority 2)
- Those 5 new downloads jump ahead of the 9,995 remaining old downloads (priority 5+)
- New emails are downloaded within ~1-2 minutes
- Every minute, a fresh
Your integration's responsibility:
When handling the 'start' task, your integration should:
- Start from the most recent content (e.g., most recent emails, newest files)
- Use the
contentExistscallback to skip content that's already downloaded - Stop early when you hit content that already exists (see "Early Stop Optimization" below)
Early Stop Optimization
To prevent subsequent syncs from paginating through thousands of old items, implement an early-stop mechanism:
// Add to your task payload
export class GmailTaskPayload {
emailId?: string | null;
pageToken?: string | null;
consecutiveEmptyPages?: number; // NEW: Track empty pages
}
// In your list handler
private async handleGetEmailList(task: GmailTask): Promise<GmailTaskResult> {
const { pageToken, consecutiveEmptyPages = 0 } = task.payload;
// Stop early if we've seen 3 consecutive pages with no new content
const MAX_EMPTY_PAGES = 3;
if (consecutiveEmptyPages >= MAX_EMPTY_PAGES) {
console.log('Stopping pagination - no new content');
return new GmailTaskResult({ output: [], tasks: [] });
}
// ... fetch messages ...
// Check what's new
const existingIds = await this.contentExists(messageIds);
const newMessages = messages.filter(m => !existingIds.has(m.id));
// Update counter
const nextEmptyPageCount = newMessages.length > 0 ? 0 : consecutiveEmptyPages + 1;
// Continue to next page with updated counter
if (response.data.nextPageToken) {
tasks.push(
new GmailTask('getEmailList', new GmailTaskPayload({
pageToken: response.data.nextPageToken,
consecutiveEmptyPages: nextEmptyPageCount,
}))
);
}
return new GmailTaskResult({ output, tasks });
}
This way, subsequent syncs stop after checking 3 pages with no new content instead of paginating through the entire history.
Handling Deletions and Updates
Many integrations need to detect when content has been deleted or modified in the external system (Gmail, Slack, etc.).
The Challenge
When a user deletes an email in Gmail, archives a Slack message, or removes content in any external system, our database still contains a copy of that content. We need a way to detect these changes and update our records accordingly.
Problems to solve:
- Content can be deleted while our system is offline
- Full mailbox scans are inefficient for large datasets
- We need to detect changes without excessive API calls
- Race conditions can occur during concurrent syncs
Solution: History/Delta API Pattern
Most modern APIs provide a "history" or "delta" endpoint that returns only changes since a specific point in time.
How it works:
- Initial Sync: When first syncing content, the API returns a
historyIdordeltaToken - Store Token: Save this token in
user_integration.syncState.lastHistoryId - Periodic Checks: Regularly call the history endpoint with your stored token
- Process Changes: API returns only items that were added, deleted, or modified since that token
- Update Token: Store the new
historyIdfor the next check
Implementation Steps
1. Add History Task Type
export type GmailTaskType =
| 'start' // Initial sync
| 'getEmailList' // Paginate through emails
| 'downloadEmail' // Download individual email
| 'checkHistory'; // NEW: Check for changes/deletions
export class GmailTaskPayload {
emailId?: string | null;
pageToken?: string | null;
startHistoryId?: string | null; // NEW: For history checks
}
2. Implement History Handler
private async handleCheckHistory(task: GmailTask): Promise<GmailTaskResult> {
const { startHistoryId } = task.payload;
// Call the external API's history endpoint
const response = await this.gmailClient.users.history.list({
userId: 'me',
startHistoryId: startHistoryId,
historyTypes: ['messageAdded', 'messageDeleted', 'labelAdded', 'labelRemoved']
});
const output: GmailTaskOutput[] = [];
const tasks: GmailTask[] = [];
// Process deletions
for (const history of response.data.history || []) {
if (history.messagesDeleted) {
for (const deleted of history.messagesDeleted) {
// Mark as deleted with special action
output.push(new GmailTaskOutput({
code: 200,
id: deleted.message.id,
content: { action: 'delete' }, // Special marker for deletions
timestamp: new Date(),
}));
}
}
// Process additions (new emails since last check)
if (history.messagesAdded) {
for (const added of history.messagesAdded) {
// Create download task for new message
tasks.push(new GmailTask('downloadEmail',
new GmailTaskPayload({ emailId: added.message.id })
));
}
}
}
// Return updated history token
return new GmailTaskResult({
output,
tasks,
syncState: {
lastHistoryId: response.data.historyId // Store for next check
}
});
}
3. Update Task Result Type
export class GmailTaskResult extends BaseTaskResult {
output: GmailTaskOutput[];
tasks?: GmailTask[];
syncState?: {
lastHistoryId?: string; // NEW: Updated history token
};
}
4. Polling System Integration
The polling processor should detect syncState and update the database:
// In polling.processor.ts (example - not implemented yet)
if (result.syncState?.lastHistoryId) {
await this.userIntegrationService.updateOne({
id: userIntegrationId,
syncState: {
...currentSyncState,
lastHistoryId: result.syncState.lastHistoryId
}
});
}
The polling service should handle deletion markers:
// In polling.service.ts - saveOutput() (example - not implemented yet)
for (const output of successfulOutputs) {
if (output.content?.action === 'delete') {
await this.contentService.markAsDeleted(userIntegrationId, output.id);
} else {
await this.contentService.createOne({...});
}
}
Database Schema for Deletions
Add deletion tracking fields to user_integration_content:
ALTER TABLE user_integration_content
ADD COLUMN deleted_at TIMESTAMP NULL,
ADD COLUMN is_deleted BOOLEAN DEFAULT FALSE;
CREATE INDEX idx_user_integration_content_is_deleted
ON user_integration_content(is_deleted, user_integration_id);
Scheduling Strategy
History checks should run more frequently than full syncs:
Full Sync (Initial/Backfill):
- Run every 15-60 minutes
- Fetches new content only
- Low priority for deep backfill tasks
History Check (Changes/Deletes):
- Run every 1-5 minutes
- Detects deletions and recent additions
- High priority for real-time updates
API-Specific Examples
Gmail History API
- Endpoint:
GET /gmail/v1/users/me/history?startHistoryId={id} - Token:
syncState.lastHistoryId - Changes:
messagesAdded,messagesDeleted,labelsAdded,labelsRemoved - Expiration: ~1 week - must handle
404errors and fall back to full sync
Slack Conversations API
- Endpoint: Cursor-based pagination with timestamps
- Token:
syncState.lastMessageTs - Changes: Compare timestamps, check for
is_deleted: trueflag
Microsoft Graph API
- Endpoint:
GET /me/messages/delta?$deltatoken={token} - Token:
syncState.deltaToken - Changes: Returns
@removedannotation for deleted items
Fallback Strategy
If history is unavailable (token expired, API doesn't support it):
- Weekly Full Comparison: Fetch all external IDs from the API
- Compare with Database: Find content in DB that's not in API response
- Mark as Deleted: Soft-delete missing items
- Update History Token: Get new token for future incremental syncs
Best Practices
✅ DO:
- Use soft deletes (
deleted_at,is_deleted) rather than hard deletes - Store history tokens in
syncStatefor persistence - Handle token expiration gracefully (fall back to full sync)
- Run history checks frequently (1-5 minutes)
- Use high priority for history check tasks
- Filter deleted content from user queries by default
❌ DON'T:
- Hard delete content (you lose audit trail)
- Ignore token expiration (can miss deletions)
- Check history too infrequently (defeats the purpose)
- Use same frequency as full sync (inefficient)
- Forget to handle race conditions (multiple workers)
Reference
Complete Integration Checklist
- Define task types for your integration (must include
'start'as first type) - Handle the
'start'task inrunTask()method - Create credential class extending
BaseAuthCredentials - Implement
initialize()for authentication - Implement
validateAuthentication() - Implement
runTask()with routing to handlers - Create handler for each task type
- Return output in correct format (code, id, content, timestamp)
- Define your content structure (what fields go in the content object)
- Use
contentExiststo prevent duplicates - Handle pagination by creating new tasks
- Implement early stop optimization (track consecutive empty pages)
- Create
manifest.jsonwith all task types listed - Register integration with
integration()export - (Optional) Implement history/delta checking for deletions
Complete Gmail Example
See src/integration/gmail/src/main.ts for a complete, working example that demonstrates:
- OAuth2 authentication
- Pagination (email list with pageToken)
- Idempotency (checking for existing emails)
- Early stop optimization (consecutive empty pages)
- Structured output format
- Task spawning pattern
- Error handling
- Priority queue integration
Next Steps
After creating your integration:
- Add to Integration Loader - The system auto-discovers integrations in
src/integration/*/src/main.ts - Create User Integration - Use the CLI to link a user to your integration
- Monitor - Watch logs and database to verify data is being fetched
- Optimize - Add rate limiting, better error handling, retry logic as needed
Troubleshooting
Q: My integration isn't being picked up
- Check the manifest.json exists
- Verify the integration() function is exported
- Restart the backend to reload integrations
Q: Tasks aren't spawning
- Ensure you're returning tasks in the result
- Check task types match your defined types
- Verify payload structure is correct
Q: Content isn't saving
- Check output has
code: 200 - Verify
idfield exists (must be the external system's unique ID) - Make sure
contentis a valid object - Ensure
timestampis a valid Date object
Q: Duplicates are being created
- Use the
contentExistscallback to check existing items - Ensure
idfield is the external system's unique ID (not a randomly generated one) - Check idempotency logic in your task handlers
- Verify you're filtering out existing IDs before creating download tasks
Q: New content takes too long to appear
- Verify your
'start'task starts from the most recent content - Check that you're using
contentExiststo skip already-downloaded items - Implement early stop optimization
- Ensure tasks are being created with appropriate priorities