Building Streaky: Distributed Queue System with Service Bindings (Part 3)




Part 3: Scaling to 1000+ Users

In Part 1, I shared the journey from sequential processing to distributed architecture. In Part 2, I explained how I solved IP blocking with a Rust proxy.

Now, let’s dive deep into the distributed queue system that makes it all work.




The Scalability Problem

After solving the IP blocking issue, I still had the CPU time limit problem. Processing 10 users sequentially took 30+ seconds. Cloudflare Workers have a 30-second CPU time limit.

The constraint:

  • Each user takes ~3 seconds to process (GitHub API + notifications)
  • 10 users × 3 seconds = 30 seconds
  • Add any overhead = Time Limit Exceeded (TLE)

The realization:
I can’t process users sequentially in a single Worker. I need to distribute the work across multiple Workers.




The Solution: Service Bindings + Queue

Core idea: Instead of one Worker processing N users, spawn N Workers each processing 1 user.

Architecture:

Scheduler Worker
    |
    |-- Dispatch Worker 1 (User A)
    |-- Dispatch Worker 2 (User B)
    |-- Dispatch Worker 3 (User C)
    |-- ...
    |-- Dispatch Worker N (User N)

Each Worker:
- Fresh CPU budget (30 seconds)
- Isolated execution context
- Parallel processing
Enter fullscreen mode

Exit fullscreen mode

Result:

  • 10 users processed in ~10 seconds (parallel)
  • Each Worker uses <5 seconds CPU time
  • No TLE errors
  • Scales to 1000+ users



Key Components



1. Queue Table (D1 SQLite)

The queue table tracks which users need processing and their status.

CREATE TABLE cron_queue (
  id TEXT PRIMARY KEY,
  user_id TEXT NOT NULL,
  batch_id TEXT NOT NULL,
  status TEXT NOT NULL CHECK(status IN ('pending', 'processing', 'completed', 'failed')),
  created_at TEXT NOT NULL DEFAULT (datetime('now')),
  started_at TEXT,
  completed_at TEXT,
  error_message TEXT,
  retry_count INTEGER NOT NULL DEFAULT 0
);

CREATE INDEX idx_cron_queue_status ON cron_queue(status);
CREATE INDEX idx_cron_queue_batch ON cron_queue(batch_id);
CREATE INDEX idx_cron_queue_user ON cron_queue(user_id);
Enter fullscreen mode

Exit fullscreen mode

Why D1?

  • Already part of the stack (no external dependencies)
  • Fast enough for job queues (< 10ms queries)
  • Supports atomic operations (CTE + UPDATE + RETURNING)
  • Free tier: 50,000 writes/day (plenty for this use case)



2. Service Bindings

Service Bindings allow a Worker to call itself, creating new Worker instances.

Configuration (wrangler.toml):

[[services]]
binding = "SELF"
service = "streaky"
Enter fullscreen mode

Exit fullscreen mode

Usage:

// Each fetch creates a NEW Worker instance
env.SELF.fetch('http://internal/api/cron/process-user', {
  method: 'POST',
  headers: {
    'X-Cron-Secret': env.SERVER_SECRET,
    'Content-Type': 'application/json',
  },
  body: JSON.stringify({
    queueId: queueItem.id,
    userId: queueItem.user_id,
  }),
})
Enter fullscreen mode

Exit fullscreen mode

Why Service Bindings?

  • Each env.SELF.fetch() = new Worker instance
  • Fresh CPU budget per instance
  • Automatic load balancing by Cloudflare
  • No external queue service needed (Redis, SQS, etc.)



Implementation



Step 1: Initialize Batch

When the cron trigger fires, create a batch of queue items.

// web/backend/src/services/queue.ts

