Skip to main content

Integration Polling System - Implementation Plan

Overview

Add background polling to existing integration plugins using Cron + Bull Queue + Task-based processing.

Architecture

Cron (every 2 min)
└─> Find integrations due for sync
└─> Create initial task for each → Bull Queue (Redis)
└─> Worker picks up task
└─> Load integration plugin (gmail, slack, etc.)
└─> Run task → Returns { output, tasks }
├─> Save output to database
└─> Enqueue follow-up tasks (recursive!)

Key Insight: Integrations already use task-based pattern where tasks spawn more tasks. Perfect for queues!

// Example: Gmail getEmailList task spawns downloadEmail tasks
runTask({ type: 'getEmailList' })
{
output: [],
tasks: [
{ type: 'downloadEmail', payload: { emailId: 'msg-1' } },
{ type: 'downloadEmail', payload: { emailId: 'msg-2' } },
{ type: 'getEmailList', payload: { pageToken: 'next' } } // Pagination!
]
}

Code Organization

backend/src/integration/
├── _base/
│ └── main.ts # BaseIntegration, BaseTask (existing)

├── gmail/ # Integration plugin (existing)
│ ├── src/main.ts
│ ├── manifest.json
│ ├── gmail.log.ts # NEW: Logging
│ └── gmail.error.ts # NEW: Error handling

├── hello-world/ # Integration plugin (existing)
│ ├── src/main.ts
│ ├── hello-world.log.ts # NEW: Logging
│ └── hello-world.error.ts # NEW: Error handling

├── polling/ # NEW: Polling orchestration
│ ├── polling.module.ts
│ ├── polling.scheduler.ts # Cron: find due integrations
│ ├── polling.producer.ts # Create & enqueue tasks
│ ├── polling.processor.ts # Run tasks, enqueue follow-ups
│ ├── polling.service.ts # Save output, update state
│ ├── polling.loader.ts # Load integration plugins
│ ├── polling.log.ts # Logging templates
│ └── polling.error.ts # Error classes

├── README.md # Integration system overview
└── plan-short.md # This file

Logging Standards

Polling System Logs

File: polling/polling.log.ts

import { createLogs } from '../../common/logger/create-logs.helper';

export const pollingLogs = createLogs('pollingLogs', {
// Scheduler
cycleStarting: 'Polling cycle initiated',
cycleComplete: 'Polling cycle completed with $COUNT integrations processed',
integrationsDueForSync: 'Found $COUNT integrations due for synchronization',

// Producer
taskCreationStarting: 'Creating initial task for user-integration $INTEGRATION_ID ($DOMAIN)',
taskCreationComplete: 'Initial task created and enqueued for user-integration $INTEGRATION_ID',
followUpTasksEnqueuing: 'Enqueuing $COUNT follow-up tasks for user-integration $INTEGRATION_ID',
followUpTasksEnqueued: 'Follow-up tasks enqueued successfully',

// Processor
taskProcessingStarting: 'Processing task $TASK_TYPE for user-integration $INTEGRATION_ID',
taskProcessingComplete: 'Task $TASK_TYPE completed: $OUTPUT_COUNT outputs, $TASK_COUNT follow-up tasks',
integrationLoadStarting: 'Loading integration plugin for domain $DOMAIN',
integrationLoadComplete: 'Integration plugin loaded successfully for domain $DOMAIN',
integrationInitializeStarting: 'Initializing integration instance for $INTEGRATION_ID',
integrationInitializeComplete: 'Integration instance initialized successfully',

// Service
outputSavingStarting: 'Saving $COUNT outputs to database for user-integration $INTEGRATION_ID',
outputSavingComplete: 'Outputs saved successfully to database',
syncCompleteMarking: 'Marking sync complete for user-integration $INTEGRATION_ID',
syncCompleteMarked: 'Sync marked complete, next sync scheduled for $NEXT_SYNC_AT',

// Loader
integrationsLoadingStarting: 'Loading integration plugins from filesystem',
integrationsLoadingComplete: 'Loaded $COUNT integration plugins: $DOMAINS',
integrationNotFound: 'Integration plugin not found for domain $DOMAIN',

// Errors
taskProcessingFailed: 'Task $TASK_TYPE failed for user-integration $INTEGRATION_ID: $ERROR',
integrationLoadFailed: 'Failed to load integration plugin for domain $DOMAIN: $ERROR',
outputSavingFailed: 'Failed to save outputs for user-integration $INTEGRATION_ID: $ERROR',
unexpectedError: 'Unexpected error during polling operation: $ERROR',
});

