I’m trying to create a DynamoDB table that has the stores data transformed by a Lambda Function processing data from Kinesis Firehose. This data will be stored in a S3 Bucket. I’d like to use DynamoDB Table Streams to aggregate the data stored in DynamoDB.
I know that the stream_enabled = true
in the DynamoDB table resource should be enough, but I’m not sure how to create the trigger for the DynamoDB Table Stream. I can currently upload the Lambda Function, but I don’t know how to create that “trigger” to invoke the Lambda Function.
# dynamoDB/main.tf
resource "aws_dynamodb_table" "table" {
name = var.table_name
billing_mode = "PROVISIONED"
read_capacity = var.read_capacity
write_capacity = var.write_capacity
hash_key = "PK"
range_key = "SK"
stream_view_type = "NEW_IMAGE"
stream_enabled = true
# Partition key and sort key of the entire table
attribute {
name = "PK"
type = "S"
}
attribute {
name = "SK"
type = "S"
}
# Partition key and sort key of the first Global Secondary Index
attribute {
name = "GSI1PK"
type = "S"
}
attribute {
name = "GSI1SK"
type = "S"
}
global_secondary_index {
name = "GSI1"
hash_key = "GSI1PK"
range_key = "GSI1SK"
write_capacity = var.gsi1_write_capacity
read_capacity = var.gsi1_read_capacity
projection_type = "ALL"
}
# Partition key and sort key of the second Global Secondary Index
attribute {
name = "GSI2PK"
type = "S"
}
attribute {
name = "GSI2SK"
type = "S"
}
global_secondary_index {
name = "GSI2"
hash_key = "GSI2PK"
range_key = "GSI2SK"
write_capacity = var.gsi2_write_capacity
read_capacity = var.gsi2_read_capacity
projection_type = "ALL"
}
# Store the DynamoDB items for the last 35 days.
point_in_time_recovery {
enabled = true
}
# The tags related to the table.
tags = {
Project = "Blog"
Stage = var.stage
Developer = var.developer
}
}
# Analytics/main.tf
# Create an S3 Bucket to store the kinesis data
resource "aws_s3_bucket" "bucket" {
bucket = "blog-analytics-${var.stage}"
acl = "private"
tags = {
Project = "Blog"
Stage = var.stage
Developer = var.developer
}
}
# Create a Lambda function to process the kinesis stream
data "aws_iam_policy_document" "lambda_policy_doc" {
statement {
effect = "Allow"
actions = [
"dynamodb:PutItem",
"dynamodb:DeleteItem",
"dynamodb:Scan",
"dynamodb:Query",
"dynamodb:UpdateItem",
"logs:CreateLogGroup",
"logs:PutLogEvents",
"logs:CreateLogStream"
]
resources = [
var.table_arn,
"arn:aws:logs:*"
]
sid = "codecommitid"
}
}
data "archive_file" "getBlog" {
type = "zip"
source_file = "${var.path}/${var.file_name}.js"
output_path = "${var.path}/${var.file_name}.zip"
}
resource "aws_iam_role" "lambda_role" {
name = "kinesis_process_role"
assume_role_policy = <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Action": "sts:AssumeRole",
"Principal": {
"Service": "lambda.amazonaws.com"
},
"Effect": "Allow"
}
]
}
EOF
}
resource "aws_iam_role_policy" "lambda_policy" {
policy = data.aws_iam_policy_document.lambda_policy_doc.json
role = aws_iam_role.lambda_role.id
}
resource "aws_lambda_function" "kinesis_processor" {
filename = "${var.path}/${var.file_name}.zip"
function_name = "firehose-lambda-processor-${var.stage}"
role = aws_iam_role.lambda_role.arn
handler = "${var.file_name}.handler"
runtime = "nodejs12.x"
# Attatch the Lambda Layer
layers = [ var.layer_arn ]
}
resource "aws_lambda_permission" "blog" {
action = "lambda:InvokeFunction"
function_name = aws_lambda_function.kinesis_processor.function_name
principal = "firehose.amazonaws.com"
}
# Create the Firehose stream
resource "aws_iam_role" "kinesis_role" {
name = "kinesis_role"
assume_role_policy = <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Action": "sts:AssumeRole",
"Principal": {
"Service": "firehose.amazonaws.com"
},
"Effect": "Allow"
}
]
}
EOF
}
resource "aws_kinesis_firehose_delivery_stream" "extended_s3_stream" {
name = "tylernorlund-blog-analytics"
destination = "extended_s3"
extended_s3_configuration {
role_arn = aws_iam_role.kinesis_role.arn
bucket_arn = aws_s3_bucket.bucket.arn
processing_configuration {
enabled = "true"
processors {
type = "Lambda"
parameters {
parameter_name = "LambdaArn"
parameter_value = "${aws_lambda_function.kinesis_processor.arn}:$LATEST"
}
}
}
}
}