AWSserverless

AppSync – first jumps in at the deep end – part 2

By 02/22/2021No Comments

Last time we ended our voyage across the Appsync ocean on the serverless stack creation, which contained a simple GraphQL schema with the resolver we needed for our example. In case you forgot, I told you that with Unit Resolvers we need a Data Source in order to fetch the data from. This still applies, so it looks like we got our hands full. Here’s the plan for the second part:

  • Go through Resolver request/response mapping templates code
  • Avoid hogwash, mention something about Data Sources
  • As always, paste the code from serverless.yml
  • Proof that it’s working would be nice 🙂

In the repo (it’s tiny part) I left you with an undescribed mapping templates directory. For simplicity and clarity, here’s all the templates collected in a sub-directory.

├── mapping-templates

│   └── Query,getDevices.request..vtl

│   └── Query.getDevices.response.vtl

Just to remind you:

  • Request Template – great for changing the default behaviour in order to add authorisation or input validation.
  • Response Template – great for adding custom error handling or result transformations. Output is a part of the JSON response back to the client.

NOTE: For the sake of simplicity, follow the pattern {Type}.{Field}.request/response.vtl for your request or response mapping template files, where Type = Query, Mutation, Subscription, and Field are the names of the respective Query/Mutation/Subscription in your schema.gql file. In my case it was:

type Query {
      getDevices(value: String, gsi: String, scanIndexForward: Boolean, limit: Int, nextToken: String): PaginatedDevices
}

We’ll start with getDevices-request-template.vtl:

#set( $gsi = $util.defaultIfNull($ctx.args.gsi, "deviceStatus"))
#if( $gsi  == "deviceStatus" )
   #set( $exp = "d_status = :value" )
#end
#if( $gsi  == "deviceGroupId" )
   #set( $exp = "deviceGroupId = :value" )
#end
{
   "version" : "2017-02-28",
   "operation" : "Query",
   "index" : $util.toJson($gsi),
   "scanIndexForward": $util.defaultIfNull($ctx.args.scanIndexForward, false) ,
   "limit": $util.defaultIfNull($ctx.args.limit, 10),
   "nextToken": $util.toJson($util.defaultIfNullOrBlank($ctx.args.nextToken, null)),
   "query" : {
       "expression": $util.toJson($exp),
       "expressionValues" : {
           ":value" : {
               "S" : $util.toJson("$ctx.args.value")
           }
       }
   }
}

$ctx.args.gsi – ctx is nothing more than an alias for $context, a variable which is a map that holds all of the contextual information for your resolver invocation, args is an alias for the arguments key from context variable, and gsi is my input variable representing DynamoDB Global Secondary Index name.

{

“arguments” : { … },

“source” : { … },

“result” : { … },

“identity” : { … },

“request” : { … },

“info”: { … }

}

There is also a stash field, especially important in PIPELINE resolvers. Why? From the AWS documentation:

stash

The stash is a map that is made available inside each resolver and function mapping template. The same stash instance lives through a single resolver execution. This means that you can use the stash to pass arbitrary data across request and response mapping templates, and across functions in a pipeline resolver

$util.defaultIfNull($ctx.args.gsi, “deviceStatus”) – Appsync supports a set of utilities that can be leveraged within a GraphQL resolver to simplify interactions with data sources. This particular one returns the first Object (value from $ctx.args.gsi) if it is not null. Otherwise, it returns a second object as a “default Object”.

Next, we go through simple variables setting and reach the Query configuration in which we’ve got something like this:

":value" : {
    "S" : $util.toJson("$ctx.args.value")
}

It takes an object and returns a “stringified” JSON representation of that object. Alternatively you could write it like this:

"value": $util.dynamodb.toDynamoDBJson("$ctx.args.value")

A helper automatically takes the value and converts it to the DynamoDB representation of a string attribute.

getDevices-response-template.vtl doesn’t introduce any new helpers, except $util.defaultIfNullOrBlank(String, String): String which returns the first String if it is not null or blank. Otherwise, it returns second String as a “default String”.

{
 "items": $util.toJson($ctx.result.items),
 "nextToken": $util.toJson($util.defaultIfNullOrBlank($ctx.result.nextToken, null))
}

Before testing anything, let’s stop for a few minutes next to Data Sources just to be sure that we’re at the same page.