Integration Plugin Logs

File: gmail/gmail.log.ts

import { createLogs } from '../../common/logger/create-logs.helper';

export const gmailLogs = createLogs('gmailLogs', {
// Initialization
initializeStarting: 'Gmail integration initialization started',
initializeComplete: 'Gmail OAuth2 client configured successfully',
credentialsSet: 'Gmail credentials applied to OAuth2 client',

// Authentication
validateAuthStarting: 'Validating Gmail authentication credentials',
validateAuthComplete: 'Gmail authentication validated successfully',
validateAuthFailed: 'Gmail authentication validation failed: $ERROR',

// getEmailList task
emailListFetchStarting: 'Fetching email list with pageToken: $PAGE_TOKEN',
emailListFetchComplete: 'Email list fetched: $MESSAGE_COUNT messages, nextPageToken: $NEXT_PAGE_TOKEN',
emailListFetchFailed: 'Email list fetch failed: $ERROR',

// downloadEmail task
emailDownloadStarting: 'Downloading email with ID: $EMAIL_ID',
emailDownloadComplete: 'Email downloaded successfully: $EMAIL_ID (size: $SIZE bytes)',
emailDownloadFailed: 'Email download failed for ID $EMAIL_ID: $ERROR',

// Task processing
taskStarting: 'Running task: $TASK_TYPE',
taskComplete: 'Task completed: $TASK_TYPE with $OUTPUT_COUNT outputs, $TASK_COUNT follow-up tasks',
unknownTaskType: 'Unknown task type encountered: $TASK_TYPE',
});

Usage Pattern

@Injectable()
export class PollingScheduler {
constructor(
private readonly logger: CustomLoggerService,
private readonly contextService: ContextService,
private readonly producer: PollingProducer,
private readonly userIntegrationService: UserIntegrationService,
) {}

@Cron('*/2 * * * *')
async checkForDuePolls() {
const context = {
module: 'PollingScheduler',
method: 'checkForDuePolls',
...this.contextService.getLoggingContext(),
};

this.logger.logWithContext(pollingLogs.cycleStarting, context);

try {
const { results } = await this.userIntegrationService.readMany({
syncStatus: 'active',
nextSyncAtLte: new Date(),
limit: 100,
});

this.logger.logWithContext(
pollingLogs.integrationsDueForSync.replace({ count: results.length }),
context,
);

for (const integration of results) {
await this.producer.createInitialPollTask(integration);
}

this.logger.logWithContext(
pollingLogs.cycleComplete.replace({ count: results.length }),
context,
);
} catch (error) {
this.logger.errorWithContext(
pollingLogs.unexpectedError.replace({ error: error.message }),
error instanceof Error ? error.stack || '' : String(error),
context,
);
throw error;
}
}
}

Error Handling Standards

Polling Errors

File: polling/polling.error.ts

export const ERROR_CODES = {
INTEGRATION_NOT_FOUND: 'polling.integrationNotFound',
INTEGRATION_LOAD_FAILED: 'polling.integrationLoadFailed',
TASK_EXECUTION_FAILED: 'polling.taskExecutionFailed',
OUTPUT_SAVE_FAILED: 'polling.outputSaveFailed',
INVALID_TASK_TYPE: 'polling.invalidTaskType',
} as const;

export class IntegrationNotFoundError extends Error {
constructor(public domain: string) {
super(`Integration plugin not found for domain: ${domain}`);
this.name = ERROR_CODES.INTEGRATION_NOT_FOUND;
}
}

export class IntegrationLoadFailedError extends Error {
constructor(
public domain: string,
public originalError: Error,
) {
super(`Failed to load integration plugin ${domain}: ${originalError.message}`);
this.name = ERROR_CODES.INTEGRATION_LOAD_FAILED;
}
}

export class TaskExecutionFailedError extends Error {
constructor(
public taskType: string,
public integrationId: string,
public originalError: Error,
) {
super(`Task ${taskType} failed for user-integration ${integrationId}: ${originalError.message}`);
this.name = ERROR_CODES.TASK_EXECUTION_FAILED;
}
}

export class OutputSaveFailedError extends Error {
constructor(
public integrationId: string,
public originalError: Error,
) {
super(`Failed to save outputs for user-integration ${integrationId}: ${originalError.message}`);
this.name = ERROR_CODES.OUTPUT_SAVE_FAILED;
}
}

Integration Plugin Errors

File: gmail/gmail.error.ts

