Integration Polling System - Implementation Plan
Overview
Add background polling to existing integration plugins using Cron + Bull Queue + Task-based processing.
Architecture
Cron (every 2 min)
└─> Find integrations due for sync
└─> Create initial task for each → Bull Queue (Redis)
└─> Worker picks up task
└─> Load integration plugin (gmail, slack, etc.)
└─> Run task → Returns { output, tasks }
├─> Save output to database
└─> Enqueue follow-up tasks (recursive!)
Key Insight: Integrations already use task-based pattern where tasks spawn more tasks. Perfect for queues!
// Example: Gmail getEmailList task spawns downloadEmail tasks
runTask({ type: 'getEmailList' }) →
{
output: [],
tasks: [
{ type: 'downloadEmail', payload: { emailId: 'msg-1' } },
{ type: 'downloadEmail', payload: { emailId: 'msg-2' } },
{ type: 'getEmailList', payload: { pageToken: 'next' } } // Pagination!
]
}
Code Organization
backend/src/integration/
├── _base/
│ └── main.ts # BaseIntegration, BaseTask (existing)
│
├── gmail/ # Integration plugin (existing)
│ ├── src/main.ts
│ ├── manifest.json
│ ├── gmail.log.ts # NEW: Logging
│ └── gmail.error.ts # NEW: Error handling
│
├── hello-world/ # Integration plugin (existing)
│ ├── src/main.ts
│ ├── hello-world.log.ts # NEW: Logging
│ └── hello-world.error.ts # NEW: Error handling
│
├── polling/ # NEW: Polling orchestration
│ ├── polling.module.ts
│ ├── polling.scheduler.ts # Cron: find due integrations
│ ├── polling.producer.ts # Create & enqueue tasks
│ ├── polling.processor.ts # Run tasks, enqueue follow-ups
│ ├── polling.service.ts # Save output, update state
│ ├── polling.loader.ts # Load integration plugins
│ ├── polling.log.ts # Logging templates
│ └── polling.error.ts # Error classes
│
├── README.md # Integration system overview
└── plan-short.md # This file
Logging Standards
Polling System Logs
File: polling/polling.log.ts
import { createLogs } from '../../common/logger/create-logs.helper';
export const pollingLogs = createLogs('pollingLogs', {
// Scheduler
cycleStarting: 'Polling cycle initiated',
cycleComplete: 'Polling cycle completed with $COUNT integrations processed',
integrationsDueForSync: 'Found $COUNT integrations due for synchronization',
// Producer
taskCreationStarting: 'Creating initial task for user-integration $INTEGRATION_ID ($DOMAIN)',
taskCreationComplete: 'Initial task created and enqueued for user-integration $INTEGRATION_ID',
followUpTasksEnqueuing: 'Enqueuing $COUNT follow-up tasks for user-integration $INTEGRATION_ID',
followUpTasksEnqueued: 'Follow-up tasks enqueued successfully',
// Processor
taskProcessingStarting: 'Processing task $TASK_TYPE for user-integration $INTEGRATION_ID',
taskProcessingComplete: 'Task $TASK_TYPE completed: $OUTPUT_COUNT outputs, $TASK_COUNT follow-up tasks',
integrationLoadStarting: 'Loading integration plugin for domain $DOMAIN',
integrationLoadComplete: 'Integration plugin loaded successfully for domain $DOMAIN',
integrationInitializeStarting: 'Initializing integration instance for $INTEGRATION_ID',
integrationInitializeComplete: 'Integration instance initialized successfully',
// Service
outputSavingStarting: 'Saving $COUNT outputs to database for user-integration $INTEGRATION_ID',
outputSavingComplete: 'Outputs saved successfully to database',
syncCompleteMarking: 'Marking sync complete for user-integration $INTEGRATION_ID',
syncCompleteMarked: 'Sync marked complete, next sync scheduled for $NEXT_SYNC_AT',
// Loader
integrationsLoadingStarting: 'Loading integration plugins from filesystem',
integrationsLoadingComplete: 'Loaded $COUNT integration plugins: $DOMAINS',
integrationNotFound: 'Integration plugin not found for domain $DOMAIN',
// Errors
taskProcessingFailed: 'Task $TASK_TYPE failed for user-integration $INTEGRATION_ID: $ERROR',
integrationLoadFailed: 'Failed to load integration plugin for domain $DOMAIN: $ERROR',
outputSavingFailed: 'Failed to save outputs for user-integration $INTEGRATION_ID: $ERROR',
unexpectedError: 'Unexpected error during polling operation: $ERROR',
});
Integration Plugin Logs
File: gmail/gmail.log.ts
import { createLogs } from '../../common/logger/create-logs.helper';
export const gmailLogs = createLogs('gmailLogs', {
// Initialization
initializeStarting: 'Gmail integration initialization started',
initializeComplete: 'Gmail OAuth2 client configured successfully',
credentialsSet: 'Gmail credentials applied to OAuth2 client',
// Authentication
validateAuthStarting: 'Validating Gmail authentication credentials',
validateAuthComplete: 'Gmail authentication validated successfully',
validateAuthFailed: 'Gmail authentication validation failed: $ERROR',
// getEmailList task
emailListFetchStarting: 'Fetching email list with pageToken: $PAGE_TOKEN',
emailListFetchComplete: 'Email list fetched: $MESSAGE_COUNT messages, nextPageToken: $NEXT_PAGE_TOKEN',
emailListFetchFailed: 'Email list fetch failed: $ERROR',
// downloadEmail task
emailDownloadStarting: 'Downloading email with ID: $EMAIL_ID',
emailDownloadComplete: 'Email downloaded successfully: $EMAIL_ID (size: $SIZE bytes)',
emailDownloadFailed: 'Email download failed for ID $EMAIL_ID: $ERROR',
// Task processing
taskStarting: 'Running task: $TASK_TYPE',
taskComplete: 'Task completed: $TASK_TYPE with $OUTPUT_COUNT outputs, $TASK_COUNT follow-up tasks',
unknownTaskType: 'Unknown task type encountered: $TASK_TYPE',
});
Usage Pattern
@Injectable()
export class PollingScheduler {
constructor(
private readonly logger: CustomLoggerService,
private readonly contextService: ContextService,
private readonly producer: PollingProducer,
private readonly userIntegrationService: UserIntegrationService,
) {}
@Cron('*/2 * * * *')
async checkForDuePolls() {
const context = {
module: 'PollingScheduler',
method: 'checkForDuePolls',
...this.contextService.getLoggingContext(),
};
this.logger.logWithContext(pollingLogs.cycleStarting, context);
try {
const { results } = await this.userIntegrationService.readMany({
syncStatus: 'active',
nextSyncAtLte: new Date(),
limit: 100,
});
this.logger.logWithContext(
pollingLogs.integrationsDueForSync.replace({ count: results.length }),
context,
);
for (const integration of results) {
await this.producer.createInitialPollTask(integration);
}
this.logger.logWithContext(
pollingLogs.cycleComplete.replace({ count: results.length }),
context,
);
} catch (error) {
this.logger.errorWithContext(
pollingLogs.unexpectedError.replace({ error: error.message }),
error instanceof Error ? error.stack || '' : String(error),
context,
);
throw error;
}
}
}
Error Handling Standards
Polling Errors
File: polling/polling.error.ts
export const ERROR_CODES = {
INTEGRATION_NOT_FOUND: 'polling.integrationNotFound',
INTEGRATION_LOAD_FAILED: 'polling.integrationLoadFailed',
TASK_EXECUTION_FAILED: 'polling.taskExecutionFailed',
OUTPUT_SAVE_FAILED: 'polling.outputSaveFailed',
INVALID_TASK_TYPE: 'polling.invalidTaskType',
} as const;
export class IntegrationNotFoundError extends Error {
constructor(public domain: string) {
super(`Integration plugin not found for domain: ${domain}`);
this.name = ERROR_CODES.INTEGRATION_NOT_FOUND;
}
}
export class IntegrationLoadFailedError extends Error {
constructor(
public domain: string,
public originalError: Error,
) {
super(`Failed to load integration plugin ${domain}: ${originalError.message}`);
this.name = ERROR_CODES.INTEGRATION_LOAD_FAILED;
}
}
export class TaskExecutionFailedError extends Error {
constructor(
public taskType: string,
public integrationId: string,
public originalError: Error,
) {
super(`Task ${taskType} failed for user-integration ${integrationId}: ${originalError.message}`);
this.name = ERROR_CODES.TASK_EXECUTION_FAILED;
}
}
export class OutputSaveFailedError extends Error {
constructor(
public integrationId: string,
public originalError: Error,
) {
super(`Failed to save outputs for user-integration ${integrationId}: ${originalError.message}`);
this.name = ERROR_CODES.OUTPUT_SAVE_FAILED;
}
}
Integration Plugin Errors
File: gmail/gmail.error.ts
export const ERROR_CODES = {
AUTH_FAILED: 'gmail.authFailed',
EMAIL_LIST_FETCH_FAILED: 'gmail.emailListFetchFailed',
EMAIL_DOWNLOAD_FAILED: 'gmail.emailDownloadFailed',
INVALID_CREDENTIALS: 'gmail.invalidCredentials',
} as const;
export class GmailAuthFailedError extends Error {
constructor(public originalError: Error) {
super(`Gmail authentication failed: ${originalError.message}`);
this.name = ERROR_CODES.AUTH_FAILED;
}
}
export class GmailEmailListFetchFailedError extends Error {
constructor(public originalError: Error) {
super(`Failed to fetch Gmail email list: ${originalError.message}`);
this.name = ERROR_CODES.EMAIL_LIST_FETCH_FAILED;
}
}
export class GmailEmailDownloadFailedError extends Error {
constructor(
public emailId: string,
public originalError: Error,
) {
super(`Failed to download Gmail email ${emailId}: ${originalError.message}`);
this.name = ERROR_CODES.EMAIL_DOWNLOAD_FAILED;
}
}
Error Usage Pattern
// In integration plugin
public async runTask(task: GmailTask): Promise<GmailTaskResult> {
const context = {
module: 'GmailIntegration',
method: 'runTask',
taskType: task.type,
};
this.logger.logWithContext(
gmailLogs.taskStarting.replace({ taskType: task.type }),
context,
);
try {
if (task.type === 'downloadEmail') {
this.logger.logWithContext(
gmailLogs.emailDownloadStarting.replace({ emailId: task.payload.emailId }),
context,
);
const response = await this.gmailClient.users.messages.get({
userId: 'me',
id: task.payload.emailId!,
format: 'full',
});
this.logger.logWithContext(
gmailLogs.emailDownloadComplete.replace({
emailId: task.payload.emailId,
size: response.data.sizeEstimate || 0,
}),
context,
);
// ... return result
}
} catch (error) {
this.logger.errorWithContext(
gmailLogs.emailDownloadFailed.replace({
emailId: task.payload.emailId,
error: error.message,
}),
error instanceof Error ? error.stack || '' : String(error),
context,
);
throw new GmailEmailDownloadFailedError(task.payload.emailId!, error as Error);
}
}
// In processor
@Process('run-integration-task')
async handleTask(job: Job) {
const context = {
module: 'PollingProcessor',
method: 'handleTask',
...this.contextService.getLoggingContext(),
integrationId: job.data.userIntegrationId,
taskType: job.data.task.type,
};
this.logger.logWithContext(
pollingLogs.taskProcessingStarting.replace({
taskType: job.data.task.type,
integrationId: job.data.userIntegrationId,
}),
context,
);
try {
// Load and run integration
const result = await integration.runTask(job.data.task);
this.logger.logWithContext(
pollingLogs.taskProcessingComplete.replace({
taskType: job.data.task.type,
outputCount: result.output.length,
taskCount: result.tasks?.length || 0,
}),
context,
);
return result;
} catch (error) {
// Known integration errors are already logged
if (!(error instanceof GmailEmailDownloadFailedError)) {
this.logger.errorWithContext(
pollingLogs.taskProcessingFailed.replace({
taskType: job.data.task.type,
integrationId: job.data.userIntegrationId,
error: error.message,
}),
error instanceof Error ? error.stack || '' : String(error),
context,
);
}
throw new TaskExecutionFailedError(
job.data.task.type,
job.data.userIntegrationId,
error as Error,
);
}
}
Database Changes
Add to UserIntegrationEntity:
@Column({ type: 'timestamp', nullable: true })
public lastSyncAt: Date | null;
@Column({ type: 'timestamp', nullable: true })
public nextSyncAt: Date | null;
@Column({ type: 'jsonb', default: {} })
public syncState: Record<string, any>; // Integration-specific state (cursors, tokens, etc.)
@Column({ type: 'varchar', default: 'active' })
public syncStatus: 'active' | 'paused' | 'error';
@Column({ type: 'int', default: 15 })
public pollIntervalMinutes: number;
Example Log Flow: Gmail Sync
[10:00:00] pollingLogs.cycleStarting: Polling cycle initiated
{ requestId: req_abc, module: PollingScheduler, method: checkForDuePolls }
[10:00:01] pollingLogs.integrationsDueForSync: Found 1 integrations due for synchronization
{ requestId: req_abc, count: 1 }
[10:00:01] pollingLogs.taskCreationStarting: Creating initial task for user-integration int-123 (gmail)
{ requestId: req_abc, integrationId: int-123, domain: gmail }
[10:00:02] pollingLogs.taskCreationComplete: Initial task created and enqueued for user-integration int-123
{ requestId: req_abc, integrationId: int-123 }
[10:00:02] pollingLogs.taskProcessingStarting: Processing task getEmailList for user-integration int-123
{ requestId: req_def, integrationId: int-123, taskType: getEmailList }
[10:00:02] pollingLogs.integrationLoadStarting: Loading integration plugin for domain gmail
{ requestId: req_def, domain: gmail }
[10:00:02] gmailLogs.taskStarting: Running task: getEmailList
{ module: GmailIntegration, method: runTask, taskType: getEmailList }
[10:00:03] gmailLogs.emailListFetchStarting: Fetching email list with pageToken: null
{ pageToken: null }
[10:00:04] gmailLogs.emailListFetchComplete: Email list fetched: 50 messages, nextPageToken: abc123
{ messageCount: 50, nextPageToken: abc123 }
[10:00:04] pollingLogs.taskProcessingComplete: Task getEmailList completed: 0 outputs, 51 follow-up tasks
{ outputCount: 0, taskCount: 51 }
[10:00:04] pollingLogs.followUpTasksEnqueuing: Enqueuing 51 follow-up tasks for user-integration int-123
{ count: 51, integrationId: int-123 }
[10:00:05] pollingLogs.taskProcessingStarting: Processing task downloadEmail for user-integration int-123
{ taskType: downloadEmail, integrationId: int-123 }
[10:00:05] gmailLogs.emailDownloadStarting: Downloading email with ID: msg-1
{ emailId: msg-1 }
[10:00:06] gmailLogs.emailDownloadComplete: Email downloaded successfully: msg-1 (size: 15234 bytes)
{ emailId: msg-1, size: 15234 }
[10:00:06] pollingLogs.outputSavingStarting: Saving 1 outputs to database for user-integration int-123
{ count: 1, integrationId: int-123 }
[10:00:07] pollingLogs.outputSavingComplete: Outputs saved successfully to database
... (49 more downloadEmail tasks) ...
[10:02:30] pollingLogs.syncCompleteMarking: Marking sync complete for user-integration int-123
{ integrationId: int-123 }
[10:02:30] pollingLogs.syncCompleteMarked: Sync marked complete, next sync scheduled for 2025-11-08T10:15:00Z
{ nextSyncAt: 2025-11-08T10:15:00Z }
[10:02:31] pollingLogs.cycleComplete: Polling cycle completed with 1 integrations processed
{ count: 1 }
Implementation Phases
Phase 1: Database (1 day)
- Add fields to
UserIntegrationEntity - Create migration
- Update service interfaces
Phase 2: Polling Infrastructure (2-3 days)
- Create
integration/polling/module - Set up Bull queue + Redis
- Create
polling.log.tsandpolling.error.ts - Implement scheduler, producer, processor with logging
- Create integration loader
- Add error handling throughout
Phase 3: Update Integrations (1 day)
- Add
hello-world.log.tsandhello-world.error.ts - Add logging to hello-world integration
- Add
gmail.log.tsandgmail.error.ts - Add logging to gmail integration
- Update error handling
Phase 4: Test with Hello World (1 day)
- Create initial task
- Verify task execution
- Verify logging output
- Test error handling
Phase 5: Test with Gmail (2 days)
- Test full flow
- Verify pagination works
- Monitor logs
- Test error scenarios
Phase 6: Monitoring (1 day)
- Add Bull Board dashboard
- Review log patterns in Loki
- Set up alerts for error codes
- Document common issues
Dependencies
{
"@nestjs/bull": "^10.0.0",
"@nestjs/schedule": "^4.0.0",
"bull": "^4.12.0",
"redis": "^4.7.0"
}
Configuration
REDIS_URL=redis://localhost:6379
POLLING_CRON_SCHEDULE=*/2 * * * *
POLLING_BATCH_SIZE=100
Next Steps
- ✅ Review and approve plan
- ⬜ Create database migration
- ⬜ Set up
integration/polling/module - ⬜ Create
polling.log.tsandpolling.error.ts - ⬜ Implement scheduler, producer, processor with logging
- ⬜ Add logging to hello-world integration
- ⬜ Add logging to gmail integration
- ⬜ Test with hello-world
- ⬜ Test with Gmail
- ⬜ Deploy and monitor logs