06.05, Katowice AWS Summit Poland
12 min read

DynamoDB Streams — streaming to SQS with Lambda

Exploring the usefulness and rough edges of Amazon DynamoDB Streams, using AWS Lambda to stream table changes to Amazon SQS in near real-time.



Sooner or later in any development process based on serverless architecture in AWS, you’re going to come across DynamoDB streams. If you’re already familiar with event-based development patterns, then DynamoDB Streams can seem like an easy way to capture DynamoDB activity and use it to invoke further actions.

However, the devil is in the details — and in this case those might cost you unnecessary money. So, let’s not waste time and set a stream, attach a Lambda function to it and highlight those details.

The basics of DynamoDB Streams

AWS maintains separate endpoints for DynamoDB and DynamoDB Streams. To work with database tables and indexes, the application must access a DynamoDB endpoint. However, if you want to process DynamoDB Streams records, your request must obtain a DynamoDB Streams endpoint in the same Region.

A stream consists of stream records. Each stream record represents a single data change in the DynamoDB table to which the flow belongs. Each stream record is assigned a sequence number, reflecting the order in which the record was published to the stream.

Depending on the operation that was performed on the DynamoDB table, the application will receive a corresponding INSERT, MODIFY, or REMOVE event:

{  "Records": [    {      "eventID": "1",      "eventName": "INSERT",      "eventVersion": "1.0",      "eventSource": "aws:dynamodb",      "awsRegion": "us-east-1",      "dynamodb": {        "Keys": {          "Id": {            "N": "101"          }        },        "NewImage": {          "Message": {            "S": "New item!"          },          "Id": {            "N": "101"          }        },        "SequenceNumber": "111",        "SizeBytes": 26,        "StreamViewType": "NEW_AND_OLD_IMAGES"      },      "eventSourceARN": "stream-ARN"    }, {      "eventID": "2",      "eventName": "MODIFY",      "eventVersion": "1.0",      "eventSource": "aws:dynamodb",      "awsRegion": "us-east-1",      "dynamodb": {        "Keys": {          "Id": {            "N": "101"          }        },        "NewImage": {          "Message": {            "S": "This item has changed"          },          "Id": {            "N": "101"          }        },        "OldImage": {          "Message": {            "S": "New item!"          },          "Id": {            "N": "101"          }        },        "SequenceNumber": "222",        "SizeBytes": 59,        "StreamViewType": "NEW_AND_OLD_IMAGES"      },      "eventSourceARN": "stream-ARN"    }, {      "eventID": "3",      "eventName": "REMOVE",      "eventVersion": "1.0",      "eventSource": "aws:dynamodb",      "awsRegion": "us-east-1",      "dynamodb": {        "Keys": {          "Id": {            "N": "101"          }        },        "OldImage": {          "Message": {            "S": "This item has changed"          },          "Id": {            "N": "101"          }        },        "SequenceNumber": "333",        "SizeBytes": 38,        "StreamViewType": "NEW_AND_OLD_IMAGES"      },      "eventSourceARN": "stream-ARN"    }  ]}

Stream records are organized into groups or shards. Each shard acts as a container for multiple stream records and contains the information required for accessing and iterating through these records.

Creating a DynamoDB Stream

Using the Serverless Framework, the simplest way to define DynamoDB Streams is alongside your database in the Resource section of serverless.yml.

Normally, I prefer to separate everything — apart from function definitions — into dedicated .yml files and reference them via ${file({your_file_name}.yml)} instead, to keep everything neatly organized.

serverless.yml yaml
Resources:  AnalysisMetadataTable:    Type: AWS::DynamoDB::Table    Properties:      TableName: ${self:custom.analysis-tablename}      AttributeDefinitions:        - AttributeName: analysisRequestId          AttributeType: S        - AttributeName: requestorId          AttributeType: S      KeySchema:        - AttributeName: analysisRequestId          KeyType: HASH      StreamSpecification:        StreamViewType: NEW_IMAGE      ProvisionedThroughput:        ReadCapacityUnits: 10        WriteCapacityUnits: 10      GlobalSecondaryIndexes:        - IndexName: requestorId-gsi          KeySchema:            - AttributeName: requestorId              KeyType: HASH          Projection:            ProjectionType: ALL          ProvisionedThroughput:            ReadCapacityUnits: 5            WriteCapacityUnits: 5      Tags:        - Key: Name          Value: ${self:custom.analysis-tablename}        - Key: Environment          Value: ${self:custom.stage}        - Key: Project          Value: ${self:custom.app}        - Key: Application          Value: ${self:custom.service_acronym}

