Skip to main content

Integration Polling System Proposal (Revised)

Overview

This proposal integrates the existing plugin-based integration system (src/integration/*) with a Bull queue + Cron scheduler architecture for background polling and task processing.

Key Insight: Task-Based Architecture

The existing integration system uses a task-based pattern where integrations can spawn recursive tasks:

// Gmail example:
runTask({ type: 'getEmailList' })
Returns: {
output: [], // No output yet
tasks: [
{ type: 'downloadEmail', payload: { emailId: 'msg-1' } },
{ type: 'downloadEmail', payload: { emailId: 'msg-2' } },
{ type: 'getEmailList', payload: { pageToken: 'next-page' } }
]
}

This is perfect for queue-based processing because tasks spawn more tasks, creating a self-expanding work queue!

Architecture Overview

┌─────────────────────────────────────────────────────────────────┐
│ NestJS Backend │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────────┐ ┌──────────────────┐ │
│ │ Cron Scheduler │────────>│ Queue Producer │ │
│ │ (every 2 min) │ │ (create initial │ │
│ │ │ │ poll tasks) │ │
│ └──────────────────┘ └────────┬─────────┘ │
│ │ │
│ v │
│ ┌─────────────────┐ │
│ │ Bull Queue │ │
│ │ (Redis) │ │
│ │ "task-runner" │ │
│ └────────┬────────┘ │
│ │ │
│ v │
│ ┌──────────────────┐ ┌─────────────────┐ │
│ │ Task Processor │<──────│ Bull Workers │ │
│ │ (run one task) │ │ (pull jobs) │ │
│ └────────┬─────────┘ └─────────────────┘ │
│ │ │
│ │ Load Integration │
│ v │
│ ┌──────────────────────────────────┐ │
│ │ Integration Loader │ │
│ │ - Loads src/integration/* │ │
│ │ - Instantiates Integration │ │
│ │ - Calls integration.runTask() │ │
│ └────────┬─────────────────────────┘ │
│ │ │
│ v │
│ ┌──────────────────────────────────┐ │
│ │ Integration Plugins │ │
│ │ - hello-world/ │ │
│ │ - gmail/ │ │
│ │ - slack/ (future) │ │
│ └────────┬─────────────────────────┘ │
│ │ │
│ │ Returns: { output, tasks } │
│ v │
│ ┌──────────────────┐ ┌─────────────────┐ │
│ │ Save Output to │ │ Enqueue New │ │
│ │ Database │ │ Tasks to Queue │ │
│ │ (content table) │ │ (recursive!) │ │
│ └──────────────────┘ └─────────────────┘ │
│ │
│ ┌──────────────────────────────────────────────┐ │
│ │ PostgreSQL Database │ │
│ │ - user_integration (credentials, state) │ │
│ │ - user_integration_content (fetched data) | │
│ └──────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘

Code Organization

backend/src/
├── integration/ # EXISTING: Plugin system
│ ├── _base/
│ │ ├── main.ts # BaseIntegration, BaseTask, etc.
│ │ └── manifest.json
│ ├── gmail/
│ │ ├── src/main.ts # GmailIntegration extends BaseIntegration
│ │ ├── manifest.json # { domain: "gmail", credentials: {...} }
│ │ └── package.json
│ ├── hello-world/
│ │ └── ...
│ └── README.md

├── integration-polling/ # NEW: Polling orchestration
│ ├── integration-polling.module.ts
│ ├── integration-polling.scheduler.ts # Cron: find integrations due for poll
│ ├── integration-polling.producer.ts # Create initial "poll" tasks
│ ├── integration-polling.processor.ts # Process tasks, enqueue follow-ups
│ ├── integration-polling.service.ts # Save output, update state
│ └── integration-polling.loader.ts # Load integrations from src/integration/*

└── v1/
├── user-integration/ # API for managing integrations
└── user-integration-content/ # API for querying content

Key Separation:

  • integration/ = Plugins (self-contained, task-based integrations)
  • integration-polling/ = Orchestration (cron, queue, loading plugins, saving results)
  • v1/ = APIs (user-facing REST endpoints)

How It Works

1. Cron Scheduler (Kickoff)

File: integration-polling/integration-polling.scheduler.ts

@Injectable()
export class IntegrationPollingScheduler {
constructor(
private readonly producer: IntegrationPollingProducer,
private readonly userIntegrationService: UserIntegrationService,
) {}

@Cron('*/2 * * * *') // Every 2 minutes
async checkForDuePolls() {
// Find integrations ready for polling
const { results } = await this.userIntegrationService.readMany({
syncStatus: 'active',
nextSyncAtLte: new Date(),
limit: 100,
});

// Create initial "poll" task for each integration
for (const integration of results) {
await this.producer.createInitialPollTask(integration);
}
}
}

2. Queue Producer (Create Initial Task)

File: integration-polling/integration-polling.producer.ts

