How to create AWS Firehose to iceberg destination with multiple tables routing (inline parsing)

Guys, I am writing a terraform code to create a firehose delivery stream that will write data into iceberg tables. I don’t find anywhere on hashicorp docs/internet an example of routing enabling (inline parsing), represented on aws console UI on section Inline parsing for routing information, enabled via a checkbox, then defining 3 jq expressions. can you please help me on this? many thanks

Hello, friend.

Try to use this example above. You need to config the processing_configuration block:

processing_configuration {
  enabled = true
  
  # JQ processor example
  processors {
    type = "MetadataExtraction"
    parameters {
      parameter_name  = "JsonParsingEngine"
      parameter_value = "JQ-1.6"
    }
    parameters {
      parameter_name  = "MetadataExtractionQuery"
      parameter_value = "{destinationDatabaseName: .type.table_type, destinationTableName: .type.table_name, operation: \"insert\"}"
    }
  }
2 Likes

thanks for your reply. i am gonna try this asap and come back here to confirm it works well

1 Like

Hello! i am back here to say that it worked, but I need to concatenate the value .type.table_name with ‘_’, I am struggling to the this. i tried the follow:

"" + .type.table_name
"
" + .type.table_name
("_" + .type.table_name)
and some other variations.
in all cases, I got a ‘MetaDataExtraction JQ Query must be a valid JSON’.
any idea how to do this? thanks

Thanks a lot for this post man. There is not other resource on internet where this is given. My issue was that I wanted to do something similar using Cloudformation template. Your terraform script was good reference while developing a similar cloudformation template. And here is my cloudformation template for other people to use.

  CleanIcebergKinesisFirehose:
    Type: AWS::KinesisFirehose::DeliveryStream
    Properties:
      DeliveryStreamName: test-iceberg-delivery-stream
      IcebergDestinationConfiguration:
        AppendOnly: true
        BufferingHints:
          IntervalInSeconds: 20
          SizeInMBs: 5
        CatalogConfiguration:
          CatalogArn: <Specify ARC>
        CloudWatchLoggingOptions:
          Enabled: true
          LogGroupName: !Ref IcebergKinesisFirehoseLogGroup
          LogStreamName: !Ref IcebergKinesisFirehoseLogStream
        ProcessingConfiguration:
          Enabled: true
          Processors:
            - Parameters: 
                - ParameterName: "JsonParsingEngine"
                  ParameterValue: "JQ-1.6"
                - ParameterName: "MetadataExtractionQuery"
                  ParameterValue: "{destinationDatabaseName: \"my_db\", destinationTableName: .tableinfo.name, operation: \"insert\"}"                
              Type: MetadataExtraction 
        RetryOptions:
          DurationInSeconds: 30
        RoleARN: <Specify ARN>
        s3BackupMode: FailedDataOnly
        S3Configuration:
          BucketARN: <Specify ARN>
          ErrorOutputPrefix: "events/backup/"
          CompressionFormat: UNCOMPRESSED
          RoleARN: <Specify ARN>
1 Like

Hello Bruno. I apologize for the delay in seeing your message.
table_name is a key for Json Parsing Engine. It means that when you send the mensage to Firehose, it must contains that key and the value is your table name.

For example: ```
{destinationDatabaseName: .type.table_type, destinationTableName: .type.table_name, operation: "insert"}

It means that you need to send a payload with this format:

{
     "type": {
        "table_type": <your_database_name>,
        "table_name": <your_table_name>,
    },
     "key_1": "value_1",
     "key_2": "value_2",
     ...
}

PS: Your table metadata must contains the column named as “type” with map<string,string> to work.