Skip to main content
Version: Next

Advanced Patterns

Once you're comfortable with the basics, you can implement more sophisticated task management patterns. This guide covers advanced techniques for complex use cases.

Task Workflows

Organize related tasks to create complex workflows:

// src/app/tasks/data-processing.ts
import { task } from '@commandkit/tasks';

export const dataProcessing = task({
name: 'data-processing',
schedule: '0 2 * * *', // Daily at 2 AM
async execute(ctx) {
// Step 1: Collect data
const data = await collectData();
ctx.store.set('collectedData', data);

// Step 2: Process data immediately
const processedData = await processData(data);

// Step 3: Send notification
const channel = ctx.commandkit.client.channels.cache.get('log-channel');
if (channel?.isTextBased()) {
await channel.send(`Data processing completed for ID: ${data.id}`);
}
},
});

Task Retry Logic

Implement custom retry logic for failed tasks:

// src/app/tasks/retry-example.ts
import { task } from '@commandkit/tasks';

export const retryTask = task({
name: 'retry-task',
async execute(ctx) {
const { attempt = 1, maxAttempts = 3 } = ctx.data;

try {
// Attempt the operation
await performRiskyOperation();

// Success - no need to retry
console.log('Operation completed successfully');
} catch (error) {
console.error(`Attempt ${attempt} failed:`, error);

if (attempt < maxAttempts) {
// Log retry attempt
console.log(`Will retry attempt ${attempt + 1} of ${maxAttempts}`);

// In a real implementation, you might use a different approach
// such as storing retry state in a database
} else {
// Max attempts reached
console.error('Max retry attempts reached');

// Notify about failure
const channel =
ctx.commandkit.client.channels.cache.get('error-channel');
if (channel?.isTextBased()) {
await channel.send(`Task failed after ${maxAttempts} attempts`);
}
}
}
},
});

Task Dependencies

Manage tasks that depend on other conditions:

// src/app/tasks/dependency-example.ts
import { task } from '@commandkit/tasks';

export const dependencyTask = task({
name: 'dependency-task',
async execute(ctx) {
const { requiredData } = ctx.data;

// Check if required conditions are met
const conditionsMet = await checkRequiredConditions();

if (!conditionsMet) {
console.log('Required conditions not met, skipping task execution');
return;
}

// Conditions met, proceed with the task
await processWithDependentData(requiredData);
},
});

Task Batching

Process multiple items in batches:

// src/app/tasks/batch-processing.ts
import { task } from '@commandkit/tasks';

export const batchProcessor = task({
name: 'batch-processor',
schedule: '0 */6 * * *', // Every 6 hours
async execute(ctx) {
// Get items to process
const items = await getItemsToProcess();

if (items.length === 0) {
console.log('No items to process');
return;
}

// Process in batches of 10
const batchSize = 10;
let processedCount = 0;

for (let i = 0; i < items.length; i += batchSize) {
const batch = items.slice(i, i + batchSize);
console.log(
`Processing batch ${Math.floor(i / batchSize) + 1}/${Math.ceil(items.length / batchSize)}`,
);

// Process the batch
for (const item of batch) {
await processItem(item);
processedCount++;
}
}

console.log(`Completed processing ${processedCount} items`);
},
});

Task Monitoring and Metrics

Track task execution metrics:

// src/app/tasks/metrics.ts
import { task } from '@commandkit/tasks';

export const metricsTask = task({
name: 'metrics-task',
schedule: '0 * * * *', // Every hour
async execute(ctx) {
const startTime = Date.now();

try {
// Perform the task
await performTask();

// Record success metrics
await recordMetrics({
taskName: 'metrics-task',
status: 'success',
duration: Date.now() - startTime,
timestamp: new Date(),
});
} catch (error) {
// Record failure metrics
await recordMetrics({
taskName: 'metrics-task',
status: 'error',
duration: Date.now() - startTime,
error: error.message,
timestamp: new Date(),
});

throw error; // Re-throw to let the task system handle it
}
},
});

Task State Management

Use the context store to manage state across task executions:

