Skip to content

Implementation Guide โ€‹

๐Ÿš€ Getting Started โ€‹

This comprehensive guide walks through implementing Chainly SDK in your TypeScript applications, from basic setup to advanced enterprise patterns.

๐Ÿ“ฆ Installation & Setup โ€‹

Package Installation โ€‹

bash
# Install the SDK
npm install chainly-sdk

# Install peer dependencies for TypeScript projects
npm install --save-dev typescript @types/node

# Optional: Install additional integrations
npm install @chainly/express-middleware @chainly/metrics-collector

Basic Configuration โ€‹

typescript
// chainly.config.ts
import { ChainlyConfig } from 'chainly-sdk';

export const config: ChainlyConfig = {
  // Global retry policy
  defaultRetryPolicy: {
    maxRetries: 3,
    backoffStrategy: 'exponential',
    initialDelay: 1000
  },
  
  // Concurrency limits
  maxConcurrentTasks: 10,
  maxConcurrentWorkflows: 5,
  
  // Memory management
  memoryLimit: 512, // MB
  contextCleanupThreshold: 100, // Number of completed tasks
  
  // Monitoring
  enableMetrics: true,
  enableTracing: true,
  
  // Error handling
  defaultErrorHandler: 'log-and-continue',
  enableCircuitBreaker: true
};

๐Ÿ”ง Core Implementation Patterns โ€‹

1. Basic Workflow Creation โ€‹

Simple Sequential Workflow โ€‹

typescript
import { Workflow, Task } from 'chainly-sdk';

interface UserRegistrationContext {
  email: string;
  password: string;
  profile: UserProfile;
}

interface UserRegistrationResult {
  userId: string;
  verified: boolean;
  welcomeEmailSent: boolean;
}

// Define individual tasks
const validateUserTask = new Task<UserRegistrationContext, boolean>(
  'validate-user',
  async (context) => {
    // Validation logic
    if (!context.email || !context.password) {
      throw new Error('Missing required fields');
    }
    
    const isValidEmail = /^[^\s@]+@[^\s@]+\.[^\s@]+$/.test(context.email);
    if (!isValidEmail) {
      throw new Error('Invalid email format');
    }
    
    return true;
  },
  {
    timeout: 5000,
    retryPolicy: {
      maxRetries: 2,
      backoffStrategy: 'linear'
    }
  }
);

const createUserTask = new Task<UserRegistrationContext, string>(
  'create-user',
  async (context) => {
    // Database operation
    const hashedPassword = await hashPassword(context.password);
    const user = await userRepository.create({
      email: context.email,
      password: hashedPassword,
      profile: context.profile
    });
    
    return user.id;
  },
  {
    timeout: 10000,
    retryPolicy: {
      maxRetries: 3,
      backoffStrategy: 'exponential',
      retryCondition: (error) => error.code === 'DATABASE_ERROR'
    }
  }
);

const sendWelcomeEmailTask = new Task<{ userId: string; email: string }, boolean>(
  'send-welcome-email',
  async (context) => {
    await emailService.sendWelcomeEmail(context.email, context.userId);
    return true;
  },
  {
    timeout: 15000,
    retryPolicy: {
      maxRetries: 5,
      backoffStrategy: 'exponential'
    }
  }
);

// Create and configure workflow
const userRegistrationWorkflow = new Workflow<UserRegistrationContext, UserRegistrationResult>({
  name: 'user-registration',
  description: 'Complete user registration process'
});

// Add tasks with dependencies
userRegistrationWorkflow.addTask(validateUserTask);
userRegistrationWorkflow.addTask(createUserTask, { dependsOn: ['validate-user'] });
userRegistrationWorkflow.addTask(sendWelcomeEmailTask, { dependsOn: ['create-user'] });

// Execute workflow
async function registerUser(userData: UserRegistrationContext): Promise<UserRegistrationResult> {
  try {
    const result = await userRegistrationWorkflow.execute(userData);
    
    return {
      userId: result.get('create-user'),
      verified: result.get('validate-user'),
      welcomeEmailSent: result.get('send-welcome-email')
    };
  } catch (error) {
    console.error('User registration failed:', error);
    throw error;
  }
}