StreamSpecification defines how the stream is configured. You’ve got four options in terms of the type of information that will be written to the stream whenever data in the table is modified:

I chose NEW_IMAGE, but that’s not all we need.

Binding a Lambda to a DynamoDB Stream

The next step is to define the Lambda function responsible for collecting and processing the data from the stream.

First, we have to attach an IAM Role with the right permissions to manage resources related to our DynamoDB Streams. We’re going to need:

Action:  - dynamodb:DescribeStream  - dynamodb:GetRecords  - dynamodb:GetShardIterator  - dynamodb:ListStreams

Then, as the resource we’re setting the ARN of the stream:

arn:aws:dynamodb:{REGION}:{ACCOUNT_ID}:table/{TABLE_NAME}/stream/{STREAM}

And the full IAM Role resource could then be defined like this:

serverless.yml yaml
Resources:  StreamProcessorRole:    Type: AWS::IAM::Role    Properties:      AssumeRolePolicyDocument:        Version: "2012-10-17"        Statement:          - Effect: Allow            Principal:              Service:                - lambda.amazonaws.com            Action:              - sts:AssumeRole      Path: /      Policies:        - PolicyName: metadata-stream-processor-${self:custom.stage}          PolicyDocument:            Version: "2012-10-17"            Statement:              - Effect: Allow                Action:                  - dynamodb:DescribeStream                  - dynamodb:GetRecords                  - dynamodb:GetShardIterator                  - dynamodb:ListStreams                Resource:                  - arn:aws:dynamodb:#{AWS::Region}:#{AWS::AccountId}:table/${self:custom.analysis-tablename}/stream/*

Note: I’ve used * as STREAM in the ARN because you cannot describe the stream unless it’s created. Of course, to maintain the principle of least privilege, you should limit the IAM Role only to the stream you are going to create.


When using the Serverless Framework, the first problem you’re going to encounter integrating your Lambda function is actually defining the stream as an event for Lambda. Take a look at the example function I’ve been using for processing the stream data.

We’ve got an events section, wherein we define the DynamoDB Stream as a source event for our Lambda function:

events:  - stream:    type: dynamodb    arn:      Fn::GetAtt:        - AnalysisMetadataTable        - StreamArn    batchSize: 10    startingPosition: LATEST    enabled: True

The issue is that it doesn’t bind the source event to the Lambda function. To do that you have to first create a stream for DynamoDB…

Screenshot of AWS Console showing a created DynamoDB Stream and its ARN
AWS Console showing a created DynamoDB Stream and its ARN.

… and after it is visible in the console — or via the describe_stream() method of DynamoDBStreams available in boto3 — you must then define another stream in the events section…

- stream: arn:aws:dynamodb:#{AWS::Region}:#{AWS::AccountId}:table/${self:custom.analysis-tablename}/stream/{STREAM}

… with STREAM replaced with the proper key, of course.

Finally, after the stack gets rebuilt with sls deploy, you should see your event source trigger attached to the particular Lambda function, you’ve added it for:

Screenshot of AWS Console showing DynamoDB as an event source trigger for AWS Lambda
AWS Console showing DynamoDB as an event source trigger for AWS Lambda.

Here’s the full configuration of the Lambda function including those changes:

