Skip to content

Technical Architecture

🏗️ System Design Overview

Chainly SDK is built on a modular, event-driven architecture that prioritizes type safety, performance, and extensibility. The system is designed to handle complex workflow orchestration while maintaining simplicity for developers and reliability for production environments.

🔧 Core Architecture Patterns

Event-Driven Architecture

The SDK leverages Node.js EventEmitter patterns for reactive programming:

typescript
interface WorkflowEvent {
  type: 'task:started' | 'task:completed' | 'task:failed' | 'workflow:completed';
  payload: {
    taskId?: string;
    workflowId: string;
    timestamp: Date;
    context?: any;
    error?: Error;
  };
}

class EventBus extends EventEmitter {
  private static instance: EventBus;
  
  static getInstance(): EventBus {
    if (!EventBus.instance) {
      EventBus.instance = new EventBus();
    }
    return EventBus.instance;
  }
  
  emit<T extends WorkflowEvent>(event: T['type'], payload: T['payload']): boolean {
    return super.emit(event, payload);
  }
}

Dependency Injection Pattern

typescript
interface ITaskExecutor {
  execute<T>(task: Task<T>, context: WorkflowContext): Promise<T>;
}

interface IErrorHandler {
  handle(error: Error, context: WorkflowContext): Promise<void>;
}

class WorkflowContainer {
  private services = new Map<string, any>();
  
  register<T>(token: string, implementation: T): void {
    this.services.set(token, implementation);
  }
  
  resolve<T>(token: string): T {
    const service = this.services.get(token);
    if (!service) {
      throw new Error(`Service ${token} not found`);
    }
    return service;
  }
}

🧩 Core Components Architecture

1. Workflow Engine

Workflow State Machine

typescript
enum WorkflowState {
  PENDING = 'pending',
  RUNNING = 'running',
  COMPLETED = 'completed',
  FAILED = 'failed',
  CANCELLED = 'cancelled'
}

interface WorkflowStateMachine {
  currentState: WorkflowState;
  transitions: Map<WorkflowState, WorkflowState[]>;
  
  canTransition(to: WorkflowState): boolean;
  transition(to: WorkflowState): void;
}

class Workflow<TContext = any, TResult = any> {
  private stateMachine: WorkflowStateMachine;
  private tasks: Map<string, Task>;
  private dependencyGraph: DependencyGraph;
  private executionPlan: ExecutionPlan;
  
  constructor(private config: WorkflowConfig) {
    this.stateMachine = new WorkflowStateMachine();
    this.tasks = new Map();
    this.dependencyGraph = new DependencyGraph();
  }
  
  async execute(context: TContext): Promise<TResult> {
    this.stateMachine.transition(WorkflowState.RUNNING);
    
    try {
      const plan = await this.generateExecutionPlan();
      const result = await this.executeplan(plan, context);
      
      this.stateMachine.transition(WorkflowState.COMPLETED);
      return result;
    } catch (error) {
      this.stateMachine.transition(WorkflowState.FAILED);
      throw error;
    }
  }
}

Dependency Resolution Algorithm

typescript
class DependencyGraph {
  private adjacencyList: Map<string, Set<string>> = new Map();
  private inDegree: Map<string, number> = new Map();
  
  addDependency(task: string, dependency: string): void {
    if (!this.adjacencyList.has(dependency)) {
      this.adjacencyList.set(dependency, new Set());
    }
    
    this.adjacencyList.get(dependency)!.add(task);
    this.inDegree.set(task, (this.inDegree.get(task) || 0) + 1);
  }
  
  topologicalSort(): string[] {
    const result: string[] = [];
    const queue: string[] = [];
    
    // Find all nodes with no incoming edges
    for (const [task, degree] of this.inDegree) {
      if (degree === 0) {
        queue.push(task);
      }
    }
    
    while (queue.length > 0) {
      const current = queue.shift()!;
      result.push(current);
      
      const dependents = this.adjacencyList.get(current) || new Set();
      for (const dependent of dependents) {
        this.inDegree.set(dependent, this.inDegree.get(dependent)! - 1);
        
        if (this.inDegree.get(dependent) === 0) {
          queue.push(dependent);
        }
      }
    }
    
    // Check for circular dependencies
    if (result.length !== this.inDegree.size) {
      throw new Error('Circular dependency detected');
    }
    
    return result;
  }
}