export async function initializeBatch(
  env: Env,
  userIds: string[]
): Promise<string> {
  const batchId = crypto.randomUUID();

  // Bulk insert users to queue
  for (const userId of userIds) {
    const queueId = crypto.randomUUID();
    await env.DB.prepare(
      `INSERT INTO cron_queue (id, user_id, batch_id, status)
       VALUES (?, ?, ?, 'pending')`
    )
      .bind(queueId, userId, batchId)
      .run();
  }

  return batchId;
}
Enter fullscreen mode

Exit fullscreen mode



Step 2: Atomic Queue Claiming

The critical part: prevent race conditions when multiple Workers try to claim the same user.

// web/backend/src/services/queue.ts

export async function claimNextPendingUserAtomic(
  env: Env
): Promise<QueueItem | null> {
  const result = await env.DB.prepare(`
    WITH next AS (
      SELECT id FROM cron_queue
      WHERE status="pending"
      ORDER BY created_at ASC
      LIMIT 1
    )
    UPDATE cron_queue
    SET status="processing", started_at = datetime('now')
    WHERE id IN (SELECT id FROM next)
    RETURNING id, user_id, batch_id
  `).all<QueueItem>();

  return result.results[0] ?? null;
}
Enter fullscreen mode

Exit fullscreen mode

Why atomic?

  • CTE (WITH) + UPDATE + RETURNING in single transaction
  • No gap between SELECT and UPDATE
  • Prevents duplicate processing
  • D1 SQLite guarantees atomicity

Without atomic claiming:

Worker 1: SELECT id WHERE status="pending" → Gets user A
Worker 2: SELECT id WHERE status="pending" → Gets user A (race!)
Worker 1: UPDATE status="processing" WHERE id=A
Worker 2: UPDATE status="processing" WHERE id=A
Result: Both workers process user A (duplicate!)
Enter fullscreen mode

Exit fullscreen mode

With atomic claiming:

Worker 1: CTE + UPDATE + RETURNING → Gets user A, marks processing
Worker 2: CTE + UPDATE + RETURNING → Gets user B, marks processing
Result: No duplicates, each worker gets unique user
Enter fullscreen mode

Exit fullscreen mode



Step 3: Scheduler (Main Worker)

The scheduler initializes the batch and dispatches Workers.

// web/backend/src/index.ts

export default {
  async scheduled(event: ScheduledEvent, env: Env, ctx: ExecutionContext) {
    console.log('[Scheduled] Cron trigger fired:', event.cron);

    // Query active users
    const usersResult = await env.DB.prepare(
      `SELECT id FROM users WHERE is_active = 1 AND github_pat IS NOT NULL`
    ).all();

    const userIds = usersResult.results.map((row: any) => row.id as string);

    if (userIds.length === 0) {
      console.log('[Scheduled] No active users to process');
      return;
    }

    // Initialize batch
    const batchId = await initializeBatch(env, userIds);
    console.log(`[Scheduled] Batch ${batchId} initialized with ${userIds.length} users`);

    // Dispatch Workers via Service Bindings
    for (let i = 0; i < userIds.length; i++) {
      const queueItem = await claimNextPendingUserAtomic(env);

      if (!queueItem) {
        console.log('[Scheduled] No more pending users in queue');
        break;
      }

      ctx.waitUntil(
        env.SELF.fetch('http://internal/api/cron/process-user', {
          method: 'POST',
          headers: {
            'X-Cron-Secret': env.SERVER_SECRET,
            'Content-Type': 'application/json',
          },
          body: JSON.stringify({
            queueId: queueItem.id,
            userId: queueItem.user_id,
          }),
        })
          .then((res) => {
            console.log(`[Scheduled] User ${queueItem.user_id} dispatched: ${res.status}`);
          })
          .catch((error: Error) => {
            console.error(`[Scheduled] User ${queueItem.user_id} dispatch failed:`, error);
          })
      );
    }

    console.log(`[Scheduled] All ${userIds.length} users dispatched for batch ${batchId}`);
  }
}
Enter fullscreen mode

Exit fullscreen mode

Key points:

  • ctx.waitUntil() ensures async operations complete
  • Each env.SELF.fetch() creates new Worker instance
  • Errors in one Worker don’t affect others



