Search This Blog

Tuesday, 27 May 2025

Processing Real-Time Streams with Amazon Kinesis

 

Processing Real-Time Streams with Amazon Kinesis

Introduction

In today's data-driven world, the ability to process and analyze data in real time is crucial for businesses to make timely decisions and gain competitive advantages. Amazon Kinesis is a suite of services offered by Amazon Web Services (AWS) that makes it easy to collect, process, and analyze real-time streaming data so you can get timely insights and react quickly to new information.

What is Amazon Kinesis?

Amazon Kinesis is a scalable, fully managed service for real-time data streaming and analytics. It allows you to ingest massive volumes of data from hundreds of thousands of sources, process it in real time, and deliver it to various destinations for further analysis and storage.

Key Components

  • Kinesis Data Streams: Lets you create your own applications to handle data that’s coming in continuously (like a live stream). Imagine you have a website and you want to track every click in real-time. Kinesis Data Streams can collect all those click events so your application can process and analyze them as they happen.
  • Kinesis Data Firehose: Makes it easy to take the streaming data and send it directly to storage or analysis tools without much setup. Using the same website, instead of building a custom app to handle clicks, Kinesis Data Firehose can automatically send all the click data to Amazon S3 (a storage service) or Amazon Redshift (a data warehouse) for you to store and analyze later.
  • Kinesis Data Analytics: Lets you analyze the streaming data in real-time using regular SQL queries, which are like the commands you use to manage databases.  If you want to monitor your website clicks and get instant reports on how many clicks you’re getting each minute, Kinesis Data Analytics can run SQL queries on the incoming data to provide those real-time insights.
  • Kinesis Video Streams: Allows you to securely send video from devices (like cameras) to AWS for things like analysis, machine learning, or storing the videos. If you have security cameras at different locations, Kinesis Video Streams can stream the video footage to AWS where you can analyze it for any unusual activity or use it to train a machine learning model to recognize specific events.

Why is Kinesis Important in Cloud Computing?

  • Real-Time Processing: Processes data as it arrives, enabling immediate insights.
  • Scalability: Automatically scales to match the throughput of your data.
  • Cost-Effective: Pay only for the resources you use without upfront commitments.
  • Integration: Seamlessly integrates with other AWS services for storage, processing, and analytics.
  • Reliability: Provides durable storage of streaming data and ensures data is not lost.

Real-World Use Cases

  • Application Monitoring: Real-time analysis of logs and metrics to monitor application health.
  • E-commerce: Tracking user behavior in real time to provide personalized experiences.
  • IoT Devices: Processing sensor data from devices for immediate action or storage.
  • Financial Transactions: Monitoring transactions in real time to detect fraud.
  • Social Media Analytics: Analyzing social media feeds to gauge public sentiment.

Hands-on Lab: Building a Real-Time Data Processing Pipeline with Amazon Kinesis

Scenario Overview

Imagine you're a data engineer at a company that needs to process real-time clickstream data from a website to monitor user behavior and detect anomalies. You need to build a solution that can ingest, process, and store this streaming data for real-time analytics.

What You'll Learn

  • How to set up an Amazon Kinesis Data Stream to ingest real-time data.
  • How to use Amazon Kinesis Data Analytics to process and analyze the streaming data.
  • How to deliver processed data to an Amazon S3 bucket using Kinesis Data Firehose.
  • How to test the entire pipeline by generating sample data.
  • How to monitor and scale the solution.

Prerequisites

  • An AWS account with administrative access.
  • AWS Command Line Interface (CLI) installed and configured.
  • Python 3.x installed on your local machine.
  • Familiarity with basic Python programming.

Step 1: Set Up Your Development Environment

1.1 Install and Configure AWS CLI

Why? The AWS CLI allows you to interact with AWS services from your terminal, which is essential for managing Kinesis resources.

curl "https://awscli.amazonaws.com/AWSCLIV2.pkg" -o "AWSCLIV2.pkg"

sudo installer -pkg AWSCLIV2.pkg -target /

    • Linux:

curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip"

unzip awscliv2.zip

sudo ./aws/install

  • Configure AWS CLI:

aws configure

