AI in Data Engineering: Revolutionising Spark Usage

Context

I have been using Cursor for the last year on a personal IOS app project and watched it grow from an overconfident irritating trainee who would triumphantly generate code that would not compile to a seasoned professional who can generate elegant code, build it, test it and get it right the first time around most of the time.  

Cursor has become a trusted partner in my project so much so that I even ask it for new features ideas.

But all that applies to traditional computing, so I was curious to see how this tool would fare with distributed programming and I had the perfect code for this: the one I used for my previous Cloudera blog post on Spark performance deep  dive. It‘s still available on the Cloudera website but, due to the lack of a code displaying feature, it is a difficult read so I have added it to my own blog here.

Code analysis

So I created a new Cursor project, added the original prepare_data_baseline function and provided the following prompt:

Please analyze the code of the prepare_data baseline function and outline the steps you take to fix this code, explicitly referencing the best practices you  implement.

And here is Cursor’s output:

Here is a concise analysis of prepare_data_baseline and a fix plan tied to Spark and Python practices.

What the code does

The function:

  1. Marks rows where page == ‘Cancellation Confirmation’ with a binary Churn flag.
  2. Builds lists of user IDs who ever churned vs. everyone else, then filters the main df into all_cancelled and stayed.
  3. Adds a label column: 0 if the user is treated as “stayed”, 1 otherwise.

Issues (ordered by severity)

1. Driver collect() + Python loops (scalability and stability)
distinct().collect() on users (lines 26–27 and 37–40) pulls all distinct IDs to the driver. That does not scale, risks OOM, and wastes cluster parallelism.

2. Row-at-a-time Python UDFs for simple predicates
The first UDF (lines 17–20) only compares page to a string; the second (lines 53–57) only checks membership in a Python set. Both skip Catalyst’s optimized expressions and add Python UDF overhead (serialization, row-by-row execution) where native column ops would suffice.

3. Fragile / inconsistent “lists” logic
canc_list filters empty strings from gb, but stayed_list uses set(gh) – set(gb) (line 41), not set(gh) – set(canc_list). If ” appears in gb, behavior between all_cancelled / stayed / stayed_list can disagree. Naming is also mixed (canceled vs cancelled, Churn vs churn in filters).

4. Large isin lists
where(col(‘userId’).isin(canc_list)) (lines 50–51) can become a huge IN list and hit planner/parser limits or poor plans at scale; a small dimension table + join (often broadcast) is the usual pattern.

5. Documentation and style
The docstring (lines 5–14) is not indented as part of the function body (invalid/normal Python would fail — if this runs, it may have been fixed elsewhere or the file is illustrative). PEP 257 expects an indented docstring under def.

Please ignore the point above , it might have been introduced by me while copying and pasting the code.

6. Side effects in a “prepare” function
print calls (lines 34, 47) mix I/O with data logic, which hurts reuse, testing, and production jobs (logs/noise, non-deterministic ordering in distributed runs).

Steps I would take to fix it (and which practices they implement)

StepChangePractice
AReplace the first UDF with when(col(“page”) == lit(“Cancellation Confirmation”), 1).otherwise(0) (or equivalent expr).Prefer built-in Column expressions so Catalyst can optimize and avoid Python UDF cost.
BCompute “ever churned” users with groupBy(“userId”).agg(max(“Churn”).alias(“ever_churned”)) (or bool_or / max on 0/1), after defining Churn without a UDF.Use distributed aggregates instead of driver-side lists.
CDefine stayed users as users with ever_churned == 0 (and handle null/empty userId explicitly with filter conditions, not ad hoc list diffs).Single source of truth for churn vs stay; avoids set(gh)-set(gb) inconsistencies.
DJoin the main df to a small user-level churn flag DataFrame (join(…, “userId”, “left”)) to create label (and optionally drop the helper). Prefer broadcast on the small side when you know it’s small.Joins over isin giant lists; broadcast for small dimensions per Spark tuning guidance.
EBuild stayed and all_cancelled by joining df to the stayed-user set / churned-user set DataFrames (or semi joins), not isin(python_list).Keeps execution fully on executors; same semantics without driver materialization.
FNormalize naming: one spelling for cancelled, consistent churn column casing in filter/col.Consistent API and fewer case-resolution surprises across Spark versions.
GFix the docstring (indent under def, Args/Returns aligned with real types: Spark DataFrames).PEP 257 and clearer contracts.
HRemove or replace print with optional logging or return counts as DataFrames/small structs if needed for debugging.Separation of concerns and testability.

Summary

