Aws - Kinesis Data Firehose Delivery Stream

About

Amazon Kinesis Data Firehose is a simple service for delivering real-time streaming data to destinations. It is part of the Kinesis streaming data platform

Delivery streams load data, automatically and continuously, to the destinations that you specify.

Example

Architecture

Basic Architecture with:

Aws User Click Event Processing Architecture

Features

Stream management

  • auto-scaling (automatically provisions, manages and scales compute, memory, and network resources required)
  • monitoring (metrics through the console)
  • data can be loaded
    • by batch (interval)
    • compressed,
    • encrypted
  • Pay-as-you-go pricing

Data transformation

Kinesis Firehose enables all of the records received by the stream to be automatically delivered to a serverless function created with AWS Lambda

  • pre-built Lambda blueprints for converting common data sources such as Apache logs and system logs to JSON and CSV formats.
  • or write your own custom functions

Firehose Record Processes

More:

Lambda

To return records from AWS Lambda to Kinesis Firehose after transformation, the Lambda function you invoke must be compliant with the required record transformation output model.

Record format conversion

Relational Data Type (such as Apache parquet or Apache ORC format) is typically more efficient to query than JSON.

Kinesis Data Firehose can convert:

  • JSON-formatted source records using a schema from a table defined in AWS Glue.
  • others records format by going through a Lambda function that converts them to JSON

Built-In Data Format Conversion -

Destination

S3 prefix

By default, Kinesis Data Firehose appends the prefix “YYYY/MM/DD/HH” (in UTC) to the data it delivers to Amazon S3. You can override this default by specifying a custom prefix that includes expressions that are evaluated at runtime. https://docs.aws.amazon.com/console/firehose/custom-s3-keys-prefix

ie a file will have the following path by default

bucket_name/YYYY/MM/DD/HH/streamName-index-YYY-MM-DD-HH-MM-SS-hash
::example:
ng-test-firehose/2019/04/13/18/test-1-2019-04-13-18-47-14-b7952580-9c0b-4508-abde-3001df436985

S3 error prefix

You can specify an S3 bucket prefix to be used in error conditions. This prefix can include expressions for Kinesis Data Firehose to evaluate at runtime https://docs.aws.amazon.com/console/firehose/expression-rules

S3 buffer

Firehose buffers incoming records before delivering them to your S3 bucket. Record delivery will be triggered once either of these conditions has been satisfied https://docs.aws.amazon.com/console/firehose/configure-buffer

  • Buffer size: buffer size between 1-128 MB
  • Buffer interval: buffer interval between 60-900 seconds

S3 compression and encryption

Firehose can compress records before delivering them to your S3 bucket. Compressed records can also be encrypted in the S3 bucket using a KMS master key. https://docs.aws.amazon.com/console/firehose/configure-compression-encryption

  • S3 compression: GZIP, Snappy, Zip
  • S3 encryption

Data Structure

A Request mapping template can be created in API Gateway to define the request payload structure that will restrict requests to an expected structure and then transform those well-formed requests into the structure that the Kinesis Firehose PutRecord API requires.

IAM Role

  • Kinesis Firehose requires a service role that allows it to deliver received records as events to:
    • the created Lambda function
    • as well as the processed records to the destination S3 bucket.
  • The Amazon API Gateway API also requires a new role that permits the API to invoke the PutRecord API within Kinesis Firehose for each received API request.

Firehose uses an IAM role to access your specified resources, such as the S3 bucket and KMS key.

Doc:

