Complete Examples

Example 1: Data Migration Pipeline

This example shows a complete data migration from a legacy database to MongoDB, with validation and error handling.

Scenario

Migrate customer data from a legacy SQL database to MongoDB, transforming the data structure and validating data quality.

Requirements

  • Extract customers from legacy database
  • Transform to new schema
  • Validate email addresses and phone numbers
  • Handle invalid records separately
  • Load valid records to MongoDB
  • Generate migration report

ETL Chainset: “CustomerMigration”

Chain: “MigrateCustomers”

1. JDBC Reader
   Connection: legacy_db
   Query: SELECT * FROM customers WHERE migrated = 0
   
2. Transform to New Schema
   - Map old field names to new field names
   - Combine firstName + lastName → fullName
   - Parse address into structured format
   - Convert date formats

3. Validate Email
   - Check email format with regex
   - Filter invalid → Chain Call "LogInvalidRecord"
   
4. Validate Phone
   - Check phone format
   - Standardize format
   - Filter invalid → Chain Call "LogInvalidRecord"

5. Validate Required Fields
   - Check all required fields present
   - Filter invalid → Chain Call "LogInvalidRecord"

6. Enrich Data
   - Add migration timestamp
   - Add source system identifier
   - Generate new customer ID

7. MongoDB Writer
   Pool: your-pool-name
   Collection: customers
   
8. Update Legacy Database
   - Mark records as migrated
   - JDBC Writer: UPDATE customers SET migrated = 1

9. Count and Log
   - Count successful migrations
   - Chain Call "UpdateMigrationReport"

Chain: “LogInvalidRecord”

1. Add Error Information
   - structure.AddFields: { "errorType": ..., "errorMessage": ..., "errorTimestamp": ... }

2. MongoDB Writer
   Pool: your-pool-name
   Collection: migration_errors
   
3. Return (allows main chain to continue)

Chain: “UpdateMigrationReport”

1. date.Today
   Output Field: today

2. mongodb.AggregationDefinition
   Pool: your-pool-name
   Collection: migration_report
   Aggregation: [{"$match": {"migrationDate": "${today}"}}]

3. string.Substitute
   Field: aggregation

4. mongodb.Reader

5. Update Counters
   - Increment successful count
   - Update last processed timestamp

6. MongoDB Writer
   Collection: migration_report

Running the Migration

  1. Create test input with sample legacy data
  2. Run “MigrateCustomers” chain in ETL Designer
  3. Verify results in MongoDB
  4. Check migration_errors collection for issues
  5. Review migration_report for statistics
  6. Schedule for full migration when ready

Expected Results

Valid Record:

{
  "customerId": "CUST-2026-001",
  "fullName": "Jane Doe",
  "email": "jane.doe@example.com",
  "phone": "+1-555-0123",
  "address": {
    "street": "123 Main St",
    "city": "New York",
    "state": "NY",
    "zip": "10001"
  },
  "migrationTimestamp": "2026-03-03T14:30:00Z",
  "sourceSystem": "legacy_crm"
}

Invalid Record (in errors collection):

{
  "originalData": { ... },
  "errorType": "invalid_email",
  "errorMessage": "Email format invalid: jane.doe@",
  "errorTimestamp": "2026-03-03T14:30:00Z"
}

Example 2: Scheduled Report Generation

This example shows automated daily report generation with email distribution.

Scenario

Generate a daily sales report every morning at 8 AM, showing sales by region and product category, and email it to the management team.

ETL Chainset: “ReportGeneration”

Chain: “DailySalesReport”

1. JSON Record (receive parameters from scheduler)
   Input: {
     "reportDate": "${yesterday}",
     "recipients": ["sales@example.com", "management@example.com"]
   }

2. mongodb.AggregationDefinition
   Pool: your-pool-name
   Collection: orders
   Aggregation: [{"$match": {"orderDate": "${reportDate}", "status": "completed"}}]

3. string.Substitute
   Field: aggregation

4. mongodb.Reader

5. Aggregate by Region
   Group by: region
   Sum: orderTotal
   Count: orderCount
   Average: orderTotal

6. Sort by Total Descending
   Sort field: orderTotal

7. Chain Call "CalculateGrowth"
   - Compare to previous day
   - Calculate percentage change

8. Aggregate by Product Category
   Group by: productCategory
   Sum: orderTotal
   Count: orderCount

9. Chain Call "GenerateExcelReport"
   - Create formatted spreadsheet
   - Include charts
   - Add summary page

10. Compose Email
    To: ${recipients}
    Subject: "Daily Sales Report - ${reportDate}"
    Body: "Please find attached the daily sales report."
    Attachment: generated Excel file