@Injectable()
export class IntegrationPollingProducer {
constructor(
@InjectQueue('task-runner') private queue: Queue,
) {}

async createInitialPollTask(integration: UserIntegration) {
// Determine the initial task based on integration type
const initialTask = this.getInitialTask(integration);

await this.queue.add(
'run-integration-task',
{
userIntegrationId: integration.id,
userId: integration.userId,
integrationDomain: integration.integrationDomain,
credentials: integration.credentials,
syncState: integration.syncState,
task: initialTask, // e.g., { type: 'getEmailList', payload: {} }
},
{
attempts: 3,
backoff: { type: 'exponential', delay: 2000 },
}
);
}

private getInitialTask(integration: UserIntegration): BaseTask {
// Each integration type has a different starting task
switch (integration.integrationDomain) {
case 'gmail':
return { type: 'getEmailList', payload: { pageToken: null } };
case 'slack':
return { type: 'getChannelList', payload: {} };
case 'ftp':
return { type: 'listDirectory', payload: { path: '/' } };
default:
throw new Error(`Unknown integration: ${integration.integrationDomain}`);
}
}

// Enqueue follow-up tasks returned by integration
async enqueueFollowUpTasks(
jobData: any,
followUpTasks: BaseTask[]
) {
for (const task of followUpTasks) {
await this.queue.add('run-integration-task', {
...jobData,
task, // New task to run
});
}
}
}

3. Task Processor (Run Tasks)

File: integration-polling/integration-polling.processor.ts

@Processor('task-runner')
export class IntegrationPollingProcessor {
constructor(
private readonly loader: IntegrationPollingLoader,
private readonly producer: IntegrationPollingProducer,
private readonly service: IntegrationPollingService,
) {}

@Process('run-integration-task')
async handleTask(job: Job) {
const { userIntegrationId, integrationDomain, credentials, task } = job.data;

// Load the integration plugin
const IntegrationClass = this.loader.loadIntegration(integrationDomain);
const integration = new IntegrationClass(credentials);

// Initialize if needed
await integration.initialize();

// Run the task
const result = await integration.runTask(task);

// Save output to database
if (result.output.length > 0) {
await this.service.saveOutput(
userIntegrationId,
integrationDomain,
result.output
);
}

// Enqueue follow-up tasks
if (result.tasks && result.tasks.length > 0) {
await this.producer.enqueueFollowUpTasks(job.data, result.tasks);
}

// If no more tasks, mark integration sync as complete
const queueStats = await this.checkQueueForIntegration(userIntegrationId);
if (queueStats.pending === 0) {
await this.service.markSyncComplete(userIntegrationId);
}

return {
outputCount: result.output.length,
followUpTaskCount: result.tasks?.length || 0,
};
}
}

4. Integration Loader (Load Plugins)

File: integration-polling/integration-polling.loader.ts

@Injectable()
export class IntegrationPollingLoader {
private integrations = new Map<string, any>();

constructor() {
this.loadAllIntegrations();
}

private loadAllIntegrations() {
// Load all integrations from src/integration/*
const gmailIntegration = require('../integration/gmail/src/main');
const helloWorldIntegration = require('../integration/hello-world/src/main');
// ... more integrations

this.integrations.set('gmail', gmailIntegration.integration().Integration);
this.integrations.set('hello-world', helloWorldIntegration.integration().Integration);
}

loadIntegration(domain: string): typeof BaseIntegration {
const Integration = this.integrations.get(domain);
if (!Integration) {
throw new Error(`Integration not found: ${domain}`);
}
return Integration;
}

getManifest(domain: string): any {
const integration = require(`../integration/${domain}/manifest.json`);
return integration;
}
}

5. Polling Service (Save Results)

File: integration-polling/integration-polling.service.ts

@Injectable()
export class IntegrationPollingService {
constructor(
private readonly contentService: UserIntegrationContentService,
private readonly userIntegrationService: UserIntegrationService,
) {}

async saveOutput(
userIntegrationId: string,
integrationDomain: string,
outputs: BaseTaskOutput[]
) {
for (const output of outputs) {
if (output.code === 200) {
await this.contentService.createOne({
userIntegrationId,
integrationDomain,
content: output.content,
contentMetadata: {
name: output.name,
// ... other metadata
},
externalId: (output as any).id || null,
externalTimestamp: (output as any).timestamp || null,
});
}
}
}

async markSyncComplete(userIntegrationId: string) {
await this.userIntegrationService.updateOne(userIntegrationId, {
lastSyncAt: new Date(),
nextSyncAt: new Date(Date.now() + 15 * 60 * 1000), // 15 min later
syncStatus: 'active',
});
}
}

Database Changes

Add to UserIntegrationEntity:

@Column({ type: 'timestamp', nullable: true })
public lastSyncAt: Date | null;

@Column({ type: 'timestamp', nullable: true })
public nextSyncAt: Date | null;

@Column({ type: 'jsonb', default: {} })
public syncState: Record<string, any>; // Integration-specific state

@Column({ type: 'varchar', default: 'active' })
public syncStatus: 'active' | 'paused' | 'error';

@Column({ type: 'int', default: 15 })
public pollIntervalMinutes: number;

Flow Diagram: Gmail Polling Example