Step 4: Worker Instance (Process Single User)

Each Worker instance processes one user.

// web/backend/src/routes/cron.ts

app.post('/process-user', async (c) => {
  // Auth check
  const secret = c.req.header('X-Cron-Secret');
  if (!c.env.SERVER_SECRET || secret !== c.env.SERVER_SECRET) {
    return c.json({ error: 'Unauthorized' }, 401);
  }

  try {
    const body = await c.req.json<{ queueId: string; userId: string }>();
    const { queueId, userId } = body;

    if (!queueId || !userId) {
      return c.json({ error: 'Missing queueId or userId' }, 400);
    }

    // Idempotency check
    const status = await getQueueItemStatus(c.env, queueId);

    if (status === 'completed') {
      return c.json({ 
        success: true, 
        queueId, 
        userId, 
        skipped: true, 
        reason: 'Already completed' 
      });
    }

    if (status === 'failed') {
      return c.json({ 
        success: false, 
        queueId, 
        userId, 
        skipped: true, 
        reason: 'Already failed' 
      });
    }

    // Process user
    try {
      await processSingleUser(c.env, userId);
      await markCompleted(c.env, queueId);

      return c.json({ success: true, queueId, userId });
    } catch (error) {
      const errorMessage = error instanceof Error ? error.message : 'Unknown error';
      await markFailed(c.env, queueId, errorMessage);

      console.error(`[ProcessUser] Failed for user ${userId}:`, error);

      // Return 200 (not 500) so scheduler continues with other users
      return c.json({ success: false, queueId, userId, error: errorMessage });
    }
  } catch (error) {
    console.error('[ProcessUser] Error:', error);
    return c.json({ 
      error: 'Process user failed', 
      message: error instanceof Error ? error.message : 'Unknown error' 
    }, 500);
  }
});
Enter fullscreen mode

Exit fullscreen mode

Key points:

  • Idempotency protection (check status before processing)
  • Return 200 even on failure (don’t block other Workers)
  • Mark completed/failed in queue



Step 5: Process Single User

The actual user processing logic.

// web/backend/src/cron/process-single-user.ts

export async function processSingleUser(env: Env, userId: string): Promise<void> {
  // Fetch user from D1
  const user = await env.DB.prepare(
    `SELECT id, github_username, github_pat, discord_webhook, telegram_token, telegram_chat_id
     FROM users
     WHERE id = ? AND is_active = 1`
  )
    .bind(userId)
    .first<User>();

  if (!user) {
    throw new Error(`User ${userId} not found or inactive`);
  }

  if (!user.github_pat) {
    throw new Error(`User ${user.github_username} has no GitHub PAT configured`);
  }

  // Initialize services
  const encryptionService = await createEncryptionService(env.ENCRYPTION_KEY);
  const notificationService = createNotificationService(env);

  // Decrypt GitHub PAT
  const decryptedPat = await encryptionService.decrypt(user.github_pat);

  // Create GitHub service
  const githubService = createCachedGitHubService(decryptedPat, 5);

  // Check contributions
  const contributionsToday = await githubService.getContributionsToday(user.github_username);
  const currentStreak = await githubService.getCurrentStreak(user.github_username);

  // Prepare notification message
  const notificationMessage = {
    username: user.github_username,
    currentStreak,
    contributionsToday,
    message: contributionsToday > 0
      ? `Great job! You made ${contributionsToday} contribution${contributionsToday > 1 ? 's' : ''} today! Your ${currentStreak}-day streak is safe. Keep it up!`
      : `You have not made any contributions today! Your ${currentStreak}-day streak is at risk. Make a commit to keep it alive!`,
  };

  // Send Discord notification if configured
  if (user.discord_webhook) {
    try {
      const discordResult = await notificationService.sendDiscordNotification(
        user.discord_webhook,
        notificationMessage
      );

      await logNotification(env, user.id, 'discord', discordResult.success ? 'sent' : 'failed', discordResult.error);

      if (discordResult.success) {
        console.log(`[Process] Discord notification sent to ${user.github_username}`);
      } else {
        console.error(`[Process] Discord notification failed for ${user.github_username}:`, discordResult.error);
      }
    } catch (error) {
      console.error(`[Process] Error sending Discord notification to ${user.github_username}:`, error);
    }
  }

  // Send Telegram notification if configured
  if (user.telegram_token && user.telegram_chat_id) {
    try {
      const telegramResult = await notificationService.sendTelegramNotification(
        user.telegram_token,
        user.telegram_chat_id,
        notificationMessage
      );

      await logNotification(env, user.id, 'telegram', telegramResult.success ? 'sent' : 'failed', telegramResult.error);

      if (telegramResult.success) {
        console.log(`[Process] Telegram notification sent to ${user.github_username}`);
      } else {
        console.error(`[Process] Telegram notification failed for ${user.github_username}:`, telegramResult.error);
      }
    } catch (error) {
      console.error(`[Process] Error sending Telegram notification to ${user.github_username}:`, error);
    }
  }

  console.log(`[Process] User ${user.github_username} processed successfully`);
}
Enter fullscreen mode