The baseline mixes correct high-level intent (per-user churn label and splits) with patterns that break down in Spark: driver collect, Python UDFs for trivial logic, and set arithmetic on collected lists. The fix is to express churn as aggregates and joins on DataFrames, use column expressions instead of UDFs where possible, align empty userId handling, and reserve broadcast joins for small user-level tables—matching common Spark performance and reliability practice.

I can turn this into a concrete patched prepare_data_baseline.py in your repo if you want that next.

I think it’s fair to say that Cursor nailed it, it even caught the inconsistent column naming that I had joked about (point F)!

Code refactoring

Here is the rest of the chat.

Please go ahead

Implementing the refactored function: reading the file and applying the full replacement.

Writing the refactored implementation. No other files reference Churn, so we’ll use the consistent lowercase churn column name.

Here is what was implemented in prepare_data_baseline.py:

Changes

  1. Native column expressions — Replaced both UDFs with when(col(“page”) == lit(“Cancellation Confirmation”), lit(1)).otherwise(lit(0)) so Catalyst can optimize and avoid Python row UDFs.
  2. Distributed logic — Replaced collect() and Python set logic with groupBy(“userId”).agg(F.max(“churn”).alias(“ever_churned”)), then inner joins to build stayed and all_cancelled, so work stays on executors.
  3. Single definition of churned vs stayed — churned_users and stayed_users both come from user_churn with ever_churned and the same valid_user filter, so there is no mismatch between gb and canc_list.
  4. valid_user — userId not null and not ”, aligned with the old list filtering.
  5. Label — label is ever_churned from the join (same as before: 1 if the user ever churned, 0 otherwise).
  6. Docstring — Indented, PEP 257-style, with accurate Args/Returns.
  7. Imports — Dropped udf, IntegerType; added functions as F and when / lit.
  8. Naming — Row flag is churn (lowercase) instead of Churn. Nothing else in the repo referenced the old name; if notebooks expect Churn, rename the column in one line or say if you want it kept.
  9. No print in the pipeline — Keeps the function pure and easier to test or schedule.

Note: user_churn is not wrapped in broadcast(). With many distinct users, a shuffle join is appropriate; Spark can still auto-broadcast small sides under AQE. If you know the user dimension is always small, you can wrap user_churn in broadcast() for a broadcast hash join.

prepare_data_baseline.py
Lines 1-58
from pyspark.sql import functions as F
from pyspark.sql.functions import col, lit, when
def prepare_data_baseline(df):
    """Prepare a Spark DataFrame with row-level churn flags and user-level labels.
    ...
    """
    df = df.withColumn(
        "churn",
        when(col("page") == lit("Cancellation Confirmation"), lit(1)).otherwise(lit(0)),
    )
    user_churn = df.groupBy("userId").agg(F.max("churn").alias("ever_churned"))
    valid_user = col("userId").isNotNull() & (col("userId") != lit(""))
    churned_users = user_churn.filter((col("ever_churned") == 1) & valid_user).select(
        "userId"
    )
    stayed_users = user_churn.filter((col("ever_churned") == 0) & valid_user).select(
        "userId"
    )
    df = (
        df.join(user_churn, on="userId", how="left")
        .withColumn("label", col("ever_churned"))
        .drop("ever_churned")
    )
    all_cancelled = df.join(churned_users, on="userId", how="inner")
    stayed = df.join(stayed_users, on="userId", how="inner")
    return df, stayed, all_cancelled

Implications

First, I can answer the initial question: 

Yes, AI code generators are knowledgeable about distributed processing.

But that answer prompts a new set of questions.

Have I just become useless?

Well not quite. 

The generated code still needs to be checked. I can appreciate that it is correct because I am also knowledgeable about distributed processing

What are the implications of this for Spark?

Checking is much less time consuming than doing: so this results in a massive productivity boost which, in turn , lowers the cost of ownership of Spark code.

The cost of ownership of Spark code has long been used as a deterrent by proponents of ‘no code’ or ‘low code’ solutions. Now, with the help of AI generators, Spark (and Flink) become unrivaled in terms of flexibility, performance and costs.

What are the implications of this for data engineers?

They need to learn the art of  prompting. Those with experience  in testing will have a headstart because they should have the essential skill of specifying without ambiguity expected behaviour.

What are the implications of this for organisations?

Another popular strategy to mitigate the cost of ownership of code was outsourcing. I do not see the value of this model anymore. These organisations have developed the skills needed to specify unambiguously their requirements, now they can get quality results back instantly without the overhead of contracts, different timezones and costs.

It’s a new world

Those are some of the immediate questions that arise from this limited experiment, I am sure there are many more. We are discovering a new world in which knowledge, once captured, can be distributed and conveniently applied to the resolution of specific problems. That does not imply that expertise has become redundant because those models need to be trained and their output need to be checked but it does mean that it no longer needs to be as ubiquitous as it used to be.