11. Send Mail
    Send the composed email

12. timestamp.Now
    To Field: now

13. Log Report Generation
    MongoDB Writer
    Collection: report_log
    Document: {
      reportType: "daily_sales",
      reportDate: "${reportDate}",
      status: "sent",
      timestamp: "${now}",
      recipientCount: <count>
    }

Chain: “CalculateGrowth”

1. mongodb.AggregationDefinition
   Pool: your-pool-name
   Collection: orders
   Aggregation: [{"$match": {"orderDate": "${previousDay}"}}]

2. string.Substitute
   Field: aggregation

3. mongodb.Reader

4. Aggregate Previous Day
   Sum: orderTotal

5. Calculate Percentage Change
   Formula: ((current - previous) / previous) * 100

6. Add Growth Fields
   structure.AddFields: { "previousDayTotal": ..., "growthPercentage": ..., "growthDirection": "up"/"down" }

7. Return enriched data

Chain: “GenerateExcelReport”

1. Create Workbook
   XLSX Create

2. Add Summary Sheet
   - Total sales
   - Order count
   - Average order value
   - Growth percentage

3. Add Region Sheet
   - Sales by region table
   - Bar chart

4. Add Product Category Sheet
   - Sales by category table
   - Pie chart

5. Format and Style
   - Apply formatting
   - Set column widths
   - Add headers

6. Return Excel file

Scheduler Configuration

Job Name: Daily Sales Report
Type: ETL
Chainset: ReportGeneration
Chain: DailySalesReport
Schedule: Every day at 8:00 AM
Enabled: Yes
Parameters:
{
  "reportDate": "${yesterday}",
  "recipients": ["sales@example.com", "management@example.com"]
}

Sample Report Output

Email Subject: Daily Sales Report - 2026-03-02

Email Body:

Daily Sales Report Summary

Report Date: March 2, 2026
Total Sales: $125,450
Total Orders: 342
Average Order Value: $366.81
Growth vs Previous Day: +5.2%

Please see the attached Excel file for detailed breakdowns by region and product category.

Excel File Contents: - Summary sheet with key metrics - Region breakdown with chart - Product category breakdown with chart

Example 3: Real-time Data Enrichment

This example shows real-time order processing with data enrichment from multiple sources.

Scenario

When a new order is created, enrich it with customer information, product details, and pricing calculations, then store the enriched order.

ETL Chainset: “OrderProcessing”

Chain: “ProcessNewOrder”

1. Receive Order Data
   Input: {
     "orderId": "ORD-2026-001",
     "customerId": "C12345",
     "items": [
       { "sku": "WIDGET-A", "quantity": 2 },
       { "sku": "GADGET-B", "quantity": 1 }
     ]
   }

2. Validate Order Structure
   - Check required fields
   - Validate data types
   - If invalid: Chain Call "HandleInvalidOrder"

3. Chain Call "EnrichWithCustomerData"
   - Get customer name, email, address
   - Get customer tier (for discounts)
   - Get payment method

4. For Each Order Item:
   Chain Call "EnrichOrderItem"
   - Get product details
   - Get current price
   - Calculate line total

5. Chain Call "CalculateOrderTotals"
   - Sum line totals
   - Calculate tax
   - Apply discounts based on customer tier
   - Calculate final total

6. Chain Call "CheckInventory"
   - Verify items in stock
   - Reserve inventory
   - If insufficient: Chain Call "HandleInsufficientInventory"

7. Add Processing Metadata
   - Processing timestamp
   - Processing status
   - Enrichment source versions

8. MongoDB Writer
   Pool: your-pool-name
   Collection: orders
   
9. Chain Call "TriggerFulfillment"
   - Send to fulfillment system
   - Create fulfillment workflow instance

10. Chain Call "SendOrderConfirmation"
    - Email customer
    - Include order details

Chain: “EnrichWithCustomerData”

1. mongodb.AggregationDefinition
   Pool: your-pool-name
   Collection: customers
   Aggregation: [{"$match": {"customerId": "${customerId}"}}]

2. string.Substitute
   Field: aggregation

3. mongodb.Reader
   (emits one record per matching customer; emits nothing if no match — record is dropped)

4. Extract Customer Data
   - name
   - email
   - shippingAddress
   - customerTier
   - paymentMethod

5. Merge into Order
   workflow.MergeToElxPublic

6. Return enriched order

Chain: “EnrichOrderItem”

1. mongodb.AggregationDefinition
   Pool: your-pool-name
   Collection: products
   Aggregation: [{"$match": {"sku": "${sku}"}}]

