Problem: We often need to load large amounts of data into data warehouses in batches, and it is important to be able to restart the process from the point of failure in case of any errors. How can we implement batch processing in an SSIS package?
Solution: SSIS provides excellent support for batch processing using its built-in components. One approach to implementing batch processing in SSIS is to group the rows to be processed into batches, process each batch, and update the groups as they are processed. Let’s walk through an example scenario and implement an SSIS package to perform this task.
Scenario
Let’s assume that we have a reporting application that requires data aggregation by month. We also want to be able to make adjustments to the aggregated data and recalculate only the affected months. Our SSIS package will have the following steps:
- Get Batch List: This step groups the source data into batches using an Execute SQL task that executes a stored procedure.
- Process Batch Loop: This Foreach Loop container iterates over the result set rows obtained from the previous step.
- Transaction Container: This Sequence container contains the tasks to be executed for each iteration of the loop and controls the transaction used to commit or rollback the changes.
- Append Batch to Sales History: This Execute SQL task extracts a batch of rows from the source table and inserts them into a history table.
- Compute Aggregation: This Execute SQL task performs the necessary aggregations on the batch and updates an aggregation table.
- Mark Batch as Processed: This Execute SQL task updates the rows in the source table to indicate that they have been processed.
Implementation
For simplicity, let’s use the AdventureWorks sample database that comes with SQL Server 2005 as our data source. We’ll copy the SalesOrderHeader and SalesOrderDetail tables into a database called mssqltips. Here’s the script to do that:
USE mssqltips
GO
SELECT *
INTO dbo.imp_SalesOrderHeader
FROM AdventureWorks.Sales.SalesOrderHeader
SELECT *
INTO dbo.imp_SalesOrderDetail
FROM AdventureWorks.Sales.SalesOrderDetail
ALTER TABLE dbo.imp_SalesOrderHeader
ADD Processed bit not null default 0
GO
In our SSIS package, we’ll use the following variables:
- User::v_BatchList: This variable will store the result set obtained from the Get Batch List step.
- Other variables: We’ll describe the usage of other variables in the following sections.
Get Batch List
The Get Batch List step executes a stored procedure that groups the source data into batches. In our case, we’ll group the data by year and month using the stored procedure stp_CreateOrderBatchList
. Here’s the SQL code for the stored procedure:
SELECT
DATEPART(YYYY,OrderDate) OrderYear,
DATEPART(MONTH,OrderDate) OrderMonth
FROM dbo.imp_SalesOrderHeader
WHERE Processed = 0
GROUP BY
DATEPART(YYYY,OrderDate),
DATEPART(MONTH,OrderDate)
ORDER BY
DATEPART(YYYY,OrderDate),
DATEPART(MONTH,OrderDate)
The stored procedure only selects rows where the Processed column is equal to zero. The Execute SQL task in our SSIS package executes this stored procedure and stores the result set in the package variable User::v_BatchList
.
Process Batch Loop
The Process Batch Loop is a Foreach Loop container that iterates over the result set obtained from the Get Batch List step. It is configured with the following settings:
- Collection: Enumerator is set to Foreach ADO Enumerator, and the ADO object source variable is set to
User::v_BatchList
. - Variable Mappings: The columns in each row of the result set are mapped to package variables based on their ordinal position.
Transaction Container
The Transaction Container is a Sequence container that contains all the tasks to be executed for each iteration of the loop. It is configured with the TransactionOption property set to Required, which ensures that all tasks inside the container are executed within a transaction. A new transaction is created for each iteration of the loop.
Append Batch to Sales History
The Append Batch to Sales History step is an Execute SQL task that calls a stored procedure to extract a single batch of data from the source table and append it to the sales history table. If any transformations are required, a Data Flow task can be used. Here’s the SQL code for the stored procedure stp_AppendSalesHistory
:
CREATE TABLE dbo.SalesHistory (
OrderYear int not null,
OrderMonth int not null,
ProductID int not null,
OrderQty smallint not null,
LineTotal money not null
)
CREATE PROCEDURE dbo.stp_AppendSalesHistory
@OrderYear int,
@OrderMonth int
AS
BEGIN
SET NOCOUNT ON;
INSERT INTO dbo.SalesHistory (
OrderYear,
OrderMonth,
ProductID,
OrderQty,
LineTotal
)
SELECT
DATEPART(YYYY,m.OrderDate),
DATEPART(MONTH,m.OrderDate),
d.ProductID,
d.OrderQty,
d.LineTotal
FROM dbo.imp_SalesOrderHeader m
JOIN dbo.imp_SalesOrderDetail d ON d.SalesOrderID = m.SalesOrderID
WHERE Processed = 0
AND DATEPART(YYYY,m.OrderDate) = @OrderYear
AND DATEPART(MONTH,m.OrderDate) = @OrderMonth
END
GO
The stored procedure only selects rows where the Processed column is equal to zero. The Execute SQL task is configured to execute this stored procedure, with the necessary parameters mapped to SSIS package variables.
Compute Aggregation
The Compute Aggregation step is an Execute SQL task that recalculates the summary data in the sales history summary table for the order year and order month batch being processed. Here’s the SQL code for the stored procedure stp_CalcSalesHistorySummary
:
CREATE TABLE dbo.SalesHistorySummary (
OrderYear int not null,
OrderMonth int not null,
ProductID int not null,
OrderQty smallint not null,
LineTotal money not null
)
CREATE PROCEDURE dbo.stp_CalcSalesHistorySummary
@OrderYear int,
@OrderMonth int
AS
BEGIN
SET NOCOUNT ON;
DELETE FROM dbo.SalesHistorySummary
WHERE OrderYear = @OrderYear
AND OrderMonth = @OrderMonth;
INSERT INTO dbo.SalesHistorySummary (
OrderYear,
OrderMonth,
ProductID,
OrderQty,
LineTotal
)
SELECT
OrderYear,
OrderMonth,
ProductID,
SUM(OrderQty),
SUM(LineTotal)
FROM dbo.SalesHistory
WHERE OrderYear = @OrderYear
AND OrderMonth = @OrderMonth
GROUP BY
OrderYear,
OrderMonth,
ProductID
END
GO
The stored procedure first deletes any existing rows in the summary table for the order year and month being processed, then performs the necessary aggregation and insert. The Execute SQL task is configured to execute this stored procedure, with the necessary parameters mapped to SSIS package variables.
Mark Batch as Processed
The Mark Batch as Processed step is an Execute SQL task that updates the Processed column in the source table for the rows that have been processed in the current batch. It invokes the following stored procedure:
CREATE PROCEDURE dbo.stp_MarkOrdersProcessed
@OrderYear int,
@OrderMonth int
AS
BEGIN
SET NOCOUNT ON;
UPDATE dbo.imp_SalesOrderHeader
SET Processed = 1
WHERE DATEPART(YYYY,OrderDate) = @OrderYear
AND DATEPART(MONTH,OrderDate) = @OrderMonth;
END
GO
The Execute SQL task is configured to execute this stored procedure, with the necessary parameters mapped to SSIS package variables.
Summary
In this example, we have demonstrated how to implement batch processing in an SSIS package. The key points to remember are:
- Group the source data into batches using an Execute SQL task and store the result set in a package variable.
- Iterate over the result set using a Foreach Loop Container.
- Use a Sequence Container to define a transaction and add the appropriate tasks inside it.
- The package design supports restarting the package from the beginning in case of any errors. The Sequence Container ensures that all work is committed if successful or rolled back if there are any errors.
- Flag the source data as processed to avoid reprocessing if the package is restarted.
By following these steps, you can efficiently process large amounts of data in batches using SSIS, ensuring data integrity and the ability to restart the process from the point of failure.