Spotify Luigi for building data engineering Pipeline

Sundara
2 min readDec 4, 2020

I came across the Luigi package when I was reading about building industry grade pipeline to automate the end to end flow of data.

I started my journey with Luigi one weekend and by the end of it I fell in love with it.

I am not comparing other tools such as Apache Airflow here instead I am documenting my journey with Luigi and what's classy about it.

I use Windows OS. In general, I have noticed not many tools are compatible with windows. Even if they do support windows, the installation takes us for a ride. It was to my surprise that installation of Luigi was astonishingly simple. You simply use pip to install it

pip install luigi

I started with the example provided in the documentation, HelloWorld and NameSubstituter. It is a simple python object oriented program where the class inherits from the Luigi Task. This helped me understand how to pass a parameter. Also, this helped me to run the same job in a central scheduler. There are two schedulers available in Luigi, local scheduler and central scheduler.

I started with a simple usecase

  1. Read the data from locally available comma separated values
  2. Perform basic data cleanup task
  3. Build a data transformation and data engineering task
  4. Create simple moving averages
  5. Run this at scale

I defined the above flow of execution. Luigi execution happens from bottom to top. This means that we trigger the top most task and the top most task executed at last. The dependency tree is built in such a way that the “Task Manager” depends on “Data Transformer” and “Signal Generator”. These two tasks in turn depends on “Data cleaner” task. The “Data cleaner” task depends on “Source File Reader”. The source file reader fetches the data from the local file in this case CSV.

The dependency is built with the function “requires”. The “output” function is used to write the output results upon completion of the processing. The processing is done within the function “run”. The run function can invoke other functions (class level methods or global functions).

The above structure provides cleaner implementation and overall modularity to the entire pipeline task thereby enabling code reusability.

The parameters to be passed as input to the tasks are defined within the class as Luigi parameter. The dependent tasks can pass parameters. This is achieved as the output of the earlier task is made available as input for the next task.

The task can be triggered using a main function through Luigi’s build function. This function accepts the name of the task as a list along with the number of workers and the type of scheduler as input.

The code is available in the github repository under the file StockPrice.py https://github.com/snaray12/luigi-demo/

--

--