In the previous post I created a data engineering pipeline. However, the pipeline was executed manually and as we know luigi will not run a job if it has already completed.

In this post I want to mention how I overcame both the issue ie. running it in a scheduled way and how to run it any number of times without stopping.

Scheduling the pipeline execution

I used python module schedule to run the luigi pipeline automatically. The schedule module is very intuitive and powerful. If I want to run the job continuously every 10 seconds then the same can be done using the one liner

schedule.every(10).second.do(job)

For more details on the schedule module please refer to the documentation.

The next item is to run the luigi pipeline irrespective of previous execution. I achieved this using a simple trick. I used another parameter as input and I passed the execution start time as a parameter to the task. With this inclusion I effectively was able to tell the task that there is a change in the data to run the task. The luigi task will now consider every run as a new run with new data. With this change luigi is able to execute the task even if it was already completed in the previous run.

The code for the same is listed below. I have used the Hello World task and the NameSubstituter task to run every 10 seconds. The below code snippet will generate two files for each run. The file name changes because of the execution timestamp.

import luigi
import time
from datetime import datetime
import schedule
class HelloWorld(luigi.Task):
name = luigi.Parameter()
now = luigi.Parameter()

def requires(self):
return None
def output(self):
return luigi.LocalTarget('helloworld_'+self.now+'.txt')
def run(self):
time.sleep(1)
with self.output().open('w') as outfile:
outfile.write('Hello World!\n')
outfile.write(self.name)
time.sleep(1)
class NameSubstituter(luigi.Task):
name = luigi.Parameter()
now = luigi.Parameter()
def requires(self):
return HelloWorld("My Name", self.now)
def output(self):
return luigi.LocalTarget(self.input().path + '.name_' + self.name+self.now)
def run(self):
time.sleep(1)
with self.input().open() as infile, self.output().open('w') as outfile:
text = infile.read()
text = text.replace('World', self.name)
outfile.write(text)
time.sleep(1)
def job():
print("I'm working...")
#luigi.run()
now = datetime.now() # time object
dt_string = now.strftime('%d_%m_%Y_%H_%M_%S')
luigi.build([NameSubstituter("Simba", dt_string)], workers=1, local_scheduler=True)
#luigi.build([HelloWorld()], workers=1, local_scheduler=True)
schedule.every(10).seconds.do(job)while True:
schedule.run_pending()
time.sleep(1)