Real-Time Stock Data Pipeline Using AWS – Built for Speed & Scale!

Source: Real-time CSV files with stock prices

Target: JSON format files consumable by Data Analysts

Goal: Automate the transformation and cataloging for query-ready analytics

πŸ› οΈ Step-by-Step Pipeline with AWS Services
πŸ”Ή 1. CSV Files Drop into S3
Incoming files: stock data like stock_data_2025-06-16.csv

S3 Source Bucket: s3://reedx-stock-raw/

These files were pushed by upstream providers or batched ingestion tools.

StockID,Company,Price,Volume,Date
101,Apple,192.5,1200,2025-06-16
102,Google,2785.0,850,2025-06-16
πŸ”Ή 2. S3 Event Trigger to Invoke Lambda
S3 event notification set up for ObjectCreated

Trigger: Calls a Lambda function

Permissions: IAM role with s3:GetObject, s3:PutObject, logs:CreateLogGroup

πŸ”Ή 3. AWS Lambda – CSV to JSON Conversion
Language: Python

Action:

Read the CSV from source bucket

Parse and convert each row to JSON

Write JSON to a target S3 bucket: s3://reedx-stock-json/

βœ… Sample Lambda code snippet:

import boto3, csv, json
from io import StringIO

def lambda_handler(event, context):
s3 = boto3.client(‘s3’)

bucket_name = event['Records'][0]['s3']['bucket']['name']
key_name = event['Records'][0]['s3']['object']['key']

obj = s3.get_object(Bucket=bucket_name, Key=key_name)
data = obj['Body'].read().decode('utf-8')

reader = csv.DictReader(StringIO(data))
json_data = [row for row in reader]

output_key = key_name.replace('.csv', '.json')
s3.put_object(
    Bucket='reedx-stock-json',
    Key=output_key,
    Body=json.dumps(json_data)
)

πŸ”Ή 4. JSON Files Stored to S3
Partitioned based on date:
s3://reedx-stock-json/date=2025-06-16/stock_data.json

Helps Glue crawler and Athena work efficiently

πŸ”Ή 5. AWS Glue Crawler for Schema Inference
Crawls: reedx-stock-json/

Output: Creates or updates a Glue table: reedx_stock_data_json

Runs every 15 minutes (or via EventBridge rule)

Format: JSON with schema detected as:

Column Type
StockID int
Company string
Price float
Volume int
Date string

πŸ”Ή 6. Athena Queries by Data Analysts
Analysts query stock data directly with Athena using SQL.

Sample queries:

— View latest stock prices
SELECT * FROM reedx_stock_data_json
WHERE date = ‘2025-06-16’;

— Find highest traded volume
SELECT Company, MAX(Volume) as max_volume
FROM reedx_stock_data_json
GROUP BY Company;

— Filter high-priced stocks
SELECT * FROM reedx_stock_data_json
WHERE Price > 1000;
πŸ”Ή 7. Monitoring & Logging
CloudWatch used to log:

Lambda success/failure

Crawler run outcomes

Alerts set using CloudWatch Alarms on errors or missing files

βœ… IAM Permissions Required
πŸ”Έ Lambda Execution Role:

{
“Effect”: “Allow”,
“Action”: [
“s3:GetObject”,
“s3:PutObject”,
“logs:” ], “Resource”: “
}
πŸ”Έ Glue Role:
json
Copy
Edit
{
“Effect”: “Allow”,
“Action”: [
“s3:GetObject”,
“glue:” ], “Resource”: “
}

🎯 Business Benefits
Zero manual steps for conversion

Real-time availability for analysis

Serverless, scalable, and cost-effective

Leave a Reply

Your email address will not be published. Required fields are marked *