2. string.Substitute
   Field: aggregation

3. mongodb.Reader
   (emits one record per matching product; emits nothing if no match — record is dropped)

4. Get Current Price
   - Base price
   - Active promotions
   - Volume discounts

5. Calculate Line Total
   quantity * price

6. Add Product Details
   - Product name
   - Description
   - Category
   - Price
   - Line total

7. Return enriched item

Chain: “CalculateOrderTotals”

1. Sum Line Totals
   Array Reduce: sum all item totals

2. Calculate Tax
   subtotal * taxRate

3. Apply Customer Tier Discount
   If tier = "Gold": 10% discount
   If tier = "Platinum": 15% discount

4. Calculate Final Total
   subtotal - discount + tax

5. Add Totals to Order
   - subtotal
   - discount
   - tax
   - total

6. Return order with totals

Chain: “CheckInventory”

1. For Each Order Item:
   mongodb.AggregationDefinition
   Pool: your-pool-name
   Collection: inventory
   Aggregation: [{"$match": {"sku": "${sku}"}}]

2. string.Substitute
   Field: aggregation

3. mongodb.Reader

4. Check Available Quantity
   Compare available to ordered quantity

5. Reserve Inventory
   MongoDB Update
   Decrement available quantity
   Add reservation record

6. Return reservation confirmation

Integration Points

Triggered by: - Workflow: “OrderWorkflow” OnEntry of “New” state - REST API: POST /api/orders - Message Queue: Kafka topic “new-orders”

Triggers: - Fulfillment workflow instance creation - Email notification - Inventory reservation

Expected Result

Enriched Order:

{
  "orderId": "ORD-2026-001",
  "customerId": "C12345",
  "customerName": "Jane Doe",
  "customerEmail": "jane.doe@example.com",
  "customerTier": "Gold",
  "shippingAddress": {
    "street": "123 Main St",
    "city": "New York",
    "state": "NY",
    "zip": "10001"
  },
  "items": [
    {
      "sku": "WIDGET-A",
      "productName": "Premium Widget",
      "quantity": 2,
      "unitPrice": 25.00,
      "lineTotal": 50.00
    },
    {
      "sku": "GADGET-B",
      "productName": "Super Gadget",
      "quantity": 1,
      "unitPrice": 75.00,
      "lineTotal": 75.00
    }
  ],
  "subtotal": 125.00,
  "discount": 12.50,
  "tax": 11.25,
  "total": 123.75,
  "processingTimestamp": "2026-03-03T14:30:00Z",
  "status": "processing"
}

Example 4: Multi-Source Data Integration

This example shows integrating data from multiple sources to create a unified view.

Scenario

Create a unified customer view by combining data from CRM, order history, support tickets, and marketing engagement.

ETL Chainset: “CustomerDataIntegration”

Chain: “BuildCustomer360View”

1. Receive Customer ID
   Input: { "customerId": "C12345" }

2. Get CRM Data
   JDBC Reader
   Connection: crm_db
   Query: SELECT * FROM customers WHERE id = '${customerId}'

3. mongodb.AggregationDefinition
   Pool: your-pool-name
   Collection: orders
   Aggregation: [{"$match": {"customerId": "${customerId}"}}]

4. string.Substitute
   Field: aggregation

5. mongodb.Reader (Get Order History)

6. Aggregate Order Metrics
   - Total orders
   - Total spent
   - Average order value
   - Last order date
   - Favorite products

7. Get Support Tickets
   REST API Call
   URL: https://support.example.com/api/tickets
   Query: customerId=${customerId}

8. Aggregate Support Metrics
   - Total tickets
   - Open tickets
   - Average resolution time
   - Satisfaction score

9. mongodb.AggregationDefinition
   Pool: your-pool-name
   Collection: engagement
   Aggregation: [{"$match": {"customerId": "${customerId}"}}]

10. string.Substitute
    Field: aggregation

11. mongodb.Reader (Get Marketing Engagement)

12. Aggregate Marketing Metrics
    - Email open rate
    - Click-through rate
    - Campaign responses
    - Preferences

13. Calculate Customer Score
    Chain Call "CalculateCustomerScore"
    - Based on purchase history
    - Support interactions
    - Engagement level

14. Determine Customer Segment
    Chain Call "DetermineSegment"
    - High value
    - At risk
    - Engaged
    - Dormant

15. Combine All Data
    Merge all sources into unified view

16. MongoDB Writer
    Pool: your-pool-name
    Collection: customer_360
    
17. Return unified customer view

Chain: “CalculateCustomerScore”

