Zen of Batch Pipelines - A recipe to reduce cognitive load

In this post, I’ll share some hard-earned tricks for building and maintaining many many batch data pipelines. It’s all about coercing yourself to keep the mental overhead to a minimum.

For the past 5 years I worked at Schibsted, a multinational Marketplace & Media giant in tiny Nordics. It was the deep learning ambitions and the massive streams of event data that lured me in and the great people and engineering culture and that kept me there.

My team ran the machine learning pipelines for personalization and advertising (roughly behavioural events in \(\rightarrow\) predicted user traits out). We maintained some 25 pipelines × 10s stages × 3 countries × dev/pre/pro under strict privacy, quality and downtime constraints.

Over the years I had many reasons to think about scaling a data team and its limiting factors. This matters because I firmly believe ML/Data teams must run systems in production to create value. Center of expertise? Temporary consultants? No. More jobs \(\propto\) more value and the written-in-flesh limit is the shared context of around a 12 people team.

To be able to debug & develop on the already vast error surface, cognitive load - hence variation - had to be kept in check. This was collectively learnt from thousands of hours sifting through confs, spark logs, stack-traces, s3, code, monitors, metrics etc etc.

I wrote this down because my team needed a shared mental model of our battle-tested design principles. It would end up serving as a portal piece for many design discussions.

Maybe it’s of interest to the internet even if you’re not using an analogous stack to what we had (Scala/Spark/Python/K8s/Luigi/S3). You can read this with one finger solemnly raised if it helps:

The Zen of batch pipelines

Purpose of this document is to keep track and align programming principles for our batch pipelines. We should update this doc and revisit it to keep track of style/design decisions. If possible; keep it short/readable/declarative with the intended readers in mind:

  • New team members trying to understand why things are like they are.
  • Old team member referencing this doc in a design decision/PR/review.

Guiding Principles / Desired state

Our data pipelines should follow functional programming principles: reproducibility, atomicity, and readability

Infra