2. Parallel Task Execution โ€‹

Concurrent Data Processing โ€‹

typescript
import { Workflow, Task, ParallelGroup } from 'chainly-sdk';

interface DataProcessingContext {
  dataSet: DataRecord[];
  processingOptions: ProcessingOptions;
}

// Create parallel processing tasks
const validateDataTask = new Task('validate-data', async (context) => {
  return await dataValidator.validate(context.dataSet);
});

const enrichDataTask = new Task('enrich-data', async (context) => {
  return await dataEnricher.enrich(context.dataSet);
});

const analyzeDataTask = new Task('analyze-data', async (context) => {
  return await dataAnalyzer.analyze(context.dataSet);
});

// Create workflow with parallel execution
const dataProcessingWorkflow = new Workflow<DataProcessingContext>({
  name: 'data-processing-pipeline'
});

// Group tasks for parallel execution
const parallelProcessingGroup = new ParallelGroup([
  validateDataTask,
  enrichDataTask,
  analyzeDataTask
], {
  maxConcurrency: 3,
  failFast: false, // Continue even if one task fails
  aggregateResults: true
});

dataProcessingWorkflow.addParallelGroup(parallelProcessingGroup);

// Add final aggregation task
const aggregateResultsTask = new Task('aggregate-results', async (context) => {
  const validationResult = context.getResult('validate-data');
  const enrichmentResult = context.getResult('enrich-data');
  const analysisResult = context.getResult('analyze-data');
  
  return {
    isValid: validationResult?.isValid || false,
    enrichedRecords: enrichmentResult?.records || [],
    insights: analysisResult?.insights || []
  };
});

dataProcessingWorkflow.addTask(aggregateResultsTask, { 
  dependsOn: ['validate-data', 'enrich-data', 'analyze-data'] 
});

3. Conditional Workflow Branching โ€‹

Dynamic Workflow Execution โ€‹

typescript
import { Workflow, Task, ConditionalBranch } from 'chainly-sdk';

interface OrderProcessingContext {
  order: Order;
  customer: Customer;
  paymentMethod: PaymentMethod;
}

// Define conditional tasks
const standardShippingTask = new Task('standard-shipping', async (context) => {
  return await shippingService.scheduleStandardDelivery(context.order);
});

const expressShippingTask = new Task('express-shipping', async (context) => {
  return await shippingService.scheduleExpressDelivery(context.order);
});

const premiumProcessingTask = new Task('premium-processing', async (context) => {
  return await premiumService.processOrder(context.order);
});

// Create conditional branches
const shippingBranch = new ConditionalBranch({
  name: 'shipping-selection',
  conditions: [
    {
      condition: (context) => context.customer.isPremium && context.order.total > 100,
      tasks: [expressShippingTask]
    },
    {
      condition: (context) => context.customer.isPremium,
      tasks: [premiumProcessingTask, expressShippingTask]
    }
  ],
  fallback: [standardShippingTask]
});

const orderProcessingWorkflow = new Workflow<OrderProcessingContext>({
  name: 'order-processing'
});

orderProcessingWorkflow.addConditionalBranch(shippingBranch);

4. Event-Driven Workflows โ€‹

Real-time Event Processing โ€‹

typescript
import { EventDrivenWorkflow, EventTrigger, EventFilter } from 'chainly-sdk';

interface PaymentEventContext {
  paymentId: string;
  amount: number;
  currency: string;
  status: 'pending' | 'completed' | 'failed';
}

// Define event filters
const paymentCompletedFilter = new EventFilter<PaymentEventContext>({
  type: 'payment.completed',
  condition: (event) => event.status === 'completed' && event.amount > 0
});

const paymentFailedFilter = new EventFilter<PaymentEventContext>({
  type: 'payment.failed',
  condition: (event) => event.status === 'failed'
});

