Creating ETL Pipelines With BodoCreating ETL Pipelines With Bodo

Creating ETL Pipelines With Bodo

July 7, 2022
Dillon Roach
The data used in this post comes from NYC parking tickets issued during the years 2016 and 2017. The original CSV files are available in an S3 bucket (s3://bodo-examples-data/nyc-parking-tickets). You can alternatively get the data from Kaggle.

Reading and Converting Data

While these files may be read directly from source into dataframes, we prefer to avoid performance issues related to networking and focus on dealing with the files locally.  You can download the requisite files with the following script:

This gets us the .CSV files, but we'll also want to use the same files in Parquet format to cover some other points along the way.  While, in theory, it's as simple as your pandas dataframe.to_parquet(), the input files are not entirely set up for it.  You'll likely need to run the following first to prepare the file:

You can also simply write df = df.astype({'A':'string','B':'string'}) type statements, of course, we are extra verbose here for clarity.We then dump out two versions of the parquet file; first with standard output, and again with row_group_size set to 100k.

The same can be done for the 2017 dataset, as shown here.

Comparing Read Metrics

In our ETL pipeline we want to run a piece of code that looks like this:

We read in a file, perform a simple df.fillna() and then call a df.groupby() followed by some data cleaning leading up to the final analysis goal here: a new column that gives the per-violation-code percentage of summons for each row of data. We finally then return the resulting dataframe for use in other steps. On a modern desktop, this can take upwards of 40 seconds to run, and because the entire .CSV needs to be read, it can eat up a good chunk of RAM.

Enter Bodo: we can run the exact same function with minimal changes and drop that number down to 6 seconds using 8 cores.  Let's see how.

If you're running your code in a Jupyter notebook, you'll want to launch a IPyParallel cluster to start:

Note: when running on the platform, this step is unnecessary and already taken care of.

With a cluster available, we then run our cells with the %%px magic as below:

This is identical to our previous function, with the simple change that we've now included the @bodo.jit() decorator and we ensure the summons dictionary is replicated to all workers with bodo.allgatherv(sums_temp).  Bodo now compiles our code with a just-in-time compiler and runs on our available compute engines, without having to hassle with MPI code by hand.  On our particular hardware this code now runs in just under 4 seconds.  A good step forward, but there's still some room for improvement.

With the same imports we now run the same script again, but now lean on the benefits of using Parquet.

On the same hardware, all of this now took less than 1.1 seconds. This is possible in part because Parquet uses columnar storage. While the CSV files must be read in entirety, using Parquet allows Bodo to extract efficiently only the subset of columns needed.

Should you run the same code, yet again, but choose to read in the segmented parquet files that roughly 1 second drops further to about 0.5 seconds. Not as exciting when you're just running it once, on small files; but if you had a process that had to be run millions of times, on lots of data, that's a hefty chunk of compute cost just saved.

From 40 seconds down to 0.5 seconds with little hassle
For a single file you'll also be paying some time up-front compiling, but as the job grows that compile time still only happens the once and becomes negligible for large-scale work

Boxing and Unboxing

One thing you may have noticed already - in the code examples above, when we call load_parking_tickets() and set it to a variable, we take an object created in compiled @bodo.jit() code and pass that object back out to the general Python namespace.

The issue is that Bodo and native Python do not always represent data structures in the same way—Bodo represents data using efficient native data structures. This means that passing data from the top-level Python namespace into a Bodo-jitted function involves 'unboxing' that object from its native Python representation. Similarly, returning an object back to the top-level Python namespace from a Bodo-jitted function requires 'boxing' that object back into Python's native object representation.

For a single function, like above, this is likely of little concern. But, should you find yourself making long series of functions that need to be called, one after another, a final @bodo.jit() that wraps all of the functions may be advisable:

Given pseudo-code:

This simply means that rather than running

You would instead wrap these in another @bodo.jit() to keep the objects in the same representation as long as possible:

If you would like to read more about this topic, we take this notion and apply it to the load_parking_tickets() example, including the entire nyc_parking_tickets notebook, here.

For More

If you've made it this far and are still interested in knowing more, we suggest looking at the final three notebooks in this repo.  We cover a few final details about scaling these ideas for production and how to generalize a pipeline to work with many different data sources.

Ready to see Bodo in action?
Sign up now for our free SDK trial

Let’s go