1. Cron runs at 10:00 AM
└─> Finds integration { domain: 'gmail', nextSyncAt: 10:00 AM }

2. Producer creates initial task
└─> Enqueues: {
task: { type: 'getEmailList', payload: { pageToken: null } },
credentials: { ... },
userIntegrationId: 'int-123'
}

3. Worker picks up job, loads Gmail integration
└─> Calls: gmailIntegration.runTask({ type: 'getEmailList', ... })

4. Gmail integration returns:
{
output: [], // No direct output yet
tasks: [
{ type: 'downloadEmail', payload: { emailId: 'msg-1' } },
{ type: 'downloadEmail', payload: { emailId: 'msg-2' } },
{ type: 'downloadEmail', payload: { emailId: 'msg-3' } },
{ type: 'getEmailList', payload: { pageToken: 'next-page' } }
]
}

5. Producer enqueues 4 new tasks
└─> Queue now has 4 jobs waiting

6. Workers process downloadEmail tasks
└─> Each returns:
{
output: [{ code: 200, content: '...email content...' }],
tasks: []
}

7. Service saves each email to user_integration_content table

8. Worker processes getEmailList with pageToken
└─> Returns more downloadEmail tasks...

9. Process repeats until no pageToken
└─> All tasks complete

10. Service updates integration:
└─> lastSyncAt = now
└─> nextSyncAt = now + 15 minutes

Why This Architecture Works

✅ Leverages Existing System

  • Uses the existing plugin-based integration system
  • No need to rewrite integrations
  • Follows the established task-based pattern

✅ Self-Expanding Work Queue

  • Tasks spawn more tasks
  • Perfect for pagination, recursion, directories
  • Queue handles distribution automatically

✅ Scalable

  • Multiple workers can process tasks in parallel
  • Each task is independent
  • No blocking or coordination needed

✅ Resilient

  • Tasks persist in Redis
  • Failed tasks retry automatically
  • Can pause/resume integrations

✅ Extensible

  • Add new integrations by dropping them in src/integration/*
  • No changes to polling system needed
  • Each integration defines its own task types

✅ Similar to N8N

  • N8N: Nodes trigger nodes → Create executions → Workers process
  • This: Integrations spawn tasks → Enqueue jobs → Workers process

Comparison: Old vs New Proposal

AspectOriginal ProposalRevised Proposal
Integration PatternSimple adapter with fetchNewContent()Task-based with recursive task spawning
How it worksAdapter fetches all content at onceInitial task spawns follow-up tasks
PaginationManual in adapterAutomatic via task spawning
Locationintegration-polling/adapters/integration/ (existing!)
FlexibilityOne-shot fetchRecursive, composable tasks
Matches existing code❌ New pattern✅ Uses existing pattern

Implementation Phases

Phase 1: Database Setup

  • Add sync fields to UserIntegrationEntity
  • Create migration
  • Test with existing data

Phase 2: Polling Infrastructure

  • Create integration-polling/ module
  • Set up Bull queue with Redis
  • Implement scheduler, producer, processor
  • Create integration loader

Phase 3: Test with Existing Integration

  • Use existing hello-world integration as test
  • Create initial task
  • Verify task execution
  • Verify output saving

Phase 4: Production Test with Gmail

  • Use existing gmail integration
  • Test full flow: getEmailList → downloadEmail
  • Verify pagination works
  • Verify incremental sync

Phase 5: Monitoring & Refinement

  • Add Bull Board dashboard
  • Add error tracking
  • Add rate limiting
  • Optimize queue settings

Phase 6: Additional Integrations

  • Add Slack integration
  • Add FTP integration
  • Document pattern for new integrations

Dependencies

{
"dependencies": {
"@nestjs/bull": "^10.0.0",
"@nestjs/schedule": "^4.0.0",
"bull": "^4.12.0",
"redis": "^4.7.0"
}
}

Configuration

REDIS_URL=redis://localhost:6379
POLLING_CRON_SCHEDULE=*/2 * * * * # Every 2 minutes
POLLING_BATCH_SIZE=100 # Max integrations per cron run

Open Questions

  1. Task deduplication: Should we prevent duplicate tasks in queue?
  2. Queue naming: One queue for all integrations, or separate queues per integration type?
  3. State management: Should we track task completion state in database?
  4. Rate limiting: How do we enforce per-integration rate limits across workers?
  5. Large content: Do we need S3 for content > 1MB?
  6. Incremental sync: How do integrations know what's "new" since last sync?

Testing Strategy

Unit Tests

  • Test task processor with mock integrations
  • Test integration loader
  • Test producer enqueue logic

Integration Tests

  • Test full flow with hello-world integration
  • Test with real Redis queue
  • Verify database persistence

End-to-End Test

  • Deploy with Gmail integration
  • Add user integration via API
  • Wait for cron to trigger
  • Verify emails appear in content table
  • Check queue metrics in Bull Board

Next Steps

  1. Review and approve architecture
  2. Create database migration
  3. Set up Bull queue module
  4. Implement integration loader
  5. Build scheduler/producer/processor
  6. Test with hello-world
  7. Test with Gmail
  8. Deploy to staging

References