Enter your Access Key ID, Secret Access Key, Default region name (e.g., us-east-1), and Default output format (json).

1.2 Install Python and Boto3

Why? We'll use Python scripts to generate sample data and interact with AWS services programmatically.

  • Verify Python Installation:

python3 --version

If not installed, download Python from the official website.

  • Install Boto3 (AWS SDK for Python):

pip install boto3


Step 2: Create an Amazon Kinesis Data Stream

2.1 Log into AWS Management Console

2.2 Navigate to Amazon Kinesis

  • In the AWS Management Console, type Kinesis in the search bar at the top and select Kinesis.

2.3 Create a Data Stream

Why? A Kinesis Data Stream is required to ingest real-time streaming data.

  1.  Choose "Kinesis Data Streams" in the Get started section.
  2. Click "Create data stream".
  3. Click “Provisioned”
  4. Data Stream name: Enter ClickstreamDataStream.
  5. Provisioned shards: Leave as 1 (for this lab).
    • Note: A shard is a unit of capacity. Each shard can ingest up to 1 MB/sec or 1,000 records/sec.
  6. Click "Create data stream".

Wait until the status of the data stream changes to "Active".

 

 

 

Step 3: Set Up Kinesis Data Firehose to Deliver Data to S3

3.1 Create an S3 Bucket

Why? We need a storage destination for our processed data.

  1. In the AWS Management Console, navigate to S3 under Storage.
  2. Click "Create bucket".
  3. Bucket name: Must be globally unique, e.g., your-unique-bucket-name-clickstream.
  4. Choose your region.
  5. Leave other settings as default.
  6. Click "Create bucket".

3.2 Create a Kinesis Data Firehose Stream

Why? Kinesis Data Firehose delivers streaming data to destinations like S3, Redshift, or Elasticsearch.

How?

  1. Navigate back to Kinesis in the AWS Console.
  2. Click on "Amazon Data Firehose" in the left navigation pane.
  3. Click "Create Firehouse strem".
  4. Source:
    • Choose "Kinesis Data Stream".
  5. Destination:
    • Choose "Amazon S3".
  6. Firehouse stream name: Enter ClickstreamDeliveryStream.
  7. Source setting Kinesis data stream: Enter the data stream ARN.

  1. Destination setting s3 bucket:
    • Enter bucket URI
    • New line delimiter: Enabled (Helps in separating records when viewing data.)
    • Leave the rest default

 

  1. Click "Create firehouse stream".

 


Step 4: Set Up Kinesis Data Analytics for Real-Time Processing

4.1 Create an IAM Role for Kinesis Data Analytics

Why? An IAM role is needed to allow Kinesis Data Analytics to access your data streams and write to your destination.

  1. Navigate to IAM in the AWS Console.
  2. Click "Roles" in the left navigation pane.
  3. Click "Create role".
  4. Select type of trusted entity: Choose "AWS service".
  5. Select Use case: select Service or use care “Kinesis”
  6. Select “Kinesis Analytics”

  1. Click "Next".
  2. Attach the following policies:
    • AmazonKinesisAnalyticsFullAccess

  1. Click "Next".
  2. Role name: Enter KinesisAnalyticsRole.
  3. Click "Create role".

 

 

4.2 Create a Kinesis Data Analytics Application

Why? Kinesis Data Analytics allows you to process and analyze streaming data using SQL.

How?

  1. Navigate back to Kinesis in the AWS Console.
  2. Click on " Managed Apache Flink" in the left navigation pane.
  3. Click "Create streaming application".

  1. Select Create from scratch for our lab
  2. Application name: Enter ClickstreamAnalyticsApp.
  3. Select Template for application setting: Choose "Development" (low cost, suitable for testing and our lab).
  4. You can select the role you have created, however I am letting aws to create and manage.
  5. Click "Create application".

4.3 Configure the Application Input

Why? To specify the streaming source for the application.

How?

 

 

Step 5: Write and Deploy the Flink Application Code

1.     Create Your Flink Application Script (flink_clickstream.py):

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
from pyflink.table.window import Tumble
from pyflink.table.descriptors import Schema, Json, Kinesis, Elasticsearch
 
