Last stop on our serverless project journey
We’re going to walk through the automated flow using DynamoDB streams, AWS CI/CD services and Lambda.
AWS Lambda
AWS CodePipeline
AWS CodeBuild
Amazon DynamoDB
Hi, it’s been a while since we last met. Today, we’re reaching the end of our journey. Yes, it’s the final chapter. Last time we talked about AWS Step Functions flow for Lambda tests.
So, if we’ve got that covered, it’s high time to push the algorithm changes to the frontend. We’re going to walk through the automated flow, using DynamoDB Streams, AWS CI/CD services and two customized Lambda functions to make it simple and cost-effective. Ready? Then, let’s start…
4 sources of change - multisource AWS CodePipeline
When we built our frontend, we relied on AWS CodePipeline, a fully managed continuous delivery service. Basically, our goal was to simplify this flow as much as possible and, what was even more important, to save time. As a basic feature CodePipeline offers a seamless integration with third-party services, such as GitHub, or with your own custom plugin.

Unfortunately for our case, it wasn’t enough because we had to invoke pipeline whenever a new change was noticed. In other words, not only inside GitHub repository but also in one of S3 files containing metadata on active versions of the algorithm. These metadata files were nothing but simple json files with key/value data necessary for proper display of the webpage. Therefore, we had to customize CodePipeline Source stage (via Terraform) to meet our needs.
Generally, I prefer to use a mix of Serverless Framework and Terraform in serverless-like projects. The former gives me unquestionable simplicity when defining Lambda functions thanks to its huge variety of plugins. The latter improves flexibility when describing infrastructure parts which do not change frequently.
Terraform snippet we used to define a multisource stage of CodePipeline:
resource "aws_codepipeline" "pipeline" {
name = "${var.app}-${var.service}-${var.info}-${var.stage}"
role_arn = "${aws_iam_role.codepipeline_role.arn}"
artifact_store {
location = "${aws_s3_bucket.codepipeline.bucket}"
type = "S3"
}
stage {
name = "${var.stage_1_name}"
action {
name = "${var.stage_1_action_1}"
category = "Source"
owner = "ThirdParty"
provider = "GitHub"
version = "1"
output_artifacts = ["${var.artifact1}"]
configuration {
Owner = "${var.repository_owner}"
Repo = "${var.repository}"
Branch = "${var.branch}"
OAuthToken = "${data.aws_ssm_parameter.github_token.value}"
}
}
action {
name = "${var.stage_1_action_2}"
category = "Source"
owner = "AWS"
provider = "S3"
version = "1"
output_artifacts = ["${var.artifact2}"]
configuration {
S3Bucket = "${var.source_bucketname}"
S3ObjectKey = "${var.source_path_in}"
}
}
action {
name = "${var.stage_1_action_3}"
category = "Source"
owner = "AWS"
provider = "S3"
version = "1"
output_artifacts = ["${var.artifact3}"]
configuration {
S3Bucket = "${var.source_bucketname}"
S3ObjectKey = "${var.source_path_opt}"
}
}
action {
name = "${var.stage_1_action_4}"
category = "Source"
owner = "AWS"
provider = "S3"
version = "1"
output_artifacts = ["${var.artifact4}"]
configuration {
S3Bucket = "${var.source_bucketname}"
S3ObjectKey = "${var.source_path_out}"
}
}
}
Below, you can see the outcome view in AWS console. The reason for using 3 different source s3 objects is a number of algorithm types. We’ve leveraged Lambda function to update particular s3 objects after receiving the data via DynamoDB Streams. A stream is invoked when phase
attribute is changed into PACKAGE_TESTS_COMPLETE
. For us, this means the test went through successfully and it’s time to make changes in the User Interface.

Bob the Builder - AWS CodeBuild phase
A change in one of available sources triggers the pipeline and moves the artifact to the next stage which is AWS CodeBuild, a fully managed continuous integration service that compiles source code, runs tests, and produces software packages that are ready to deploy. The only thing worth mentioning here is to always remember that in such scenarios you have to define multiple input_artifacts
that are going to be used in a build process.
stage {
name = "${var.stage_2_name}"
action {
name = "${var.stage_2_action}"
category = "Build"
owner = "AWS"
provider = "CodeBuild"
input_artifacts = ["${var.artifact1}", "${var.artifact2}", "${var.artifact3}", "${var.artifact4}"]
version = "1"
output_artifacts = ["${var.artifact}-build"]
configuration {
ProjectName = "${var.codebuild_proj_id}"
PrimarySource = "${var.artifact1}"
}
}
}

Tell me when it’s over
Whenever I design any piece of automated flow I constantly challenge myself, especially when working on notifications. I try to make them as helpful and understandable as possible so they won’t cause any doubts, like ‘Is it finished or did something go wrong?’ This time I used a well know Lambda function for Slack notifications that sends acknowledgements of success with a format presented below. It’s the same pattern with Lambda code that I’ve provided for failures caught in the pipeline. The second parallel Lambda function was added to address the problem of cache invalidation, which allows us to remove an object from the CloudFront cache before it expires.
from helpers.exceptions import CloudFrontException
//helpers.exception snippet:
class CloudFrontException(Exception):
def __init__(self, message='Something wrong happend with CloudFront :)', status_code=None, details=None):
super(CloudFrontException, self).__init__(
message, status_code, details)
self.message = message
self.status_code = 500
self.details = details or {}
class CloudFront(object):
def list_distributions(self, bucket):
try:
logger.info(
"----Looking for CF distributions for bucket: {0}".format(bucket))
response = self.client.list_distributions()
for item in response['DistributionList']['Items']:
for i in item['Origins']['Items']:
if bucket in i['DomainName']:
logger.info("----Found {0} CF distributions for bucket: {1}".format(item['Id'], bucket))
print(item['Origins']['Items'])
id_ = item['Id']
return {"statusCode": 200,
"id_": id_
}
else:
logger.info( "----Missing CF distributions for bucket: {0}".format(bucket))
return {"statusCode": 204}
except ClientError as err:
logger.critical("----Client error: {0}".format(err))
logger.critical("----HTTP code: {0}".format(err.response['ResponseMetadata']['HTTPStatusCode']))
raise ClientError
def invalidate_cache(self, bucket, path='/*'):
try:
logger.info(
"----Invalidating CF distributions for bucket: {0}".format(bucket))
response = self.list_distributions(bucket)
if response['statusCode'] == 200:
invalidation = self.client.create_invalidation(DistributionId=response['id_'],
InvalidationBatch={
'Paths': {
'Quantity': 1,
'Items': [path]
},
'CallerReference': str(time.time())
})
return {"statusCode": 200,
"id_": invalidation['Invalidation']['Id']
}
elif response['statusCode'] == 204:
return {"statusCode": 204}
else:
return {"statusCode": 400}
except ClientError as err:
raise CloudFrontException(details=err)


Here’s the full view of the aforementioned events, starting with a triggered DynamoDB Stream, up to the final stage of Lambdas’ invocations.

Further stops on the road
Being a CTO myself, I can tell you that the highest value of serverless approach I’ve determined so far is the time it saves. Especially in cases when time to value is a critical factor. Contemporary world shows us that time is a new currency used in the innovation race. Don’t get me wrong, going serverless isn’t the only way to achieve success, but it definitely helps to shift your attention more towards reaching business/development goals and away from doing maintenance work. Moreover, it’s more likely that you won’t have to involve Operations team when justifying your confidence in a new business idea.