Data Sources

 AWS defines data source as a persistent storage system or a trigger, along with credentials for accessing that system or trigger. Examples include NoSQL databases, relational databases, AWS Lambda functions, and HTTP APIs.

It’s worth mentioning that the data source is configured on a resolver independently of your defined schema. In other words, you decide which GraphQL types are resolved or manipulated through different data sources. Let’s take a hypothetical app as an example and assume that part of its business logic relies on queries to the NoSQL database (DynamoDB) whereas advanced search operations, such as keyword, fuzzy word matches or even geospatial lookups, are handled by the Elasticsearch cluster. With AppSync it’s nothing unusual.

Apart from a variety of data sources that you can use for your resolvers simultaneously, there is also another really useful feature. It is the ability to leverage resolvers to alter the data returned from the data source you’ve picked up. For example, return results from a database, such as DynamoDB, to the request initiator with some of the attributes changed, or from services like an AWS TimeStream.

Let’s take AWS Timestream database as an example. In this particular case we will use it as a datastore for our IoT things’ status metrics. AWS Timestream is a serverless and fully managed AWS time series database, which was designed for recording time series generated by IoT devices. It also has an SQL like language for querying its data.

Here’s my Query in schema.gql file which grabs the request with two input parameters, time (hours) and pct, representing time period and percentage of signalStrength metric we’d like to filter based on devices. In other words, I’d like to filter out all the devices’ metrics stored in AWS Timestream where a signalStrength was below the pct(percentage) level over, let’s say, the last 24 hours.

Below is a Query I used in Lambda for AWS TimeStream data extraction, and JSON which I’m returning as a response if Lamba succeeds. You can notice items key in the result which later I will define as a field I want to get in the schema.gql file:

def lambda_handler(event, context):
   log_level = os.environ['log_level']
   aws_logging.setup(level=log_level,
                     aws_request_id=context.aws_request_id,
                     log_group_name=context.log_group_name,
                     function_ver=context.function_version,
                     event_=event)
   logging.info(" Event: {0}".format(event))
   try:
       query = f"""
           WITH low_signal AS (
           SELECT serialNumber, version, status, signalStrength, voltage, (cast(signalStrength as double)/95)*100 AS signal_pct
           FROM "{database}"."{tablename}"
           WHERE time >= ago({event['filter']['time']}h)
           AND (cast(signalStrength as double)/95)*100 < {event['filter']['pct']}
           )
           SELECT DISTINCT serialNumber, version, status, signalStrength, signal_pct
           FROM low_signal
           """
       result = {
           "status": 200,
           "items": timestream.invoke_query(query_string=query),
           "event": event
       }
       logging.info("Returned data: {}".format(result))
       return result
   except ClientError as err:
       logging.critical("----Client error: {0}".format(err))
       logging.critical(
  "----HTTP code: {0}".format(err.response['ResponseMetadata']['HTTPStatusCode']))
       return {
           "status": 500,
           "message": "Internal Error",
           "event": event
       }

schema.gql required snippet:

type Query {
 getIoTMetadata(filter: IoTMetadataFilterInput): TimeStreamData
}

type TimeStreamData {
 items: TimeStreamPages
}

type TimeStreamPages {
 page: [TimeStreamRows]
}

type TimeStreamRows {
 rows: [String]
}

input IoTMetadataFilterInput {
 time: String!
 pct: String
}

and appsync section in the serverless.yml needed for using Lambda as a resolver:

dataSources:
     - type: AWS_LAMBDA
       name: IoTMetadatasResolver
       description: Lambda DataSource
       config:
         functionName: ${self:custom.app}-${self:service}-iot-metadata-resolver
lambdaFunctionArn: arn:aws:lambda:${self:provider.region}:#{AWS::AccountId}:function:${self:custom.app}-${self:service}-iot-metadata-resolver
         serviceRoleArn: { Fn::GetAtt: [AppSyncLambdaServiceRole, Arn] }

The AppSyncLambdaServiceRole from the line serviceRoleArn defines the needed role and allows AWS AppSync to invoke Lambda function.