analysis-info-processor:  name: ${self:custom.app}-${self:custom.service_acronym}-analysis-info-processor  description: Handles a DynamoDB stream and queues data in SQS for an analysis worker  handler: functions.analysis_metadata_processor.lambda_handler  role: StreamProcessorRole  runtime: python3.6  reservedConcurrency: 50  alarms: # Merged with function alarms    - functionErrors  deadLetter:    sqs: # New Queue with these properties      queueName: ${self:custom.analysis-info-processor-dlq}      delaySeconds: 10      maximumMessageSize: 2048      messageRetentionPeriod: 86400      receiveMessageWaitTimeSeconds: 5      visibilityTimeout: 300  environment:    tablename: ${self:custom.analysis-tablename}  tags:    Name: ${self:custom.app}-${self:custom.service_acronym}-analysis-info-processor  layers:    - arn:aws:lambda:${self:provider.region}:#{AWS::AccountId}:layer:aws-custom-logging-${self:custom.stage}:${self:custom.logging-layer-version}  events:    - stream: arn:aws:dynamodb:#{AWS::Region}:#{AWS::AccountId}:table/${self:custom.analysis-tablename}/stream/{STREAM}    - stream:      type: dynamodb      arn:        Fn::GetAtt:          - AnalysisMetadataTable          - StreamArn      batchSize: 10      startingPosition: LATEST      enabled: True

Watch out for processing errors

Lambda reads records from the stream and invokes your function synchronously with the event that contains stream records. Generally Lambda polls shards in your DynamoDB Streams for records at a base rate of 4 times per second. When new records are available, Lambda invokes your function and waits for the result.

Important:

Learn from my mistake:

for record in event['Records']:if record['eventName'] == 'MODIFY' and record['dynamodb']['NewImage']['phase']['S'] == 'FILE_UPLOAD_COMPLETE':

Here, among all received records I was looking for MODIFY events (eventName) which are in the FILE_UPLOAD_COMPLETE.

This did get costly for me once I filled the table with testing data, and DynamoDB started to pass it to my Lambda function. Unfortunately, because of my mistake, my code didn’t process the stream data properly and kept returning an error… that I didn’t get notified about (alarm not configured). The stream kept silently invoking my function during the night, and I only realized it the next day.

Twenty-four hours of data retention might be either great — or like in my case: a costly lesson to remember.

Batch when possible

By default, Lambda invokes your function as soon as records are available in the stream. If the batch it reads from the stream only has one record in it, Lambda only sends one record to the function. To avoid invoking the function inefficiently with a small number of records, you can tell the event source to buffer records for up to 5 minutes by configuring a batch window. Before invoking the function, Lambda continues to read records from the stream until it has gathered a full batch, or until the batch window expires.

With the Serverless Framework, you can specify these parameters when defining your events:

events:  - stream: arn:aws:dynamodb:#{AWS::Region}:#{AWS::AccountId}:table/${self:custom.analysis-tablename}/stream/{YOUR_STREAM}  - stream:    type: dynamodb    arn:      Fn::GetAtt:        - AnalysisMetadataTable        - StreamArn    batchSize: 10    startingPosition: LATEST    enabled: True

Your Lambda function will be invoked when one of the following three things happens:

  1. The total payload size reaches 6MB;
  2. The batchWindow (not configured in my example) reaches its maximum value;
  3. The batchSize reaches its maximum value.

But, there is always a catch when processing more than a single record. A Lambda function cannot tell the DynamoDB Stream, Hey, I just successfully processed these 10 events you sent me, but these 10 unfortunately failed, so please retry sending me only those 10 that failed. If you fail in the Lambda function, the DynamoDB stream will resend the entire batch again.

What I don’t recommend is to set your batchSize to 1, because the ability to process a greater number of events at once is a huge performance benefit. However, there’s no silver bullet to handling errors, so be aware of issues that might occur with batch sizes greater than 1.

Conclusion

Besides our practical introduction to consuming DynamoDB Streams, there are two primary takeaways from this article:

  1. If your function returns an error, Lambda retries the batch until processing succeeds or the data expires — do not forget about that as I did.
  2. If you merely partially succeed processing a batch, keep in mind that you cannot rerequest only the records that failed. Generally, that’s a little bit problematic e.g. if you have already written part of your data to a table. Contrary to the DynamoDB stream in SQS, which allows you to delete a single message from the queue, so it does not get processed again. Removing a record in DynamoDB streams is not possible, because it doesn’t track how its consumers are reading the events to begin with.

Let's talk about your project

We'd love to answer your questions and help you thrive in the cloud.