In this post we are going to use AWS glue to extract, transform and load data from one S3 bucket to another s3 bucket. For that we are going to use projen
to create our cdk project.
First step would be to execute:npx projen new awscdk-app-ts --projenrc-ts
to create a typescript app and using typescript as well for our projen configuration file.
This would be the projen configuration file:
import { AwsCdkTypeScriptApp } from 'projen/lib/awscdk'; import { EndOfLine, NodePackageManager, TrailingComma } from 'projen/lib/javascript'; const project = new AwsCdkTypeScriptApp({ cdkVersion: '2.95.1', defaultReleaseBranch: 'main', name: 'glue-load-data-cdk', projenrcTs: true, github: false, packageManager: NodePackageManager.NPM, prettier: true, prettierOptions: { settings: { bracketSpacing: true, printWidth: 120, singleQuote: true, tabWidth: 4, trailingComma: TrailingComma.ES5, endOfLine: EndOfLine.AUTO, }, overrides: [ { files: '*.yml', options: { tabWidth: 2, }, }, ], }, eslintOptions: { prettier: true, dirs: ['src'], }, deps: ['config', '@types/config', '@aws-cdk/aws-glue-alpha'], }); project.tsconfigDev.addInclude('config/*.ts'); project.synth();
config nodejs module is used and also the @aws-cdk/aws-glue-alpha
needs to be imported, after adding those changes, run npx projen
to update project dependecies. Any change to .projenrc.ts
we need to run npx projen
.
Since we are using config
to handle the configuration of our project, a config folder needs to be created in the root of our project.
Create a development.ts
file and put it under config folder
import { GlueVersion, WorkerType } from '@aws-cdk/aws-glue-alpha'; import { RemovalPolicy } from 'aws-cdk-lib'; import { BucketEncryption } from 'aws-cdk-lib/aws-s3'; import { GlueLoadDataConfig } from '../src/shared'; const development: GlueLoadDataConfig = { /** * SSM Paramter containing the name of the bucket where the bronze data is stored * fromEC2 instance. */ bronzeBucketNameSSMPath: '/path/to/bronzeBucketName', /** * SSM Paramter containing the name of the bucket where the silver data is * going to be stored after being transformed. */ silverBucketNameSSMPath: '/path/to/silverBucketName', glueDatabase: { removalPolicy: RemovalPolicy.DESTROY, }, glueCrawler: { schemaChangePolicy: { /** * A value of `LOG` specifies that if a table or partition is found to no longer exist, * do not delete it, only log that it was found to no longer exist. */ deleteBehavior: 'LOG', // | DEPRECATE_IN_DATABASE | DELETE_FROM_DATABASE /** * A value of `LOG` specifies that if a table or a partition already exists, and a change * is detected, do not update it, only log that a change was detected. Add new tables and * new partitions (including on existing tables). */ updateBehavior: 'LOG', // | UPDATE_IN_DATABASE }, }, glueJob: { scriptBucket: { autoDeleteObjects: true, removalPolicy: RemovalPolicy.DESTROY, encryption: BucketEncryption.UNENCRYPTED, versioned: false, }, job: { properties: { maxRetries: 0, workerType: WorkerType.G_1X, numberOfWorkers: 2, timeout: 5, glueVersion: GlueVersion.V4_0, executionClass: 'STANDARD', }, defaultArguments: { enableMetrics: 'true', enableJobInsights: 'false', enableGlueDatacatalog: 'true', enableContinuousCloudwatchLog: 'true', jobBookmarkOption: 'job-bookmark-enable', jobLanguage: 'python', enableAutoScaling: 'true', }, }, }, scheduleTriggerExpression: 'cron(0 4 * * ? *)', }; export default development;
Then under src
folder, a shared folder needs to be created and create an index.ts
src/shared/index.ts
import { GlueVersion, WorkerType } from '@aws-cdk/aws-glue-alpha'; import { RemovalPolicy } from 'aws-cdk-lib'; import { BucketEncryption } from 'aws-cdk-lib/aws-s3'; import config from 'config'; export interface GlueLoadDataConfig { bronzeBucketNameSSMPath: string; silverBucketNameSSMPath: string; glueDatabase: { removalPolicy: RemovalPolicy; }; glueCrawler: { schemaChangePolicy: { deleteBehavior: string; updateBehavior: string; }; }; glueJob: { scriptBucket: { autoDeleteObjects: boolean; removalPolicy: RemovalPolicy; encryption: BucketEncryption; versioned: boolean; }; job: { properties: { maxRetries: number; workerType: WorkerType; numberOfWorkers: number; timeout: number; glueVersion: GlueVersion; executionClass: string; }; defaultArguments: { enableMetrics: string; enableJobInsights: string; enableGlueDatacatalog: string; enableContinuousCloudwatchLog: string; jobBookmarkOption: string; jobLanguage: string; enableAutoScaling: string; }; }; }; scheduleTriggerExpression: string; } export function getGlueConfig(): GlueLoadDataConfig { if (checkIfExtractConfigs(config)) { return config; } throw new Error('Missing config file.'); } export function checkIfExtractConfigs(object: any): object is GlueLoadDataConfig { return 'bronzeBucketNameSSMPath' in object; }
In our app entry point src/main.ts
this would be the code:
import { App } from 'aws-cdk-lib'; import { GlueLoadDataStack } from './glue/glue-load-data-stack'; import { getGlueConfig } from './shared'; const app = new App(); new GlueLoadDataStack(app, 'GlueLoadDataStack', { env: { account: process.env.CDK_DEFAULT_ACCOUNT, region: process.env.CDK_DEFAULT_REGION, }, tags: { CostAllocation: 'alfred', }, description: 'Use Glue to load data from s3 bronze bucket filled by third party ec2 instance.', glueConfig: getGlueConfig(), }); app.synth();
And finally create a glue folder and a file to contain our glue stack: src/glue/glue-load-data-stack.ts
.
First we create the glue database passing the removal policy (from config file)
private createGlueDatabase(removalPolicy: RemovalPolicy): IDatabase { const db = new Database(this, 'BronzeGlueDatabase'); db.applyRemovalPolicy(removalPolicy); return db; }
Then we get the bronze and silver buckets (this resources needs to be created before our stack is created). Finally we create a script bucket where the glue job will reside. In here we upload a python script containing transforming logic (Will be described later).
private createBucketAndDeployGlueJobAssets(glueConfig: GlueLoadDataConfig): IBucket { const scriptBucket = new Bucket(this, 'GlueJobBucket', { ...glueConfig.glueJob.scriptBucket, }); new BucketDeployment(this, 'GlueJobBucketDeployment', { sources: [Source.asset('./glue-job-assets')], destinationBucket: scriptBucket, }); return scriptBucket; }
Then we create the role needed to read from all the buckets and write as well and to get access to all glue resources:
private createGlueRole(): IRole { const glueRole = new Role(this, 'BronzeBucketCrawlerRole', { assumedBy: new ServicePrincipal('glue.amazonaws.com'), managedPolicies: [{ managedPolicyArn: 'arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole' }], }); this.bronzeBucket.grantRead(glueRole); this.scriptBucket.grantReadWrite(glueRole); this.silverBucket.grantReadWrite(glueRole); return glueRole; }
After the role, we need to create the glue crawler which will be in charge of crawl (scan) the bronze bucket for data and update the database for new changes:
private createGlueCrawler(glueConfig: GlueLoadDataConfig): CfnCrawler { return new CfnCrawler(this, 'BronzeBucketCrawler', { name: 'bronze-bucket-crawler', description: 'Crawler for Bronze Bucket', databaseName: this.glueDatabase.databaseName, role: this.glueRole.roleArn, targets: { s3Targets: [ { path: `s3://${this.bronzeBucket.bucketName}`, }, ], }, recrawlPolicy: { recrawlBehavior: 'CRAWL_NEW_FOLDERS_ONLY', }, schemaChangePolicy: { deleteBehavior: glueConfig.glueCrawler.schemaChangePolicy.deleteBehavior, updateBehavior: glueConfig.glueCrawler.schemaChangePolicy.updateBehavior, }, }); }
All the crawler configuration is set on the development.ts
config file.
Finally we create the glue job telling which transform script to use, the role, worker type, glue version, etc. In the default arguments we can pass parameters to be used in the job, like the bronze_bucket_url, silver_bucket_url, database_name and table_name.
private createGlueJob(glueConfig: GlueLoadDataConfig): CfnJob { const jobProperties = glueConfig.glueJob.job.properties; const defaultArguments = glueConfig.glueJob.job.defaultArguments; return new CfnJob(this, 'GlueJob', { name: 'move_data_to_silver_bucket', description: 'Move transformed data from bronze to silver bucket', command: { name: 'glueetl', scriptLocation: this.scriptBucket.s3UrlForObject('glue_etl_job.py'), pythonVersion: '3', }, role: this.glueRole.roleArn, maxRetries: jobProperties.maxRetries, workerType: jobProperties.workerType.name, numberOfWorkers: jobProperties.numberOfWorkers, timeout: jobProperties.timeout, glueVersion: jobProperties.glueVersion.name, executionClass: jobProperties.executionClass, defaultArguments: { '--enable-metrics': defaultArguments.enableMetrics, '--enable-job-insights': defaultArguments.enableJobInsights, '--enable-glue-datacatalog': defaultArguments.enableGlueDatacatalog, '--enable-continuous-cloudwatch-log': defaultArguments.enableContinuousCloudwatchLog, '--job-bookmark-option': defaultArguments.jobBookmarkOption, '--job-language': defaultArguments.jobLanguage, '--enable-auto-scaling': defaultArguments.enableAutoScaling, '--TempDir': `s3://${this.scriptBucket.bucketName}/temp/`, '--bronze_bucket_url': this.bronzeBucket.s3UrlForObject(''), '--silver_bucket_url': this.silverBucket.s3UrlForObject(''), '--database_name': this.glueDatabase.databaseName, '--table_name': this.bronzeBucket.bucketName, }, }); }
This is the glue job python script:
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job args = getResolvedOptions( sys.argv, ["JOB_NAME", "database_name", "table_name", "bronze_bucket_url", "silver_bucket_url"]) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args["JOB_NAME"], args) # Table name is created by the crawler and used as reference the source crawled # in this case a S3 bucket. The bucket name contains '-' and the crawler replaces them # with '_'. This is a workaround to replace the '-' with '_' in the table name. table_name = args["table_name"].replace("-", "_") # Script generated for node Data Catalog table DataCatalogtable_node = glueContext.create_dynamic_frame.from_catalog( database=args["database_name"], table_name=table_name, transformation_ctx="DataCatalogtable_node", ) # Script generated for node ApplyMapping ApplyMapping_node2 = ApplyMapping.apply( frame=DataCatalogtable_node, mappings=[ ("partition_1", "string", "tenant", "string"), ("partition_2", "string", "date", "string"), // In here you put the fields to add/remove the fields to be in the silver bucket. // ("column_name", "string"), ], transformation_ctx="ApplyMapping_node2", ) # Script generated for node S3 bucket silverBucket_node = glueContext.write_dynamic_frame.from_options( frame=ApplyMapping_node2, connection_type="s3", format="parquet", connection_options={ "path": args["silver_bucket_url"], "partitionKeys": ["tenant", "date"], }, transformation_ctx="silverBucket_node", ) job.commit()
At this point we have setup all needed for our ETL process. With the glue crawler the data is extracted and mapped into the glue database, then the glue job is transforming the data to be in the silver bucket. Next we need to create one schedule for the crawler and another for the job.
First we create a scheduled trigger to be executed every day at 4:00am, this setup is get from development.ts
config file
private createScheduledTrigger(crawlerToRun: CfnCrawler, scheduleExpression: string) { new CfnTrigger(this, 'GlueScheduledTrigger', { description: `Run ${crawlerToRun.name} on schedule`, name: `Run-${crawlerToRun.name}-on-schedule`, type: 'SCHEDULED', schedule: scheduleExpression, actions: [ { crawlerName: crawlerToRun.name, }, ], startOnCreation: true, }); }
Then we create the glue job to be executed after the crawler is successfully executed.
private createGlueTrigger(jobToRun: CfnJob, crawlerToWatch: CfnCrawler) { new CfnTrigger(this, 'GlueTrigger', { name: `Run-${jobToRun.name}-on-${crawlerToWatch.name}-success`, description: `Run ${jobToRun.name} on ${crawlerToWatch.name} success`, type: 'CONDITIONAL', predicate: { conditions: [ { logicalOperator: 'EQUALS', crawlerName: crawlerToWatch.name, crawlState: 'SUCCEEDED', }, ], }, actions: [ { jobName: jobToRun.name, }, ], startOnCreation: true, }); }
This is the whole stack code.
import { Database, IDatabase } from '@aws-cdk/aws-glue-alpha'; import { RemovalPolicy, Stack, StackProps } from 'aws-cdk-lib'; import { CfnCrawler, CfnJob, CfnTrigger } from 'aws-cdk-lib/aws-glue'; import { IRole, Role, ServicePrincipal } from 'aws-cdk-lib/aws-iam'; import { Bucket, IBucket } from 'aws-cdk-lib/aws-s3'; import { BucketDeployment, Source } from 'aws-cdk-lib/aws-s3-deployment'; import { StringParameter } from 'aws-cdk-lib/aws-ssm'; import { Construct } from 'constructs'; import { GlueLoadDataConfig } from '../shared'; export interface GlueLoadDataStackProps extends StackProps { glueConfig: GlueLoadDataConfig; } export class GlueLoadDataStack extends Stack { private readonly glueRole: IRole; private readonly bronzeBucket: IBucket; private readonly silverBucket: IBucket; private readonly scriptBucket: IBucket; private readonly glueDatabase: IDatabase; constructor(scope: Construct, id: string, props: GlueLoadDataStackProps) { super(scope, id, props); const { glueConfig } = props; this.glueDatabase = this.createGlueDatabase(glueConfig.glueDatabase.removalPolicy); this.bronzeBucket = this.getCoreBucket(glueConfig.bronzeBucketNameSSMPath, 'BronzeBucket'); this.silverBucket = this.getCoreBucket(glueConfig.silverBucketNameSSMPath, 'SilverBucket'); this.scriptBucket = this.createBucketAndDeployGlueJobAssets(glueConfig); this.glueRole = this.createGlueRole(); const crawler = this.createGlueCrawler(glueConfig); const job = this.createGlueJob(glueConfig); this.createScheduledTrigger(crawler, glueConfig.scheduleTriggerExpression); this.createGlueTrigger(job, crawler); } private getCoreBucket(ssmPath: string, bucketId: string): IBucket { const coreBucketName = StringParameter.fromStringParameterName(this, `${bucketId}NameSSMPath`, ssmPath); return Bucket.fromBucketName(this, bucketId, coreBucketName.stringValue); } private createBucketAndDeployGlueJobAssets(glueConfig: GlueLoadDataConfig): IBucket { const scriptBucket = new Bucket(this, 'GlueJobBucket', { ...glueConfig.glueJob.scriptBucket, }); new BucketDeployment(this, 'GlueJobBucketDeployment', { sources: [Source.asset('./glue-job-assets')], destinationBucket: scriptBucket, }); return scriptBucket; } private createGlueRole(): IRole { const glueRole = new Role(this, 'BronzeBucketCrawlerRole', { assumedBy: new ServicePrincipal('glue.amazonaws.com'), managedPolicies: [{ managedPolicyArn: 'arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole' }], }); this.bronzeBucket.grantRead(glueRole); this.scriptBucket.grantReadWrite(glueRole); this.silverBucket.grantReadWrite(glueRole); return glueRole; } private createGlueDatabase(removalPolicy: RemovalPolicy): IDatabase { const db = new Database(this, 'BronzeGlueDatabase'); db.applyRemovalPolicy(removalPolicy); return db; } private createGlueCrawler(glueConfig: GlueLoadDataConfig): CfnCrawler { return new CfnCrawler(this, 'BronzeBucketCrawler', { name: 'bronze-bucket-crawler', description: 'Crawler for Bronze Bucket', databaseName: this.glueDatabase.databaseName, role: this.glueRole.roleArn, targets: { s3Targets: [ { path: `s3://${this.bronzeBucket.bucketName}`, }, ], }, recrawlPolicy: { recrawlBehavior: 'CRAWL_NEW_FOLDERS_ONLY', }, schemaChangePolicy: { deleteBehavior: glueConfig.glueCrawler.schemaChangePolicy.deleteBehavior, updateBehavior: glueConfig.glueCrawler.schemaChangePolicy.updateBehavior, }, }); } private createGlueJob(glueConfig: GlueLoadDataConfig): CfnJob { const jobProperties = glueConfig.glueJob.job.properties; const defaultArguments = glueConfig.glueJob.job.defaultArguments; return new CfnJob(this, 'GlueJob', { name: 'move_data_to_silver_bucket', description: 'Move transformed data from bronze to silver bucket', command: { name: 'glueetl', scriptLocation: this.scriptBucket.s3UrlForObject('glue_etl_job.py'), pythonVersion: '3', }, role: this.glueRole.roleArn, maxRetries: jobProperties.maxRetries, workerType: jobProperties.workerType.name, numberOfWorkers: jobProperties.numberOfWorkers, timeout: jobProperties.timeout, glueVersion: jobProperties.glueVersion.name, executionClass: jobProperties.executionClass, defaultArguments: { '--enable-metrics': defaultArguments.enableMetrics, '--enable-job-insights': defaultArguments.enableJobInsights, '--enable-glue-datacatalog': defaultArguments.enableGlueDatacatalog, '--enable-continuous-cloudwatch-log': defaultArguments.enableContinuousCloudwatchLog, '--job-bookmark-option': defaultArguments.jobBookmarkOption, '--job-language': defaultArguments.jobLanguage, '--enable-auto-scaling': defaultArguments.enableAutoScaling, '--TempDir': `s3://${this.scriptBucket.bucketName}/temp/`, '--bronze_bucket_url': this.bronzeBucket.s3UrlForObject(''), '--silver_bucket_url': this.silverBucket.s3UrlForObject(''), '--database_name': this.glueDatabase.databaseName, '--table_name': this.bronzeBucket.bucketName, }, }); } private createGlueTrigger(jobToRun: CfnJob, crawlerToWatch: CfnCrawler) { new CfnTrigger(this, 'GlueTrigger', { name: `Run-${jobToRun.name}-on-${crawlerToWatch.name}-success`, description: `Run ${jobToRun.name} on ${crawlerToWatch.name} success`, type: 'CONDITIONAL', predicate: { conditions: [ { logicalOperator: 'EQUALS', crawlerName: crawlerToWatch.name, crawlState: 'SUCCEEDED', }, ], }, actions: [ { jobName: jobToRun.name, }, ], startOnCreation: true, }); } private createScheduledTrigger(crawlerToRun: CfnCrawler, scheduleExpression: string) { new CfnTrigger(this, 'GlueScheduledTrigger', { description: `Run ${crawlerToRun.name} on schedule`, name: `Run-${crawlerToRun.name}-on-schedule`, type: 'SCHEDULED', schedule: scheduleExpression, actions: [ { crawlerName: crawlerToRun.name, }, ], startOnCreation: true, }); } }
Lastly we have the unit test under test/glue-load-data-stack.test.ts
:
import { App } from 'aws-cdk-lib'; import { Template } from 'aws-cdk-lib/assertions'; import { GlueLoadDataStack } from '../src/glue/glue-load-data-stack'; import { getGlueConfig } from '../src/shared'; describe('Glue Data Stack', () => { let app: App; let stack: GlueLoadDataStack; let template: Template; const awsAccouuntTestId = '123456789012'; beforeEach(() => { app = new App(); stack = new GlueLoadDataStack(app, 'GlueLoadDataStackTest', { env: { account: awsAccouuntTestId, region: 'us-test-1', }, tags: { CostAllocation: 'alfred-test', }, description: 'Use Glue to load data from s3 bronze bucket filled by ec2 instance.', glueConfig: getGlueConfig(), }); template = Template.fromStack(stack); }); it('should generate a snapshot', () => { expect(template).toMatchSnapshot(); }); it('should have a Glue Database', () => { template.hasResource('AWS::Glue::Database', { DeletionPolicy: 'Delete', UpdateReplacePolicy: 'Delete', Properties: { CatalogId: awsAccouuntTestId, }, }); }); it('should have a Glue Crawler', () => { template.hasResource('AWS::Glue::Crawler', { Properties: { Name: 'bronze-bucket-crawler', RecrawlPolicy: { RecrawlBehavior: 'CRAWL_NEW_FOLDERS_ONLY', }, SchemaChangePolicy: { DeleteBehavior: 'LOG', UpdateBehavior: 'LOG', }, Targets: { S3Targets: [ { Path: { 'Fn::Join': [ '', [ 's3://', { Ref: 'BronzeBucketNameSSMPathParameter', }, ], ], }, }, ], }, }, }); }); it('should have a Glue Job', () => { template.hasResourceProperties('AWS::Glue::Job', { Command: { Name: 'glueetl', PythonVersion: '3', }, DefaultArguments: { '--bronze_bucket_url': { 'Fn::Join': [ '', [ 's3://', { Ref: 'BronzeBucketNameSSMPathParameter', }, '/', ], ], }, '--enable-auto-scaling': 'true', '--enable-continuous-cloudwatch-log': 'true', '--enable-glue-datacatalog': 'true', '--enable-job-insights': 'false', '--enable-metrics': 'true', '--job-bookmark-option': 'job-bookmark-enable', '--job-language': 'python', '--silver_bucket_url': { 'Fn::Join': [ '', [ 's3://', { Ref: 'SilverBucketNameSSMPathParameter', }, '/', ], ], }, '--table_name': { Ref: 'BronzeBucketNameSSMPathParameter', }, }, ExecutionClass: 'STANDARD', GlueVersion: '4.0', MaxRetries: 0, Name: 'move_data_to_silver_bucket', NumberOfWorkers: 5, Timeout: 10, WorkerType: 'G.1X', }); }); it('should have a Glue Job Role and one bucket', () => { template.hasResourceProperties('AWS::IAM::Role', { AssumeRolePolicyDocument: { Statement: [ { Action: 'sts:AssumeRole', Effect: 'Allow', Principal: { Service: 'glue.amazonaws.com', }, }, ], }, ManagedPolicyArns: ['arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole'], }); template.resourceCountIs('AWS::S3::Bucket', 1); }); it('should have two glue triggers', () => { template.resourceCountIs('AWS::Glue::Trigger', 2); template.hasResourceProperties('AWS::Glue::Trigger', { Actions: [ { CrawlerName: 'bronze-bucket-crawler', }, ], Type: 'SCHEDULED', Schedule: 'cron(0 14 * * ? *)', StartOnCreation: true, }); template.hasResourceProperties('AWS::Glue::Trigger', { Name: 'Run-move_data_to_silver_bucket-on-bronze-bucket-crawler-success', Actions: [ { JobName: 'move_data_to_silver_bucket', }, ], Type: 'CONDITIONAL', StartOnCreation: true, Predicate: { Conditions: [ { CrawlState: 'SUCCEEDED', CrawlerName: 'bronze-bucket-crawler', LogicalOperator: 'EQUALS', }, ], }, }); }); });