export const ERROR_CODES = {
AUTH_FAILED: 'gmail.authFailed',
EMAIL_LIST_FETCH_FAILED: 'gmail.emailListFetchFailed',
EMAIL_DOWNLOAD_FAILED: 'gmail.emailDownloadFailed',
INVALID_CREDENTIALS: 'gmail.invalidCredentials',
} as const;

export class GmailAuthFailedError extends Error {
constructor(public originalError: Error) {
super(`Gmail authentication failed: ${originalError.message}`);
this.name = ERROR_CODES.AUTH_FAILED;
}
}

export class GmailEmailListFetchFailedError extends Error {
constructor(public originalError: Error) {
super(`Failed to fetch Gmail email list: ${originalError.message}`);
this.name = ERROR_CODES.EMAIL_LIST_FETCH_FAILED;
}
}

export class GmailEmailDownloadFailedError extends Error {
constructor(
public emailId: string,
public originalError: Error,
) {
super(`Failed to download Gmail email ${emailId}: ${originalError.message}`);
this.name = ERROR_CODES.EMAIL_DOWNLOAD_FAILED;
}
}

Error Usage Pattern

// In integration plugin
public async runTask(task: GmailTask): Promise<GmailTaskResult> {
const context = {
module: 'GmailIntegration',
method: 'runTask',
taskType: task.type,
};

this.logger.logWithContext(
gmailLogs.taskStarting.replace({ taskType: task.type }),
context,
);

try {
if (task.type === 'downloadEmail') {
this.logger.logWithContext(
gmailLogs.emailDownloadStarting.replace({ emailId: task.payload.emailId }),
context,
);

const response = await this.gmailClient.users.messages.get({
userId: 'me',
id: task.payload.emailId!,
format: 'full',
});

this.logger.logWithContext(
gmailLogs.emailDownloadComplete.replace({
emailId: task.payload.emailId,
size: response.data.sizeEstimate || 0,
}),
context,
);

// ... return result
}
} catch (error) {
this.logger.errorWithContext(
gmailLogs.emailDownloadFailed.replace({
emailId: task.payload.emailId,
error: error.message,
}),
error instanceof Error ? error.stack || '' : String(error),
context,
);
throw new GmailEmailDownloadFailedError(task.payload.emailId!, error as Error);
}
}

// In processor
@Process('run-integration-task')
async handleTask(job: Job) {
const context = {
module: 'PollingProcessor',
method: 'handleTask',
...this.contextService.getLoggingContext(),
integrationId: job.data.userIntegrationId,
taskType: job.data.task.type,
};

this.logger.logWithContext(
pollingLogs.taskProcessingStarting.replace({
taskType: job.data.task.type,
integrationId: job.data.userIntegrationId,
}),
context,
);

try {
// Load and run integration
const result = await integration.runTask(job.data.task);

this.logger.logWithContext(
pollingLogs.taskProcessingComplete.replace({
taskType: job.data.task.type,
outputCount: result.output.length,
taskCount: result.tasks?.length || 0,
}),
context,
);

return result;
} catch (error) {
// Known integration errors are already logged
if (!(error instanceof GmailEmailDownloadFailedError)) {
this.logger.errorWithContext(
pollingLogs.taskProcessingFailed.replace({
taskType: job.data.task.type,
integrationId: job.data.userIntegrationId,
error: error.message,
}),
error instanceof Error ? error.stack || '' : String(error),
context,
);
}
throw new TaskExecutionFailedError(
job.data.task.type,
job.data.userIntegrationId,
error as Error,
);
}
}

Database Changes

Add to UserIntegrationEntity:

@Column({ type: 'timestamp', nullable: true })
public lastSyncAt: Date | null;

@Column({ type: 'timestamp', nullable: true })
public nextSyncAt: Date | null;

@Column({ type: 'jsonb', default: {} })
public syncState: Record<string, any>; // Integration-specific state (cursors, tokens, etc.)

@Column({ type: 'varchar', default: 'active' })
public syncStatus: 'active' | 'paused' | 'error';

@Column({ type: 'int', default: 15 })
public pollIntervalMinutes: number;

Example Log Flow: Gmail Sync

[10:00:00] pollingLogs.cycleStarting: Polling cycle initiated
{ requestId: req_abc, module: PollingScheduler, method: checkForDuePolls }

[10:00:01] pollingLogs.integrationsDueForSync: Found 1 integrations due for synchronization
{ requestId: req_abc, count: 1 }

[10:00:01] pollingLogs.taskCreationStarting: Creating initial task for user-integration int-123 (gmail)
{ requestId: req_abc, integrationId: int-123, domain: gmail }