// Define event-triggered tasks
const updateOrderStatusTask = new Task('update-order-status', async (context) => {
  await orderService.updateStatus(context.paymentId, 'paid');
  return { updated: true };
});

const sendReceiptTask = new Task('send-receipt', async (context) => {
  await emailService.sendReceipt(context.paymentId);
  return { sent: true };
});

const handlePaymentFailureTask = new Task('handle-payment-failure', async (context) => {
  await notificationService.notifyPaymentFailure(context.paymentId);
  await orderService.updateStatus(context.paymentId, 'payment-failed');
  return { handled: true };
});

// Create event-driven workflow
const paymentEventWorkflow = new EventDrivenWorkflow({
  name: 'payment-event-handler'
});

// Register event handlers
paymentEventWorkflow.on(paymentCompletedFilter, [
  updateOrderStatusTask,
  sendReceiptTask
]);

paymentEventWorkflow.on(paymentFailedFilter, [
  handlePaymentFailureTask
]);

// Start listening for events
await paymentEventWorkflow.start();

// Emit events (typically from external systems)
paymentEventWorkflow.emit('payment.completed', {
  paymentId: 'pay_123',
  amount: 99.99,
  currency: 'USD',
  status: 'completed'
});

๐Ÿ› ๏ธ Advanced Implementation Patterns โ€‹

1. Custom Middleware Development โ€‹

Comprehensive Logging Middleware โ€‹

typescript
import { Middleware, MiddlewareContext, Logger } from 'chainly-sdk';

class AdvancedLoggingMiddleware extends Middleware {
  private logger: Logger;
  
  constructor(logger: Logger) {
    super();
    this.logger = logger;
  }
  
  async before(context: MiddlewareContext): Promise<void> {
    const { task, workflow, context: workflowContext } = context;
    
    this.logger.info('Task execution started', {
      taskId: task.id,
      taskName: task.name,
      workflowId: workflow.id,
      workflowName: workflow.name,
      correlationId: workflowContext.metadata.correlationId,
      traceId: workflowContext.metadata.traceId,
      timestamp: new Date().toISOString(),
      inputSize: JSON.stringify(workflowContext.data).length
    });
    
    // Add performance tracking
    context.metadata.performanceStart = process.hrtime.bigint();
  }
  
  async after(context: MiddlewareContext, result: any): Promise<void> {
    const performanceEnd = process.hrtime.bigint();
    const executionTime = Number(performanceEnd - context.metadata.performanceStart) / 1000000; // Convert to ms
    
    this.logger.info('Task execution completed', {
      taskId: context.task.id,
      taskName: context.task.name,
      workflowId: context.workflow.id,
      executionTimeMs: executionTime,
      resultSize: JSON.stringify(result).length,
      success: true,
      timestamp: new Date().toISOString()
    });
  }
  
  async onError(error: Error, context: MiddlewareContext): Promise<void> {
    const performanceEnd = process.hrtime.bigint();
    const executionTime = Number(performanceEnd - context.metadata.performanceStart) / 1000000;
    
    this.logger.error('Task execution failed', {
      taskId: context.task.id,
      taskName: context.task.name,
      workflowId: context.workflow.id,
      executionTimeMs: executionTime,
      error: {
        name: error.name,
        message: error.message,
        stack: error.stack
      },
      success: false,
      timestamp: new Date().toISOString()
    });
  }
}

Security Validation Middleware โ€‹

typescript
import { Middleware, MiddlewareContext, SecurityValidator } from 'chainly-sdk';

class SecurityMiddleware extends Middleware {
  private validator: SecurityValidator;
  
  constructor(validator: SecurityValidator) {
    super();
    this.validator = validator;
  }
  