2. Task Management System

Task Abstraction Layer

typescript
interface TaskDefinition<TInput = any, TOutput = any> {
  name: string;
  handler: TaskHandler<TInput, TOutput>;
  dependencies?: string[];
  retryPolicy?: RetryPolicy;
  timeout?: number;
  validation?: ValidationSchema;
}

abstract class Task<TInput = any, TOutput = any> {
  protected readonly id: string;
  protected readonly name: string;
  protected state: TaskState = TaskState.PENDING;
  protected retryCount: number = 0;
  
  constructor(protected definition: TaskDefinition<TInput, TOutput>) {
    this.id = generateUUID();
    this.name = definition.name;
  }
  
  abstract execute(context: WorkflowContext): Promise<TOutput>;
  
  protected async validateInput(input: TInput): Promise<void> {
    if (this.definition.validation) {
      await this.definition.validation.validate(input);
    }
  }
  
  protected async handleRetry(error: Error): Promise<boolean> {
    if (!this.definition.retryPolicy) return false;
    
    const { maxRetries, backoffStrategy, retryCondition } = this.definition.retryPolicy;
    
    if (this.retryCount >= maxRetries) return false;
    if (retryCondition && !retryCondition(error)) return false;
    
    this.retryCount++;
    
    const delay = this.calculateBackoffDelay(backoffStrategy, this.retryCount);
    await this.sleep(delay);
    
    return true;
  }
  
  private calculateBackoffDelay(strategy: BackoffStrategy, attempt: number): number {
    switch (strategy) {
      case 'linear':
        return attempt * 1000;
      case 'exponential':
        return Math.pow(2, attempt) * 1000;
      case 'fixed':
        return 1000;
      default:
        return 1000;
    }
  }
}

Task Execution Engine

typescript
class TaskExecutor {
  private readonly concurrencyLimit: number;
  private readonly activeExecutions: Map<string, Promise<any>>;
  private readonly executionPool: Semaphore;
  
  constructor(concurrencyLimit: number = 10) {
    this.concurrencyLimit = concurrencyLimit;
    this.activeExecutions = new Map();
    this.executionPool = new Semaphore(concurrencyLimit);
  }
  
  async execute<T>(task: Task<T>, context: WorkflowContext): Promise<T> {
    await this.executionPool.acquire();
    
    try {
      const execution = this.executeWithTimeout(task, context);
      this.activeExecutions.set(task.id, execution);
      
      const result = await execution;
      return result;
    } finally {
      this.activeExecutions.delete(task.id);
      this.executionPool.release();
    }
  }
  
  private async executeWithTimeout<T>(
    task: Task<T>, 
    context: WorkflowContext
  ): Promise<T> {
    const timeout = task.definition.timeout || 30000;
    
    return Promise.race([
      task.execute(context),
      new Promise<never>((_, reject) => {
        setTimeout(() => reject(new TimeoutError(`Task ${task.name} timed out`)), timeout);
      })
    ]);
  }
}

3. Context Management

Context Propagation System

typescript
interface WorkflowContext {
  readonly id: string;
  readonly startTime: Date;
  data: Record<string, any>;
  metadata: ContextMetadata;
}

interface ContextMetadata {
  userId?: string;
  correlationId: string;
  traceId: string;
  parentSpanId?: string;
}

class ContextManager {
  private contextStack: WorkflowContext[] = [];
  