1. Score Purchase Behavior (0-40 points)
   - Total spent
   - Order frequency
   - Recent activity

2. Score Support Interactions (0-30 points)
   - Ticket count (fewer is better)
   - Satisfaction ratings
   - Resolution speed

3. Score Engagement (0-30 points)
   - Email engagement
   - Campaign responses
   - Website activity

4. Calculate Total Score (0-100)
   Sum all component scores

5. Assign Grade
   90-100: A
   80-89: B
   70-79: C
   60-69: D
   <60: F

6. Return score and grade

Result

Customer 360 View:

{
  "customerId": "C12345",
  "profile": {
    "name": "Jane Doe",
    "email": "jane.doe@example.com",
    "phone": "+1-555-0123",
    "joinDate": "2024-06-15"
  },
  "orderMetrics": {
    "totalOrders": 24,
    "totalSpent": 3250.00,
    "averageOrderValue": 135.42,
    "lastOrderDate": "2026-02-28",
    "favoriteProducts": ["WIDGET-A", "GADGET-B"]
  },
  "supportMetrics": {
    "totalTickets": 3,
    "openTickets": 0,
    "averageResolutionTime": "4.5 hours",
    "satisfactionScore": 4.8
  },
  "marketingMetrics": {
    "emailOpenRate": 0.65,
    "clickThroughRate": 0.12,
    "campaignResponses": 8,
    "preferences": ["electronics", "gadgets"]
  },
  "customerScore": 87,
  "customerGrade": "B",
  "segment": "High Value",
  "lastUpdated": "2026-03-03T14:30:00Z"
}

Example 5: Log Processing Pipeline

This example shows processing and analyzing log files.

Scenario

Process application log files hourly, extract errors and warnings, aggregate statistics, and alert on anomalies.

ETL Chainset: “LogProcessing”

Chain: “ProcessApplicationLogs”

1. date.Today
   Output Field: today
   (then use date.Add-Subtract to subtract 1 day → yesterday field)

2. File Reader
   Path: /var/logs/application/*.log
   Pattern: app-${yesterday}-*.log
   (string.Substitute on the pattern first to resolve ${yesterday})

3. Parse Log Lines
   - Extract timestamp
   - Extract log level
   - Extract message
   - Extract context (user, session, etc.)

4. Filter Errors and Warnings
   Filter: level IN ["ERROR", "WARN"]

5. Categorize Errors
   Chain Call "CategorizeError"
   - Database errors
   - API errors
   - Validation errors
   - System errors

6. Aggregate Statistics
   Group by: errorCategory, hour
   Count: occurrences
   Collect: sample messages

7. Detect Anomalies
   Chain Call "DetectAnomalies"
   - Compare to historical averages
   - Flag unusual spikes

8. MongoDB Writer
   Collection: log_statistics

9. Filter Anomalies
   Filter: isAnomaly = true

10. Send Alerts
    Chain Call "SendAlertEmail"
    - Notify operations team
    - Include anomaly details

11. Archive Processed Logs
    File Writer
    Path: /var/logs/archive/

Chain: “CategorizeError”

1. Check Error Message Patterns
   - Match against known patterns
   - Regex matching

2. Assign Category
   - "database" if contains "SQL", "MongoDB", "connection"
   - "api" if contains "HTTP", "REST", "timeout"
   - "validation" if contains "invalid", "required", "format"
   - "system" otherwise

3. Extract Error Code
   - Parse error codes from message

4. Add Category Fields
   - errorCategory
   - errorCode
   - errorPattern

5. Return categorized error

Scheduler Configuration

Job Name: Hourly Log Processing
Type: ETL
Chainset: LogProcessing
Chain: ProcessApplicationLogs
Schedule: Every hour
Enabled: Yes

Sample Output

Log Statistics:

{
  "date": "2026-03-03",
  "hour": 14,
  "errorCategory": "database",
  "occurrences": 12,
  "isAnomaly": false,
  "samples": [
    "Connection timeout to MongoDB",
    "Query execution timeout"
  ]
}

Alert (if anomaly detected):

Subject: Log Anomaly Detected

Anomaly Details:
- Category: API Errors
- Hour: 14:00
- Occurrences: 156 (average: 12)
- Spike: 1200% increase

Sample Errors:
- HTTP 503 Service Unavailable
- Connection refused to payment gateway
- Timeout calling external API

Action Required: Investigate API connectivity issues

Next Steps

These examples demonstrate common ETL patterns. For more information:

A comprehensive ETL Cookbook with additional recipes and step-by-step guides for common tasks is planned as a separate reference.