Pattern overview
New Momentum is a research and consultancy agency at the interface of Social media and social change. For their projects I had to scrape several social media platforms and save that data to a Neo4j graph database. I’d like to share a simplified version of the pattern I used in this blog post.
I chose to decouple the saving of the raw data and validating/formatting the data and saving it to a structured database (ie. Neo4j or RDS). This is the perfect use case for DynamoDB streams. Here is what happens:
- A lambda function makes a call to the social media api and saves the response to DynamoDB
- The write to DynamoDB triggers the write of a stream record to the DynamoDB stream
- Another Lambda function polls the stream and is invoked synchronously* when it detects new records
- This Lambda validates and formats the data and writes it to a new endpoint. For New Momentum this was a Neo4j graph database, but this could also be another structured database of course.
Because we save the raw data we can easily reparse it later if our data model changes or if we want to use another structured database.
Let’s dive in some code
All the services we use are serverless, so we can easily define this in a SAM (Serverless Application Model) template with relatively few lines of code. You can find all the code here on github. Let’s first go over the SAM YAML template.
First we define the DynamoDB Table:
DDBTable: Type: AWS::DynamoDB::Table Properties: AttributeDefinitions: - AttributeName: "id" AttributeType: "S" BillingMode: "PAY_PER_REQUEST" KeySchema: - AttributeName: "id" KeyType: "HASH" StreamSpecification: StreamViewType: 'NEW_IMAGE'
Unfortunately we can’t use the “AWS::Serverless::SimpleTable” definition of SAM, because we can’t define the stream there. Instead we use straight cloudformation and to activate the stream all we have to do is set the ‘StreamSpecification’ attribute. We choose NEW_IMAGE for the stream because we aren’t interested in any changes, just the new items.
Globals: Function: Runtime: "python3.8" Timeout: 10
Both lambda functions use the same runtime and timeout, so we can easily define these as globals.
SaveApiRespLambda: Type: AWS::Serverless::Function Properties: CodeUri: save_api_resp_to_ddb/ Handler: app.lambda_handler Policies: - DynamoDBWritePolicy: TableName: !Ref DDBTable Environment: Variables: DDB_TABLE: !Ref DDBTable API_KEY: "{{resolve:secretsmanager:FakeSocialMediaApiKey:SecretString}}"
In SAM we don’t have to specifically define the IAM role for the Lambda function, but we do have to declare that the Lambda function can write to the DDB table. SAM then takes care of the rest. Of course we don’t hardcode the api key, but use a dynamic reference to the AWS Secrets Manager and pass it to the Lambda as an environment variable.
HandleDDBStream: Type: AWS::Serverless::Function Properties: CodeUri: handle_ddb_stream/ Handler: app.lambda_handler Events: DDBStream: Type: DynamoDB Properties: Stream: !GetAtt DDBTable.StreamArn BatchSize: 10 MaximumBatchingWindowInSeconds: 10 StartingPosition: LATEST
Here we see how easy it is to define the DDB stream as a trigger for a Lambda function.
And that’s the whole infrastructure as code template! Let’s also have a look at the python code for the Lambda functions:
import os import boto3 from resources.social_media_wrapper import SocialMediaWrapper API_KEY = os.environ['API_KEY'] DDB_TABLE = os.environ['DDB_TABLE'] # initialize the connections outside the handler, lambda best practice wrapper = SocialMediaWrapper(API_KEY) ddb_client = boto3.resource('dynamodb') def lambda_handler(event, context): # This would be replaced by your actual api call response = wrapper.get_dummy_response() table = ddb_client.Table(DDB_TABLE) table.put_item(Item=response)
The code is quite self-explanatory. I don’t use an actual social media api here, the ‘get_dummy_response’ function simulates this by generating a dictionary with some random values.
from typing import Optional from boto3.dynamodb.types import TypeDeserializer from pydantic import BaseModel deserializer = TypeDeserializer() class FormattedSocialMediaData(BaseModel): id: str key1: str key2: str key3: Optional[str] key4: Optional[str] def dynamo_obj_to_python_obj(dynamo_obj: dict) -> dict: """ Takes a dynamodb low level item (ie. from DDB stream) and parses it to 'normal' json/dict """ return {k: deserializer.deserialize(v) for k, v in dynamo_obj.items()} def parse_low_level_event(event): raw_records = [ dynamo_obj_to_python_obj(item["dynamodb"]["NewImage"]) for item in event["Records"] if item["eventName"] == "INSERT" ] return raw_records def lambda_handler(event, context): """ The event contains the batched records from the DDB stream """ raw_records = parse_low_level_event(event) for record in raw_records: formatted_record = FormattedSocialMediaData(**record) print(formatted_record.dict()) # Here you would save the formatted record to a database/endpoint of your choice
Here we see some intricacies of working with stream records. When the Lambda is invoked it gets passed an ‘event’ object, which is a JSON-formatted document which contains some metadata and our stream records. An example:
{ "Records": [ { "eventID": "7de3041dd709b024af6f29e4fa13d34c", "eventName": "INSERT", "eventVersion": "1.1", "eventSource": "aws:dynamodb", "awsRegion": "region", "dynamodb": { "ApproximateCreationDateTime": 1479499740, "Keys": { "Timestamp": { "S": "2016-11-18:12:09:36" }, "Username": { "S": "John Doe" } }, "NewImage": { "Timestamp": { "S": "2016-11-18:12:09:36" }, "Message": { "S": "This is a bark from the Woofer social network" }, "Username": { "S": "John Doe" } }, "SequenceNumber": "13021600000000001596893679", "SizeBytes": 112, "StreamViewType": "NEW_IMAGE" }, "eventSourceARN": "arn:aws:dynamodb:region:123456789012:table/BarkTable/stream/2016-11-16T20:42:48.104" } ] }
The records are in the low-level DDB api and need to be parsed to a more normal format. Fortunately boto3 has built-in functionality for this (see ‘dynamo_obj_to_python_obj’ function). After parsing we do some validation and formatting using the python library ‘pydantic’ (highly recommended!). After that we are done and the nice formatted record can be sent wherever you want.
Next Steps
We probably don’t want to manually invoke the first Lambda function and would like to make more than just a few api calls. We also need some way to handle errors and monitor if everything is still running. For example, for New Momentum we had to request over 10 million Twitter followers. With the Twitter rate limit this took about three months! Luckily this can also be done within the SAM framework, by using AWS Step Functions. In the follow-up blog we will look into this and also see how we can easily test/develop our application with SAM accelerate. A small preview of the updated architecture:
- A small note about the synchronous invocation: the sharding of the dynamodb stream is fully managed by AWS and you have no control over it. Even though per shard the lambda is invoked synchronously, we don’t know the number of shards so ordering is not guaranteed.