  createContext(initialData: Record<string, any> = {}): WorkflowContext {
    const context: WorkflowContext = {
      id: generateUUID(),
      startTime: new Date(),
      data: { ...initialData },
      metadata: {
        correlationId: generateUUID(),
        traceId: generateTraceId(),
        parentSpanId: this.getCurrentSpanId()
      }
    };
    
    this.contextStack.push(context);
    return context;
  }
  
  getCurrentContext(): WorkflowContext | null {
    return this.contextStack[this.contextStack.length - 1] || null;
  }
  
  updateContext(updates: Partial<WorkflowContext['data']>): void {
    const current = this.getCurrentContext();
    if (current) {
      Object.assign(current.data, updates);
    }
  }
  
  destroyContext(): void {
    this.contextStack.pop();
  }
}

4. Middleware Pipeline

Middleware Architecture

typescript
interface MiddlewareContext {
  task: Task;
  workflow: Workflow;
  context: WorkflowContext;
  metadata: MiddlewareMetadata;
}

interface MiddlewareMetadata {
  startTime: Date;
  executionId: string;
  previousResults: Map<string, any>;
}

abstract class Middleware {
  abstract before(context: MiddlewareContext): Promise<void>;
  abstract after(context: MiddlewareContext, result: any): Promise<void>;
  abstract onError(error: Error, context: MiddlewareContext): Promise<void>;
}

class MiddlewarePipeline {
  private middlewares: Middleware[] = [];
  
  use(middleware: Middleware): void {
    this.middlewares.push(middleware);
  }
  
  async executeBefore(context: MiddlewareContext): Promise<void> {
    for (const middleware of this.middlewares) {
      await middleware.before(context);
    }
  }
  
  async executeAfter(context: MiddlewareContext, result: any): Promise<void> {
    // Execute in reverse order
    for (let i = this.middlewares.length - 1; i >= 0; i--) {
      await this.middlewares[i].after(context, result);
    }
  }
  
  async executeOnError(error: Error, context: MiddlewareContext): Promise<void> {
    for (const middleware of this.middlewares) {
      await middleware.onError(error, context);
    }
  }
}

Built-in Middleware Examples

typescript
class LoggingMiddleware extends Middleware {
  async before(context: MiddlewareContext): Promise<void> {
    console.log(`[${context.metadata.executionId}] Starting task: ${context.task.name}`);
    context.metadata.startTime = new Date();
  }
  
  async after(context: MiddlewareContext, result: any): Promise<void> {
    const duration = Date.now() - context.metadata.startTime.getTime();
    console.log(`[${context.metadata.executionId}] Task ${context.task.name} completed in ${duration}ms`);
  }
  
  async onError(error: Error, context: MiddlewareContext): Promise<void> {
    console.error(`[${context.metadata.executionId}] Task ${context.task.name} failed:`, error);
  }
}

class MetricsMiddleware extends Middleware {
  private metrics: MetricsCollector;
  
  constructor(metrics: MetricsCollector) {
    super();
    this.metrics = metrics;
  }
  
  async before(context: MiddlewareContext): Promise<void> {
    this.metrics.increment('task.started', {
      taskName: context.task.name,
      workflowId: context.workflow.id
    });
  }
  
  async after(context: MiddlewareContext, result: any): Promise<void> {
    const duration = Date.now() - context.metadata.startTime.getTime();
    
    this.metrics.histogram('task.duration', duration, {
      taskName: context.task.name,
      workflowId: context.workflow.id
    });
    
    this.metrics.increment('task.completed', {
      taskName: context.task.name,
      workflowId: context.workflow.id
    });
  }
  
  async onError(error: Error, context: MiddlewareContext): Promise<void> {
    this.metrics.increment('task.failed', {
      taskName: context.task.name,
      workflowId: context.workflow.id,
      errorType: error.constructor.name
    });
  }
}

🔄 Advanced Patterns

Conditional Execution Engine

typescript
interface ConditionalBranch<T = any> {
  condition: (context: WorkflowContext) => boolean | Promise<boolean>;
  tasks: Task<T>[];
  fallback?: Task<T>[];
}