document policy where

  • REPLACE_ME_ACCOUNT_ID is the account id
  • REPLACE_ME_STREAM_NAME is the firehose stream name
  • REPLACE_ME_BUCKET_NAME is the s3 destination bucket name
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "",
      "Effect": "Allow",
      "Action": [
        "glue:GetTableVersions"
      ],
      "Resource": "*"
    },
    {
      "Sid": "",
      "Effect": "Allow",
      "Action": [
        "s3:AbortMultipartUpload",
        "s3:GetBucketLocation",
        "s3:GetObject",
        "s3:ListBucket",
        "s3:ListBucketMultipartUploads",
        "s3:PutObject"
      ],
      "Resource": [
        "arn:aws:s3:::REPLACE_ME_FIREHOSE_BUCKET_NAME",
        "arn:aws:s3:::REPLACE_ME_FIREHOSE_BUCKET_NAME/*",
        "arn:aws:s3:::%FIREHOSE_BUCKET_NAME%",
        "arn:aws:s3:::%FIREHOSE_BUCKET_NAME%/*"
      ]
    },
    {
      "Sid": "",
      "Effect": "Allow",
      "Action": [
        "lambda:InvokeFunction",
        "lambda:GetFunctionConfiguration"
      ],
      "Resource": "arn:aws:lambda:eu-central-1:REPLACE_ME_ACCOUNT_ID:function:%FIREHOSE_DEFAULT_FUNCTION%:%FIREHOSE_DEFAULT_VERSION%"
    },
    {
      "Sid": "",
      "Effect": "Allow",
      "Action": [
        "logs:PutLogEvents"
      ],
      "Resource": [
        "arn:aws:logs:eu-central-1:REPLACE_ME_ACCOUNT_ID:log-group:/aws/kinesisfirehose/REPLACE_ME_FIREHOSE_STREAM_NAME:log-stream:*"
      ]
    },
    {
      "Sid": "",
      "Effect": "Allow",
      "Action": [
        "kinesis:DescribeStream",
        "kinesis:GetShardIterator",
        "kinesis:GetRecords"
      ],
      "Resource": "arn:aws:kinesis:eu-central-1:REPLACE_ME_ACCOUNT_ID:stream/%FIREHOSE_STREAM_NAME%"
    },
    {
      "Effect": "Allow",
      "Action": [
        "kms:Decrypt"
      ],
      "Resource": [
        "arn:aws:kms:eu-central-1:REPLACE_ME_ACCOUNT_ID:key/%SSE_KEY_ID%"
      ],
      "Condition": {
        "StringEquals": {
          "kms:ViaService": "kinesis.%REGION_NAME%.amazonaws.com"
        },
        "StringLike": {
          "kms:EncryptionContext:aws:kinesis:arn": "arn:aws:kinesis:%REGION_NAME%:REPLACE_ME_ACCOUNT_ID:stream/%FIREHOSE_STREAM_NAME%"
        }
      }
    }
  ]
}

Record

A record can be as large as 1,000 KB.

Buffer size and buffer interval

Kinesis Data Firehose buffers incoming streaming data to a certain size or for a certain period of time before delivering it to destinations. Buffer Size is in MBs and Buffer Interval is in seconds.

Name

Acceptable characters are uppercase and lowercase letters, numbers, underscores, hyphens, and periods.

Management

Create

Amazon Kinesis Data Firehose provides a simple way to capture, transform, and load streaming data with just a few clicks in the AWS Management Console.

You can simply:

  • create a Firehose delivery stream,
  • select the destinations,
  • and you can start sending real-time data from hundreds of thousands of data sources simultaneously.

Interface:

Cloudformation Template

Doc ref

Update

Interface:

Your Kinesis Data Firehose delivery stream remains in the ACTIVE state while your configuration is updated, and you can continue to send data. The updated configuration normally takes effect within a few minutes. The version number of a Kinesis Data Firehose delivery stream is increased by a value of 1 after you update the configuration. It is reflected in the delivered Amazon S3 object name

Send Record

send source records using:

  • the Firehose PUT API
  • or the Amazon Kinesis Agent.

API / SDK

Private

The Kinesis Data Firehose API offers two operations for sending data to your delivery stream:

  • PutRecord
  • and PutRecordBatch.

(for instance to put user click event records)

More:

Public

Public RESTful endpoint : An AWS Service Proxy (via Amazon API Gateway) to the PutRecord API of Kinesis Firehose should be created to not require authenticated requests. This way the website don't have to directly integrate with the Kinesis Firehose PutRecord API and AWS credentials to authorize those API requests.

Agent

The Amazon Kinesis Agent is a stand-alone Java software application that offers an easy way to collect and send source records to Firehose.

The agent continuously monitors a set of files and sends new data to your Kinesis Data Firehose delivery stream. The agent handles file rotation, checkpointing, and retry upon failures. It delivers all of your data in a reliable, timely, and simple manner. It also emits Amazon CloudWatch metrics to help you better monitor and troubleshoot the streaming process.

AWS IoT

Create AWS IoT rules that send data from MQTT messages. Learn more

CloudWatch Logs

From Aws - Cloudwatch Logs, Use subscription filters to deliver a real-time stream of log events. Learn more

CloudWatch Events

from Aws - CloudWatch Events: Create rules to indicate which events are of interest to your application and what automated action to take when a rule matches an event. More

Version

The version number of a Kinesis Data Firehose delivery stream is increased by a value of 1 after you update the configuration. It is reflected in the delivered Amazon S3 object_name.

Object name

https://docs.aws.amazon.com/firehose/latest/dev/basic-deliver.html#s3-object-name

Error logging

Firehose can log record delivery errors to CloudWatch Logs. If enabled, a CloudWatch log group and corresponding log streams are created on your behalf. https://docs.aws.amazon.com/console/firehose/monitor-with-cloudwatch

Documentation / Reference

Task Runner