ArticleAWSDynamodbLambda

Dynamodb Streams – handy and tricky as well

By 10/18/2019 No Comments

Sooner or later, during your development process based on the serverless architecture, you’re going to come across Dynamodb Streams. From the event-based model perspective, they seem to be an easy way to capture dynamodb activity and invoke some further actions. Looks easy, but as always, the devil is in the detail, which literally might cost you some unnecessary money. So do not waste time and set a stream, attach the Lambda function to it and highlight those details.

Create the stream and bind with the function

AWS maintains separate endpoints for DynamoDB and DynamoDB Streams. To work with database tables and indexes, the application must access a DynamoDB endpoint. However, if you want to process DynamoDB Streams records, your request must obtain a DynamoDB Streams endpoint in the same Region.

A stream consists of stream records. Each stream record represents a single data modification in the DynamoDB table to which the flow belongs. Each stream record is assigned a sequence number, reflecting the order in which the record was published to the stream. Depending on the operation that was performed on Dynamodb table, the application will receive a corresponding INSERT, MODIFY, or REMOVE:

Stream records are organized into groups or shards. Each shard acts as a container for multiple stream records and contains the information required for accessing and iterating through these records.

The easiest way, I use, to set up Dynamodb Streams is a Serverless Framework resource section in which I’m defining my database. Generally, I do prefer to put everything, apart from functions’ definitions, into separate yml files and  import them via ${file({{directory_name}/{your_file_name}.yml)} in the serverless.yml file.

Resources:
  AnalysisMetadataTable:
    Type: AWS::DynamoDB::Table
    Properties:
      TableName: ${self:custom.analysis-tablename}
      AttributeDefinitions:
        - AttributeName: analysisRequestId
          AttributeType: S
        - AttributeName: requestorId
          AttributeType: S
      KeySchema:
        - AttributeName: analysisRequestId
          KeyType: HASH
      StreamSpecification:
        StreamViewType: NEW_IMAGE
      ProvisionedThroughput:
        ReadCapacityUnits: 10
        WriteCapacityUnits: 10
      GlobalSecondaryIndexes:
        - IndexName: requestorId-gsi
          KeySchema:
            - AttributeName: requestorId
              KeyType: HASH
          Projection:
            ProjectionType: ALL
          ProvisionedThroughput:
            ReadCapacityUnits: 5
            WriteCapacityUnits: 5
      Tags:
        - Key: Name
          Value: ${self:custom.analysis-tablename}
        - Key: Environment
          Value: ${self:custom.stage}
        - Key: Project
          Value: ${self:custom.app}
        - Key: Application
          Value: ${self:custom.service_acronym}

You probably notice the parameter “StreamSpecification,” which defines how the stream is configured. You’ve got four options in terms of the type of information that will be written to the stream whenever data in the table is modified:

  • KEYS_ONLY — Only the key attributes of the modified item.
  • NEW_IMAGE — The entire item, as it appears after it was modified.
  • OLD_IMAGE — The entire item, as it appeared before it was modified.
  • NEW_AND_OLD_IMAGES — Both the new and the old images of the item.

I chose NEW_IMAGE but that’s not all we need. Second step is to define your Lambda function that is going to collect and process the data from the stream. To make it feasible, first and foremost we have to attach the IAM Role, allowing us to describe and to get records from the stream. As the resource you’re setting the arn of the stream:

arn:aws:dynamodb:{REGION}:{ACCOUNT_ID}:table/{TABLE_NAME}/stream/{STREAM}

I’ve pasted the “*” because you cannot describe the stream unless it’s created. Of course to maintain the “least-privilege” rule, you should limit the IAM Role only to the stream you are going to create.

Following permissions are needed to manage resources related to your DynamoDB Streams:

Action:
                  - dynamodb:DescribeStream
                  - dynamodb:GetRecords
                  - dynamodb:GetShardIterator
                  - dynamodb:ListStreams

Full IAM Role resource:

Resources:
  StreamProcessorRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - lambda.amazonaws.com
            Action:
              - sts:AssumeRole
      Path: /
      Policies:
        - PolicyName: metadata-stream-processor-${self:custom.stage}
          PolicyDocument:
            Version: "2012-10-17"
            Statement:
              - Effect: Allow
                Action:
                  - dynamodb:DescribeStream
                  - dynamodb:GetRecords
                  - dynamodb:GetShardIterator
                  - dynamodb:ListStreams
                Resource:
                  - arn:aws:dynamodb:#{AWS::Region}:#{AWS::AccountId}:table/${self:custom.analysis-tablename}/stream/*

The first problem with Serverless Framework you’re going to encounter during Lambda function integration is the definition of the stream as an event for Lambda. Take a look at the example function I’ve been using for the data processing. We’ve got an “event” part, which I’m defining the stream as a source event for the function.

events:
  - stream:
    type: dynamodb
    arn:
      Fn::GetAtt:
        - AnalysisMetadataTable
        - StreamArn
    batchSize: 10
    startingPosition: LATEST
    enabled: True

The point is that it doesn’t bind the source event to the Lambda function. To do that you have to first create a stream for Dynamodb and after it is visible in the console

AWS Console view

or via boto3 DynamodbStreams class method (describe_stream(**kwargs)), you must then add another configuration line to the event part in function section and “YOUR_STREAM” must be replaced with a proper name.

- stream: arn:aws:dynamodb:#{AWS::Region}:#{AWS::AccountId}:table/${self:custom.analysis-tablename}/stream/{YOUR_STREAM}

Then after the stack rebuild with sls deploy command you should see your event source trigger attached to the particular Lambda function, you’ve added it for:

Here’s the full configuration of the Lambda function after changes:

analysis-info-processor:
    name: ${self:custom.app}-${self:custom.service_acronym}-analysis-info-processor
    description: Handles dynamodb stream and puts data into SQS for analysis worker-in
    handler: functions.analysis_metadata_processor.lambda_handler
    role: StreamProcessorRole
    runtime: python3.6
    reservedConcurrency: 50
    alarms: # merged with function alarms
      - functionErrors
    deadLetter:
      sqs: # New Queue with these properties
        queueName: ${self:custom.analysis-info-processor-dlq}
        delaySeconds: 10
        maximumMessageSize: 2048
        messageRetentionPeriod: 86400
        receiveMessageWaitTimeSeconds: 5
        visibilityTimeout: 300
    environment:
      tablename: ${self:custom.analysis-tablename}
    tags:
      Name: ${self:custom.app}-${self:custom.service_acronym}-analysis-info-processor
    layers:
      - arn:aws:lambda:${self:provider.region}:#{AWS::AccountId}:layer:aws-custom-logging-${self:custom.stage}:${self:custom.logging-layer-version}
    events:
- stream: arn:aws:dynamodb:#{AWS::Region}:#{AWS::AccountId}:table/${self:custom.analysis-tablename}/stream/{YOUR_STREAM}
      - stream:
        type: dynamodb
        arn:
          Fn::GetAtt:
            - AnalysisMetadataTable
            - StreamArn
        batchSize: 10
        startingPosition: LATEST
        enabled: True

Process the data from the stream but unnoticed errors might cost

Lambda reads records from the stream and invokes your function synchronously with an event that contains stream records. Generally Lambda polls shards in your DynamoDB Streams for records at a base rate of 4 times per second. When records are available, Lambda invokes your function and waits for the result.

IMPORTANT: Key points to remember:

  • If processing succeeds, Lambda resumes polling until it receives more records.
  • If your function returns an error, Lambda retries the batch until processing succeeds or the data expires and it might cost because Lambda is going to be repeatedly invoked until any of the statements is true

Here, among all received records I am filtering those ones with MODIFY evetName and with another Attribute value that I need:

for record in event['Records']:
if record['eventName'] == 'MODIFY' and record['dynamodb']['NewImage']['phase']['S'] == 'FILE_UPLOAD_COMPLETE':

Those “unnoticed” costs I’ve mentioned happened to me when I filled the table with new “testing” data, and stream has started to pass it to a Lambda function. Unfortunately, because of my mistake, Lambda couldn’t process the data properly and was returning an error that I haven’t been noticed about(alarm not configured). The stream was invoking “silently” my function during the night, and I came to realize that in the morning the next day. Twenty-four hours of data retention might be either great or like in my case, a lesson to remember. On a greater scale, a costly lesson.

If you can handle more then collect batches

By default, Lambda invokes your function as soon as records are available in the stream. If the batch it reads from the stream only has one record in it, Lambda only sends one record to the function. To avoid invoking the function inefficiently with a small number of records, you can tell the event source to buffer records for up to 5 minutes by configuring a batch window. Before invoking the function, Lambda continues to read records from the stream until it has gathered a full batch, or until the batch window expires.

With a Serverless Framework you can specify these parameters with event section:

  • Batch size – The number of records to read from a shard in each batch, up to 1,000. Lambda passes all of the records in the batch to the function in a single call, as long as the total size of the events doesn’t exceed the payload limit for synchronous invocation (6 MB).
  • Batch window – Specify the maximum amount of time to gather records before invoking the function, in seconds.
  • Starting position – Process only new records, or all existing records.
    • Latest – Process new records added to the stream.
    • Trim horizon – Process all records in the stream.
    events:
- stream: arn:aws:dynamodb:#{AWS::Region}:#{AWS::AccountId}:table/${self:custom.analysis-tablename}/stream/{YOUR_STREAM}
      - stream:
        type: dynamodb
        arn:
          Fn::GetAtt:
            - AnalysisMetadataTable
            - StreamArn
        batchSize: 10
        startingPosition: LATEST
        enabled: True

Your Lambda function will be invoked when one of the following three things happens:

  1. The total payload size reaches 6MB;
  2. The batchWindow(not configured in my example) reaches its maximum value; or the batchSize reaches it maximum value.

Example of 5 REMOVE type stream records  in one event

BUT, there is always a catch in talking more than a single record. Lambda function cannot say to Dynamodb stream, “Hey, I just processed these 10 events successfully, you sent me before, and these 10 unfortunately failed, so please resend me only those 10 that failed”. If you fail in the Lambda function, the DynamoDB stream will resend the entire set of data again in the future.

What I don’t recommend is to set your BatchSize to 1, because the ability to process a greater number of events at once is a huge performance benefit, however, there’s no golden mean for handling errors, so be aware of issues that might occur with batch size greater than 1.

Conclusion

Basically, there are two lessons coming from this article, I wanted you to bear in mind. First of all, if your function returns an error, Lambda retries the batch until processing succeeds or the data expires – do not forget about that as I did. Last but not least, if you partially succeeded with the batch stream cannot resend you the only records that failed. Generally, that’s a little bit problematic if you have already written part of your data, for example, to the table. Contrary to the Dynamodb stream in SQS, you can then delete a single message from the queue, so it does not get processed again. In Dynamodb streams, there is no concept of removing a record, because it doesn’t track how its consumers are reading the events.