An AWS Step Function Use Case: Serverless ETL

Sep 28, 2020 | By Mark Panahi

Stairs

With batching and looping techniques, FloQast used AWS Step Functions as a serverless ETL tool while respecting its limits on data size.

FloQast uses machine learning to perform zero-config, automatic reconciliations via our AutoRec tool. While it performs the bulk of reconciliations automatically, users can still make some adjustments and additional matches. We want to analyze these adjustments to make our algorithm better so that the user will need to make fewer adjustments the next time around. Specifically, we'd like to generate a daily report of important metrics data (e.g., false positives/negatives) for all users that performed reconciliations for a given day.

We want the process to be serverless, but we can't use a single Lambda function to do everything due to the 15-minute cap on running time. Therefore, we chose to break up the process into multiple Lambdas and tie them together with AWS Step Functions.

The Step Function workflow we created orchestrates Lambdas that in turn interact with MongoDB collections, DocumentDB collections, and S3 files. But as we will see, we ran into problems with the amount of data that can flow through the workflow at any given time. We accommodated this by looping through one batch of data at a time. While looping with AWS Step Functions is a known pattern, this blog post shows how to combine looping with a paging technique for MongoDB.

Our First Try

In the most basic sense, the workflow needs to grab a (potentially huge) list of records, perform some calculations an each of these records with data cross-referenced from other sources, and store the end results somewhere else.

We kick off the workflow with the period in which we want to collect data, e.g.:

{
    "date_begin": "2020-08-18T00:00:00",
    "date_end": "2020-08-20T23:59:59"
}

The step function passes these parameters to the Query Jobs lambda, which fetches a list of all user reconciliation jobs that fall into the date range from our "Jobs" MongoDB collection. The output is then fed into a step function Iterator, which concurrently invokes the Calc Metrics lambda for each job. This lambda function compares user transaction matches from a DocumentDB collection with transaction matches generated from our algorithm stored in S3 and writes the results to our "Metrics" MongoDB collection.

This is the workflow diagram generated from AWS:

This is the code for the Query Jobs lambda:

def lambda_handler(event, context):
    query = {}

    query['runDate'] = dict()
    query['runDate']['$gte'] = event['date_begin']
    query['runDate']['$lte'] = event['date_end']

    client = MongoClient(connection_string)
    db = client['Analytics']

    return list(db.jobs.find(query))

The problem with this approach is that the results returned from the Query Jobs lambda can be potentially thousands of documents. AWS currently limits the output size of a Step Function task to 256 KB. Without any special handling, this limit is easily exceeded, and will cause the Step Function execution to fail:

A Scalable Approach

The solution we came up with was to:

  1. Limit the results coming out of the Query Jobs lambda,
  2. Process and store the comparisons on only this limited set,
  3. Detect if there are more results,
  4. Loop back to the Query Jobs lambda to repeat the process until there is no more data.

To do this, we limit the number of documents returned from the Query Jobs lambda for any given invocation by introducing a pagination scheme. Instead of selecting all documents, we set a query limit and special batch_id to indicate the current page to retrieve. If there are more pages, the lambda will return the batch_id identifying the next page of documents to be used when the lambda is invoked again. The batch_id is similar to the LastEvaluatedKey used for pagination in DynamoDB. We use the internal MongoDB _id field from the "Jobs" collection in sorted order as the batch_id.

We now start the workflow with a batch size in addition to the date range:

{
    "date_begin": "2020-08-18T00:00:00",
    "date_end": "2020-08-20T23:59:59",
    "batch_size": 20
}

In this example, we limit the batch size to 20.

The new code below shows the Query Jobs lambda with added paging capabilities. We also make sure runDate is properly indexed in our "Jobs" collection.

def lambda_handler(event, context):
    query = {}

    batch_id = event['batch_id']

    query['runDate'] = dict()
    query['runDate']['$gte'] = event['date_begin']
    query['runDate']['$lte'] = event['date_end']

    if batch_id:
        query['_id'] = dict()
        query['_id']['$gt'] = ObjectId(batch_id)

    client = MongoClient(connection_string)
    db = client['Analytics']

    jobs = list(db.jobs.find(query).sort('_id', 1).limit(event['batch_size']))

    event['batch_id'] = None
    # if there are more results, set the batch_id for the next iteration in the sfn
    if len(jobs) >= event['batch_size']:
        event['batch_id'] = jobs[-1]['_id']

    return dict(
        jobs = jobs, 
        queryParams = event
    )

In addition to the requested documents, the lambda also returns the query parameters to use in the next iteration. These include the batch_id for the next batch of data. The "Check For More Data" choice state will see that a batch_id exists and begins another iteration. See the workflow snippet below. In the next iteration, the batch_id will be filled in with the last _id seen from the previous results and grab the next batch of 20. The query criteria in the Query Jobs lambda uses a strictly greater than clause ($gt) for _id if batch_id exists. This way, the query is guaranteed to continue where it left off the last time around.

Once the number of documents the query returns is less than the batch size, no batch_id is set. The "Check For More Data" choice state sees this and terminates the workflow.

 

"Check For More Data": {
    "Type": "Choice",
    "Choices": [
        {
            "Variable": "$.data.Payload.queryParams.batch_id",
            "IsNull": false,
            "Next": "Next Batch"
        },
        {
            "Variable": "$.data.Payload.queryParams.batch_id",
            "IsNull": true,
            "Next": "Success"
        }
    ],
    "Default": "Success"
},
"Next Batch": {
    "Type": "Pass",
    "Parameters": {
        "queryParams.$": "$.data.Payload.queryParams"
    },
    "Next": "Query Jobs"
}

There you have it! Now we can pass along a bunch of queried data through a workflow without having the thing blow up!

What We Learned

As always, the choice of ETL tool (or any AWS tool) depends on the use case. For our case, we learned that we could adapt an existing Step Function workflow to handle a moderate amount of volume without the heavy lift of migrating to a full-blown serverless ETL tool, like AWS Glue. For our case, we knew that we’d always be dealing with data at the scale of thousands of documents. A use-case involving millions of documents would instead require a different approach.

Another limitation we didn't cover here is a 25,000 limit on the execution history size of Step Functions. Restarting the workflow as a new execution solves this limitation.

Mark Panahi
Mark Panahi
Mark is a Senior Software Engineer at FloQast. Mark loves chocolate croissants.

Check out research, videos, case studies, and more!

Learn more about working at FloQast!