// src/app/tasks/state-management.ts
import { task } from '@commandkit/tasks';

export const statefulTask = task({
name: 'stateful-task',
schedule: '0 */2 * * *', // Every 2 hours
async prepare(ctx) {
// Check if we're in a cooldown period
const lastRun = ctx.commandkit.store.get('last-run');
const cooldown = 30 * 60 * 1000; // 30 minutes

if (lastRun && Date.now() - lastRun < cooldown) {
return false; // Skip execution
}

return true;
},
async execute(ctx) {
// Update last run time
ctx.commandkit.store.set('last-run', Date.now());

// Get or initialize counter
const counter = ctx.commandkit.store.get('execution-counter') || 0;
ctx.commandkit.store.set('execution-counter', counter + 1);

console.log(`Task executed ${counter + 1} times`);

// Perform the actual work
await performWork();
},
});

Task Cleanup

Implement cleanup tasks for resource management:

// src/app/tasks/cleanup.ts
import { task } from '@commandkit/tasks';

export const cleanupTask = task({
name: 'cleanup',
schedule: '0 3 * * *', // Daily at 3 AM
async execute(ctx) {
const cutoffDate = new Date(Date.now() - 7 * 24 * 60 * 60 * 1000); // 7 days ago

// Clean up old data
await cleanupOldRecords(cutoffDate);

// Clean up expired tasks
await cleanupExpiredTasks();

// Clean up temporary files
await cleanupTempFiles();

console.log('Cleanup completed successfully');
},
});

Task Error Recovery

Implement robust error recovery mechanisms:

// src/app/tasks/error-recovery.ts
import { task } from '@commandkit/tasks';

export const resilientTask = task({
name: 'resilient-task',
async execute(ctx) {
const { operation, fallbackOperation } = ctx.data;

try {
// Try the primary operation
await performOperation(operation);
} catch (error) {
console.error('Primary operation failed:', error);

// Try fallback operation
try {
await performOperation(fallbackOperation);
console.log('Fallback operation succeeded');
} catch (fallbackError) {
console.error('Fallback operation also failed:', fallbackError);

// Log the failure for manual intervention
const channel =
ctx.commandkit.client.channels.cache.get('error-channel');
if (channel?.isTextBased()) {
await channel.send(
`Task failed: ${error.message}. Fallback also failed: ${fallbackError.message}`,
);
}
}
}
},
});

Task Scheduling Patterns

Rolling Windows

// src/app/tasks/rolling-window.ts
import { task } from '@commandkit/tasks';

export const rollingWindowTask = task({
name: 'rolling-window',
schedule: '*/15 * * * *', // Every 15 minutes
async execute(ctx) {
const windowSize = 60 * 60 * 1000; // 1 hour
const now = Date.now();

// Get data from the last hour
const data = await getDataInWindow(now - windowSize, now);

// Process the rolling window
await processRollingWindow(data);
},
});

Adaptive Processing

// src/app/tasks/adaptive-processing.ts
import { task } from '@commandkit/tasks';

export const adaptiveTask = task({
name: 'adaptive-task',
schedule: '*/5 * * * *', // Every 5 minutes
async execute(ctx) {
// Get current system load
const load = await getCurrentLoad();

// Adjust processing based on load
if (load > 0.8) {
// High load - process fewer items
await performTask({ maxItems: 10 });
} else if (load > 0.5) {
// Medium load - process normal amount
await performTask({ maxItems: 50 });
} else {
// Low load - process more items
await performTask({ maxItems: 100 });
}
},
});

Best Practices Summary

  1. Error Handling: Always handle errors gracefully and implement fallback mechanisms
  2. Resource Management: Clean up resources and implement proper cleanup tasks
  3. Monitoring: Track task execution metrics and performance
  4. State Management: Use the context store for sharing data between task phases
  5. Conditional Execution: Check required conditions before proceeding with task work
  6. Batching: Process large datasets in manageable batches
  7. Adaptive Processing: Adjust processing based on system load
  8. Logging: Implement comprehensive logging for debugging and monitoring

Next Steps