  async before(context: MiddlewareContext): Promise<void> {
    // Validate user permissions
    const userId = context.context.metadata.userId;
    const taskPermissions = context.task.getRequiredPermissions();
    
    if (taskPermissions.length > 0) {
      const hasPermission = await this.validator.validatePermissions(userId, taskPermissions);
      if (!hasPermission) {
        throw new SecurityError(`User ${userId} lacks required permissions for task ${context.task.name}`);
      }
    }
    
    // Validate input data
    await this.validator.validateInput(context.context.data);
    
    // Rate limiting
    await this.validator.checkRateLimit(userId, context.task.name);
  }
  
  async after(context: MiddlewareContext, result: any): Promise<void> {
    // Sanitize output data
    const sanitizedResult = await this.validator.sanitizeOutput(result);
    
    // Update rate limiting counters
    const userId = context.context.metadata.userId;
    await this.validator.updateRateLimit(userId, context.task.name);
    
    return sanitizedResult;
  }
  
  async onError(error: Error, context: MiddlewareContext): Promise<void> {
    // Log security events
    if (error instanceof SecurityError) {
      await this.validator.logSecurityEvent({
        userId: context.context.metadata.userId,
        taskName: context.task.name,
        error: error.message,
        timestamp: new Date(),
        severity: 'high'
      });
    }
  }
}

2. Enterprise Integration Patterns โ€‹

Database Transaction Management โ€‹

typescript
import { Workflow, Task, TransactionManager } from 'chainly-sdk';

class DatabaseTransactionWorkflow extends Workflow {
  private transactionManager: TransactionManager;
  
  constructor(config: WorkflowConfig, transactionManager: TransactionManager) {
    super(config);
    this.transactionManager = transactionManager;
  }
  
  async execute<TContext, TResult>(context: TContext): Promise<TResult> {
    const transaction = await this.transactionManager.begin();
    
    try {
      // Add transaction context to all tasks
      const transactionalContext = {
        ...context,
        transaction: transaction
      };
      
      const result = await super.execute(transactionalContext);
      
      await transaction.commit();
      return result;
    } catch (error) {
      await transaction.rollback();
      throw error;
    }
  }
}

// Usage example
const orderProcessingWorkflow = new DatabaseTransactionWorkflow(
  { name: 'order-processing' },
  new PostgresTransactionManager(dbConnection)
);

const createOrderTask = new Task('create-order', async (context) => {
  return await context.transaction.query(
    'INSERT INTO orders (customer_id, total) VALUES ($1, $2) RETURNING id',
    [context.customerId, context.total]
  );
});

const updateInventoryTask = new Task('update-inventory', async (context) => {
  for (const item of context.items) {
    await context.transaction.query(
      'UPDATE inventory SET quantity = quantity - $1 WHERE product_id = $2',
      [item.quantity, item.productId]
    );
  }
});

orderProcessingWorkflow.addTask(createOrderTask);
orderProcessingWorkflow.addTask(updateInventoryTask, { dependsOn: ['create-order'] });

Message Queue Integration โ€‹

typescript
import { EventDrivenWorkflow, MessageQueueAdapter } from 'chainly-sdk';

class MessageQueueWorkflow extends EventDrivenWorkflow {
  private messageQueue: MessageQueueAdapter;
  
  constructor(config: WorkflowConfig, messageQueue: MessageQueueAdapter) {
    super(config);
    this.messageQueue = messageQueue;
  }
  
  async start(): Promise<void> {
    await super.start();
    
    // Subscribe to message queue events
    await this.messageQueue.subscribe('workflow.events', (message) => {
      this.emit(message.type, message.payload);
    });
  }
  
  async publishResult(taskName: string, result: any): Promise<void> {
    await this.messageQueue.publish('workflow.results', {
      workflowId: this.id,
      taskName,
      result,
      timestamp: new Date()
    });
  }
}

// Redis implementation
class RedisMessageQueueAdapter implements MessageQueueAdapter {
  private redis: Redis;
  
  constructor(redis: Redis) {
    this.redis = redis;
  }
  
  async subscribe(channel: string, handler: (message: any) => void): Promise<void> {
    await this.redis.subscribe(channel);
    this.redis.on('message', (receivedChannel, message) => {
      if (receivedChannel === channel) {
        handler(JSON.parse(message));
      }
    });
  }
  
