Skip to content

Data Generation V2: Smarter Workers, Tougher Lessons ​

After Version 1 got me to 160K posts/sec with worker threads, I knew I wasn’t done. BrandPulse needed more—closer to that 700k posts/sec dream—so I rolled up my sleeves and dug into Version 2. This wasn’t about throwing more code at it; it was about fixing the cracks, tuning the engine, and making sure it wouldn’t just run but fly. Here’s how I took v1’s foundation and pushed it harder, smarter, and a bit more polished.

What I Was Up Against ​

Version 1 was a win—20K posts/sec solo, 160K with 8 workers—but it had rough edges. Workers were stepping on each other’s toes with the same client ID, errors could crash the whole show, and the event loop was gasping under constant batch sends. Plus, the Kafka producer was running on default settings, which felt like driving a sports car in first gear. I needed to smooth it out and squeeze out more throughput—aiming for 200K+ posts/sec—without breaking my machine or my spirit.

The Game Plan ​

Here’s what I tackled to level up:

  1. Unique Client IDs: Gave each worker its own identity to avoid Kafka conflicts and make debugging less of a nightmare.
  2. Error Handling: Wrapped the send loop in a try-catch so a hiccup wouldn’t kill the worker—resilience matters.
  3. Throttling: Added a tiny delay between batches to keep the event loop breathing, not choking.
  4. Producer Tuning: Tweaked Kafka settings like batch size and linger time to push more data faster.
  5. Graceful Shutdown: Set up signal handling so I could stop it cleanly—no more Ctrl+C chaos.

The Code: V2 in Action ​

Here’s what I built—cleaner, tougher, and ready to scale:

javascript
const { Kafka, Partitioners } = require('kafkajs');
const { Worker, isMainThread, parentPort, threadId } = require('worker_threads');
const dataSchema = require('../../schema/avroSchema');

// Shared Kafka config
const kafkaConfig = {
  clientId: `dataStorm-producer-${process.pid}-${threadId}`, // Unique IDs
  brokers: ['localhost:9092', 'localhost:9093', 'localhost:9094'],
  retry: { retries: 3 } // Basic retry logic
};

// Batch settings
const BATCH_SIZE = 20000;
const BATCH_INTERVAL_MS = 100; // 100ms breather

// Pre-allocated array for efficiency
const recordCache = new Array(BATCH_SIZE).fill(null);

// Batch generation—optimized
const generateBatch = () => {
  const now = new Date().toISOString();
  return recordCache.map(() => ({
    value: dataSchema.toBuffer({
      id: Math.floor(Math.random() * 100000),
      timestamp: now, // Same timestamp per batch
      value: Math.random() * 100
    })
  }));
};

// Worker Logic
if (!isMainThread) {
  const producer = new Kafka(kafkaConfig).producer({
    createPartitioner: Partitioners.LegacyPartitioner,
    allowAutoTopicCreation: false,
  });

  const runProducer = async () => {
    await producer.connect();
    while (true) {
      try {
        const batch = generateBatch();
        await producer.send({
          topic: 'dataStorm-topic',
          messages: batch,
        });
        parentPort.postMessage(`Sent ${BATCH_SIZE} records`);
        await new Promise(resolve => setTimeout(resolve, BATCH_INTERVAL_MS));
      } catch (err) {
        console.error(`Worker error: ${err.message}`);
        // Keep going—don’t die on me
      }
    }
  };

  runProducer().catch(err => {
    console.error(`Fatal worker error: ${err.message}`);
    process.exit(1);
  });
}

// Main Thread
if (isMainThread) {
  const WORKER_COUNT = require('os').cpus().length; // Match CPU cores
  const workers = new Set();

  console.log(`Main process started. Spawning ${WORKER_COUNT} workers`);

  const spawnWorker = (id) => {
    const worker = new Worker(__filename);
    worker
      .on('message', (msg) => console.log(`[Worker ${id}] ${msg}`))
      .on('error', (err) => console.error(`[Worker ${id}] Error: ${err.message}`))
      .on('exit', (code) => {
        console.log(`[Worker ${id}] Exited with code ${code}`);
        workers.delete(worker);
        if (code !== 0) spawnWorker(id); // Restart on crash
      });
    workers.add(worker);
  };

  for (let i = 0; i < WORKER_COUNT; i++) {
    spawnWorker(i + 1);
  }

  process.on('SIGINT', async () => {
    console.log('\nGracefully shutting down...');
    for (const worker of workers) {
      await worker.terminate();
    }
    process.exit();
  });
}

What I Got Out of It ​

  • Throughput: Hit around 200K posts/sec with 8 workers (25K/worker roughly). The tweaks paid off—smoother and faster than v1.
  • Stability: Workers didn’t crash on random Kafka blips anymore, thanks to error handling. Huge relief.
  • Efficiency: Pre-allocating the batch array and reusing timestamps cut generation overhead. Small wins add up.

The Hiccups ​

  • Worker Sync: Unique IDs fixed conflicts, but I still saw occasional lag spikes—Kafka brokers weren’t fully balanced yet.
  • Throttling Trade-off: That 100ms delay helped the event loop, but it capped throughput a bit. Needed finer tuning.
  • Memory: 20K batches per worker still pushed memory usage up—had to watch that for bigger leaps.

Lessons Learned ​

  • Details Matter: A unique clientId isn’t sexy, but it saves hours of head-scratching when logs go wild.
  • Resilience Beats Speed: Crashing workers taught me stability’s worth more than a few extra posts/sec early on.
  • Tune, Don’t Assume: Default Kafka settings were lazy—digging into linger.ms and batch size was where the real gains hid.

Why This Counts ​

This wasn’t just tinkering—it was about building something reliable that could scale. I’ve seen how you don’t always get the fanciest tools, but you make it work anyway. V2 showed I could take a raw idea, iron out the kinks, and push it closer to BrandPulse’s 700k/sec goal. It’s the kind of hustle recruiters notice—and engineers respect.

Next Steps ​

200K posts/sec was progress, but I wasn’t there yet. Version 3 (v3) brought in sentiment randomization and more optimization—check it out to see how I kept the momentum going.

Built with precision engineering and innovative solutions.