Case
I have a Data Factory pipeline that should run each hour and collect all new files added to the data lake since the last run. What is the best activity or do we need to write code?No scripts, no loops |
Solution
The Copy Data Activity has a wildcard filter which allows you to read multi files (of the same type/format) at once. So no need for a ForEach Activity to process multiple files at once. Combine that with the start- and enddate filter option within that same Copy Data Activity and you can limit the files to a certain period.
The Copy Data Activity has a wildcard filter which allows you to read multi files (of the same type/format) at once. So no need for a ForEach Activity to process multiple files at once. Combine that with the start- and enddate filter option within that same Copy Data Activity and you can limit the files to a certain period.
Date filter
The End datetime property will be populated with the start-datetime of the current ADF pipeline. So files added during the run of the pipeline will be skipped and processed during the next run. This End datetime will also be stored afterwards for the next run.
The Start datetime will be retrieved from a database table. The previous run of the pipeline stored its End datetime as the Start datetime for the next run.
The basic setup |
1) Table and stored procedures
To store (and retrieve) the datetime from the pipeline we use a database table and some Stored Procedures. To keep it easy we kept it very very basic. Feel free to extend it to your own needs. In this solution there will be only one record per source. The SetLastRun stored procedure will either insert a new record or update the existing record via a MERGE statement. The GetLastRun stored procedure will retrieve the datetime of the last run and return a default date if there is no record available.
-- Create runs table
CREATE
TABLE
[dbo].[Runs](
[SourceName] [nvarchar](50)
NOT
NULL
,
[LastRun] [datetime2](7)
NULL
,
CONSTRAINT
[PK_Runs]
PRIMARY
KEY
CLUSTERED
(
[SourceName]
ASC
)
)
-- Save the lastrundate
CREATE
PROCEDURE
SetLastRun
(
@SourceName
as
nvarchar(50)
, @LastRun
as
datetime2(7)
)
AS
BEGIN
-- SET NOCOUNT ON added to prevent extra result sets from
-- interfering with SELECT statements.
SET
NOCOUNT
ON
-- Check if there already is a record
-- Then Insert of Update a record
MERGE dbo.Runs
AS
[Target]
USING (
SELECT
@SourceName, @LastRun)
AS
[Source] ([SourceName], [LastRun])
ON
([Target].[SourceName] = [Source].[SourceName])
WHEN
MATCHED
THEN
UPDATE
SET
[LastRun] = [Source].[LastRun]
WHEN
NOT
MATCHED
THEN
INSERT
([SourceName], [LastRun])
VALUES
([Source].[SourceName], [Source].[LastRun]);
END
GO
-- Retrieve the lastrundate
CREATE
PROCEDURE
GetLastRun
(
@SourceName
as
nvarchar(50)
)
AS
BEGIN
DECLARE
@DefaultDate
as
datetime2(7) =
'1-1-1900'
-- Retrieve lastrun and provide default date in case null
SELECT
ISNULL
(
MAX
([LastRun]), @DefaultDate)
as
[LastRun]
FROM
[Runs]
WHERE
[SourceName] = @SourceName
END
GO
2) Retrieve datetime last run
So the first step in the pipeline is to execute a stored procedure that retrieves the End datetime of the previous run. As mentioned a default datetime will be returned if there is no previous run available. Due the lack of getting output parameters we will use the Lookup activity instead of the Stored Procedure activity.
- Add a Lookup activity to the pipeline and give it a descriptive name
- On the Settings tab add or reuse a Source dataset (and Linked service) that points to the database containing the table and store procedures of the previous step (don't point to a specific table).
- Choose Stored Procedure under the Use query property
- Select 'GetLastRun' as Stored procedure name and hit the Import button to get the paramaters from the stored procedure
- Now either use a hardcoded source name or use an expression like @pipeline().Pipeline to for example use the pipeline name as source.
Execute Stored Procedure via Lookup to retrieve last rundate |
3) Copy Data Activity
The second step is to retrieve the actual data from the data lake with a Copy Data activity. With two expressions it will first retrieve the datetime of the previous step and use it as the starttime filter and secondly retrieve the Start datetime of the pipeline itself and use that as Endtime filter.
- Add the Copy Data Activity and set it up to load a specific file from the data lake to a SQL Server table (or your own destination)
- Now on the Source tab change the File path type to Wildcard file path
- Then set the Wildcard file name to for example read all CSV files with *.csv instead of a specific file.
- Next set the Start time (UTC) property under Filter by last modified to the following expression:
@activity('Get Last Run').output.firstRow.LastRun. Where the yellow marked text is the name of the previous task and the green marked text is the output of the Stored Procedure (more details here). - Also set the End time (UTC) property with the following expression:
@pipeline().TriggerTime (this will get the actual starttime of the pipeline) - You also might want to add an extra metadata column with the Filename via the Additional columns option (more details here).
Set up wildcard and datetime filters |
4) Store datetime for next run
The last step is to save the Start datetime of the pipeline itself as run datetime so that it can be retrieved in the next run. Since this Stored Procedure doesn't have any output parameters we can use the standard Stored Procedure Activity.
- Add the Stored Procedure activity and connect it to the previous activity
- On the Settings tab reuse the same Linked service as in step 2
- Select SetLastRun as the Stored procedure name
- Hit the import button and set the parameters
- LastRun should be filled with the startdatetime of the pipeline: @pipeline().TriggerTime
- SourceName should be filled with the same expression as in step 2
Add Stored Procedure to save rundate |
5) Schedule
Now just schedule your pipeline every x minutes or x hours with a trigger to keep your database table up-to-date with files from the data lake. Then keep adding files to the data lake and watch your runs table (step 1) and the actual staging table to see the result. The optional metadata column of step 3 should make debugging and testing a lot easier.
No comments:
Post a Comment