  async publish(channel: string, message: any): Promise<void> {
    await this.redis.publish(channel, JSON.stringify(message));
  }
}

๐Ÿงช Testing Strategies โ€‹

Unit Testing Workflows โ€‹

typescript
import { WorkflowTestRunner, MockTask, TestContext } from 'chainly-sdk/testing';

describe('User Registration Workflow', () => {
  let testRunner: WorkflowTestRunner;
  let mockEmailService: jest.MockedObject<EmailService>;
  let mockUserRepository: jest.MockedObject<UserRepository>;
  
  beforeEach(() => {
    mockEmailService = createMockEmailService();
    mockUserRepository = createMockUserRepository();
    
    testRunner = new WorkflowTestRunner(userRegistrationWorkflow, {
      mocks: {
        emailService: mockEmailService,
        userRepository: mockUserRepository
      }
    });
  });
  
  it('should complete user registration successfully', async () => {
    // Arrange
    const testContext: UserRegistrationContext = {
      email: 'test@example.com',
      password: 'securePassword123',
      profile: { firstName: 'John', lastName: 'Doe' }
    };
    
    mockUserRepository.create.mockResolvedValue({ id: 'user-123' });
    mockEmailService.sendWelcomeEmail.mockResolvedValue(true);
    
    // Act
    const result = await testRunner.execute(testContext);
    
    // Assert
    expect(result.userId).toBe('user-123');
    expect(result.verified).toBe(true);
    expect(result.welcomeEmailSent).toBe(true);
    
    expect(mockUserRepository.create).toHaveBeenCalledWith({
      email: 'test@example.com',
      password: expect.any(String), // hashed password
      profile: testContext.profile
    });
    
    expect(mockEmailService.sendWelcomeEmail).toHaveBeenCalledWith(
      'test@example.com',
      'user-123'
    );
  });
  
  it('should handle validation errors gracefully', async () => {
    // Arrange
    const invalidContext: UserRegistrationContext = {
      email: 'invalid-email',
      password: '',
      profile: { firstName: 'John', lastName: 'Doe' }
    };
    
    // Act & Assert
    await expect(testRunner.execute(invalidContext))
      .rejects
      .toThrow('Invalid email format');
    
    expect(mockUserRepository.create).not.toHaveBeenCalled();
    expect(mockEmailService.sendWelcomeEmail).not.toHaveBeenCalled();
  });
});

Integration Testing โ€‹

typescript
import { WorkflowIntegrationTester } from 'chainly-sdk/testing';

describe('Order Processing Integration', () => {
  let integrationTester: WorkflowIntegrationTester;
  let testDatabase: TestDatabase;
  let testMessageQueue: TestMessageQueue;
  
  beforeAll(async () => {
    testDatabase = await createTestDatabase();
    testMessageQueue = await createTestMessageQueue();
    
    integrationTester = new WorkflowIntegrationTester({
      database: testDatabase,
      messageQueue: testMessageQueue,
      timeout: 30000
    });
  });
  
  afterAll(async () => {
    await testDatabase.cleanup();
    await testMessageQueue.cleanup();
  });
  
  it('should process order end-to-end', async () => {
    // Arrange
    const orderData = await testDatabase.createTestOrder();
    
    // Act
    const result = await integrationTester.executeWorkflow(
      orderProcessingWorkflow,
      { orderId: orderData.id }
    );
    
    // Assert
    expect(result.success).toBe(true);
    
    const updatedOrder = await testDatabase.getOrder(orderData.id);
    expect(updatedOrder.status).toBe('processed');
    
    const queueMessages = await testMessageQueue.getMessages('order.events');
    expect(queueMessages).toHaveLength(3); // start, processing, completed
  });
});

This implementation guide provides comprehensive patterns for building robust, scalable workflows with Chainly SDK while maintaining enterprise-grade reliability and developer productivity.

Built with precision engineering and innovative solutions.