def main():
    # Set up the execution environment
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_parallelism(1)
    settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
    t_env = StreamTableEnvironment.create(env, environment_settings=settings)
 
    # Define the source (Kinesis Data Stream)
    t_env.connect(Kinesis()
                  .version("1.0")
                  .stream("ClickstreamDataStream")
                  .aws_region("us-east-1")
                  .format(Json())
                  .json_schema("""
                      {
                          "type": "object",
                          "properties": {
                              "userid": {"type": "string"},
                              "page": {"type": "string"},
                              "timestamp": {"type": "long"}
                          }
                      }
                  """)
                  .option("scan.stream.initpos", "LATEST"))
        .with_format(Json())
        .with_schema(Schema()
                     .field("userid", "STRING")
                     .field("page", "STRING")
                     .field("timestamp", "BIGINT"))
        .create_temporary_table("source_table")
 
    # Register the source table
    source_table = t_env.from_path("source_table")
 
    # Define the processing logic using SQL
    result_table = t_env.sql_query("""
        SELECT
            userid,
            COUNT(*) AS page_views
        FROM
            source_table
        GROUP BY
            userid,
            TUMBLE(proctime, INTERVAL '1' MINUTE')
    """)
 
    # Define the sink (Kinesis Data Firehose)
    t_env.connect(Kinesis()
                  .version("1.0")
                  .stream("ClickstreamDeliveryStream")
                  .aws_region("us-east-1")
                  .format(Json())
                  .json_schema("""
                      {
                          "type": "object",
                          "properties": {
                              "userid": {"type": "string"},
                              "page_views": {"type": "integer"}
                          }
                      }
                  """))
        .with_format(Json())
        .with_schema(Schema()
                     .field("userid", "STRING")
                     .field("page_views", "INT"))
        .create_temporary_table("sink_table")
 
    # Insert the processed data into the sink table
    result_table.insert_into("sink_table")
 
    # Execute the Flink job
    t_env.execute("ClickstreamAnalyticsJob")
 
if __name__ == '__main__':
    main()

Explanation:

    • Source Configuration: Connects to ClickstreamDataStream and defines the schema.
    • Processing Logic: Counts the number of page views per userid every minute.
    • Sink Configuration: Sends the processed data to ClickstreamDeliveryStream in JSON format.

2.     Upload the Script to S3:

    • Upload flink_clickstream.py to your S3 bucket:
aws s3 cp flink_clickstream.py s3://your-unique-bucket-name-clickstream/

 
 

 

3.     Update Flink Application Configuration:

    • In the Flink Application Configuration page Click on  your application ClickstreamAnalyticsApp
    • Click Configure

      • Application Code Location:
        • Amazon S3 Bucket: s3://your-unique-bucket-name-clickstream
        • Path to S3 Object: flink_clickstream.py
        • Check Turn on snapshots
    • Click "Save changes" to apply.

4.     Start the Flink Application:

    • Ensure the application status is "Running". If not, start it manually.

Step 6: Configure Output to Kinesis Data Firehose

Since the sink is already defined in your Flink application code (ClickstreamDeliveryStream), ensure that:

1.     Firehose Stream Configuration:

    • Destination: Your S3 bucket (your-unique-bucket-name-clickstream).
    • Format: JSON (as specified in the Flink script).
    • Buffer Settings: Use default settings to minimize costs.

2.     Verify IAM Role Permissions:

    • The IAM role attached to your Flink application must have permissions to write to ClickstreamDeliveryStream.

Step 7: Generate Sample Data to Test the Pipeline

1.     Ensure Your Python Environment is Set Up Correctly:

    • If you downgraded Python or are using AWS Cloud9, ensure you're in the correct environment.

2.     Create data_generator.py:

import boto3
import random
import time
import json
 
# Replace 'your-region' with your AWS region, e.g., 'us-east-1'
kinesis = boto3.client('kinesis', region_name='us-east-1')
 
user_ids = ['user1', 'user2', 'user3', 'user4', 'user5']
pages = ['/home', '/about', '/contact', '/products', '/cart']
 
