AES Fabric ETL User Guide
Version 1.0
Copyright © 2024-2030 Assurance eServices Inc. All rights reserved.
This software is licensed under the Business Source License 1.1 (BSL):
- Free for community use (up to 1,000 Task Runs/month)
- Commercial License required for usage exceeding 1,000 Task Runs/month
Change Date: 2030-06-01
After Change Date: Converts to Apache License 2.0
Contact: support@assuranceeservices.com
Website: www.assuranceeservices.com
Table of Contents
- Overview
- Architecture
- Prerequisites
- Installation & Setup
- Core Components
- Configuration
- Usage Guide
- Command Types
- Advanced Features
- Monitoring & Logging
- Troubleshooting
- Best Practices
- API Reference
Overview
AES Fabric ETL (aesetl_v1) is a metadata-driven ETL orchestration framework designed for Microsoft Fabric. It provides enterprise-grade capabilities for managing complex data pipelines with features including:
- Metadata-driven execution: Configure jobs via SQL database tables
- Parallel processing: Execute multiple tasks concurrently using ThreadPoolExecutor
- Incremental loading: Track and manage data extraction with key value ranges
- Multiple command types: Support for MSSQL, Spark SQL, Python, data files, and pipelines
- Variable substitution: Dynamic parameter replacement with Azure Key Vault integration
- Comprehensive logging: Thread-safe lakehouse-based logging with job auditing
- Error handling: Robust error management with configurable failure policies
- Resume capability: Prevent duplicate execution of running jobs
Architecture
High-Level Design
┌─────────────────────────────────────────────────────────────┐
│ aesetl_v1 Notebook │
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌─────────────────┐ │
│ │ Main Flow │→ │ Job Parts │→ │ Job Steps │ │
│ │ Controller │ │ Processor │ │ Executor │ │
│ └──────────────┘ └──────────────┘ └─────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
↓ ↓ ↓
┌─────────────┐ ┌──────────────┐ ┌──────────────┐
│ Fabric │ │ Task Helper │ │ Pipeline │
│ Helper │ │ Module │ │ Helper │
└─────────────┘ └──────────────┘ └──────────────┘
↓ ↓ ↓
┌──────────────────────────────────────────────────────┐
│ Microsoft Fabric Platform Services │
│ (Lakehouses, SQL DB, Pipelines, Key Vault, etc.) │
└──────────────────────────────────────────────────────┘
Component Structure
aesetl_v1/
├── aesetl_v1.ipynb # Main orchestration notebook
├── builtin/
│ └── scripts/
│ ├── fabric_helper.py # Fabric platform utilities
│ ├── lakehouse_helper.py # Lakehouse operations
│ ├── pipeline_helper.py # Pipeline management
│ └── task_helper.py # Task execution functions
├── dependencies/
│ ├── Environment/
│ └── Lakehouses/
└── env/
Prerequisites
Platform Requirements
- Microsoft Fabric Workspace with appropriate capacity
- Fabric SQL Database (DWInstrumentation) for instrumentation/metadata
- Fabric Lakehouse for data storage and logging
- Fabric Environment with
pymssqllibrary (required for SQL Server connectivity)
Important: The pymssql Python library is not available by default in Microsoft Fabric Runtime. You must create a Fabric Environment and add the library from external repositories (PyPI.org) in Quick mode. See Step 2: Create and Configure Environment for details.
Required Permissions
- Read/Write access to target Lakehouse(s)
- Execute, Read and Write permissions on (DWInstrumentation) SQL Database Tables and stored procedures
- Access to create/run Fabric pipelines
- Azure Key Vault access (if using secret management)
Database Schema
The instrumentation database must contain the following tables and stored procedures:
Core Tables:
dbo.ETLJobs– Job definitions and statusdbo.ETLJobSteps– Individual task configurationsdbo.ETLJobVariables– Job and global variablesdbo.ETLCommandTypes– Command type templatesdbo.ETLDataFileLoadDetails– Data file load tracking (DATAFILELOAD & DATAFILEPOSTPROCESS)
Audit Tables:
dbo.ETLJobAudit– Job execution audit trail (historical records of job runs)dbo.ETLJobStepAudit– Job step execution audit trail (historical records of step runs)
Stored Procedures:
dbo.ETLInitiateJob– Initialize job executiondbo.ETLGetJobParts– Retrieve job parts (step groupings)dbo.ETLGetJobStepsProcessNo– Get process numbers for parallel executiondbo.ETLGetJobStepsToProcess– Retrieve steps to executedbo.ETLGetJobStepCommandToProcess– Retrieve command XML for a specific stepdbo.ETLUpdateJobStepStatus– Update step execution statusdbo.ETLUpdateJobStatus– Update job execution statusdbo.ETLUpdateJobStepNextKeyValue– Update incremental load key valuesdbo.ETLGetJobVariables– Retrieve job and global variablesdbo.ETLFinalizeJob– Finalize job execution and update statusdbo.ETLInitiateFileAudit– Create audit record for DATAFILELOAD operationsdbo.ETLGetDataFileLoadDetails– Retrieve file load details for a stepdbo.ETLGetDataFilePostProcessDetails– Retrieve file post-process detailsdbo.ETLUpdateDataFileLoadStatus– Update file load statusdbo.ETLUpdateDataFilePostProcessStatus– Update file post-process status
Note: All commands are stored in XML format within the ETLJobSteps table.
UI Stored Procedures (Fabric ETL Control Panel):
These stored procedures are used by the Fabric ETL Control Panel application for job configuration and management:
Job Management:
dbo.UIAddJob– Create a new jobdbo.UIUpdateJob– Update job propertiesdbo.UICopyJob– Copy an existing job configurationdbo.UIGetJobs– Retrieve list of jobsdbo.UIGetJobAuditDetails– Get job execution history
Job Step Management:
dbo.UIAddJobStep– Create a new job stepdbo.UIUpdateJobStep– Update job step propertiesdbo.UICopyJobStep– Copy an existing job stepdbo.UIGetJobSteps– Retrieve job steps for a jobdbo.UIGetJobPartName– Get distinct job part namesdbo.UIGetJobStepMessage– Get job step status message
Command Configuration:
dbo.UIGetCommand– Retrieve command XML for a stepdbo.UIUpdateCommand– Update command XML for a stepdbo.UIGetCommandTypes– Get list of available command typesdbo.UIGetCommandTemplate– Get command template XML
Next Key Command Configuration:
dbo.UIGetNextKeyCommand– Retrieve next key command XMLdbo.UIUpdateNextKeyCommand– Update next key command XMLdbo.UIGetNextKeyCommandTypes– Get list of next key command typesdbo.UIGetNextKeyCommandTemplate– Get next key command template XML
Variable Management:
dbo.UIAddJobVariables– Create a new job variabledbo.UIUpdateJobVariables– Update job variabledbo.UIGetJobVariables– Retrieve job variables
Data File Tracking:
dbo.UIGetDataFileLoadDetails– View file load audit detailsdbo.UIGetDataFilePostProcessDetails– View file post-process audit details
System:
dbo.UITestConn– Test database connectivity
Installation & Setup
Step 1: Create Instrumentation Database
- Create a Fabric SQL Database in your workspace. Recommended to name as DWInstrumentation
- Execute the database schema scripts to create required tables and stored procedures
- Configure job metadata (Jobs, Job Steps, Job Variables)
- Important: All command configurations use XML format with CDATA sections for queries and scripts
Key Tables to Create:
ETLJobs– Job definitionsETLJobSteps– Task configurations (stores XML commands)ETLJobVariables– Variables and secretsETLCommandTypes– Reusable command templates (optional)ETLJobAudit– Job execution historyETLJobStepAudit– Job step execution historyETLDataFileLoadDetails– Data file load tracking
Step 2: Create and Configure Environment
Required: The pymssql library is not available by default in Fabric Runtime and must be installed via a custom Environment.
- Create a new Fabric Environment:
- Navigate to your Fabric workspace
- Click + New → Environment
- Name it (e.g., “AESETLEnvironment”)
- Add pymssql library:
- Open the Environment settings
- Go to Public Libraries tab
- Click Add from PyPI
- Select Quick mode
- Search for and add:
pymssql - Click Save and wait for the environment to publish
- Attach Environment to Notebook:
- Open
aesetl_v1.ipynb - In the notebook toolbar, select the Environment you created
- The notebook will now have access to the pymssql library
- Open
Note: Any additional custom libraries required by your ETL jobs should also be added to this Environment.
Step 3: Deploy Notebook
- Import
aesetl_v1.ipynbinto your Fabric workspace - Copy the
builtin/scripts/folder to the notebook’s resource directory - Attach the notebook to the Environment created in Step 2
Step 4: Configure Lakehouse
- Create or identify the target Lakehouse for data storage
- Create a
logs/folder structure in the Lakehouse Files area - Add the Lakehouse as a dependency to the notebook
Step 5: Set Parameters
Configure the notebook parameters (Cell 2):
strjobname = "YourJobName" # Job name from instrumentation DB
instrumentationdbname = "DWInstrumentation" # SQL DB name
lakehousetolog = "lhdw" # Lakehouse for logging
strsessionid = "manualrun" # Session identifier
pipelineid = "manualnotebookrun" # Pipeline run ID
Core Components
1. Main Notebook (aesetl_v1.ipynb)
The orchestration engine that:
- Initializes job execution
- Retrieves job configuration from instrumentation database
- Manages parallel processing of job parts
- Handles error conditions and logging
- Finalizes job status
2. Helper Modules
- Fabric Helper (
fabric_helper.py): Provides core Fabric platform functions - Lakehouse Helper (
lakehouse_helper.py): Manages Lakehouse operations and SCD merges - Task Helper (
task_helper.py): Executes specific command types - Pipeline Helper (
pipeline_helper.py): Manages Fabric pipeline orchestration
Configuration
Job Configuration Structure
Jobs are configured in the instrumentation database with the following hierarchy:
ETLJobs (Job Definition)
↓
ETLJobSteps (Individual tasks with logical grouping via strJobStepPartName)
↓
ETLJobVariables (Dynamic parameters - Global or Job-specific)
Execution Flow:
- Job steps are grouped by
strJobStepPartNameAliasJob Task Group(e.g., “01 Copy Tables”, “02 Shadow Deletes”) - Parts execute sequentially based on their naming order
- Within each part, all ProcessNo values (0, 1, 2, etc.) run in parallel
- Within each ProcessNo, multiple tasks are executed in order based on
intProcessOrder - Each step can have incremental loading via
strCurrentKeyValuetracking
Job Metadata Tables
ETLJobs Table
- intJobID (PK)
- strJobName (Unique job identifier)
- strJobDescription
- strJobStatus (Current status: P=Processing, C=Complete, F=Failed, I=Inactive)
- strJobDefaultProcessType (P = Delta, F = Full)
- strJobCurrentProcessType (P = Delta, F = Full)
- dtmStartDate (Job start timestamp)
- dtmEndDate (Job end timestamp)
- strSessionID (Session/pipeline run ID)
- strLogFileName (Log file path)
- dtmUpdateDate (Last update timestamp)
- strUpdateUser (User who last updated)
ETLJobSteps Table
Note: Job Steps are organized by strJobStepPartName (executed sequentially) and intProcessNo (executed in parallel within each part).
Key Fields:
- intJobStepID (PK)
- strJobName (Job identifier)
- strJobStepName (Task name/description)
- strJobStepPartName (Logical grouping/Job Task Group - parts run sequentially)
- strJobStepStatus (Step status: P=Processing, C=Complete, F=Failed, I=Inactive)
- intProcessNo (Parallel execution group within a part - all ProcessNo values run in parallel)
- intProcessOrder (Execution order within the same ProcessNo - lower values execute first)
- strStartKeyValue (Initial key value for incremental loading)
- strEndKeyValue (End key value for incremental loading)
- strCommandType (Task type - see Command Types section)
- strCommand (Command XML to execute)
- strNextKeyCommand (Query to get next key value - XML format)
- strNextKeyCommandType (Query type: MSSQL, SPARKSQL, PYTHON, INSTRUMENTATIONDBSQL)
- strCurrentKeyValue (Last successfully processed value)
- strNextKeyValue (Next key value calculated by strNextKeyCommand)
- strFailPackageonFailure (Y/N - stop job on error)
- dtmStartDate (Step start timestamp)
- dtmEndDate (Step end timestamp)
- strMessage (Status or error message)
- dtmUpdateDate (Last update timestamp)
- strUpdateUser (User who last updated)
ETLJobVariables Table
- intJobVariableID (PK)
- strJobName (Job name or 'Global' for global variables)
- strJobVariableName (Variable name)
- strJobVariableValue (Variable value)
- bitDeleted (Soft delete flag)
- dtmUpdateDate (Last update timestamp)
- strUpdateUser (User who last updated)
Note: Variables with strJobName = 'Global' are available to all jobs.
ETLJobAudit Table
This table maintains a historical record of all job executions, providing an audit trail for job runs.
- intJobAuditID (PK) - Unique audit record identifier
- intJobID - Foreign key to ETLJobs table
- strJobName - Job name
- strJobStatus - Job completion status (P=Processing, C=Complete, F=Failed)
- strJobDefaultProcessType - Default process type (P=Delta, F=Full)
- strJobCurrentProcessType - Process type used for this run
- dtmStartDate - Job start timestamp
- dtmEndDate - Job completion timestamp
- strSessionID - Session/pipeline run identifier
- strLogFileName - Path to log file for this run
- strMessage - Status or error message
- dtmUpdateDate - Audit record creation timestamp
- strUpdateUser - User who initiated the job
Purpose: Provides historical tracking of job executions for compliance, troubleshooting, and performance analysis.
ETLJobStepAudit Table
This table maintains a historical record of all job step executions, providing detailed audit trail for each task.
- intJobStepAuditID (PK) - Unique audit record identifier
- intJobStepID - Foreign key to ETLJobSteps table
- intJobAuditID - Foreign key to ETLJobAudit table (links to parent job run)
- strJobName - Job name
- strJobStepName - Step name/description
- strJobStepPartName - Logical grouping/Job Task Group
- strJobStepStatus - Step completion status (P=Processing, C=Complete, F=Failed, I=Inactive)
- intProcessNo - Parallel execution group number
- intProcessOrder - Execution order within ProcessNo
- strStartKeyValue - Key value at start of execution
- strEndKeyValue - Key value at end of execution
- strCommandType - Command type executed
- strCommand - Full command XML executed
- strNextKeyCommand - Next key command XML (if applicable)
- strNextKeyCommandType - Next key command type
- strCurrentKeyValue - Last successfully processed value before this run
- strNextKeyValue - Next key value calculated during this run
- strFailPackageonFailure - Whether failure stops entire job (Y/N)
- dtmStartDate - Step start timestamp
- dtmEndDate - Step completion timestamp
- strMessage - Status or error message
- dtmUpdateDate - Audit record creation timestamp
- strUpdateUser - User who initiated the job
Purpose: Provides detailed execution history for troubleshooting, performance analysis, and identifying patterns in step failures or execution times.
ETLDataFileLoadDetails Table
This table tracks individual file loads for DATAFILELOAD command types, maintaining a record of all files processed.
- intFileLoadID (PK) - Unique file load record identifier
- intJobStepID - Foreign key to ETLJobSteps table
- intJobStepAuditID - Foreign key to ETLJobStepAudit table (links to specific execution)
- strFileFolder - Folder path where file was stored
- strFileName - Name of the file loaded
- dtmFileDate - Date the file data represents
- strSessionID - Session/pipeline run identifier
- strStatus - File load status (P=Processing, C=Complete, F=Failed)
- strMessage - Status or error message
- dtmLoadStartDate - File load start timestamp
- dtmLoadEndDate - File load completion timestamp
- dtmCreateDate - Record creation timestamp
- strCreateUser - User who initiated the load
Purpose: Provides file-level audit trail for DATAFILELOAD operations, enabling tracking of which files have been processed, when, and their status. Useful for identifying missing files, reprocessing scenarios, and data quality investigations.
Variable Substitution
Variables are replaced in commands using the syntax: $$VariableName$$
Global Variables: Variables with strJobName = 'Global' are available to all jobs.
Example:
-- Define Global Variable:
INSERT INTO dbo.ETLJobVariables
(strJobName, strJobVariableName, strJobVariableValue)
VALUES
('Global', 'CurrencyLayerAPIKey', '223978105a8e1e10d737984b69a1665e');
-- Use in command:
<datafileloadpythonscript><![CDATA[
API_KEY = "$$CurrencyLayerAPIKey$$"
BASE_URL = "http://api.currencylayer.com/historical"
]]></datafileloadpythonscript>
Azure Key Vault Integration
Store sensitive values in Azure Key Vault and reference them using:
$azkv$KeyVaultName.secret.SecretName$azkv$
Example:
-- Define variable with Key Vault reference:
INSERT INTO dbo.ETLJobVariables
(strJobName, strJobVariableName, strJobVariableValue)
VALUES
('Global', 'azkv_sourcedata_connstring',
'$azkv$kvaesetlsandbox.secret.sqlserver1-connectionstring$azkv$');
-- Use in command:
<copymssql2lakehouse>
<sqlconnectionstringtype>connectstring</sqlconnectionstringtype>
<sqlconnectstring><![CDATA[$$azkv_sourcedata_connstring$$]]></sqlconnectstring>
...
</copymssql2lakehouse>
Supported Secret Types:
secret– Key Vault Secret- Additional types may be configured as needed
Usage Guide
Running a Job Manually
- Open the notebook:
aesetl_v1.ipynb - Set parameters (Cell 2):
strjobname = "DWCurrencyRates"
instrumentationdbname = "DWInstrumentation"
lakehousetolog = "lhdw"
strsessionid = "manualrun"
pipelineid = "manualnotebookrun"
- Run all cells or use “Run All” from the notebook menu
- Monitor execution:
- View real-time output in notebook
- Check log files in Lakehouse:
Files/logs/{JobName}/{Timestamp}_{SessionID}.log - Query instrumentation database for job status
Running from a Pipeline
- Create a Fabric Pipeline
- Add Notebook Activity:
- Select
aesetl_v1notebook - Configure parameters:
{ "strjobname": "YourJobName", "instrumentationdbname": "DWInstrumentation", "lakehousetolog": "lhdw", "strsessionid": "@{pipeline().RunId}", "pipelineid": "@{pipeline().RunId}" }
- Select
- Run the pipeline
Command Types
The framework supports multiple command types, each using XML format for configuration. Commands are stored in ETLJobSteps with XML structure wrapped in CDATA sections for SQL queries and Python scripts.
Command Templates
Command templates can be stored in the ETLCommandTypes table for reusability:
-- ETLCommandTypes Table Structure
- strCommandType (PK) - Command type identifier
- strCommandTemplate - XML template for the command
- strNextKeyCommandTemplate - XML template for next key calculation
- bitActive - Enable/disable command type
- dtmUpdateDate - Last update timestamp
- strUpdateUser - User who last updated
XML Format Guidelines
- All commands use XML structure
- SQL queries and Python scripts are wrapped in
<![CDATA[...]]>sections - Variable substitution uses
$$VariableName$$syntax - Key value placeholders use
@strStartKeyValueand@strEndKeyValue - Connection strings can reference gateways or use direct connections
1. COPYMSSQL2LAKEHOUSE
Purpose: Copy data from SQL Server to Lakehouse with Type 1 or Type 2 SCD merge
Command Format (XML):
<copymssql2lakehouse>
<sqlconnectionstringtype>gateway</sqlconnectionstringtype>
<sqlconnectstring>sqlserver1</sqlconnectstring>
<sqlquery><![CDATA[
SELECT *, ModifiedDate AS LastUpdateDate
FROM [AdventureWorks].[SalesLT].[Product]
WHERE ModifiedDate >= '@strStartKeyValue'
AND ModifiedDate <= '@strEndKeyValue'
]]></sqlquery>
<stagingtablename>lhdw.dwstg.adventureworks_saleslt_product</stagingtablename>
<destinationtablename>lhdw.dw.adventureworks_saleslt_product</destinationtablename>
<destinationtablemergetype>type2</destinationtablemergetype>
<destinationtablemergetype2datepart>SECOND</destinationtablemergetype2datepart>
<sourcetableprimarykeycolumns>productid</sourcetableprimarykeycolumns>
<desttablealternatekeycolumns>productid</desttablealternatekeycolumns>
<desttablecolumnsignorechanges></desttablecolumnsignorechanges>
<convertschematolowercase>true</convertschematolowercase>
<batchsize>100000</batchsize>
<refreshendpoint>true</refreshendpoint>
</copymssql2lakehouse>
**Connection String Types (sqlconnectionstringtype) **:
- gateway: References a gateway name (e.g., “sqlserver1”)
- connectstring: Direct connection string (supports variable substitution)
Destination Table Merge Type (destinationtablemergetype):
- type1: Performs Type 1 SCD (overwrites existing records)
- type2: Performs Type 2 SCD (maintains full history)
- type2datepart: Performs Type 1 until the record update date passes the specified datepart threshold, then performs Type 2
- For
destinationtablemergetype = type2datepart, the parameter destinationtablemergetype2datepart must be configured - Possible values:
SECOND,MINUTE,HOUR,DAY,MONTH,YEAR
- For
Example with Azure Key Vault Connection String:
<copymssql2lakehouse>
<sqlconnectionstringtype>connectstring</sqlconnectionstringtype>
<sqlconnectstring><![CDATA[$$azkv_sourcedata_connstring$$]]></sqlconnectstring>
...
</copymssql2lakehouse>
NextKeyCommand Example (XML):
<mssql>
<sqlconnectionstringtype>gateway</sqlconnectionstringtype>
<sqlconnectstring>sqlserver1</sqlconnectstring>
<sqlquery><![CDATA[
SELECT MAX(CONVERT(VARCHAR(23), ModifiedDate, 121)) AS strNextKeyValue
FROM [AdventureWorks].[SalesLT].[Product]
]]></sqlquery>
<lakehouse>lhdw</lakehouse>
</mssql>
2. PYTHON
Purpose: Execute custom Python code
Command Format (XML):
<python>
<pythonscript><![CDATA[
from datetime import datetime, timedelta, timezone
try:
# Available values to use in the context (alogfile_abfs_path, spark)
# Your code goes below here
# Sample code to return yesterday's date as strNextKeyValue
yesterday_utc = datetime.now(timezone.utc) - timedelta(days=1)
formatted_utc = yesterday_utc.strftime("%Y-%m-%d")
# Your transformation logic here
df = spark.table("dwstg.SourceTable")
df_transformed = df.withColumn("NewColumn", current_timestamp())
df_transformed.write.mode("overwrite").saveAsTable("dw.TargetTable")
except Exception as e:
raise
]]></pythonscript>
</python>
NextKeyCommand Format (XML):
<python>
<pythonscript><![CDATA[
from datetime import datetime, timedelta, timezone
try:
yesterday_utc = datetime.now(timezone.utc) - timedelta(days=1)
formatted_utc = yesterday_utc.strftime("%Y-%m-%d")
scriptresult = {"status":"success", "data":{"strNextKeyValue":formatted_utc}}
except Exception as e:
scriptresult = {"status":"error", "error":str(e)}
]]></pythonscript>
</python>
3. SPARKSQL
Purpose: Execute Spark SQL statements (supports parallel execution)
Command Format (XML):
<sparksql>
<sqlcommand><![CDATA[
OPTIMIZE lhdw.dw.adventureworks_saleslt_product ZORDER BY (_iscurrent, productid);
OPTIMIZE lhdw.dw.adventureworks_saleslt_address ZORDER BY (_iscurrent, addressid);
OPTIMIZE lhdw.dw.adventureworks_saleslt_productcategory ZORDER BY (_iscurrent, productcategoryid);
OPTIMIZE lhdw.dw.adventureworks_saleslt_customer ZORDER BY (_iscurrent, customerid);
]]></sqlcommand>
<executeparallel>true</executeparallel>
<maxparallelism>4</maxparallelism>
</sparksql>
Single Statement Example:
<sparksql>
<sqlcommand><![CDATA[
CREATE TABLE IF NOT EXISTS dw.FactSales
USING DELTA
AS
SELECT
s.SalesID,
s.ProductID,
s.CustomerID,
s.SalesAmount,
s.SalesDate
FROM dwstg.StagingSales s
WHERE s.ModifiedDate BETWEEN '@strStartKeyValue' AND '@strEndKeyValue'
]]></sqlcommand>
</sparksql>
NextKeyCommand Example (XML):
<sparksql>
<sqlquery><![CDATA[
SELECT date_format(to_utc_timestamp(current_timestamp(), current_timezone()), 'yyyy-MM-dd HH:mm:ss') as strNextKeyValue;
]]></sqlquery>
</sparksql>
3a. MSSQL
Purpose: Query SQL Server database (primarily used in NextKeyCommand for incremental load key calculations)
NextKeyCommand Format (XML):
<mssql>
<sqlconnectionstringtype>gateway</sqlconnectionstringtype>
<sqlconnectstring>sqlserver1</sqlconnectstring>
<sqlquery><![CDATA[
SELECT MAX(CONVERT(VARCHAR(23), ModifiedDate, 121)) AS strNextKeyValue
FROM [AdventureWorks].[SalesLT].[Product]
]]></sqlquery>
<lakehouse>lhdw</lakehouse>
</mssql>
Alternative with Direct Connection String:
<mssql>
<sqlconnectionstringtype>connectstring</sqlconnectionstringtype>
<sqlconnectstring><![CDATA[$$azkv_sourcedata_connstring$$]]></sqlconnectstring>
<sqlquery><![CDATA[
SELECT MAX(CONVERT(VARCHAR(23), ModifiedDate, 121)) AS strNextKeyValue
FROM [AdventureWorks].[SalesLT].[Address]
]]></sqlquery>
<lakehouse>lhdw</lakehouse>
</mssql>
Note: This command type is typically used for calculating the next key value from source systems rather than as a main command for data movement.
4. DATAFILELOAD
Purpose: Load data files from external APIs or generate files with custom Python scripts
Command Format (XML):
<datafileload>
<datafileworkspacename>aesfabricetl</datafileworkspacename>
<datafilelakehousename>lhdw</datafilelakehousename>
<datafilefolderpattern>exchangerates/yyyy/mm/dd</datafilefolderpattern>
<datafilenameprefixpattern>yyyymmdd</datafilenameprefixpattern>
<datafilename>_currencyexchangerates</datafilename>
<datafiletype>json</datafiletype>
<datafrequencytype>DAY</datafrequencytype>
<datafrequency>1</datafrequency>
<datafileloadpythonscript><![CDATA[
# Available values to use in the context (datadate, datafile, datafrequencytype, datafrequency, spark)
# Your code goes below here
# Get currency rates from API
import requests
import time
time.sleep(1)
API_KEY = "$$CurrencyLayerAPIKey$$"
BASE_URL = "http://api.currencylayer.com/historical"
lparams = {
"access_key": API_KEY,
"date": datadate[:10],
"source": "USD",
"format": 1
}
try:
response = requests.get(BASE_URL, params=lparams)
ldata = response.json()
if ldata["success"]:
scriptresult = {"status": "success", "data": ldata}
else:
scriptresult = {"status": "error", "error": ldata["error"]["info"]}
except Exception as e:
scriptresult = {"status": "error", "error": str(e)}
]]></datafileloadpythonscript>
</datafileload>
Date Pattern Tokens:
yyyy– 4-digit yearmm– 2-digit monthdd– 2-digit day
5. DATAFILEPOSTPROCESS
Purpose: Post-process data files loaded by DATAFILELOAD tasks
Command Format (XML):
<datafilepostprocess>
<datafileloadjobtaskid>5</datafileloadjobtaskid>
<datafilepostprocessworkspacename>aesfabricetl</datafilepostprocessworkspacename>
<datafilepostprocesslakehousename>lhdw</datafilepostprocesslakehousename>
<datafilepostprocesspythonscript><![CDATA[
# Available values to use in the context:
# (datafilepostprocessworkspace_id, datafilepostprocesslakehouse_id, datadate, datafile, spark)
import json
from datetime import datetime, timezone
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
from delta.tables import DeltaTable
from notebookutils import mssparkutils
# Construct full ABFS path to the table
lschema_name = "dw"
ltable_name = "currency_exchange_rates"
table_abfs_path = f"abfss://{datafilepostprocessworkspace_id}@onelake.dfs.fabric.microsoft.com/{datafilepostprocesslakehouse_id}/Tables/{lschema_name}/{ltable_name}"
scriptresult = {}
try:
lraw = mssparkutils.fs.head(datafile, 50 * 1024 * 1024)
lpayload = json.loads(lraw)
if not lpayload.get("success", False):
raise ValueError(lpayload.get("error", {}).get("info", "API payload indicates failure."))
lquotes = lpayload.get("quotes", {})
ldate = lpayload.get("date")
lingested_at = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
lrows = [
{
"rate_date": ldate,
"currencycode": k,
"currencyrate": float(v),
"_lastupdatedate": lingested_at
}
for k, v in lquotes.items()
]
schema = StructType([
StructField("rate_date", StringType(), True),
StructField("currencycode", StringType(), True),
StructField("currencyrate", DoubleType(), True),
StructField("_lastupdatedate", StringType(), True)
])
ldf_rates = spark.createDataFrame(lrows, schema=schema)
if DeltaTable.isDeltaTable(spark, table_abfs_path):
deltaTable = DeltaTable.forPath(spark, table_abfs_path)
deltaTable.alias("t").merge(
ldf_rates.alias("s"),
"t.rate_date = s.rate_date AND t.currencycode = s.currencycode"
).whenMatchedUpdate(set = {
"currencyrate": "s.currencyrate",
"_lastupdatedate": "s._lastupdatedate"
}).whenNotMatchedInsertAll().execute()
else:
ldf_rates.write.format("delta").save(table_abfs_path)
scriptresult = {
"status": "success",
"data": {
"table": table_abfs_path,
"rows_processed": len(lrows),
"rate_date": ldate
}
}
except Exception as e:
scriptresult = {"status": "error", "error": str(e)}
]]></datafilepostprocesspythonscript>
</datafilepostprocess>
6. RUNPIPELINE
Purpose: Trigger another Fabric pipeline
Command Format (XML):
<runpipeline>
<pipelinename>DMAdventureWorks</pipelinename>
<ignoreifrunning>true</ignoreifrunning>
<waitforcompletion>false</waitforcompletion>
<failtaskonfailure>false</failtaskonfailure>
</runpipeline>
Configuration Options:
- pipelinename: Name of the pipeline to execute
- ignoreifrunning: Skip if pipeline is already running
- waitforcompletion: Wait for pipeline to complete before proceeding
- failtaskonfailure: Fail this task if pipeline fails
7. SHADOWDELETEMSSQL2LH
Purpose: Detect and propagate deletions from source to lakehouse (soft delete for Current Record)
Command Format (XML):
<shadowdeletemssql2lakehouse>
<sqlgatewayname>sqlserver1</sqlgatewayname>
<sqlquery><![CDATA[
SELECT customerid
FROM [AdventureWorks].[SalesLT].[Customer]
ORDER BY customerid
]]></sqlquery>
<stagingtablename>lhdw.dwstg.pk__adventureworks_saleslt_customer</stagingtablename>
<destinationtablename>lhdw.dw.adventureworks_saleslt_customer</destinationtablename>
<sourcetableprimarykeycolumns>customerid</sourcetableprimarykeycolumns>
<desttablealternatekeycolumns>customerid</desttablealternatekeycolumns>
<convertschematolowercase>true</convertschematolowercase>
<batchsize>100000</batchsize>
<refreshendpoint>true</refreshendpoint>
</shadowdeletemssql2lakehouse>
How it Works:
- Retrieves current primary keys from source system
- Compares with lakehouse table to identify deleted records
- Marks deleted records by setting
_deleteddate= current timestamp
Note: This is different from Type 2 SCD which uses _enddate and _iscurrent columns. Shadow delete specifically tracks deletion timestamps.
Purpose: Query the instrumentation database itself
NextKeyCommand Example (XML):
8. INSTRUMENTATIONDBSQL
<instrumentationdbsql>
<sqlquery><![CDATA[
SELECT strCurrentKeyValue AS strNextKeyValue
FROM dbo.ETLJobSteps AS S
WHERE intJobStepId = 5
]]></sqlquery>
</instrumentationdbsql>
Note: This command type is typically used in NextKeyCommand to reference values from other job steps or configuration tables within the instrumentation database. one usage is trigger cascade of next task.
Advanced Features
Parallel Processing
All intProcessNo values within a strJobStepPartName execute in parallel using Python’s ThreadPoolExecutor. Within each ProcessNo, multiple tasks are executed sequentially based on intProcessOrder.
Job: "DWAdventureWorks"
├─ Part: "01 Copy Tables" (runs first)
│ ├─ Process No 0 (runs in parallel with Process No 1 and 2)
│ │ └─ Load Product Table
│ ├─ Process No 1 (runs in parallel with Process No 0 and 2)
│ │ ├─ Load Address Table (ProcessOrder 1 - runs first)
│ │ └─ Load Customer Table (ProcessOrder 2 - runs after Address)
│ └─ Process No 2 (runs in parallel with Process No 0 and 1)
│ └─ Load Category Table
└─ Part: "02 Shadow Deletes" (runs after "01 Copy Tables" completes)
└─ Process No 0
└─ Shadow Delete Customer
Key Points:
- Parts (strJobStepPartName) run sequentially based on their naming order
- All ProcessNo values (0, 1, 2, etc.) within a part run in parallel simultaneously
- Within each ProcessNo, multiple tasks execute sequentially based on
intProcessOrder - Lower
intProcessOrdervalues execute before higher values within the same ProcessNo
Incremental Loading
The framework supports incremental data extraction using key value tracking:
- Initial Load:
strCurrentKeyValueis empty or default - Get Next Key: Execute
strNextKeyCommandto determine load range - Load Data: Extract data between
@strStartKeyValueand@strEndKeyValue - Update Key: Store
strNextKeyValueas newstrCurrentKeyValue - Next Run: Start from last successful key value
Example:
-- First run: Load 2024-01-01 to 2024-01-31
-- strCurrentKeyValue: '2024-01-01'
-- strNextKeyValue: '2024-01-31'
-- Second run: Load 2024-02-01 to 2024-02-28
-- strCurrentKeyValue: '2024-01-31' (from previous run)
-- strNextKeyValue: '2024-02-28'
Slowly Changing Dimensions (SCD)
Type 1 SCD (Overwrite)
# Updates existing records with new values
lakehouse_etl_merge_data_type1(
source_table="dwstg.DimProduct",
target_table="dw.DimProduct",
merge_keys=["ProductID"]
)
Type 2 SCD (Historical Tracking)
# Maintains history with effective dates and current flag
lakehouse_etl_merge_data_type2(
source_table="dwstg.DimCustomer",
target_table="dw.DimCustomer",
merge_keys=["CustomerID"],
effective_date_column="EffectiveDate",
end_date_column="EndDate",
current_flag_column="IsCurrent"
)
Concurrent Execution Prevention
The framework prevents duplicate job execution:
- Job initiation checks current job status
- If job is already running, verifies pipeline status
- If pipeline is truly running, exits gracefully with status “U” (Unchanged)
- If pipeline is not running but status shows running, marks as failed for recovery
Thread-Safe Logging
All logging operations use thread locks to prevent conflicts during parallel execution:
_log_file_lock = threading.Lock()
with _log_file_lock:
mssparkutils.fs.append(log_file_path, log_text, True)
Log File Structure
Logs are stored in the Lakehouse with the following structure:
Files/
└── logs/
└── {JobName}/
└── {Timestamp}_{SessionID}.log
Example: Files/logs/DWCurrencyRates/20260427120000_pipeline_run_123.log
Log Entry Format
[2026-04-27 12:00:00] [Job Task Id: 00001] Job Step Process Initiated
[2026-04-27 12:00:05] [Job Task Id: 00001] 2024-01-31 Next Key Value Generated.
[2026-04-27 12:00:10] [Job Task Id: 00001] Executing Command Type for Job Step: COPYMSSQL2LAKEHOUSE
[2026-04-27 12:05:30] [Job Task Id: 00001] Job Step Completed Successfully
Copyright Notice in Logs
Each log file includes copyright and licensing information:
-------------------------------------------------------------------------
Copyright (c) 2024-2030 Assurance eServices Inc. All rights reserved.
This software is licensed under the Business Source License 1.1 (BSL).
- Free for community use (up to 1,000 Task Runs/month).
- Commercial License required for usage exceeding 1,000 Task Runs/month.
Change Date: 2030-06-01
After the Change Date, this software converts to the Apache License 2.0.
For commercial licensing, contact: support@assuranceeservices.com
Website: www.assuranceeservices.com
-------------------------------------------------------------------------
Job Status Values
- P – Processing: Job is currently running
- C – Complete: Job finished successfully
- F – Failed: Job encountered errors
- U – Unchanged: Job already running (duplicate execution prevented)
- I – Inactive: Job is disabled/inactive
Troubleshooting
Common Issues
1. Job Won’t Start – “Already Running”
Symptom: Job exits gracefully with message in log “Job is already running”
Cause: Another instance of the job is executing
Solution:
- Check if pipeline is truly running
- Wait for current execution to complete
- If stuck, manually update job status in database:
UPDATE dbo.ETLJobs SET strJobStatus = 'F', strMessage = 'Manually reset' WHERE strJobName = 'YourJobName'
2. Connection Errors to SQL Database
Symptom: “Error connecting to database” or timeout errors
Cause: Network issues, incorrect connection string, or permissions
Solution:
- Verify SQL Database is accessible from Fabric workspace
- Check connection string format
- Ensure notebook has permissions to SQL Database
- Test connection using
get_mssql_connection()in a test cell
3. Missing pymssql Library
Symptom: ModuleNotFoundError: No module named 'pymssql' or import errors
Cause: The pymssql library is not available in the default Fabric Runtime
Solution:
- Create a Fabric Environment in your workspace
- Add
pymssqlfrom PyPI in Quick mode:- Environment \u2192 Public Libraries \u2192 Add from PyPI
- Search for:
pymssql - Click Save and wait for environment to publish
- Attach the Environment to your notebook
- Restart the notebook kernel
Note: This is a required step for the framework to function correctly.
4. Lakehouse File Not Found
Symptom: “File not found” or “Path does not exist”
Cause: Incorrect file path
Solution:
- Verify Lakehouse path is correct.
- Check file path format:
Files/folder/file.csv(not./Files/...) - Use
mssparkutils.fs.ls("Files/")to verify file existence
5. KeyVault Access Denied
Symptom: “Access denied” when retrieving secrets
Cause: Missing permissions or incorrect KeyVault name
Solution:
- Grant “Key Vault Secrets User” role to Fabric workspace identity
- Verify KeyVault name in variable value
- Test access:
get_azure_keyvault_object_value()
6. Parallel Processing Deadlocks
Symptom: Job hangs or steps don’t complete
Cause: Resource contention or circular dependencies
Solution:
- Review
intProcessNoassignments – ensure no circular dependencies - Reduce parallel threads by consolidating steps
- Check for table locks in Spark SQL commands
7. Incremental Load Reprocessing Same Data
Symptom: Same data loaded multiple times
Cause: strCurrentKeyValue not updating correctly
Solution:
- Verify
strNextKeyCommandreturns valid value - Check
ETLUpdateJobStepNextKeyValuestored procedure - Manually inspect
strCurrentKeyValuein database
8. Python Import Errors
Symptom: “ModuleNotFoundError” for helper modules
Cause: Helper scripts not in correct location or path issue
Solution:
- Verify files exist in
builtin/scripts/folder - Check
sys.pathincludes scripts folder - Restart notebook kernel to clear module cache
Note: This is different from issue #3 (pymssql library). This issue relates to the framework’s helper modules (fabric_helper.py, lakehouse_helper.py, etc.).
Best Practices
1. Job Design
- Logical Grouping: Use
strJobStepPartName(Job Task Groups) to group related operations - Concurrency Management: Use
intProcessNoto organize parallel tasks – all ProcessNo values run simultaneously - Sequential Execution: Use
intProcessOrderto control execution sequence within the sameintProcessNo - Granular Steps: Keep individual steps focused on single tasks
- Idempotency: Design steps to be safely re-runnable
2. Error Handling
- Selective Failure: Use
strFailPackageonFailure = 'N'for non-critical steps - Error Messages: Include detailed context in error messages
- Retry Logic: Implement retry mechanisms for transient failures
- Validation Steps: Add validation steps after critical operations
3. Performance Optimization
- Parallel Execution: Maximize parallelism where dependencies allow
- Partitioning: Use partition columns for large datasets
- Incremental Loading: Always prefer incremental over full loads
- Resource Sizing: Ensure adequate Spark cluster resources
4. Security
- Key Vault: Store all credentials and sensitive data in Azure Key Vault
- Least Privilege: Grant minimal required permissions
- Audit Trail: Maintain comprehensive job execution history
- Data Masking: Avoid logging sensitive data values
5. Maintenance
- Regular Cleanup: Archive old log files and audit records
- Version Control: Track changes to job configurations
- Testing: Test job changes in non-production environment first
- Documentation: Document custom command implementations
6. Monitoring
- Fabric ETL Control Panel: Set up alerts for failed jobs
API Reference
Main Functions
writeto_lakehouselogfile(alogfile_abfs_path, astrJobStepID, atext)
Thread-safe logging function.
Parameters:
alogfile_abfs_path(str): Full ABFS path to log fileastrJobStepID(str): Job step identifier for log entryatext(str): Text to append to log
Returns: dict with status and message
run_job_step_thread(alogfile_abfs_path, aworkspaceid, aworkspacename, ainstrumentationdb_connectstring, astrjobname, astrjobpartname, aintprocessno, astrsessionid, astrlogfilename)
Execute job steps for a specific process number in parallel.
Parameters:
alogfile_abfs_path(str): Log file pathaworkspaceid(str): Fabric workspace IDaworkspacename(str): Fabric workspace nameainstrumentationdb_connectstring(str): SQL database connection stringastrjobname(str): Job nameastrjobpartname(str): Job part nameaintprocessno(int): Process number for parallel execution groupastrsessionid(str): Session identifierastrlogfilename(str): Log file name
Returns: tuple (process_number, error or None)
Helper Module Functions
Refer to individual helper modules for detailed API documentation:
- fabric_helper.py: Fabric platform interactions
- lakehouse_helper.py: Lakehouse data operations
- pipeline_helper.py: Pipeline management
- task_helper.py: Task execution functions
Appendix A: Sample Job Configuration
Example 1: DWCurrencyRates – Daily Currency Exchange Rates Load
This job demonstrates API data loading with file generation and post-processing.
ETLJobs Table
INSERT INTO dbo.ETLJobs
(strJobName, strJobStatus, strJobDefaultProcessType, strJobCurrentProcessType)
VALUES
('DWCurrencyRates', 'C', 'P', 'P');
ETLJobSteps Table
Step 1: Load JSON Files from API
INSERT INTO dbo.ETLJobSteps
(strJobName, strJobStepName, strJobStepPartName, strJobStepStatus,
intProcessNo, intProcessOrder, strStartKeyValue, strEndKeyValue,
strCommandType, strCommand, strNextKeyCommand, strNextKeyCommandType,
strFailPackageonFailure)
VALUES
('DWCurrencyRates', 'currencyrates_api', '01 Get JSON Files', 'I',
0, 1, '2026-04-15', '2049-12-31',
'DATAFILELOAD',
'<datafileload>
<datafileworkspacename>aesfabricetl</datafileworkspacename>
<datafilelakehousename>lhdw</datafilelakehousename>
<datafilefolderpattern>exchangerates/yyyy/mm/dd</datafilefolderpattern>
<datafilenameprefixpattern>yyyymmdd</datafilenameprefixpattern>
<datafilename>_currencyexchangerates</datafilename>
<datafiletype>json</datafiletype>
<datafrequencytype>DAY</datafrequencytype>
<datafrequency>1</datafrequency>
<datafileloadpythonscript><![CDATA[
import requests
import time
time.sleep(1)
API_KEY = "$$CurrencyLayerAPIKey$$"
BASE_URL = "http://api.currencylayer.com/historical"
lparams = {
"access_key": API_KEY,
"date": datadate[:10],
"source": "USD",
"format": 1
}
try:
response = requests.get(BASE_URL, params=lparams)
ldata = response.json()
if ldata["success"]:
scriptresult = {"status": "success", "data": ldata}
else:
scriptresult = {"status": "error", "error": ldata["error"]["info"]}
except Exception as e:
scriptresult = {"status": "error", "error": str(e)}
]]></datafileloadpythonscript>
</datafileload>',
'<instrumentationdbsql>
<sqlquery><![CDATA[
SELECT strCurrentKeyValue AS strNextKeyValue
FROM dbo.ETLJobSteps
WHERE intJobStepId = 5
]]></sqlquery>
</instrumentationdbsql>',
'INSTRUMENTATIONDBSQL', 'Y');
Step 2: Post-Process JSON Files to Delta Table
INSERT INTO dbo.ETLJobSteps
(strJobName, strJobStepName, strJobStepPartName, strJobStepStatus,
intProcessNo, intProcessOrder, strStartKeyValue, strEndKeyValue,
strCommandType, strCommand, strFailPackageonFailure)
VALUES
('DWCurrencyRates', 'currencyrates_postprocess', '02 Post Process', 'I',
0, 1, '2026-04-15', '2049-12-31',
'DATAFILEPOSTPROCESS',
'<datafilepostprocess>
<datafileloadjobtaskid>5</datafileloadjobtaskid>
<datafilepostprocessworkspacename>aesfabricetl</datafilepostprocessworkspacename>
<datafilepostprocesslakehousename>lhdw</datafilepostprocesslakehousename>
<datafilepostprocesspythonscript><![CDATA[
import json
from datetime import datetime, timezone
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
from delta.tables import DeltaTable
from notebookutils import mssparkutils
lschema_name = "dw"
ltable_name = "currency_exchange_rates"
table_abfs_path = f"abfss://{datafilepostprocessworkspace_id}@onelake.dfs.fabric.microsoft.com/{datafilepostprocesslakehouse_id}/Tables/{lschema_name}/{ltable_name}"
try:
lraw = mssparkutils.fs.head(datafile, 50 * 1024 * 1024)
lpayload = json.loads(lraw)
lquotes = lpayload.get("quotes", {})
ldate = lpayload.get("date")
lingested_at = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
lrows = [{"rate_date": ldate, "currencycode": k, "currencyrate": float(v), "_lastupdatedate": lingested_at} for k, v in lquotes.items()]
schema = StructType([StructField("rate_date", StringType(), True), StructField("currencycode", StringType(), True),
StructField("currencyrate", DoubleType(), True), StructField("_lastupdatedate", StringType(), True)])
ldf_rates = spark.createDataFrame(lrows, schema=schema)
if DeltaTable.isDeltaTable(spark, table_abfs_path):
deltaTable = DeltaTable.forPath(spark, table_abfs_path)
deltaTable.alias("t").merge(ldf_rates.alias("s"), "t.rate_date = s.rate_date AND t.currencycode = s.currencycode"
).whenMatchedUpdate(set = {"currencyrate": "s.currencyrate", "_lastupdatedate": "s._lastupdatedate"
}).whenNotMatchedInsertAll().execute()
else:
ldf_rates.write.format("delta").save(table_abfs_path)
scriptresult = {"status": "success", "data": {"table": table_abfs_path, "rows_processed": len(lrows), "rate_date": ldate}}
except Exception as e:
scriptresult = {"status": "error", "error": str(e)}
]]></datafilepostprocesspythonscript>
</datafilepostprocess>', 'Y');
Example 2: DWAdventureWorks – Copy Tables from SQL Server
This job demonstrates parallel loading of multiple tables with Type 2 SCD.
ETLJobs Table
INSERT INTO dbo.ETLJobs
(strJobName, strJobStatus, strJobDefaultProcessType, strJobCurrentProcessType)
VALUES
('DWAdventureWorks', 'C', 'P', 'P');
ETLJobSteps Table
Step 1: Load Product Table (Process No 0)
INSERT INTO dbo.ETLJobSteps
(strJobName, strJobStepName, strJobStepPartName, intProcessNo, intProcessOrder,
strStartKeyValue, strEndKeyValue, strCommandType, strCommand,
strNextKeyCommand, strNextKeyCommandType, strFailPackageonFailure)
VALUES
('DWAdventureWorks', 'dw.adventureworks_saleslt_product', '01 Copy Tables',
0, 1, '1900-01-01', '2049-12-31',
'COPYMSSQL2LAKEHOUSE',
'<copymssql2lakehouse>
<sqlconnectionstringtype>gateway</sqlconnectionstringtype>
<sqlconnectstring>sqlserver1</sqlconnectstring>
<sqlquery><![CDATA[
SELECT *, ModifiedDate AS LastUpdateDate
FROM [AdventureWorks].[SalesLT].[Product]
WHERE ModifiedDate >= ''@strStartKeyValue''
AND ModifiedDate <= ''@strEndKeyValue''
]]></sqlquery>
<stagingtablename>lhdw.dwstg.adventureworks_saleslt_product</stagingtablename>
<destinationtablename>lhdw.dw.adventureworks_saleslt_product</destinationtablename>
<destinationtablemergetype>type2</destinationtablemergetype>
<destinationtablemergetype2datepart>SECOND</destinationtablemergetype2datepart>
<sourcetableprimarykeycolumns>productid</sourcetableprimarykeycolumns>
<desttablealternatekeycolumns>productid</desttablealternatekeycolumns>
<convertschematolowercase>true</convertschematolowercase>
<batchsize>100000</batchsize>
<refreshendpoint>true</refreshendpoint>
</copymssql2lakehouse>',
'<mssql>
<sqlconnectionstringtype>gateway</sqlconnectionstringtype>
<sqlconnectstring>sqlserver1</sqlconnectstring>
<sqlquery><![CDATA[
SELECT MAX(CONVERT(VARCHAR(23), ModifiedDate, 121)) AS strNextKeyValue
FROM [AdventureWorks].[SalesLT].[Product]
]]></sqlquery>
<lakehouse>lhdw</lakehouse>
</mssql>',
'MSSQL', 'N');
Step 2: Load Address Table (Process No 1 – runs in parallel with other Process No 1 steps)
INSERT INTO dbo.ETLJobSteps
(strJobName, strJobStepName, strJobStepPartName, intProcessNo, intProcessOrder,
strStartKeyValue, strEndKeyValue, strCommandType, strCommand,
strNextKeyCommand, strNextKeyCommandType, strFailPackageonFailure)
VALUES
('DWAdventureWorks', 'dw.adventureworks_saleslt_address', '01 Copy Tables',
1, 1, '1900-01-01', '2049-12-31',
'COPYMSSQL2LAKEHOUSE',
'<copymssql2lakehouse>
<sqlconnectionstringtype>connectstring</sqlconnectionstringtype>
<sqlconnectstring><![CDATA[$$azkv_sourcedata_connstring$$]]></sqlconnectstring>
<sqlquery><![CDATA[
SELECT *, ModifiedDate AS LastUpdateDate
FROM [AdventureWorks].[SalesLT].[Address]
WHERE ModifiedDate >= ''@strStartKeyValue''
AND ModifiedDate <= ''@strEndKeyValue''
]]></sqlquery>
<stagingtablename>lhdw.dwstg.adventureworks_saleslt_address</stagingtablename>
<destinationtablename>lhdw.dw.adventureworks_saleslt_address</destinationtablename>
<destinationtablemergetype>type2</destinationtablemergetype>
<destinationtablemergetype2datepart>SECOND</destinationtablemergetype2datepart>
<sourcetableprimarykeycolumns>addressid</sourcetableprimarykeycolumns>
<desttablealternatekeycolumns>addressid</desttablealternatekeycolumns>
<convertschematolowercase>true</convertschematolowercase>
<batchsize>100000</batchsize>
<refreshendpoint>true</refreshendpoint>
</copymssql2lakehouse>',
'<mssql>
<sqlconnectionstringtype>connectstring</sqlconnectionstringtype>
<sqlconnectstring><![CDATA[$$azkv_sourcedata_connstring$$]]></sqlconnectstring>
<sqlquery><![CDATA[
SELECT MAX(CONVERT(VARCHAR(23), ModifiedDate, 121)) AS strNextKeyValue
FROM [AdventureWorks].[SalesLT].[Address]
]]></sqlquery>
<lakehouse>lhdw</lakehouse>
</mssql>',
'MSSQL', 'N');
Step 3: Shadow Delete for Customer Table
INSERT INTO dbo.ETLJobSteps
(strJobName, strJobStepName, strJobStepPartName, intProcessNo, intProcessOrder,
strStartKeyValue, strEndKeyValue, strCommandType, strCommand, strFailPackageonFailure)
VALUES
('DWAdventureWorks', 'shadow_delete_customer', '02 Shadow Deletes',
0, 1, '1900-01-01', '2049-12-31',
'SHADOWDELETEMSSQL2LH',
'<shadowdeletemssql2lakehouse>
<sqlgatewayname>sqlserver1</sqlgatewayname>
<sqlquery><![CDATA[
SELECT customerid
FROM [AdventureWorks].[SalesLT].[Customer]
ORDER BY customerid
]]></sqlquery>
<stagingtablename>lhdw.dwstg.pk__adventureworks_saleslt_customer</stagingtablename>
<destinationtablename>lhdw.dw.adventureworks_saleslt_customer</destinationtablename>
<sourcetableprimarykeycolumns>customerid</sourcetableprimarykeycolumns>
<desttablealternatekeycolumns>customerid</desttablealternatekeycolumns>
<convertschematolowercase>true</convertschematolowercase>
<batchsize>100000</batchsize>
<refreshendpoint>true</refreshendpoint>
</shadowdeletemssql2lakehouse>', 'N');
Step 4: Optimize Tables with Spark SQL
INSERT INTO dbo.ETLJobSteps
(strJobName, strJobStepName, strJobStepPartName, intProcessNo, intProcessOrder,
strStartKeyValue, strEndKeyValue, strCommandType, strCommand, strFailPackageonFailure)
VALUES
('DWAdventureWorks', 'optimize_tables', '03 Optimize',
0, 1, '1900-01-01', '2049-12-31',
'SPARKSQL',
'<sparksql>
<sqlcommand><![CDATA[
OPTIMIZE lhdw.dw.adventureworks_saleslt_product ZORDER BY (_iscurrent, productid);
OPTIMIZE lhdw.dw.adventureworks_saleslt_address ZORDER BY (_iscurrent, addressid);
OPTIMIZE lhdw.dw.adventureworks_saleslt_customer ZORDER BY (_iscurrent, customerid);
]]></sqlcommand>
<executeparallel>true</executeparallel>
<maxparallelism>3</maxparallelism>
</sparksql>', 'N');
Global Variables Configuration
ETLJobVariables Table
-- API Key for currency layer service
INSERT INTO dbo.ETLJobVariables
(strJobName, strJobVariableName, strJobVariableValue)
VALUES
('Global', 'CurrencyLayerAPIKey', '223978105a8e1e10d737984b69a1665e');
-- Azure Key Vault reference for SQL Server connection string
INSERT INTO dbo.ETLJobVariables
(strJobName, strJobVariableName, strJobVariableValue)
VALUES
('Global', 'azkv_sourcedata_connstring',
'$azkv$kvaesetlsandbox.secret.sqlserver1-connectionstring$azkv$');
-- Workspace name for data operations
INSERT INTO dbo.ETLJobVariables
(strJobName, strJobVariableName, strJobVariableValue)
VALUES
('Global', 'DWWorkspaceName', 'AESFabricETL');
Appendix B: Database Schema Scripts
The following tables are defined in the instrumentation database (DWInstrumentation):
Core Metadata Tables
ETLCommandTypes
Stores command type templates (optional – used by UI applications for command generation).
CREATE TABLE [dbo].[ETLCommandTypes](
[strCommandType] [varchar](20) NOT NULL PRIMARY KEY,
[strCommandTemplate] [nvarchar](max) NULL,
[dtmUpdateDate] [datetime2](7) NULL DEFAULT (getdate()),
[strUpdateUser] [nvarchar](100) NULL DEFAULT (suser_sname())
)
ETLJobs
Main job definitions and current execution status.
CREATE TABLE [dbo].[ETLJobs](
[intJobID] [int] IDENTITY(1,1) NOT NULL PRIMARY KEY,
[strJobName] [nvarchar](300) NOT NULL UNIQUE,
[strJobDescription] [nvarchar](max) NULL,
[strJobStatus] [char](1) NULL,
[strJobDefaultProcessType] [char](1) NULL,
[strJobCurrentProcessType] [char](1) NULL,
[dtmStartDate] [datetime2](7) NULL,
[dtmEndDate] [datetime2](7) NULL,
[strSessionID] [varchar](100) NULL,
[strLogFileName] [varchar](4000) NULL,
[strMessage] [nvarchar](max) NULL,
[dtmUpdateDate] [datetime2](7) NULL DEFAULT (getdate()),
[strUpdateUser] [nvarchar](100) NULL DEFAULT (suser_sname())
)
ETLJobSteps
Individual task configurations with command XML and execution tracking.
CREATE TABLE [dbo].[ETLJobSteps](
[intJobStepID] [int] IDENTITY(1,1) NOT NULL PRIMARY KEY,
[strJobName] [nvarchar](300) NOT NULL,
[strJobStepName] [nvarchar](300) NOT NULL,
[strJobStepPartName] [nvarchar](300) NULL,
[strJobStepStatus] [char](1) NULL,
[intProcessNo] [int] NULL,
[intProcessOrder] [int] NULL,
[strStartKeyValue] [nvarchar](300) NULL,
[strEndKeyValue] [nvarchar](300) NULL,
[strCommandType] [varchar](20) NULL,
[strCommand] [nvarchar](max) NULL,
[strNextKeyCommand] [nvarchar](max) NULL,
[strNextKeyCommandType] [varchar](20) NULL,
[strCurrentKeyValue] [nvarchar](300) NULL,
[strNextKeyValue] [nvarchar](300) NULL,
[strFailPackageonFailure] [char](1) NULL,
[dtmStartDate] [datetime2](7) NULL,
[dtmEndDate] [datetime2](7) NULL,
[strMessage] [nvarchar](max) NULL,
[dtmUpdateDate] [datetime2](7) NULL DEFAULT (getdate()),
[strUpdateUser] [nvarchar](100) NULL DEFAULT (suser_sname())
)
ETLJobVariables
Global and job-specific variables for parameterization.
CREATE TABLE [dbo].[ETLJobVariables](
[intJobVariableID] [int] IDENTITY(1,1) NOT NULL PRIMARY KEY,
[strJobName] [nvarchar](300) NULL,
[strJobVariableName] [nvarchar](1000) NULL,
[strJobVariableValue] [nvarchar](max) NULL,
[bitDeleted] [bit] NULL DEFAULT ((0)),
[dtmUpdateDate] [datetime2](7) NULL DEFAULT (getdate()),
[strUpdateUser] [nvarchar](100) NULL DEFAULT (suser_sname())
)
Audit and Tracking Tables
ETLJobAudit
Historical record of all job executions.
CREATE TABLE [dbo].[ETLJobAudit](
[intJobAuditID] [int] IDENTITY(1,1) NOT NULL PRIMARY KEY,
[intJobID] [int] NOT NULL,
[strJobName] [nvarchar](300) NOT NULL,
[strJobStatus] [char](1) NULL,
[strJobDefaultProcessType] [char](1) NULL,
[strJobCurrentProcessType] [char](1) NULL,
[dtmStartDate] [datetime2](7) NULL,
[dtmEndDate] [datetime2](7) NULL,
[strSessionID] [varchar](100) NULL,
[strLogFileName] [varchar](4000) NULL,
[strMessage] [nvarchar](max) NULL,
[dtmUpdateDate] [datetime2](7) NULL DEFAULT (getdate()),
[strUpdateUser] [nvarchar](100) NULL DEFAULT (suser_sname())
)
ETLJobStepAudit
Historical record of all job step executions.
CREATE TABLE [dbo].[ETLJobStepAudit](
[intJobStepAuditID] [int] IDENTITY(1,1) NOT NULL PRIMARY KEY,
[intJobStepID] [int] NOT NULL,
[intJobAuditID] [int] NOT NULL,
[strJobName] [nvarchar](300) NOT NULL,
[strJobStepName] [nvarchar](300) NOT NULL,
[strJobStepPartName] [nvarchar](300) NULL,
[strJobStepStatus] [char](1) NULL,
[intProcessNo] [int] NULL,
[intProcessOrder] [int] NULL,
[strStartKeyValue] [nvarchar](300) NULL,
[strEndKeyValue] [nvarchar](300) NULL,
[strCommandType] [varchar](20) NULL,
[strCommand] [nvarchar](max) NULL,
[strNextKeyCommand] [nvarchar](max) NULL,
[strNextKeyCommandType] [varchar](20) NULL,
[strCurrentKeyValue] [nvarchar](300) NULL,
[strNextKeyValue] [nvarchar](300) NULL,
[strFailPackageonFailure] [char](1) NULL,
[dtmStartDate] [datetime2](7) NULL,
[dtmEndDate] [datetime2](7) NULL,
[strMessage] [nvarchar](max) NULL,
[dtmUpdateDate] [datetime2](7) NULL DEFAULT (getdate()),
[strUpdateUser] [nvarchar](100) NULL DEFAULT (suser_sname())
)
ETLDataFileLoadDetails
Tracks individual file loads for DATAFILELOAD operations.
CREATE TABLE [dbo].[ETLDataFileLoadDetails](
[intFileLoadID] [int] IDENTITY(1,1) NOT NULL PRIMARY KEY,
[intJobStepID] [int] NOT NULL,
[intJobStepAuditID] [int] NOT NULL,
[strFileFolder] [nvarchar](1000) NULL,
[strFileName] [nvarchar](1000) NULL,
[dtmFileDate] [datetime2](7) NULL,
[strSessionID] [varchar](38) NULL,
[strStatus] [char](1) NULL,
[strMessage] [nvarchar](max) NULL,
[dtmLoadStartDate] [datetime2](7) NULL,
[dtmLoadEndDate] [datetime2](7) NULL,
[dtmCreateDate] [datetime2](7) NULL DEFAULT (getdate()),
[strCreateUser] [nvarchar](100) NULL DEFAULT (suser_sname())
)
Note: For complete stored procedure definitions, refer to the database schema file: DWInstrumentation-{guid}.Database.sql
Appendix C: AESFabricControlPanel
A user-friendly interface is available in Microsoft Fabric Apps for configuring, maintaining, monitoring, and gaining insights into your ETL jobs.
Support & Contact
For technical support, licensing inquiries, or feature requests:
- Email: support@assuranceeservices.com
- Website: www.assuranceeservices.com
- Documentation: Check for updates and additional resources on our website
License
This software is licensed under the Business Source License 1.1 (BSL).
- Community Use: Free for up to 1,000 Task Runs per month
- Commercial Use: License required for usage exceeding 1,000 Task Runs per month
- Change Date: 2030-06-01
- Post-Change License: Apache License 2.0
For full license text, see LICENSE.md file.
Document Version: 1.0
Last Updated: April 27, 2026
Copyright: © 2024-2030 Assurance eServices Inc. All rights reserved.