class ConditionalExecutor {
  async evaluateBranches<T>(
    branches: ConditionalBranch<T>[], 
    context: WorkflowContext
  ): Promise<Task<T>[]> {
    for (const branch of branches) {
      const shouldExecute = await branch.condition(context);
      if (shouldExecute) {
        return branch.tasks;
      }
    }
    
    // Return fallback if no conditions match
    const fallbackBranch = branches.find(b => b.fallback);
    return fallbackBranch?.fallback || [];
  }
}

Parallel Execution Coordinator

typescript
class ParallelExecutor {
  async executeParallel<T>(
    tasks: Task<T>[], 
    context: WorkflowContext,
    options: ParallelExecutionOptions = {}
  ): Promise<T[]> {
    const { maxConcurrency = Infinity, failFast = true } = options;
    
    const semaphore = new Semaphore(maxConcurrency);
    const promises = tasks.map(async (task) => {
      await semaphore.acquire();
      try {
        return await task.execute(context);
      } finally {
        semaphore.release();
      }
    });
    
    if (failFast) {
      return Promise.all(promises);
    } else {
      return Promise.allSettled(promises).then(results => 
        results.map(result => 
          result.status === 'fulfilled' ? result.value : null
        ).filter(Boolean)
      );
    }
  }
}

📊 Performance Optimizations

Memory Management

typescript
class MemoryManager {
  private readonly maxMemoryUsage: number;
  private readonly gcThreshold: number;
  
  constructor(maxMemoryMB: number = 512) {
    this.maxMemoryUsage = maxMemoryMB * 1024 * 1024;
    this.gcThreshold = this.maxMemoryUsage * 0.8;
  }
  
  checkMemoryUsage(): void {
    const usage = process.memoryUsage();
    
    if (usage.heapUsed > this.gcThreshold) {
      global.gc?.();
    }
    
    if (usage.heapUsed > this.maxMemoryUsage) {
      throw new Error('Memory limit exceeded');
    }
  }
  
  optimizeContext(context: WorkflowContext): void {
    // Remove large objects from context after task completion
    for (const [key, value] of Object.entries(context.data)) {
      if (this.isLargeObject(value)) {
        delete context.data[key];
      }
    }
  }
  
  private isLargeObject(obj: any): boolean {
    const size = JSON.stringify(obj).length;
    return size > 1024 * 1024; // 1MB threshold
  }
}

Execution Plan Optimization

typescript
class ExecutionPlanOptimizer {
  optimize(tasks: Task[], dependencies: DependencyGraph): ExecutionPlan {
    const sortedTasks = dependencies.topologicalSort();
    const parallelGroups = this.identifyParallelGroups(sortedTasks, dependencies);
    
    return new ExecutionPlan(parallelGroups);
  }
  
  private identifyParallelGroups(
    sortedTasks: string[], 
    dependencies: DependencyGraph
  ): string[][] {
    const groups: string[][] = [];
    const processed = new Set<string>();
    
    for (const task of sortedTasks) {
      if (processed.has(task)) continue;
      
      const group = this.findParallelGroup(task, sortedTasks, dependencies, processed);
      groups.push(group);
      
      group.forEach(t => processed.add(t));
    }
    
    return groups;
  }
  
  private findParallelGroup(
    startTask: string,
    allTasks: string[],
    dependencies: DependencyGraph,
    processed: Set<string>
  ): string[] {
    const group = [startTask];
    const startDeps = dependencies.getDependencies(startTask);
    
    for (const task of allTasks) {
      if (processed.has(task) || task === startTask) continue;
      
      const taskDeps = dependencies.getDependencies(task);
      
      // Can run in parallel if dependencies are the same or subset
      if (this.canRunInParallel(startDeps, taskDeps)) {
        group.push(task);
      }
    }
    
    return group;
  }
}

This architecture ensures Chainly SDK provides enterprise-grade workflow orchestration while maintaining developer-friendly APIs and high-performance execution patterns.

Built with precision engineering and innovative solutions.