Resources:
 AppSyncLambdaServiceRole:
   Type: AWS::IAM::Role
   Properties:
     AssumeRolePolicyDocument:
       Version: "2012-10-17"
       Statement:
         - Effect: Allow
           Principal:
             Service:
               - appsync.amazonaws.com
           Action:
             - sts:AssumeRole
     Path: /
     Policies:
       - PolicyName: devices-appsync-listdevices-resolver-${self:custom.stage}
         PolicyDocument:
           Version: "2012-10-17"
           Statement:
             - Effect: Allow
               Action:
                 - lambda:invokeFunction
               Resource:
                 - arn:aws:lambda:${self:provider.region}:#{AWS::AccountId}:function:${self:custom.app}-${self:service}-iot-metadata-resolver

Of course, we also need a Lambda function definition which, thanks to the Serverless Framework, is extremely easy to define via serverless.yml:

functions:
 devices-iot-metadata-resolver:
   name: ${self:custom.app}-${self:service}-iot-metadata-resolver
   description: Enables mutation on returned data from the TimeStream table
   handler: functions/resolver_iot_metadata.lambda_handler
   role: IoTMetadataResolver
   runtime: python3.6
   memorySize: 128
   reservedConcurrency: 5
   tags:
     Name: ${self:custom.app}-${self:service}-iot-metadata-resolver
   layers: arn:aws:lambda:${self:provider.region}:#{AWS::AccountId}:layer:aws-custom-logging-${self:custom.stage}:${self:custom.logging-layer-version}

A quick note: To simplify custom AWS Lambda logging, I’ve attached a layer I used in previous projects. It allows me to structure the log in a more human-friendly pattern. As this article is more about the Appsync, I’ll leave Lambda code details for now. But those of you, who’d like to start playing with AWS TimeStream via python, just ping me on LinkedIn and we’ll share the experience.

Now, let’s see how the AWS Console looks like after creating the whole stack (I assume you’re familiar with a Serverless Framework). In the right bottom corner you can see our Resolver IoTMetadataResolver.

Corresponding request and response mapping templates are below. An observant reader will surely notice a Data Source named IoTMetadataResolver (in Edit Resolver) that represents the AWS Lambda function we’ve defined.

Let’s execute the query and see the results. I’ve set a period of 48 hours and 50% level of signalStrength, and I’ve used VisualStudioCode plugin in this particular test, but feel free to use whatever plugin you prefer.

POST https://xxxx.appsync-api.eu-west-1.amazonaws.com/graphql
x-api-key: {{ api-key }}
X-REQUEST-TYPE: GraphQL
query getIoTMetadataInfo {
 getIoTMetadata(filter: {time: "48", pct: "50"}) {
   items {
     page {
       rows
     }
   }
 }
}

The end result after invocation:

{
  "data": {
    "getIoTMetadata": {
      "items": {
        "page": [
          {
            "rows": [
              "[{serialNumber=00:00:00:00:04}, {version=v1}, {status=online}, {signalStrength=42}, {signal_pct=44.21052631578947}]",
              "[{serialNumber=00:00:00:00:02}, {version=v1}, {status=online}, {signalStrength=13}, {signal_pct=13.684210526315791}]",
              "[{serialNumber=00:00:00:00:09}, {version=v1}, {status=online}, {signalStrength=20}, {signal_pct=21.052631578947366}]"
            ]
          }
        ]
      }
    }
  }
}

“Everything fails all the time” paradigm highlights an important aspect of dealing with errors properly. Thankfully, AWS AppSync enables raising errors from the following sources:

  • Request or response mapping template
  • Lambda function

Take a look below at the failure I’ve received from my Lambda during initial invocation, before I fixed the issue in the code. It’s good to know that when your Lambda function raises an unhandled error, AWS AppSync uses the error message set by AWS Lambda.

If an error is thrown from the Lambda function, AWS AppSync fails to resolve the current field. Only the error message returned from Lambda will be set in the response.

{
  "data": {
    "getIoTMetadata": null
  },
  "errors": [
    {
      "path": [
        "getIoTMetadata"
      ],
      "data": null,
      "errorType": "Lambda:Unhandled",
      "errorInfo": null,
      "locations": [
        {
          "line": 2,
          "column": 3,
          "sourceName": null
        }
      ],
      "message": "module initialization error"
    }
  ]
}

When we meet again

I will stop at this point and when I come back with the next part, we will investigate Dynamodb design in connection with AWS AppSync. I’ll also reveal our current project and talk about some Dynamodb concepts which I didn’t take into consideration before.