Complete Examples
Example 1: Data Migration Pipeline
This example shows a complete data migration from a legacy database to MongoDB, with validation and error handling.
Scenario
Migrate customer data from a legacy SQL database to MongoDB, transforming the data structure and validating data quality.
Requirements
- Extract customers from legacy database
- Transform to new schema
- Validate email addresses and phone numbers
- Handle invalid records separately
- Load valid records to MongoDB
- Generate migration report
ETL Chainset: “CustomerMigration”
Chain: “MigrateCustomers”
1. JDBC Reader
Connection: legacy_db
Query: SELECT * FROM customers WHERE migrated = 0
2. Transform to New Schema
- Map old field names to new field names
- Combine firstName + lastName → fullName
- Parse address into structured format
- Convert date formats
3. Validate Email
- Check email format with regex
- Filter invalid → Chain Call "LogInvalidRecord"
4. Validate Phone
- Check phone format
- Standardize format
- Filter invalid → Chain Call "LogInvalidRecord"
5. Validate Required Fields
- Check all required fields present
- Filter invalid → Chain Call "LogInvalidRecord"
6. Enrich Data
- Add migration timestamp
- Add source system identifier
- Generate new customer ID
7. MongoDB Writer
Pool: your-pool-name
Collection: customers
8. Update Legacy Database
- Mark records as migrated
- JDBC Writer: UPDATE customers SET migrated = 1
9. Count and Log
- Count successful migrations
- Chain Call "UpdateMigrationReport"
Chain: “LogInvalidRecord”
1. Add Error Information
- structure.AddFields: { "errorType": ..., "errorMessage": ..., "errorTimestamp": ... }
2. MongoDB Writer
Pool: your-pool-name
Collection: migration_errors
3. Return (allows main chain to continue)
Chain: “UpdateMigrationReport”
1. date.Today
Output Field: today
2. mongodb.AggregationDefinition
Pool: your-pool-name
Collection: migration_report
Aggregation: [{"$match": {"migrationDate": "${today}"}}]
3. string.Substitute
Field: aggregation
4. mongodb.Reader
5. Update Counters
- Increment successful count
- Update last processed timestamp
6. MongoDB Writer
Collection: migration_report
Running the Migration
- Create test input with sample legacy data
- Run “MigrateCustomers” chain in ETL Designer
- Verify results in MongoDB
- Check migration_errors collection for issues
- Review migration_report for statistics
- Schedule for full migration when ready
Expected Results
Valid Record:
{
"customerId": "CUST-2026-001",
"fullName": "Jane Doe",
"email": "jane.doe@example.com",
"phone": "+1-555-0123",
"address": {
"street": "123 Main St",
"city": "New York",
"state": "NY",
"zip": "10001"
},
"migrationTimestamp": "2026-03-03T14:30:00Z",
"sourceSystem": "legacy_crm"
}
Invalid Record (in errors collection):
{
"originalData": { ... },
"errorType": "invalid_email",
"errorMessage": "Email format invalid: jane.doe@",
"errorTimestamp": "2026-03-03T14:30:00Z"
}
Example 2: Scheduled Report Generation
This example shows automated daily report generation with email distribution.
Scenario
Generate a daily sales report every morning at 8 AM, showing sales by region and product category, and email it to the management team.
ETL Chainset: “ReportGeneration”
Chain: “DailySalesReport”
1. JSON Record (receive parameters from scheduler)
Input: {
"reportDate": "${yesterday}",
"recipients": ["sales@example.com", "management@example.com"]
}
2. mongodb.AggregationDefinition
Pool: your-pool-name
Collection: orders
Aggregation: [{"$match": {"orderDate": "${reportDate}", "status": "completed"}}]
3. string.Substitute
Field: aggregation
4. mongodb.Reader
5. Aggregate by Region
Group by: region
Sum: orderTotal
Count: orderCount
Average: orderTotal
6. Sort by Total Descending
Sort field: orderTotal
7. Chain Call "CalculateGrowth"
- Compare to previous day
- Calculate percentage change
8. Aggregate by Product Category
Group by: productCategory
Sum: orderTotal
Count: orderCount
9. Chain Call "GenerateExcelReport"
- Create formatted spreadsheet
- Include charts
- Add summary page
10. Compose Email
To: ${recipients}
Subject: "Daily Sales Report - ${reportDate}"
Body: "Please find attached the daily sales report."
Attachment: generated Excel file
11. Send Mail
Send the composed email
12. timestamp.Now
To Field: now
13. Log Report Generation
MongoDB Writer
Collection: report_log
Document: {
reportType: "daily_sales",
reportDate: "${reportDate}",
status: "sent",
timestamp: "${now}",
recipientCount: <count>
}
Chain: “CalculateGrowth”
1. mongodb.AggregationDefinition
Pool: your-pool-name
Collection: orders
Aggregation: [{"$match": {"orderDate": "${previousDay}"}}]
2. string.Substitute
Field: aggregation
3. mongodb.Reader
4. Aggregate Previous Day
Sum: orderTotal
5. Calculate Percentage Change
Formula: ((current - previous) / previous) * 100
6. Add Growth Fields
structure.AddFields: { "previousDayTotal": ..., "growthPercentage": ..., "growthDirection": "up"/"down" }
7. Return enriched data
Chain: “GenerateExcelReport”
1. Create Workbook
XLSX Create
2. Add Summary Sheet
- Total sales
- Order count
- Average order value
- Growth percentage
3. Add Region Sheet
- Sales by region table
- Bar chart
4. Add Product Category Sheet
- Sales by category table
- Pie chart
5. Format and Style
- Apply formatting
- Set column widths
- Add headers
6. Return Excel file
Scheduler Configuration
Job Name: Daily Sales Report
Type: ETL
Chainset: ReportGeneration
Chain: DailySalesReport
Schedule: Every day at 8:00 AM
Enabled: Yes
Parameters:
{
"reportDate": "${yesterday}",
"recipients": ["sales@example.com", "management@example.com"]
}
Sample Report Output
Email Subject: Daily Sales Report - 2026-03-02
Email Body:
Daily Sales Report Summary
Report Date: March 2, 2026
Total Sales: $125,450
Total Orders: 342
Average Order Value: $366.81
Growth vs Previous Day: +5.2%
Please see the attached Excel file for detailed breakdowns by region and product category.
Excel File Contents: - Summary sheet with key metrics - Region breakdown with chart - Product category breakdown with chart
Example 3: Real-time Data Enrichment
This example shows real-time order processing with data enrichment from multiple sources.
Scenario
When a new order is created, enrich it with customer information, product details, and pricing calculations, then store the enriched order.
ETL Chainset: “OrderProcessing”
Chain: “ProcessNewOrder”
1. Receive Order Data
Input: {
"orderId": "ORD-2026-001",
"customerId": "C12345",
"items": [
{ "sku": "WIDGET-A", "quantity": 2 },
{ "sku": "GADGET-B", "quantity": 1 }
]
}
2. Validate Order Structure
- Check required fields
- Validate data types
- If invalid: Chain Call "HandleInvalidOrder"
3. Chain Call "EnrichWithCustomerData"
- Get customer name, email, address
- Get customer tier (for discounts)
- Get payment method
4. For Each Order Item:
Chain Call "EnrichOrderItem"
- Get product details
- Get current price
- Calculate line total
5. Chain Call "CalculateOrderTotals"
- Sum line totals
- Calculate tax
- Apply discounts based on customer tier
- Calculate final total
6. Chain Call "CheckInventory"
- Verify items in stock
- Reserve inventory
- If insufficient: Chain Call "HandleInsufficientInventory"
7. Add Processing Metadata
- Processing timestamp
- Processing status
- Enrichment source versions
8. MongoDB Writer
Pool: your-pool-name
Collection: orders
9. Chain Call "TriggerFulfillment"
- Send to fulfillment system
- Create fulfillment workflow instance
10. Chain Call "SendOrderConfirmation"
- Email customer
- Include order details
Chain: “EnrichWithCustomerData”
1. mongodb.AggregationDefinition
Pool: your-pool-name
Collection: customers
Aggregation: [{"$match": {"customerId": "${customerId}"}}]
2. string.Substitute
Field: aggregation
3. mongodb.Reader
(emits one record per matching customer; emits nothing if no match — record is dropped)
4. Extract Customer Data
- name
- email
- shippingAddress
- customerTier
- paymentMethod
5. Merge into Order
workflow.MergeToElxPublic
6. Return enriched order
Chain: “EnrichOrderItem”
1. mongodb.AggregationDefinition
Pool: your-pool-name
Collection: products
Aggregation: [{"$match": {"sku": "${sku}"}}]
2. string.Substitute
Field: aggregation
3. mongodb.Reader
(emits one record per matching product; emits nothing if no match — record is dropped)
4. Get Current Price
- Base price
- Active promotions
- Volume discounts
5. Calculate Line Total
quantity * price
6. Add Product Details
- Product name
- Description
- Category
- Price
- Line total
7. Return enriched item
Chain: “CalculateOrderTotals”
1. Sum Line Totals
Array Reduce: sum all item totals
2. Calculate Tax
subtotal * taxRate
3. Apply Customer Tier Discount
If tier = "Gold": 10% discount
If tier = "Platinum": 15% discount
4. Calculate Final Total
subtotal - discount + tax
5. Add Totals to Order
- subtotal
- discount
- tax
- total
6. Return order with totals
Chain: “CheckInventory”
1. For Each Order Item:
mongodb.AggregationDefinition
Pool: your-pool-name
Collection: inventory
Aggregation: [{"$match": {"sku": "${sku}"}}]
2. string.Substitute
Field: aggregation
3. mongodb.Reader
4. Check Available Quantity
Compare available to ordered quantity
5. Reserve Inventory
MongoDB Update
Decrement available quantity
Add reservation record
6. Return reservation confirmation
Integration Points
Triggered by: - Workflow: “OrderWorkflow” OnEntry of “New” state - REST API: POST /api/orders - Message Queue: Kafka topic “new-orders”
Triggers: - Fulfillment workflow instance creation - Email notification - Inventory reservation
Expected Result
Enriched Order:
{
"orderId": "ORD-2026-001",
"customerId": "C12345",
"customerName": "Jane Doe",
"customerEmail": "jane.doe@example.com",
"customerTier": "Gold",
"shippingAddress": {
"street": "123 Main St",
"city": "New York",
"state": "NY",
"zip": "10001"
},
"items": [
{
"sku": "WIDGET-A",
"productName": "Premium Widget",
"quantity": 2,
"unitPrice": 25.00,
"lineTotal": 50.00
},
{
"sku": "GADGET-B",
"productName": "Super Gadget",
"quantity": 1,
"unitPrice": 75.00,
"lineTotal": 75.00
}
],
"subtotal": 125.00,
"discount": 12.50,
"tax": 11.25,
"total": 123.75,
"processingTimestamp": "2026-03-03T14:30:00Z",
"status": "processing"
}
Example 4: Multi-Source Data Integration
This example shows integrating data from multiple sources to create a unified view.
Scenario
Create a unified customer view by combining data from CRM, order history, support tickets, and marketing engagement.
ETL Chainset: “CustomerDataIntegration”
Chain: “BuildCustomer360View”
1. Receive Customer ID
Input: { "customerId": "C12345" }
2. Get CRM Data
JDBC Reader
Connection: crm_db
Query: SELECT * FROM customers WHERE id = '${customerId}'
3. mongodb.AggregationDefinition
Pool: your-pool-name
Collection: orders
Aggregation: [{"$match": {"customerId": "${customerId}"}}]
4. string.Substitute
Field: aggregation
5. mongodb.Reader (Get Order History)
6. Aggregate Order Metrics
- Total orders
- Total spent
- Average order value
- Last order date
- Favorite products
7. Get Support Tickets
REST API Call
URL: https://support.example.com/api/tickets
Query: customerId=${customerId}
8. Aggregate Support Metrics
- Total tickets
- Open tickets
- Average resolution time
- Satisfaction score
9. mongodb.AggregationDefinition
Pool: your-pool-name
Collection: engagement
Aggregation: [{"$match": {"customerId": "${customerId}"}}]
10. string.Substitute
Field: aggregation
11. mongodb.Reader (Get Marketing Engagement)
12. Aggregate Marketing Metrics
- Email open rate
- Click-through rate
- Campaign responses
- Preferences
13. Calculate Customer Score
Chain Call "CalculateCustomerScore"
- Based on purchase history
- Support interactions
- Engagement level
14. Determine Customer Segment
Chain Call "DetermineSegment"
- High value
- At risk
- Engaged
- Dormant
15. Combine All Data
Merge all sources into unified view
16. MongoDB Writer
Pool: your-pool-name
Collection: customer_360
17. Return unified customer view
Chain: “CalculateCustomerScore”
1. Score Purchase Behavior (0-40 points)
- Total spent
- Order frequency
- Recent activity
2. Score Support Interactions (0-30 points)
- Ticket count (fewer is better)
- Satisfaction ratings
- Resolution speed
3. Score Engagement (0-30 points)
- Email engagement
- Campaign responses
- Website activity
4. Calculate Total Score (0-100)
Sum all component scores
5. Assign Grade
90-100: A
80-89: B
70-79: C
60-69: D
<60: F
6. Return score and grade
Result
Customer 360 View:
{
"customerId": "C12345",
"profile": {
"name": "Jane Doe",
"email": "jane.doe@example.com",
"phone": "+1-555-0123",
"joinDate": "2024-06-15"
},
"orderMetrics": {
"totalOrders": 24,
"totalSpent": 3250.00,
"averageOrderValue": 135.42,
"lastOrderDate": "2026-02-28",
"favoriteProducts": ["WIDGET-A", "GADGET-B"]
},
"supportMetrics": {
"totalTickets": 3,
"openTickets": 0,
"averageResolutionTime": "4.5 hours",
"satisfactionScore": 4.8
},
"marketingMetrics": {
"emailOpenRate": 0.65,
"clickThroughRate": 0.12,
"campaignResponses": 8,
"preferences": ["electronics", "gadgets"]
},
"customerScore": 87,
"customerGrade": "B",
"segment": "High Value",
"lastUpdated": "2026-03-03T14:30:00Z"
}
Example 5: Log Processing Pipeline
This example shows processing and analyzing log files.
Scenario
Process application log files hourly, extract errors and warnings, aggregate statistics, and alert on anomalies.
ETL Chainset: “LogProcessing”
Chain: “ProcessApplicationLogs”
1. date.Today
Output Field: today
(then use date.Add-Subtract to subtract 1 day → yesterday field)
2. File Reader
Path: /var/logs/application/*.log
Pattern: app-${yesterday}-*.log
(string.Substitute on the pattern first to resolve ${yesterday})
3. Parse Log Lines
- Extract timestamp
- Extract log level
- Extract message
- Extract context (user, session, etc.)
4. Filter Errors and Warnings
Filter: level IN ["ERROR", "WARN"]
5. Categorize Errors
Chain Call "CategorizeError"
- Database errors
- API errors
- Validation errors
- System errors
6. Aggregate Statistics
Group by: errorCategory, hour
Count: occurrences
Collect: sample messages
7. Detect Anomalies
Chain Call "DetectAnomalies"
- Compare to historical averages
- Flag unusual spikes
8. MongoDB Writer
Collection: log_statistics
9. Filter Anomalies
Filter: isAnomaly = true
10. Send Alerts
Chain Call "SendAlertEmail"
- Notify operations team
- Include anomaly details
11. Archive Processed Logs
File Writer
Path: /var/logs/archive/
Chain: “CategorizeError”
1. Check Error Message Patterns
- Match against known patterns
- Regex matching
2. Assign Category
- "database" if contains "SQL", "MongoDB", "connection"
- "api" if contains "HTTP", "REST", "timeout"
- "validation" if contains "invalid", "required", "format"
- "system" otherwise
3. Extract Error Code
- Parse error codes from message
4. Add Category Fields
- errorCategory
- errorCode
- errorPattern
5. Return categorized error
Scheduler Configuration
Job Name: Hourly Log Processing
Type: ETL
Chainset: LogProcessing
Chain: ProcessApplicationLogs
Schedule: Every hour
Enabled: Yes
Sample Output
Log Statistics:
{
"date": "2026-03-03",
"hour": 14,
"errorCategory": "database",
"occurrences": 12,
"isAnomaly": false,
"samples": [
"Connection timeout to MongoDB",
"Query execution timeout"
]
}
Alert (if anomaly detected):
Subject: Log Anomaly Detected
Anomaly Details:
- Category: API Errors
- Hour: 14:00
- Occurrences: 156 (average: 12)
- Spike: 1200% increase
Sample Errors:
- HTTP 503 Service Unavailable
- Connection refused to payment gateway
- Timeout calling external API
Action Required: Investigate API connectivity issues
Next Steps
These examples demonstrate common ETL patterns. For more information:
- Best Practices - Design guidelines
- Integration - Use with other modules
- Core Concepts - Understand ETL fundamentals
- ETL Designer - Build your own chains
A comprehensive ETL Cookbook with additional recipes and step-by-step guides for common tasks is planned as a separate reference.