Exit fullscreen mode




Advanced Features



1. Stale Item Requeuing

What if a Worker crashes? Items stuck in “processing” need to be requeued.

// web/backend/src/services/queue.ts

export async function requeueStaleProcessing(
  env: Env,
  minutes: number = 10
): Promise<number> {
  const result = await env.DB.prepare(`
    UPDATE cron_queue
    SET status="pending", started_at = NULL
    WHERE status="processing"
      AND started_at < datetime('now', '-' || ? || ' minutes')
  `)
    .bind(minutes)
    .run();

  return result.meta.changes;
}
Enter fullscreen mode

Exit fullscreen mode

Usage in scheduler:

// Reaper for stale processing items (10+ minutes)
ctx.waitUntil(
  requeueStaleProcessing(env, 10)
    .then((requeued) => {
      if (requeued > 0) {
        console.log(`[Scheduled] Requeued ${requeued} stale processing items`);
      }
    })
    .catch((error: Error) => {
      console.error('[Scheduled] Error requeuing stale items:', error);
    })
);
Enter fullscreen mode

Exit fullscreen mode



2. Batch Cleanup

Delete old batches to prevent database bloat.

// web/backend/src/services/queue.ts

export async function cleanupOldBatches(
  env: Env,
  daysOld: number = 7
): Promise<number> {
  const result = await env.DB.prepare(`
    DELETE FROM cron_queue
    WHERE created_at < datetime('now', '-' || ? || ' days')
  `)
    .bind(daysOld)
    .run();

  return result.meta.changes;
}
Enter fullscreen mode

Exit fullscreen mode

Usage in scheduler:

// Cleanup old batches (7+ days)
ctx.waitUntil(
  cleanupOldBatches(env, 7)
    .then((deleted) => {
      if (deleted > 0) {
        console.log(`[Scheduled] Cleaned up ${deleted} old queue items`);
      }
    })
    .catch((error: Error) => {
      console.error('[Scheduled] Error cleaning up old batches:', error);
    })
);
Enter fullscreen mode

Exit fullscreen mode



3. Batch Progress Tracking

Monitor batch progress in real-time.

// web/backend/src/services/queue.ts

export interface BatchProgress {
  pending: number;
  processing: number;
  completed: number;
  failed: number;
  total: number;
}

export async function getBatchProgress(
  env: Env,
  batchId: string
): Promise<BatchProgress> {
  const results = await env.DB.prepare(`
    SELECT status, COUNT(*) as count
    FROM cron_queue
    WHERE batch_id = ?
    GROUP BY status
  `)
    .bind(batchId)
    .all();

  const progress: BatchProgress = {
    pending: 0,
    processing: 0,
    completed: 0,
    failed: 0,
    total: 0,
  };

  for (const row of results.results as Array<{ status: string; count: number }>) {
    const status = row.status as keyof Omit<BatchProgress, 'total'>;
    progress[status] = row.count;
    progress.total += row.count;
  }

  return progress;
}
Enter fullscreen mode

