CaseI have a Data Factory pipeline that should run each hour and collect all new files added to the data lake since the last run. What is the best activity or do we need to write code?
|No scripts, no loops|
The Copy Data Activity has a wildcard filter which allows you to read multi files (of the same type/format) at once. So no need for a ForEach Activity to process multiple files at once. Combine that with the start- and enddate filter option within that same Copy Data Activity and you can limit the files to a certain period.
|The basic setup|
- Add a Lookup activity to the pipeline and give it a descriptive name
- On the Settings tab add or reuse a Source dataset (and Linked service) that points to the database containing the table and store procedures of the previous step (don't point to a specific table).
- Choose Stored Procedure under the Use query property
- Select 'GetLastRun' as Stored procedure name and hit the Import button to get the paramaters from the stored procedure
- Now either use a hardcoded source name or use an expression like @pipeline().Pipeline to for example use the pipeline name as source.
|Execute Stored Procedure via Lookup to retrieve last rundate|
- Add the Copy Data Activity and set it up to load a specific file from the data lake to a SQL Server table (or your own destination)
- Now on the Source tab change the File path type to Wildcard file path
- Then set the Wildcard file name to for example read all CSV files with *.csv instead of a specific file.
- Next set the Start time (UTC) property under Filter by last modified to the following expression:
@activity('Get Last Run').output.firstRow.LastRun. Where the yellow marked text is the name of the previous task and the green marked text is the output of the Stored Procedure (more details here).
- Also set the End time (UTC) property with the following expression:
@pipeline().TriggerTime (this will get the actual starttime of the pipeline)
- You also might want to add an extra metadata column with the Filename via the Additional columns option (more details here).
|Set up wildcard and datetime filters|
- Add the Stored Procedure activity and connect it to the previous activity
- On the Settings tab reuse the same Linked service as in step 2
- Select SetLastRun as the Stored procedure name
- Hit the import button and set the parameters
- LastRun should be filled with the startdatetime of the pipeline: @pipeline().TriggerTime
- SourceName should be filled with the same expression as in step 2
|Add Stored Procedure to save rundate|