Integration Polling System Proposal (Revised)
Overview
This proposal integrates the existing plugin-based integration system (src/integration/*) with a Bull queue + Cron scheduler architecture for background polling and task processing.
Key Insight: Task-Based Architecture
The existing integration system uses a task-based pattern where integrations can spawn recursive tasks:
// Gmail example:
runTask({ type: 'getEmailList' }) →
Returns: {
output: [], // No output yet
tasks: [
{ type: 'downloadEmail', payload: { emailId: 'msg-1' } },
{ type: 'downloadEmail', payload: { emailId: 'msg-2' } },
{ type: 'getEmailList', payload: { pageToken: 'next-page' } }
]
}
This is perfect for queue-based processing because tasks spawn more tasks, creating a self-expanding work queue!
Architecture Overview
┌─────────────────────────────────────────────────────────────────┐
│ NestJS Backend │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────────┐ ┌──────────────────┐ │
│ │ Cron Scheduler │────────>│ Queue Producer │ │
│ │ (every 2 min) │ │ (create initial │ │
│ │ │ │ poll tasks) │ │
│ └──────────────────┘ └────────┬─────────┘ │
│ │ │
│ v │
│ ┌─────────────────┐ │
│ │ Bull Queue │ │
│ │ (Redis) │ │
│ │ "task-runner" │ │
│ └────────┬────────┘ │
│ │ │
│ v │
│ ┌──────────────────┐ ┌─────────────────┐ │
│ │ Task Processor │<──────│ Bull Workers │ │
│ │ (run one task) │ │ (pull jobs) │ │
│ └────────┬─────────┘ └─────────────────┘ │
│ │ │
│ │ Load Integration │
│ v │
│ ┌──────────────────────────────────┐ │
│ │ Integration Loader │ │
│ │ - Loads src/integration/* │ │
│ │ - Instantiates Integration │ │
│ │ - Calls integration.runTask() │ │
│ └────────┬─────────────────────────┘ │
│ │ │
│ v │
│ ┌──────────────────────────────────┐ │
│ │ Integration Plugins │ │
│ │ - hello-world/ │ │
│ │ - gmail/ │ │
│ │ - slack/ (future) │ │
│ └────────┬─────────────────────────┘ │
│ │ │
│ │ Returns: { output, tasks } │
│ v │
│ ┌──────────────────┐ ┌─────────────────┐ │
│ │ Save Output to │ │ Enqueue New │ │
│ │ Database │ │ Tasks to Queue │ │
│ │ (content table) │ │ (recursive!) │ │
│ └──────────────────┘ └─────────────────┘ │
│ │
│ ┌──────────────────────────────────────────────┐ │
│ │ PostgreSQL Database │ │
│ │ - user_integration (credentials, state) │ │
│ │ - user_integration_content (fetched data) | │
│ └──────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
Code Organization
backend/src/
├── integration/ # EXISTING: Plugin system
│ ├── _base/
│ │ ├── main.ts # BaseIntegration, BaseTask, etc.
│ │ └── manifest.json
│ ├── gmail/
│ │ ├── src/main.ts # GmailIntegration extends BaseIntegration
│ │ ├── manifest.json # { domain: "gmail", credentials: {...} }
│ │ └── package.json
│ ├── hello-world/
│ │ └── ...
│ └── README.md
│
├── integration-polling/ # NEW: Polling orchestration
│ ├── integration-polling.module.ts
│ ├── integration-polling.scheduler.ts # Cron: find integrations due for poll
│ ├── integration-polling.producer.ts # Create initial "poll" tasks
│ ├── integration-polling.processor.ts # Process tasks, enqueue follow-ups
│ ├── integration-polling.service.ts # Save output, update state
│ └── integration-polling.loader.ts # Load integrations from src/integration/*
│
└── v1/
├── user-integration/ # API for managing integrations
└── user-integration-content/ # API for querying content
Key Separation:
integration/= Plugins (self-contained, task-based integrations)integration-polling/= Orchestration (cron, queue, loading plugins, saving results)v1/= APIs (user-facing REST endpoints)
How It Works
1. Cron Scheduler (Kickoff)
File: integration-polling/integration-polling.scheduler.ts
@Injectable()
export class IntegrationPollingScheduler {
constructor(
private readonly producer: IntegrationPollingProducer,
private readonly userIntegrationService: UserIntegrationService,
) {}
@Cron('*/2 * * * *') // Every 2 minutes
async checkForDuePolls() {
// Find integrations ready for polling
const { results } = await this.userIntegrationService.readMany({
syncStatus: 'active',
nextSyncAtLte: new Date(),
limit: 100,
});
// Create initial "poll" task for each integration
for (const integration of results) {
await this.producer.createInitialPollTask(integration);
}
}
}
2. Queue Producer (Create Initial Task)
File: integration-polling/integration-polling.producer.ts
@Injectable()
export class IntegrationPollingProducer {
constructor(
@InjectQueue('task-runner') private queue: Queue,
) {}
async createInitialPollTask(integration: UserIntegration) {
// Determine the initial task based on integration type
const initialTask = this.getInitialTask(integration);
await this.queue.add(
'run-integration-task',
{
userIntegrationId: integration.id,
userId: integration.userId,
integrationDomain: integration.integrationDomain,
credentials: integration.credentials,
syncState: integration.syncState,
task: initialTask, // e.g., { type: 'getEmailList', payload: {} }
},
{
attempts: 3,
backoff: { type: 'exponential', delay: 2000 },
}
);
}
private getInitialTask(integration: UserIntegration): BaseTask {
// Each integration type has a different starting task
switch (integration.integrationDomain) {
case 'gmail':
return { type: 'getEmailList', payload: { pageToken: null } };
case 'slack':
return { type: 'getChannelList', payload: {} };
case 'ftp':
return { type: 'listDirectory', payload: { path: '/' } };
default:
throw new Error(`Unknown integration: ${integration.integrationDomain}`);
}
}
// Enqueue follow-up tasks returned by integration
async enqueueFollowUpTasks(
jobData: any,
followUpTasks: BaseTask[]
) {
for (const task of followUpTasks) {
await this.queue.add('run-integration-task', {
...jobData,
task, // New task to run
});
}
}
}
3. Task Processor (Run Tasks)
File: integration-polling/integration-polling.processor.ts
@Processor('task-runner')
export class IntegrationPollingProcessor {
constructor(
private readonly loader: IntegrationPollingLoader,
private readonly producer: IntegrationPollingProducer,
private readonly service: IntegrationPollingService,
) {}
@Process('run-integration-task')
async handleTask(job: Job) {
const { userIntegrationId, integrationDomain, credentials, task } = job.data;
// Load the integration plugin
const IntegrationClass = this.loader.loadIntegration(integrationDomain);
const integration = new IntegrationClass(credentials);
// Initialize if needed
await integration.initialize();
// Run the task
const result = await integration.runTask(task);
// Save output to database
if (result.output.length > 0) {
await this.service.saveOutput(
userIntegrationId,
integrationDomain,
result.output
);
}
// Enqueue follow-up tasks
if (result.tasks && result.tasks.length > 0) {
await this.producer.enqueueFollowUpTasks(job.data, result.tasks);
}
// If no more tasks, mark integration sync as complete
const queueStats = await this.checkQueueForIntegration(userIntegrationId);
if (queueStats.pending === 0) {
await this.service.markSyncComplete(userIntegrationId);
}
return {
outputCount: result.output.length,
followUpTaskCount: result.tasks?.length || 0,
};
}
}
4. Integration Loader (Load Plugins)
File: integration-polling/integration-polling.loader.ts
@Injectable()
export class IntegrationPollingLoader {
private integrations = new Map<string, any>();
constructor() {
this.loadAllIntegrations();
}
private loadAllIntegrations() {
// Load all integrations from src/integration/*
const gmailIntegration = require('../integration/gmail/src/main');
const helloWorldIntegration = require('../integration/hello-world/src/main');
// ... more integrations
this.integrations.set('gmail', gmailIntegration.integration().Integration);
this.integrations.set('hello-world', helloWorldIntegration.integration().Integration);
}
loadIntegration(domain: string): typeof BaseIntegration {
const Integration = this.integrations.get(domain);
if (!Integration) {
throw new Error(`Integration not found: ${domain}`);
}
return Integration;
}
getManifest(domain: string): any {
const integration = require(`../integration/${domain}/manifest.json`);
return integration;
}
}
5. Polling Service (Save Results)
File: integration-polling/integration-polling.service.ts
@Injectable()
export class IntegrationPollingService {
constructor(
private readonly contentService: UserIntegrationContentService,
private readonly userIntegrationService: UserIntegrationService,
) {}
async saveOutput(
userIntegrationId: string,
integrationDomain: string,
outputs: BaseTaskOutput[]
) {
for (const output of outputs) {
if (output.code === 200) {
await this.contentService.createOne({
userIntegrationId,
integrationDomain,
content: output.content,
contentMetadata: {
name: output.name,
// ... other metadata
},
externalId: (output as any).id || null,
externalTimestamp: (output as any).timestamp || null,
});
}
}
}
async markSyncComplete(userIntegrationId: string) {
await this.userIntegrationService.updateOne(userIntegrationId, {
lastSyncAt: new Date(),
nextSyncAt: new Date(Date.now() + 15 * 60 * 1000), // 15 min later
syncStatus: 'active',
});
}
}
Database Changes
Add to UserIntegrationEntity:
@Column({ type: 'timestamp', nullable: true })
public lastSyncAt: Date | null;
@Column({ type: 'timestamp', nullable: true })
public nextSyncAt: Date | null;
@Column({ type: 'jsonb', default: {} })
public syncState: Record<string, any>; // Integration-specific state
@Column({ type: 'varchar', default: 'active' })
public syncStatus: 'active' | 'paused' | 'error';
@Column({ type: 'int', default: 15 })
public pollIntervalMinutes: number;
Flow Diagram: Gmail Polling Example
1. Cron runs at 10:00 AM
└─> Finds integration { domain: 'gmail', nextSyncAt: 10:00 AM }
2. Producer creates initial task
└─> Enqueues: {
task: { type: 'getEmailList', payload: { pageToken: null } },
credentials: { ... },
userIntegrationId: 'int-123'
}
3. Worker picks up job, loads Gmail integration
└─> Calls: gmailIntegration.runTask({ type: 'getEmailList', ... })
4. Gmail integration returns:
{
output: [], // No direct output yet
tasks: [
{ type: 'downloadEmail', payload: { emailId: 'msg-1' } },
{ type: 'downloadEmail', payload: { emailId: 'msg-2' } },
{ type: 'downloadEmail', payload: { emailId: 'msg-3' } },
{ type: 'getEmailList', payload: { pageToken: 'next-page' } }
]
}
5. Producer enqueues 4 new tasks
└─> Queue now has 4 jobs waiting
6. Workers process downloadEmail tasks
└─> Each returns:
{
output: [{ code: 200, content: '...email content...' }],
tasks: []
}
7. Service saves each email to user_integration_content table
8. Worker processes getEmailList with pageToken
└─> Returns more downloadEmail tasks...
9. Process repeats until no pageToken
└─> All tasks complete
10. Service updates integration:
└─> lastSyncAt = now
└─> nextSyncAt = now + 15 minutes
Why This Architecture Works
✅ Leverages Existing System
- Uses the existing plugin-based integration system
- No need to rewrite integrations
- Follows the established task-based pattern
✅ Self-Expanding Work Queue
- Tasks spawn more tasks
- Perfect for pagination, recursion, directories
- Queue handles distribution automatically
✅ Scalable
- Multiple workers can process tasks in parallel
- Each task is independent
- No blocking or coordination needed
✅ Resilient
- Tasks persist in Redis
- Failed tasks retry automatically
- Can pause/resume integrations
✅ Extensible
- Add new integrations by dropping them in
src/integration/* - No changes to polling system needed
- Each integration defines its own task types
✅ Similar to N8N
- N8N: Nodes trigger nodes → Create executions → Workers process
- This: Integrations spawn tasks → Enqueue jobs → Workers process
Comparison: Old vs New Proposal
| Aspect | Original Proposal | Revised Proposal |
|---|---|---|
| Integration Pattern | Simple adapter with fetchNewContent() | Task-based with recursive task spawning |
| How it works | Adapter fetches all content at once | Initial task spawns follow-up tasks |
| Pagination | Manual in adapter | Automatic via task spawning |
| Location | integration-polling/adapters/ | integration/ (existing!) |
| Flexibility | One-shot fetch | Recursive, composable tasks |
| Matches existing code | ❌ New pattern | ✅ Uses existing pattern |
Implementation Phases
Phase 1: Database Setup
- Add sync fields to
UserIntegrationEntity - Create migration
- Test with existing data
Phase 2: Polling Infrastructure
- Create
integration-polling/module - Set up Bull queue with Redis
- Implement scheduler, producer, processor
- Create integration loader
Phase 3: Test with Existing Integration
- Use existing
hello-worldintegration as test - Create initial task
- Verify task execution
- Verify output saving
Phase 4: Production Test with Gmail
- Use existing
gmailintegration - Test full flow: getEmailList → downloadEmail
- Verify pagination works
- Verify incremental sync
Phase 5: Monitoring & Refinement
- Add Bull Board dashboard
- Add error tracking
- Add rate limiting
- Optimize queue settings
Phase 6: Additional Integrations
- Add Slack integration
- Add FTP integration
- Document pattern for new integrations
Dependencies
{
"dependencies": {
"@nestjs/bull": "^10.0.0",
"@nestjs/schedule": "^4.0.0",
"bull": "^4.12.0",
"redis": "^4.7.0"
}
}
Configuration
REDIS_URL=redis://localhost:6379
POLLING_CRON_SCHEDULE=*/2 * * * * # Every 2 minutes
POLLING_BATCH_SIZE=100 # Max integrations per cron run
Open Questions
- Task deduplication: Should we prevent duplicate tasks in queue?
- Queue naming: One queue for all integrations, or separate queues per integration type?
- State management: Should we track task completion state in database?
- Rate limiting: How do we enforce per-integration rate limits across workers?
- Large content: Do we need S3 for content > 1MB?
- Incremental sync: How do integrations know what's "new" since last sync?
Testing Strategy
Unit Tests
- Test task processor with mock integrations
- Test integration loader
- Test producer enqueue logic
Integration Tests
- Test full flow with hello-world integration
- Test with real Redis queue
- Verify database persistence
End-to-End Test
- Deploy with Gmail integration
- Add user integration via API
- Wait for cron to trigger
- Verify emails appear in content table
- Check queue metrics in Bull Board
Next Steps
- Review and approve architecture
- Create database migration
- Set up Bull queue module
- Implement integration loader
- Build scheduler/producer/processor
- Test with hello-world
- Test with Gmail
- Deploy to staging