Problem Statement
Truecaller deals with millions of user settings change events daily.
Each event looks like this:
id (long)
name (string)
value (string)
timestamp (long)
The goal:
Group events by id.
Convert (name, value) pairs into a Map.
Always pick the value for each key that has the latest timestamp.
Output a partitioned table for faster downstream queries.
Example:
id name value timestamp
1 notification true 1546333200
1 notification false 1546335647
1 background true 1546333546
Output:
id settings
1 {“notification”: “false”, “background”: “true”}
PySpark Solution
python
Copy
Edit
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, row_number, collect_list, map_from_entries, struct
from pyspark.sql.window import Window
spark = SparkSession.builder \
.appName(“TruecallerETL”) \
.getOrCreate()
df = spark.read.csv(“path/to/events.csv”, header=True, inferSchema=True)
window_spec = Window.partitionBy(“id”, “name”).orderBy(col(“timestamp”).desc())
latest_df = df.withColumn(“rn”, row_number().over(window_spec)) \
.filter(col(“rn”) == 1) \
.drop(“rn”, “timestamp”)
result_df = latest_df.groupBy(“id”) \
.agg(map_from_entries(collect_list(struct(col(“name”), col(“value”)))).alias(“settings”))
result_df.write.mode(“overwrite”).parquet(“path/to/output”)
Optimizations Applied
✅ Window functions for latest value selection.
✅ Minimal shuffles by filtering early.
✅ map_from_entries for direct map creation.
✅ Parquet + partitioning for efficient storage and queries.
Design Considerations for Scale
If processing hundreds of millions of rows:
Partition input data by date/hour.
Use broadcast joins only when necessary.
Tune spark.sql.shuffle.partitions for your cluster size.
Use checkpointing if job stages are reused downstream.
💡 Final Thoughts
While the business logic here is simple, performance tuning in PySpark makes the difference between a job that takes minutes vs hours. At Truecaller scale, that’s critical.