DynamoDB Table Stream

I’m trying to create a DynamoDB table that has the stores data transformed by a Lambda Function processing data from Kinesis Firehose. This data will be stored in a S3 Bucket. I’d like to use DynamoDB Table Streams to aggregate the data stored in DynamoDB.

I know that the stream_enabled = true in the DynamoDB table resource should be enough, but I’m not sure how to create the trigger for the DynamoDB Table Stream. I can currently upload the Lambda Function, but I don’t know how to create that “trigger” to invoke the Lambda Function.

# dynamoDB/main.tf
resource "aws_dynamodb_table" "table" {
  name             = var.table_name
  billing_mode     = "PROVISIONED"
  read_capacity    = var.read_capacity
  write_capacity   = var.write_capacity
  hash_key         = "PK"
  range_key        = "SK"
  stream_view_type = "NEW_IMAGE"
  stream_enabled   = true

  # Partition key and sort key of the entire table
  attribute {
    name = "PK"
    type = "S"
  }
  attribute {
    name = "SK"
    type = "S"
  }

  # Partition key and sort key of the first Global Secondary Index
  attribute {
    name = "GSI1PK"
    type = "S"
  }
  attribute {
    name = "GSI1SK"
    type = "S"
  }
  global_secondary_index {
    name               = "GSI1"
    hash_key           = "GSI1PK"
    range_key          = "GSI1SK"
    write_capacity     = var.gsi1_write_capacity
    read_capacity      = var.gsi1_read_capacity
    projection_type    = "ALL"
  }

  # Partition key and sort key of the second Global Secondary Index
  attribute {
    name = "GSI2PK"
    type = "S"
  }
  attribute {
    name = "GSI2SK"
    type = "S"
  }
  global_secondary_index {
    name               = "GSI2"
    hash_key           = "GSI2PK"
    range_key          = "GSI2SK"
    write_capacity     = var.gsi2_write_capacity
    read_capacity      = var.gsi2_read_capacity
    projection_type    = "ALL"
  }

  # Store the DynamoDB items for the last 35 days.
  point_in_time_recovery {
    enabled = true
  }

  # The tags related to the table.
  tags = {
    Project   = "Blog"
    Stage     = var.stage
    Developer = var.developer
  }
}
# Analytics/main.tf
# Create an S3 Bucket to store the kinesis data
resource "aws_s3_bucket" "bucket" {
  bucket = "blog-analytics-${var.stage}"
  acl    = "private"
  tags = {
    Project   = "Blog"
    Stage     = var.stage
    Developer = var.developer
  }
}

# Create a Lambda function to process the kinesis stream
data "aws_iam_policy_document" "lambda_policy_doc" {
  statement {
    effect = "Allow"
    actions = [
      "dynamodb:PutItem",
      "dynamodb:DeleteItem",
      "dynamodb:Scan",
      "dynamodb:Query",
      "dynamodb:UpdateItem",
      "logs:CreateLogGroup",
      "logs:PutLogEvents",
      "logs:CreateLogStream"
    ]
    resources = [ 
      var.table_arn,
      "arn:aws:logs:*" 
    ]
    sid = "codecommitid"
  }
}
data "archive_file" "getBlog" {
  type = "zip"
  source_file = "${var.path}/${var.file_name}.js"
  output_path = "${var.path}/${var.file_name}.zip"
}
resource "aws_iam_role" "lambda_role" {
   name = "kinesis_process_role"

   assume_role_policy = <<EOF
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Action": "sts:AssumeRole",
      "Principal": {
        "Service": "lambda.amazonaws.com"
      },
      "Effect": "Allow"
    }
  ]
}
EOF
}
resource "aws_iam_role_policy" "lambda_policy" {
  policy = data.aws_iam_policy_document.lambda_policy_doc.json
  role   = aws_iam_role.lambda_role.id
}
resource "aws_lambda_function" "kinesis_processor" {
  filename      = "${var.path}/${var.file_name}.zip"
  function_name = "firehose-lambda-processor-${var.stage}"
  role          = aws_iam_role.lambda_role.arn
  handler       = "${var.file_name}.handler"
  runtime       = "nodejs12.x"
  # Attatch the Lambda Layer 
  layers        = [ var.layer_arn ]
}
resource "aws_lambda_permission" "blog" {
  action        = "lambda:InvokeFunction"
  function_name = aws_lambda_function.kinesis_processor.function_name
  principal     = "firehose.amazonaws.com"
}

# Create the Firehose stream
resource "aws_iam_role" "kinesis_role" {
   name = "kinesis_role"
   assume_role_policy = <<EOF
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Action": "sts:AssumeRole",
      "Principal": {
        "Service": "firehose.amazonaws.com"
      },
      "Effect": "Allow"
    }
  ]
}
EOF
}
resource "aws_kinesis_firehose_delivery_stream" "extended_s3_stream" {
  name        = "tylernorlund-blog-analytics"
  destination = "extended_s3"

  extended_s3_configuration {
    role_arn   = aws_iam_role.kinesis_role.arn
    bucket_arn = aws_s3_bucket.bucket.arn

    processing_configuration {
      enabled = "true"
      processors {
        type = "Lambda"
        parameters {
          parameter_name  = "LambdaArn"
          parameter_value = "${aws_lambda_function.kinesis_processor.arn}:$LATEST"
        }
      }
    }
  }
}

Hi @tnorlund ,
I guess you already came across this resource.
Did you test that?

https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/lambda_event_source_mapping

1 Like

It worked great. Thanks for the help!