[10:00:02] pollingLogs.taskCreationComplete: Initial task created and enqueued for user-integration int-123
{ requestId: req_abc, integrationId: int-123 }

[10:00:02] pollingLogs.taskProcessingStarting: Processing task getEmailList for user-integration int-123
{ requestId: req_def, integrationId: int-123, taskType: getEmailList }

[10:00:02] pollingLogs.integrationLoadStarting: Loading integration plugin for domain gmail
{ requestId: req_def, domain: gmail }

[10:00:02] gmailLogs.taskStarting: Running task: getEmailList
{ module: GmailIntegration, method: runTask, taskType: getEmailList }

[10:00:03] gmailLogs.emailListFetchStarting: Fetching email list with pageToken: null
{ pageToken: null }

[10:00:04] gmailLogs.emailListFetchComplete: Email list fetched: 50 messages, nextPageToken: abc123
{ messageCount: 50, nextPageToken: abc123 }

[10:00:04] pollingLogs.taskProcessingComplete: Task getEmailList completed: 0 outputs, 51 follow-up tasks
{ outputCount: 0, taskCount: 51 }

[10:00:04] pollingLogs.followUpTasksEnqueuing: Enqueuing 51 follow-up tasks for user-integration int-123
{ count: 51, integrationId: int-123 }

[10:00:05] pollingLogs.taskProcessingStarting: Processing task downloadEmail for user-integration int-123
{ taskType: downloadEmail, integrationId: int-123 }

[10:00:05] gmailLogs.emailDownloadStarting: Downloading email with ID: msg-1
{ emailId: msg-1 }

[10:00:06] gmailLogs.emailDownloadComplete: Email downloaded successfully: msg-1 (size: 15234 bytes)
{ emailId: msg-1, size: 15234 }

[10:00:06] pollingLogs.outputSavingStarting: Saving 1 outputs to database for user-integration int-123
{ count: 1, integrationId: int-123 }

[10:00:07] pollingLogs.outputSavingComplete: Outputs saved successfully to database

... (49 more downloadEmail tasks) ...

[10:02:30] pollingLogs.syncCompleteMarking: Marking sync complete for user-integration int-123
{ integrationId: int-123 }

[10:02:30] pollingLogs.syncCompleteMarked: Sync marked complete, next sync scheduled for 2025-11-08T10:15:00Z
{ nextSyncAt: 2025-11-08T10:15:00Z }

[10:02:31] pollingLogs.cycleComplete: Polling cycle completed with 1 integrations processed
{ count: 1 }

Implementation Phases

Phase 1: Database (1 day)

  • Add fields to UserIntegrationEntity
  • Create migration
  • Update service interfaces

Phase 2: Polling Infrastructure (2-3 days)

  • Create integration/polling/ module
  • Set up Bull queue + Redis
  • Create polling.log.ts and polling.error.ts
  • Implement scheduler, producer, processor with logging
  • Create integration loader
  • Add error handling throughout

Phase 3: Update Integrations (1 day)

  • Add hello-world.log.ts and hello-world.error.ts
  • Add logging to hello-world integration
  • Add gmail.log.ts and gmail.error.ts
  • Add logging to gmail integration
  • Update error handling

Phase 4: Test with Hello World (1 day)

  • Create initial task
  • Verify task execution
  • Verify logging output
  • Test error handling

Phase 5: Test with Gmail (2 days)

  • Test full flow
  • Verify pagination works
  • Monitor logs
  • Test error scenarios

Phase 6: Monitoring (1 day)

  • Add Bull Board dashboard
  • Review log patterns in Loki
  • Set up alerts for error codes
  • Document common issues

Dependencies

{
"@nestjs/bull": "^10.0.0",
"@nestjs/schedule": "^4.0.0",
"bull": "^4.12.0",
"redis": "^4.7.0"
}

Configuration

REDIS_URL=redis://localhost:6379
POLLING_CRON_SCHEDULE=*/2 * * * *
POLLING_BATCH_SIZE=100

Next Steps

  1. ✅ Review and approve plan
  2. ⬜ Create database migration
  3. ⬜ Set up integration/polling/ module
  4. ⬜ Create polling.log.ts and polling.error.ts
  5. ⬜ Implement scheduler, producer, processor with logging
  6. ⬜ Add logging to hello-world integration
  7. ⬜ Add logging to gmail integration
  8. ⬜ Test with hello-world
  9. ⬜ Test with Gmail
  10. ⬜ Deploy and monitor logs