travis         {on commit}        =>
Spinnaker      {on merge}         =>
k8s cronjob    {every 20 min}     =>
k8s Job        repeat             =>
luigi worker   {if not completed} =>
batch stage => {s3://../_SUCCESS}
  • We automate deployments to pre/pro.
  • Kubernetes Cronjobs schedules luigi pods
  • luigi launches batch job stages
  • A batch job stage is uniquely defined by its luigi input arguments
  • Batch job is dockerized
  • The main branch intends to reflect production code. Whatever is in pro-environment can be assumed to be production - it’s depended on.

A stage:

  • is a transformation Inputs => Output launched by luigi. Typically s3 => s3
    • Example: s3://../events-raw/.. => s3://../events-preprocessed/..
  • Is idempotent: Same luigi input params produces the same output.
    • We don’t like side effects. If unavoidable - all of them are easily understood from luigi task.
    • Output can be traced to a commit.
    • We strive for deterministic programs. Fixing seeds or maintaining sort order helps.
  • Is atomic. Has completeness defined by its single _SUCCESS file.
  • Is versioned. Its path /version=./ is always bumped for backwards incompatible changes.
  • Work of stage is done in either Spark/Scala or Python. We avoid inlining it in luigi.
  • has easily found and enforced output format/schema. The output of a Spark stage is a Dataset[Type].
  • A single stage is better than more stages.
  • Pipelines should be self-contained:
    • Keep the interdependencies between pipelines to a minimum. Each pipeline produces its own derived datasets (aggregations, features, models) rather than depend on other pipelines. Redundant compute is cheaper than blocked developers.
    • External systems are accessed through standardized datasets or shared libraries. Ex: Don’t call an API to fetch data into your stage. Read data from a stage that persisted data from an API call.

Naming

  • A luigi task is named after its output dataset (sideeffect). Think “what’s a good sql table name”
  • Dataset names explain its content, rather than transformations to get there.
  • Transformation code (eg. Scala main) can be named after what it does if it’s used more than once.
  • It’s best if luigi, s3, and scala names are identical enough for a dumb IDE to help us with search-replace-refactoring.
  • Ideally we want to be able to infer from the data path who:
    • Wrote it (Pipeline)
    • Who triggered it (luigi task)
    • What code was involved (scala/python main class)
    • What’s in the dataset and what output schema it has (name of output case class)
    • Infer from column names where data comes from

Examples

Good:
  • EventsWithLocations luigi task
  • s3://.../events-with-locations/../
  • Scala main class under com.org.pipelinename.jobs:
    • EventsWithLocationsJob.scala
  • Output type is Dataset[EventWithLocation]

A full path could look like

s3://bucket-name/retention=5/pipeline-name/events-with-locations/version=1/lookback=25/year=2024/month=1/day=1/hour=1/_SUCCESS

This is long indeed but everything serves its purpose.

Bad:
  • EventAttributeEnrichment luigi task
  • s3://bucket-name/pipeline-name/ecosystem/predicted-events/version=0.0.3/..
  • EventPredictor.scala
  • Output type is an untyped DataFrame

Implications

  • Luigi does all the path checking/manipulations.
  • If your stage is not easily named such that its luigi task explains its sideffect - you’re likely trying to do something we shouldn’t do.
  • If you don’t know exactly what the output format of a stage is - a consumer won’t either.
  • The mental model described here may not always fit to how we want to write code, but if we try to conform to any kind consistent mental model it will help reading and reasoning about the system (and mainly) the data afterwards.

Testing

  • Goal: if travis says ✅ then the k8s job runs without failure.
  • Luigi tests should explicitly and verbosely verify expected sideffects - i.e the exact function call like input/output path of stage.
  • Scala main should only contain IO and calls to a function which is tested.
  • We prefer explicit and dumb unit tests.
  • If it’s hard to test - you probably shouldn’t do it.

Style

  • We try to minimize library dependencies.
  • Formatting is automatically enforced via pre-commit and scalafmt.
  • We enforce mypy typing. Every function should be using types.
  • We gravitate towards spark Dataset api and shunn the org.apache.spark.ml.Transformer Api.
  • We avoid Spark udf’s
  • We keep luigi task parameters to a minimum.
    • Parameters are passed explicitly. Luigi tasks do not .clone().
    • We avoid default parameters. in the Pipeline Differences Google doc
  • (Some Pipeline Name) is the gold standard. Changes should be propagated to it and from it. We try to keep other pipelines in sync with this pipeline.

Keras WTTE-RNN and Noisy signals

I was really happy to find daynebatten’s post about implementing WTTE-RNN in keras. Since then I’ve done some work to fully cram WTTE-RNN into Keras and get it up and running. Some things becomes outright hacky (like target has to be the same shape as predicted) but Keras is also a really nice place to mock upp networks and tests and get work done.

If you haven’t checked out the updated Github-project, here’s a quick taste.

Evenly spaced points revisited

I like this example. We know the truth and can modify the signal. The problem is to predict the Time To Event (TTE, black) only knowing what happened up until where you’re predicting from. An added problem is that you can only train on what you have data for (the whole timewindow), leading to censoring (in red). Here looping through the sequences that are possible:

Add some noise

Lets set up a 200-step RNN. As input to the network feed if there was an event in the last step. You use this to predict the number of steps to the next event.

Imagine now that this signal is corrupted by noise. Stacking 160 such sequences on top of eachother looks something like this:

Here the x-axis is time and y the individual sequences. Note how there’s black events sparkled randomly.

Now let’s train an RNN to see what it thinks about it. With a few lines of Keras we can define a small network:

print 'init_alpha: ',init_alpha

np.random.seed(1)
# Store some history
history = History()

# Start building the model
model = Sequential()
model.add(GRU(1, input_shape=(n_timesteps, n_features),return_sequences=True))

model.add(Dense(2))
model.add(Lambda(wtte.output_lambda, arguments={"init_alpha":init_alpha, 
                                               "max_beta_value":4.0}))
loss = wtte.loss(kind='discrete').loss_function

model.compile(loss=loss, optimizer=adam(lr=.01))

model.summary()
init_alpha:  43.4425042957
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
=================================================================
gru_1 (GRU)                  (None, 200, 1)            9         
_________________________________________________________________
dense_1 (Dense)              (None, 200, 2)            4         
_________________________________________________________________
lambda_1 (Lambda)            (None, 200, 2)            0         
=================================================================
Total params: 13.0
Trainable params: 13.0
Non-trainable params: 0.0
_________________________________________________________________

After some epochs it works pretty well. Stacking the predicted Weibull parameters it’s clear that it learned something:

Above we’re looping through the sequences (top to bottom) and show the predicted quantiles. With just one GRU-cell it’s clear that it’s fooled by noise, but it still seems to have learned some type of bayesian reasoning. In particular, even though it was only trained on the censored TTE it manages to predict the actual TTE quite well.

To get the code and improve the results, check out the Github-project and the Jupyter Notebook.

More to come!