Exit fullscreen mode

API endpoint:

// web/backend/src/routes/cron.ts

app.get('/batch/:batchId', async (c) => {
  const secret = c.req.header('X-Cron-Secret');
  if (!c.env.SERVER_SECRET || secret !== c.env.SERVER_SECRET) {
    return c.json({ error: 'Unauthorized' }, 401);
  }

  try {
    const batchId = c.req.param('batchId');
    const progress = await getBatchProgress(c.env, batchId);

    return c.json({
      batchId,
      progress,
      percentage: progress.total > 0 
        ? Math.round(((progress.completed + progress.failed) / progress.total) * 100) 
        : 0,
    });
  } catch (error) {
    console.error('[BatchProgress] Error:', error);
    return c.json({ error: 'Failed to get batch progress' }, 500);
  }
});
Enter fullscreen mode

Exit fullscreen mode




Performance Analysis



Before (Sequential Processing)

10 users × 3 seconds = 30 seconds
CPU time: 30 seconds (at limit!)
Wall time: 30 seconds
Success rate: 0% (TLE errors)
Enter fullscreen mode

Exit fullscreen mode



After (Distributed Processing)

10 users / 10 Workers = 1 user per Worker
CPU time per Worker: 3 seconds
Wall time: ~10 seconds (parallel)
Success rate: 100%
Enter fullscreen mode

Exit fullscreen mode



Scalability

Current load:

  • 10 users/day
  • 10 Workers dispatched
  • ~10 seconds total processing time

Theoretical capacity:

  • Cloudflare Workers: 100,000 requests/day (free tier)
  • D1 writes: 50,000/day (free tier)
  • Bottleneck: D1 writes (2 writes per user = 25,000 users/day)

Headroom: 2500x current load



Cost Analysis

Free tier limits:

  • Cloudflare Workers: 100k req/day
  • D1 database: 50k writes/day
  • Koyeb VPS: 512MB RAM

Current usage:

  • Workers: ~20 req/day (10 users × 2 endpoints)
  • D1 writes: ~40 writes/day (queue + notifications)
  • VPS: ~20MB RAM

Cost: $0/month




Lessons Learned



1. Service Bindings Are Powerful

Each env.SELF.fetch() creates a new Worker instance with fresh CPU budget. This is the key to scaling beyond single-Worker limits.



2. D1 Is Fast Enough for Queues

No need for Redis or SQS. D1 SQLite handles job queues perfectly:

  • Atomic operations with CTE + UPDATE + RETURNING
  • Fast queries (< 10ms)
  • Built-in indexes
  • Free tier generous enough



3. Atomic Operations Prevent Races

Without atomic claiming, multiple Workers would process the same user. CTE + UPDATE + RETURNING in single statement solves this.



4. Idempotency Is Critical

Check status before processing. Safe retries, no duplicate notifications.



5. Stale Item Requeuing Is Essential

Workers crash. Items get stuck. Reaper process requeues them after 10 minutes.



6. Return 200 Even on Failure

If a Worker fails processing one user, return 200 (not 500). Don’t block other Workers.




What’s Next?

This completes the 3-part series on building Streaky:

  • Part 1: The journey from sequential to distributed processing
  • Part 2: Solving IP blocking with Rust VPS
  • Part 3: Distributed queue system with Service Bindings



Try It Out

Live App: streakyy.vercel.app

GitHub: github.com/0xReLogic/Streaky

Queue Code: web/backend/src/services/queue.ts




Let’s Connect

Building distributed systems on Cloudflare? Have questions about Service Bindings or D1? Drop a comment!

GitHub: @0xReLogic

Project: Streaky



Source link

Leave a Reply

Your email address will not be published. Required fields are marked *