Part 3: Scaling to 1000+ Users
In Part 1, I shared the journey from sequential processing to distributed architecture. In Part 2, I explained how I solved IP blocking with a Rust proxy.
Now, let’s dive deep into the distributed queue system that makes it all work.
The Scalability Problem
After solving the IP blocking issue, I still had the CPU time limit problem. Processing 10 users sequentially took 30+ seconds. Cloudflare Workers have a 30-second CPU time limit.
The constraint:
- Each user takes ~3 seconds to process (GitHub API + notifications)
- 10 users × 3 seconds = 30 seconds
- Add any overhead = Time Limit Exceeded (TLE)
The realization:
I can’t process users sequentially in a single Worker. I need to distribute the work across multiple Workers.
The Solution: Service Bindings + Queue
Core idea: Instead of one Worker processing N users, spawn N Workers each processing 1 user.
Architecture:
Scheduler Worker
|
|-- Dispatch Worker 1 (User A)
|-- Dispatch Worker 2 (User B)
|-- Dispatch Worker 3 (User C)
|-- ...
|-- Dispatch Worker N (User N)
Each Worker:
- Fresh CPU budget (30 seconds)
- Isolated execution context
- Parallel processing
Result:
- 10 users processed in ~10 seconds (parallel)
- Each Worker uses <5 seconds CPU time
- No TLE errors
- Scales to 1000+ users
Key Components
1. Queue Table (D1 SQLite)
The queue table tracks which users need processing and their status.
CREATE TABLE cron_queue (
id TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
batch_id TEXT NOT NULL,
status TEXT NOT NULL CHECK(status IN ('pending', 'processing', 'completed', 'failed')),
created_at TEXT NOT NULL DEFAULT (datetime('now')),
started_at TEXT,
completed_at TEXT,
error_message TEXT,
retry_count INTEGER NOT NULL DEFAULT 0
);
CREATE INDEX idx_cron_queue_status ON cron_queue(status);
CREATE INDEX idx_cron_queue_batch ON cron_queue(batch_id);
CREATE INDEX idx_cron_queue_user ON cron_queue(user_id);
Why D1?
- Already part of the stack (no external dependencies)
- Fast enough for job queues (< 10ms queries)
- Supports atomic operations (CTE + UPDATE + RETURNING)
- Free tier: 50,000 writes/day (plenty for this use case)
2. Service Bindings
Service Bindings allow a Worker to call itself, creating new Worker instances.
Configuration (wrangler.toml):
[[services]]
binding = "SELF"
service = "streaky"
Usage:
// Each fetch creates a NEW Worker instance
env.SELF.fetch('http://internal/api/cron/process-user', {
method: 'POST',
headers: {
'X-Cron-Secret': env.SERVER_SECRET,
'Content-Type': 'application/json',
},
body: JSON.stringify({
queueId: queueItem.id,
userId: queueItem.user_id,
}),
})
Why Service Bindings?
- Each
env.SELF.fetch()= new Worker instance - Fresh CPU budget per instance
- Automatic load balancing by Cloudflare
- No external queue service needed (Redis, SQS, etc.)
Implementation
Step 1: Initialize Batch
When the cron trigger fires, create a batch of queue items.
// web/backend/src/services/queue.ts
export async function initializeBatch(
env: Env,
userIds: string[]
): Promise<string> {
const batchId = crypto.randomUUID();
// Bulk insert users to queue
for (const userId of userIds) {
const queueId = crypto.randomUUID();
await env.DB.prepare(
`INSERT INTO cron_queue (id, user_id, batch_id, status)
VALUES (?, ?, ?, 'pending')`
)
.bind(queueId, userId, batchId)
.run();
}
return batchId;
}
Step 2: Atomic Queue Claiming
The critical part: prevent race conditions when multiple Workers try to claim the same user.
// web/backend/src/services/queue.ts
export async function claimNextPendingUserAtomic(
env: Env
): Promise<QueueItem | null> {
const result = await env.DB.prepare(`
WITH next AS (
SELECT id FROM cron_queue
WHERE status="pending"
ORDER BY created_at ASC
LIMIT 1
)
UPDATE cron_queue
SET status="processing", started_at = datetime('now')
WHERE id IN (SELECT id FROM next)
RETURNING id, user_id, batch_id
`).all<QueueItem>();
return result.results[0] ?? null;
}
Why atomic?
- CTE (WITH) + UPDATE + RETURNING in single transaction
- No gap between SELECT and UPDATE
- Prevents duplicate processing
- D1 SQLite guarantees atomicity
Without atomic claiming:
Worker 1: SELECT id WHERE status="pending" → Gets user A
Worker 2: SELECT id WHERE status="pending" → Gets user A (race!)
Worker 1: UPDATE status="processing" WHERE id=A
Worker 2: UPDATE status="processing" WHERE id=A
Result: Both workers process user A (duplicate!)
With atomic claiming:
Worker 1: CTE + UPDATE + RETURNING → Gets user A, marks processing
Worker 2: CTE + UPDATE + RETURNING → Gets user B, marks processing
Result: No duplicates, each worker gets unique user
Step 3: Scheduler (Main Worker)
The scheduler initializes the batch and dispatches Workers.
// web/backend/src/index.ts
export default {
async scheduled(event: ScheduledEvent, env: Env, ctx: ExecutionContext) {
console.log('[Scheduled] Cron trigger fired:', event.cron);
// Query active users
const usersResult = await env.DB.prepare(
`SELECT id FROM users WHERE is_active = 1 AND github_pat IS NOT NULL`
).all();
const userIds = usersResult.results.map((row: any) => row.id as string);
if (userIds.length === 0) {
console.log('[Scheduled] No active users to process');
return;
}
// Initialize batch
const batchId = await initializeBatch(env, userIds);
console.log(`[Scheduled] Batch ${batchId} initialized with ${userIds.length} users`);
// Dispatch Workers via Service Bindings
for (let i = 0; i < userIds.length; i++) {
const queueItem = await claimNextPendingUserAtomic(env);
if (!queueItem) {
console.log('[Scheduled] No more pending users in queue');
break;
}
ctx.waitUntil(
env.SELF.fetch('http://internal/api/cron/process-user', {
method: 'POST',
headers: {
'X-Cron-Secret': env.SERVER_SECRET,
'Content-Type': 'application/json',
},
body: JSON.stringify({
queueId: queueItem.id,
userId: queueItem.user_id,
}),
})
.then((res) => {
console.log(`[Scheduled] User ${queueItem.user_id} dispatched: ${res.status}`);
})
.catch((error: Error) => {
console.error(`[Scheduled] User ${queueItem.user_id} dispatch failed:`, error);
})
);
}
console.log(`[Scheduled] All ${userIds.length} users dispatched for batch ${batchId}`);
}
}
Key points:
-
ctx.waitUntil()ensures async operations complete - Each
env.SELF.fetch()creates new Worker instance - Errors in one Worker don’t affect others
Step 4: Worker Instance (Process Single User)
Each Worker instance processes one user.
// web/backend/src/routes/cron.ts
app.post('/process-user', async (c) => {
// Auth check
const secret = c.req.header('X-Cron-Secret');
if (!c.env.SERVER_SECRET || secret !== c.env.SERVER_SECRET) {
return c.json({ error: 'Unauthorized' }, 401);
}
try {
const body = await c.req.json<{ queueId: string; userId: string }>();
const { queueId, userId } = body;
if (!queueId || !userId) {
return c.json({ error: 'Missing queueId or userId' }, 400);
}
// Idempotency check
const status = await getQueueItemStatus(c.env, queueId);
if (status === 'completed') {
return c.json({
success: true,
queueId,
userId,
skipped: true,
reason: 'Already completed'
});
}
if (status === 'failed') {
return c.json({
success: false,
queueId,
userId,
skipped: true,
reason: 'Already failed'
});
}
// Process user
try {
await processSingleUser(c.env, userId);
await markCompleted(c.env, queueId);
return c.json({ success: true, queueId, userId });
} catch (error) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error';
await markFailed(c.env, queueId, errorMessage);
console.error(`[ProcessUser] Failed for user ${userId}:`, error);
// Return 200 (not 500) so scheduler continues with other users
return c.json({ success: false, queueId, userId, error: errorMessage });
}
} catch (error) {
console.error('[ProcessUser] Error:', error);
return c.json({
error: 'Process user failed',
message: error instanceof Error ? error.message : 'Unknown error'
}, 500);
}
});
Key points:
- Idempotency protection (check status before processing)
- Return 200 even on failure (don’t block other Workers)
- Mark completed/failed in queue
Step 5: Process Single User
The actual user processing logic.
// web/backend/src/cron/process-single-user.ts
export async function processSingleUser(env: Env, userId: string): Promise<void> {
// Fetch user from D1
const user = await env.DB.prepare(
`SELECT id, github_username, github_pat, discord_webhook, telegram_token, telegram_chat_id
FROM users
WHERE id = ? AND is_active = 1`
)
.bind(userId)
.first<User>();
if (!user) {
throw new Error(`User ${userId} not found or inactive`);
}
if (!user.github_pat) {
throw new Error(`User ${user.github_username} has no GitHub PAT configured`);
}
// Initialize services
const encryptionService = await createEncryptionService(env.ENCRYPTION_KEY);
const notificationService = createNotificationService(env);
// Decrypt GitHub PAT
const decryptedPat = await encryptionService.decrypt(user.github_pat);
// Create GitHub service
const githubService = createCachedGitHubService(decryptedPat, 5);
// Check contributions
const contributionsToday = await githubService.getContributionsToday(user.github_username);
const currentStreak = await githubService.getCurrentStreak(user.github_username);
// Prepare notification message
const notificationMessage = {
username: user.github_username,
currentStreak,
contributionsToday,
message: contributionsToday > 0
? `Great job! You made ${contributionsToday} contribution${contributionsToday > 1 ? 's' : ''} today! Your ${currentStreak}-day streak is safe. Keep it up!`
: `You have not made any contributions today! Your ${currentStreak}-day streak is at risk. Make a commit to keep it alive!`,
};
// Send Discord notification if configured
if (user.discord_webhook) {
try {
const discordResult = await notificationService.sendDiscordNotification(
user.discord_webhook,
notificationMessage
);
await logNotification(env, user.id, 'discord', discordResult.success ? 'sent' : 'failed', discordResult.error);
if (discordResult.success) {
console.log(`[Process] Discord notification sent to ${user.github_username}`);
} else {
console.error(`[Process] Discord notification failed for ${user.github_username}:`, discordResult.error);
}
} catch (error) {
console.error(`[Process] Error sending Discord notification to ${user.github_username}:`, error);
}
}
// Send Telegram notification if configured
if (user.telegram_token && user.telegram_chat_id) {
try {
const telegramResult = await notificationService.sendTelegramNotification(
user.telegram_token,
user.telegram_chat_id,
notificationMessage
);
await logNotification(env, user.id, 'telegram', telegramResult.success ? 'sent' : 'failed', telegramResult.error);
if (telegramResult.success) {
console.log(`[Process] Telegram notification sent to ${user.github_username}`);
} else {
console.error(`[Process] Telegram notification failed for ${user.github_username}:`, telegramResult.error);
}
} catch (error) {
console.error(`[Process] Error sending Telegram notification to ${user.github_username}:`, error);
}
}
console.log(`[Process] User ${user.github_username} processed successfully`);
}
Advanced Features
1. Stale Item Requeuing
What if a Worker crashes? Items stuck in “processing” need to be requeued.
// web/backend/src/services/queue.ts
export async function requeueStaleProcessing(
env: Env,
minutes: number = 10
): Promise<number> {
const result = await env.DB.prepare(`
UPDATE cron_queue
SET status="pending", started_at = NULL
WHERE status="processing"
AND started_at < datetime('now', '-' || ? || ' minutes')
`)
.bind(minutes)
.run();
return result.meta.changes;
}
Usage in scheduler:
// Reaper for stale processing items (10+ minutes)
ctx.waitUntil(
requeueStaleProcessing(env, 10)
.then((requeued) => {
if (requeued > 0) {
console.log(`[Scheduled] Requeued ${requeued} stale processing items`);
}
})
.catch((error: Error) => {
console.error('[Scheduled] Error requeuing stale items:', error);
})
);
2. Batch Cleanup
Delete old batches to prevent database bloat.
// web/backend/src/services/queue.ts
export async function cleanupOldBatches(
env: Env,
daysOld: number = 7
): Promise<number> {
const result = await env.DB.prepare(`
DELETE FROM cron_queue
WHERE created_at < datetime('now', '-' || ? || ' days')
`)
.bind(daysOld)
.run();
return result.meta.changes;
}
Usage in scheduler:
// Cleanup old batches (7+ days)
ctx.waitUntil(
cleanupOldBatches(env, 7)
.then((deleted) => {
if (deleted > 0) {
console.log(`[Scheduled] Cleaned up ${deleted} old queue items`);
}
})
.catch((error: Error) => {
console.error('[Scheduled] Error cleaning up old batches:', error);
})
);
3. Batch Progress Tracking
Monitor batch progress in real-time.
// web/backend/src/services/queue.ts
export interface BatchProgress {
pending: number;
processing: number;
completed: number;
failed: number;
total: number;
}
export async function getBatchProgress(
env: Env,
batchId: string
): Promise<BatchProgress> {
const results = await env.DB.prepare(`
SELECT status, COUNT(*) as count
FROM cron_queue
WHERE batch_id = ?
GROUP BY status
`)
.bind(batchId)
.all();
const progress: BatchProgress = {
pending: 0,
processing: 0,
completed: 0,
failed: 0,
total: 0,
};
for (const row of results.results as Array<{ status: string; count: number }>) {
const status = row.status as keyof Omit<BatchProgress, 'total'>;
progress[status] = row.count;
progress.total += row.count;
}
return progress;
}
API endpoint:
// web/backend/src/routes/cron.ts
app.get('/batch/:batchId', async (c) => {
const secret = c.req.header('X-Cron-Secret');
if (!c.env.SERVER_SECRET || secret !== c.env.SERVER_SECRET) {
return c.json({ error: 'Unauthorized' }, 401);
}
try {
const batchId = c.req.param('batchId');
const progress = await getBatchProgress(c.env, batchId);
return c.json({
batchId,
progress,
percentage: progress.total > 0
? Math.round(((progress.completed + progress.failed) / progress.total) * 100)
: 0,
});
} catch (error) {
console.error('[BatchProgress] Error:', error);
return c.json({ error: 'Failed to get batch progress' }, 500);
}
});
Performance Analysis
Before (Sequential Processing)
10 users × 3 seconds = 30 seconds
CPU time: 30 seconds (at limit!)
Wall time: 30 seconds
Success rate: 0% (TLE errors)
After (Distributed Processing)
10 users / 10 Workers = 1 user per Worker
CPU time per Worker: 3 seconds
Wall time: ~10 seconds (parallel)
Success rate: 100%
Scalability
Current load:
- 10 users/day
- 10 Workers dispatched
- ~10 seconds total processing time
Theoretical capacity:
- Cloudflare Workers: 100,000 requests/day (free tier)
- D1 writes: 50,000/day (free tier)
- Bottleneck: D1 writes (2 writes per user = 25,000 users/day)
Headroom: 2500x current load
Cost Analysis
Free tier limits:
- Cloudflare Workers: 100k req/day
- D1 database: 50k writes/day
- Koyeb VPS: 512MB RAM
Current usage:
- Workers: ~20 req/day (10 users × 2 endpoints)
- D1 writes: ~40 writes/day (queue + notifications)
- VPS: ~20MB RAM
Cost: $0/month
Lessons Learned
1. Service Bindings Are Powerful
Each env.SELF.fetch() creates a new Worker instance with fresh CPU budget. This is the key to scaling beyond single-Worker limits.
2. D1 Is Fast Enough for Queues
No need for Redis or SQS. D1 SQLite handles job queues perfectly:
- Atomic operations with CTE + UPDATE + RETURNING
- Fast queries (< 10ms)
- Built-in indexes
- Free tier generous enough
3. Atomic Operations Prevent Races
Without atomic claiming, multiple Workers would process the same user. CTE + UPDATE + RETURNING in single statement solves this.
4. Idempotency Is Critical
Check status before processing. Safe retries, no duplicate notifications.
5. Stale Item Requeuing Is Essential
Workers crash. Items get stuck. Reaper process requeues them after 10 minutes.
6. Return 200 Even on Failure
If a Worker fails processing one user, return 200 (not 500). Don’t block other Workers.
What’s Next?
This completes the 3-part series on building Streaky:
- Part 1: The journey from sequential to distributed processing
- Part 2: Solving IP blocking with Rust VPS
- Part 3: Distributed queue system with Service Bindings
Try It Out
Live App: streakyy.vercel.app
GitHub: github.com/0xReLogic/Streaky
Queue Code: web/backend/src/services/queue.ts
Let’s Connect
Building distributed systems on Cloudflare? Have questions about Service Bindings or D1? Drop a comment!
GitHub: @0xReLogic
Project: Streaky
