Guys, I am writing a terraform code to create a firehose delivery stream that will write data into iceberg tables. I don’t find anywhere on hashicorp docs/internet an example of routing enabling (inline parsing), represented on aws console UI on section Inline parsing for routing information, enabled via a checkbox, then defining 3 jq expressions. can you please help me on this? many thanks
Hello, friend.
Try to use this example above. You need to config the processing_configuration block:
processing_configuration {
enabled = true
# JQ processor example
processors {
type = "MetadataExtraction"
parameters {
parameter_name = "JsonParsingEngine"
parameter_value = "JQ-1.6"
}
parameters {
parameter_name = "MetadataExtractionQuery"
parameter_value = "{destinationDatabaseName: .type.table_type, destinationTableName: .type.table_name, operation: \"insert\"}"
}
}
thanks for your reply. i am gonna try this asap and come back here to confirm it works well
Hello! i am back here to say that it worked, but I need to concatenate the value .type.table_name with ‘_’, I am struggling to the this. i tried the follow:
"" + .type.table_name
"" + .type.table_name
("_" + .type.table_name)
and some other variations.
in all cases, I got a ‘MetaDataExtraction JQ Query must be a valid JSON’.
any idea how to do this? thanks
Thanks a lot for this post man. There is not other resource on internet where this is given. My issue was that I wanted to do something similar using Cloudformation template. Your terraform script was good reference while developing a similar cloudformation template. And here is my cloudformation template for other people to use.
CleanIcebergKinesisFirehose:
Type: AWS::KinesisFirehose::DeliveryStream
Properties:
DeliveryStreamName: test-iceberg-delivery-stream
IcebergDestinationConfiguration:
AppendOnly: true
BufferingHints:
IntervalInSeconds: 20
SizeInMBs: 5
CatalogConfiguration:
CatalogArn: <Specify ARC>
CloudWatchLoggingOptions:
Enabled: true
LogGroupName: !Ref IcebergKinesisFirehoseLogGroup
LogStreamName: !Ref IcebergKinesisFirehoseLogStream
ProcessingConfiguration:
Enabled: true
Processors:
- Parameters:
- ParameterName: "JsonParsingEngine"
ParameterValue: "JQ-1.6"
- ParameterName: "MetadataExtractionQuery"
ParameterValue: "{destinationDatabaseName: \"my_db\", destinationTableName: .tableinfo.name, operation: \"insert\"}"
Type: MetadataExtraction
RetryOptions:
DurationInSeconds: 30
RoleARN: <Specify ARN>
s3BackupMode: FailedDataOnly
S3Configuration:
BucketARN: <Specify ARN>
ErrorOutputPrefix: "events/backup/"
CompressionFormat: UNCOMPRESSED
RoleARN: <Specify ARN>
Hello Bruno. I apologize for the delay in seeing your message.
table_name is a key for Json Parsing Engine. It means that when you send the mensage to Firehose, it must contains that key and the value is your table name.
For example: ```
{destinationDatabaseName: .type.table_type, destinationTableName: .type.table_name, operation: "insert"}
It means that you need to send a payload with this format:
{
"type": {
"table_type": <your_database_name>,
"table_name": <your_table_name>,
},
"key_1": "value_1",
"key_2": "value_2",
...
}
PS: Your table metadata must contains the column named as “type” with map<string,string> to work.