tfplan = import “tfplan”
Functions
Find all resources of a specific type from all modules using the tfplan import
find_resources_from_plan = func(type) {
resources = {}
Iterate over all modules in the tfplan import
for tfplan.module_paths as path {
# Iterate over the named resources of desired type in the module
for tfplan.module(path).resources[type] else {} as name, instances {
# Iterate over resource instances
for instances as index, r {
# Get the address of the instance
if length(path) == 0 {
# root module
address = type + “.” + name + “[” + string(index) + “]”
} else {
# non-root module
address = “module.” + strings.join(path, “.module.”) + “.” +
type + “.” + name + “[” + string(index) + “]”
}
# Add the instance to resources map, setting the key to the address
resources[address] = r
}
}
}
return resources
}
Validate that all instances of specified type have a specified top-level
attribute that contains all members of a given list
validate_attribute_contains_list = func(type, attribute, required_values) {
validated = true
Get all resource instances of the specified type
resource_instances = find_resources_from_plan(type)
Loop through the resource instances
for resource_instances as address, r {
# Skip resource instances that are being destroyed
# to avoid unnecessary policy violations.
# Used to be: if length(r.diff) == 0
if r.destroy and not r.requires_new {
print(“Skipping resource”, address, “that is being destroyed.”)
continue
}
# Determine if the attribute is computed
# We check "attribute.%" and "attribute.#" because an
# attribute of type map or list won't show up in the diff
if (r.diff[attribute + ".%"].computed else false) or
(r.diff[attribute + ".#"].computed else false) {
print("Resource", address, "has attribute", attribute,
"that is computed.")
# If you want computed values to cause the policy to fail,
# uncomment the next line.
# validated = false
} else {
# Validate that the attribute is a map
# but first check if r.applied[attribute] exists
if r.applied[attribute] else "" is not "" and
types.type_of(r.applied[attribute]) is "map" {
# Check if "managed-by" key exists and has the value "Terraform"
if r.applied[attribute]["managed-by"] != "Terraform" {
print("Resource", address, "does not have the required 'managed-by' tag set to 'Terraform'")
validated = false
}
} else {
print("Resource", address, "is missing attribute", attribute,
"or it is not a map")
validated = false
} # end check that attribute is map
} # end computed check
} # end resource instances
return validated
}
Rules
Call the validation function for all AWS resources
tags_validated = validate_attribute_contains_list(“aws_”, “tags”, [ “managed-by” ])
Main rule that evaluates results
main = rule {
tags_validated
}