Source: Real-time CSV files with stock prices
Target: JSON format files consumable by Data Analysts
Goal: Automate the transformation and cataloging for query-ready analytics
π οΈ Step-by-Step Pipeline with AWS Services
πΉ 1. CSV Files Drop into S3
Incoming files: stock data like stock_data_2025-06-16.csv
S3 Source Bucket: s3://reedx-stock-raw/
These files were pushed by upstream providers or batched ingestion tools.
StockID,Company,Price,Volume,Date
101,Apple,192.5,1200,2025-06-16
102,Google,2785.0,850,2025-06-16
πΉ 2. S3 Event Trigger to Invoke Lambda
S3 event notification set up for ObjectCreated
Trigger: Calls a Lambda function
Permissions: IAM role with s3:GetObject, s3:PutObject, logs:CreateLogGroup
πΉ 3. AWS Lambda β CSV to JSON Conversion
Language: Python
Action:
Read the CSV from source bucket
Parse and convert each row to JSON
Write JSON to a target S3 bucket: s3://reedx-stock-json/
β Sample Lambda code snippet:
import boto3, csv, json
from io import StringIO
def lambda_handler(event, context):
s3 = boto3.client(‘s3’)
bucket_name = event['Records'][0]['s3']['bucket']['name']
key_name = event['Records'][0]['s3']['object']['key']
obj = s3.get_object(Bucket=bucket_name, Key=key_name)
data = obj['Body'].read().decode('utf-8')
reader = csv.DictReader(StringIO(data))
json_data = [row for row in reader]
output_key = key_name.replace('.csv', '.json')
s3.put_object(
Bucket='reedx-stock-json',
Key=output_key,
Body=json.dumps(json_data)
)
πΉ 4. JSON Files Stored to S3
Partitioned based on date:
s3://reedx-stock-json/date=2025-06-16/stock_data.json
Helps Glue crawler and Athena work efficiently
πΉ 5. AWS Glue Crawler for Schema Inference
Crawls: reedx-stock-json/
Output: Creates or updates a Glue table: reedx_stock_data_json
Runs every 15 minutes (or via EventBridge rule)
Format: JSON with schema detected as:
Column Type
StockID int
Company string
Price float
Volume int
Date string
πΉ 6. Athena Queries by Data Analysts
Analysts query stock data directly with Athena using SQL.
Sample queries:
— View latest stock prices
SELECT * FROM reedx_stock_data_json
WHERE date = ‘2025-06-16’;
— Find highest traded volume
SELECT Company, MAX(Volume) as max_volume
FROM reedx_stock_data_json
GROUP BY Company;
— Filter high-priced stocks
SELECT * FROM reedx_stock_data_json
WHERE Price > 1000;
πΉ 7. Monitoring & Logging
CloudWatch used to log:
Lambda success/failure
Crawler run outcomes
Alerts set using CloudWatch Alarms on errors or missing files
β
IAM Permissions Required
πΈ Lambda Execution Role:
{
“Effect”: “Allow”,
“Action”: [
“s3:GetObject”,
“s3:PutObject”,
“logs:” ], “Resource”: ““
}
πΈ Glue Role:
json
Copy
Edit
{
“Effect”: “Allow”,
“Action”: [
“s3:GetObject”,
“glue:” ], “Resource”: ““
}
π― Business Benefits
Zero manual steps for conversion
Real-time availability for analysis
Serverless, scalable, and cost-effective