while True:
    data = {
        'userid': random.choice(user_ids),
        'page': random.choice(pages),
        'timestamp': int(time.time() * 1000)
    }
    print(json.dumps(data))
    kinesis.put_record(
        StreamName='ClickstreamDataStream',
        Data=json.dumps(data),
        PartitionKey=data['userid']
    )
    time.sleep(1)

3.     Run the Data Generator:

python data_generator.py
    • This script will start sending sample data to ClickstreamDataStream every second.
    • Note: You should see printed JSON data in your terminal confirming data generation.

Step 8: Verify Data Flow and Results

1.     Monitor the Flink Application:

    • In Managed Apache Flink, select ClickstreamAnalyticsApp.
    • Click on "View CloudWatch Dashboard".
    • Monitor metrics like Records In, Records Processed, and Records Out to ensure data is being ingested and processed.

2.     Check Processed Data in S3:

    • Navigate to S3 in the AWS Console.
    • Open your bucket your-unique-bucket-name-clickstream.
    • You should see JSON files being created, containing userid and page_views.
    • Example Content:
{
  "userid": "user1",
  "page_views": 5
}

Step 9: Clean Up Resources

To prevent incurring unnecessary charges, delete the resources you've created once you're done with the lab.

1.     Stop the Data Generator Script:

    • In your terminal, press Ctrl + C to stop the data_generator.py script.

2.     Delete the Flink Application:

    • Navigate to Amazon Kinesis > Managed Apache Flink.
    • Select ClickstreamAnalyticsApp.
    • Click "Actions" > "Delete" and confirm.

3.     Delete Kinesis Data Firehose Delivery Stream:

    • Go to Amazon Kinesis > Data Firehose.
    • Select ClickstreamDeliveryStream.
    • Click "Delete" and confirm.

4.     Delete Kinesis Data Stream:

    • Navigate to Amazon Kinesis > Data Streams.
    • Select ClickstreamDataStream.
    • Click "Delete" and confirm.

5.     Empty and Delete the S3 Bucket:

    • Go to S3 in the AWS Console.
    • Open your-unique-bucket-name-clickstream.
    • Click "Empty" to remove all objects.
    • After emptying, select the bucket and click "Delete".

6.     Delete IAM Roles (Optional):

    • Navigate to IAM in the AWS Console.
    • Click on "Roles".
    • Locate kinesis-analytics-ClickstreamAnalyticsApp-us-east-1 and delete it if it's no longer needed.

 

 

 

 

  1. In the application details page, click "Connect streaming data".
  2. Source:
    • Streaming source: Choose "Kinesis data stream".
    • Kinesis stream: Select ClickstreamDataStream.
    • IAM role: Select KinesisAnalyticsRole.
  3. Discovery schema:
    • Click "Discover schema".
    • Since we don't have any data yet, we'll define the schema manually.
  4. Manually specify schema:
    • Click "Edit schema".
    • Add the following columns:

Column Name

Data Type

userid

VARCHAR

page

VARCHAR

timestamp

BIGINT

    • Click "Save and continue".
  1. Click "Create application".

Step 5: Write and Deploy the SQL Code for Data Analytics

5.1 Write the SQL Code

Why? To process the incoming data and perform real-time analytics.

Scenario Solution:

We want to count the number of page views per user in real time.

How?

  1. In the "SQL editor", replace the existing code with the following:

sql


CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (

  "userid" VARCHAR(16),

  "page_views" INTEGER

);

 

CREATE OR REPLACE PUMP "STREAM_PUMP" AS

  INSERT INTO "DESTINATION_SQL_STREAM"

  SELECT

    "userid",

    COUNT(*) AS "page_views"

  FROM "SOURCE_SQL_STREAM_001"

  GROUP BY

    "userid",

    TUMBLING (INTERVAL '1' MINUTE);

Explanation:

    • We create a destination stream to hold our results.
    • We select data from the source stream, counting page views per user over a 1-minute window.

5.2 Save and Run the Application

How?

  1. Click "Save and run SQL".
  2. The application status will change to "Running".

Step 6: Configure Output to Kinesis Data Firehose

Why?

To send the processed data from Kinesis Data Analytics to our S3 bucket via Kinesis Data Firehose.

How?

  1. In the application details page, click "Add output".
  2. Name: AnalyticsOutput.
  3. Destination:
    • Kinesis Data Firehose: Select ClickstreamDeliveryStream.
    • IAM role: Select KinesisAnalyticsRole.
  4. Stream mapping:
    • Map the columns from your SQL stream to the destination.
    • Ensure that the column names match.
  5. Click "Save and continue".

Step 7: Generate Sample Data to Test the Pipeline

7.1 Create a Data Generator Script

Why?

To simulate real-time clickstream data.

How?

  1. On your local machine, create a file named data_generator.py.
  2. Add the following code:

python


import boto3

import random

import time

import json

 

kinesis = boto3.client('kinesis', region_name='your-region')

 

user_ids = ['user1', 'user2', 'user3', 'user4', 'user5']

pages = ['/home', '/about', '/contact', '/products', '/cart']

 

while True:

    data = {

        'userid': random.choice(user_ids),

        'page': random.choice(pages),

        'timestamp': int(time.time() * 1000)

    }

    print(json.dumps(data))

    kinesis.put_record(

        StreamName='ClickstreamDataStream',

        Data=json.dumps(data),

        PartitionKey=data['userid']

    )

    time.sleep(1)

Explanation:

    • Generates random user activity data.
    • Sends data to the ClickstreamDataStream.
  1. Replace 'your-region' with your AWS region (e.g., 'us-east-1').

7.2 Run the Data Generator

How?

  • In your terminal, run:

bash


python data_generator.py

  • This will start sending data to your Kinesis Data Stream.

Step 8: Verify Data Flow and Results

8.1 Monitor Kinesis Data Analytics Application

Why?

To ensure the application is processing data correctly.

How?

  1. In the AWS Console, navigate to Kinesis Data Analytics.
  2. Select your application ClickstreamAnalyticsApp.
  3. Click on "Real-time analytics".
  4. You should see data flowing through your application.

8.2 Check Processed Data in S3

Why?

To verify that the processed data is being delivered to S3.

How?

  1. Navigate to S3 in the AWS Console.
  2. Open your bucket your-unique-bucket-name-clickstream.
  3. You should see data files in the bucket, organized by date and time.
  4. Download one of the files and open it to view the processed data.

Step 9: Clean Up Resources

Why?

To avoid incurring unnecessary charges.

How?

  1. Stop the Data Generator Script
    • Press Ctrl + C in your terminal to stop the script.
  2. Delete the Kinesis Data Analytics Application
    • Navigate to Kinesis Data Analytics.
    • Select ClickstreamAnalyticsApp.
    • Click "Delete".
  3. Delete the Kinesis Data Firehose Delivery Stream
    • Navigate to Kinesis Data Firehose.
    • Select ClickstreamDeliveryStream.
    • Click "Delete".
  4. Delete the Kinesis Data Stream
    • Navigate to Kinesis Data Streams.
    • Select ClickstreamDataStream.
    • Click "Delete".
  5. Empty and Delete the S3 Bucket
    • Navigate to S3.
    • Open your bucket.
    • Delete all objects within the bucket.
    • Return to the bucket list, select your bucket, and click "Delete".
  6. Delete IAM Roles (Optional)
    • Navigate to IAM.
    • Delete KinesisAnalyticsRole if not used elsewhere.

Testing and Validation

Verify Data Ingestion

  • Check Kinesis Data Stream Metrics:
    • Navigate to Kinesis Data Streams.
    • Select ClickstreamDataStream.
    • Click on "Monitoring".
    • Ensure that data is being ingested.

Verify Data Processing

  • Monitor Kinesis Data Analytics:
    • Check that the application is running and processing records.

Verify Data Delivery

  • Check S3 Bucket:
    • Ensure that processed data files are being written to your S3 bucket.

Summary

In this lab, you have:

  • Set Up a Kinesis Data Stream: Created a stream to ingest real-time data.
  • Configured Kinesis Data Firehose: Set up a delivery stream to send data to S3.
  • Used Kinesis Data Analytics: Processed streaming data in real time using SQL.
  • Generated Sample Data: Simulated real-time data ingestion.
  • Verified Data Flow: Ensured data was processed and stored correctly.
  • Cleaned Up Resources: Deleted resources to prevent unnecessary costs.