Using Airflow to replace cron on your home network

Making a backup with Git on my developer server from my Airflow server, using the SSHOperator. Look ma, no cron!

I love cron. I mean it, is powerful enough to automate workloads and before Ansible, Nomad jobs, K8s jobs it was good old cron showing up to the party…

But besides issues like the lack of pipelines, not being distributed it is also an issue. So for the sake of argument, let me show you a snippet of a few crob jobs I have:

# Security scan every Friday at Midnight
0 0 * * 5 $HOME/bin/urlscan_batch.sh > $HOME/logs/urlscan_batch.log 2>&1
0 0 * * 5 $HOME/bin/wpscan_batch.sh > $HOME/logs/wpscan_batch.log 2>&1
0 0 * * 5 $HOME/bin/nikto_batch.sh > $HOME/logs/nikto_batch.log 2>&1

Above is an example of some scripts I run on one of my machines using cron. In order to monitor the jobs I get an email or an alert in Telegram (using a homemade bot). But if something fails completely I need to log in and check the logs. Not the end of the world but slowly I’m getting more and more workloads added.

Also this is only on this machine. I have a few small computers sitting around, wouldn’t be nice to have a central location to set them up and monitor their status?

Ah, now I’m dreaming big. What about being able to run on restricted hardware?

# Yeah, late 2006 and still kicking strong with Linux :-)
description: Low Profile Desktop Computer
product: Macmini2,1 (System SKUNumber)
vendor: Apple Inc.
version: 1.0
width: 64 bits
capabilities: smbios-2.4 dmi-2.4 smp vsyscall32
CPU: Intel(R) Core(TM)2 CPU T5600 @ 1.83GHz
*-memory
description: System Memory
physical id: 11f
slot: System board or motherboard
size: 2GiB
Linux macmini2 5.6.13-100.fc30.x86_64 #1 SMP Fri May 15 00:36:06 UTC 2020 x86_64 x86_64 x86_64 GNU/Linux

Yeah, Linux on Mac hardware rocks.

There is a tool called ‘Apache Airflow’ (with a shinning 2.0.1 version at the time of this writing), that passes all these checks. But first a a semi-formal introduction:

Let me show you how I migrated a few cron jobs workflows.

NOTE: This installation will run under my username, josevnz. I do not need root nor a system account to perform any of this tasks.

Installing Airflow, bare metal

You can see the duration all your jobs once they start to run

PostgreSQL

Follow this Fedora specific steps to get some dependencies installed. In particular I want to be able to run more than one task at the time and the executor that works with SqlLite cannot do that:

sudo dnf install -y postgresql.x86_64
sudo dnf install -y postgresql-server.x86_64
sudo postgresql-setup --initdb --unit postgresql

Time to enable remote networked access to the database

# sudo vi /var/lib/pgsql/data/pg_hba.conf
# Access from home network
host airflow_db airflow_user 192.168.1.0/24 md5

And…

# sudo vi /var/lib/pgsql/data/postgresql.conf
listen_addresses = '*' # what IP address(es) to listen on;

Then start PostgreSQL (OK, fine, we need root for this step :-))

sudo systemctl enable postgresql --now

And finally create the airflow database user

[josevnz@macmini2 airflow]$ sudo -u postgres -i
[postgres@macmini2 ~]$ psql
psql (11.7)
Type "help" for help.
postgres=# CREATE USER airflow_user WITH PASSWORD 'XXXX';
postgres=# CREATE DATABASE airflow_db;

Test it from a different host:

psql --host macmini2 --username airflow_user --password airflow_db
Password:
psql (12.6, server 11.7)
Type "help" for help.
airflow_db=> quit

Easy huh? But I’m not here to run database just for the sake of it, so let’s keep moving…

Airflow using virtual environment and PIP

No root? No problem!

Now than the PostgreSQL database is setup I can focus on getting Airflow up and running using PIP

python3 -m venv ~/virtualenv/airflow
. ~/virtualenv/airflow/bin/activate
/bin/mkdir -p /home/josevnz/airflow
(airflow) [josevnz@macmini2 airflow]$ cat .airflow
export AIRFLOW_HOME=/home/josevnz/airflow
export AIRFLOW_VERSION=2.0.1
export PYTHON_VERSION="$(/usr/bin/python3 --version | /bin/cut -d " " -f 2| /usr/bin/cut -d "." -f 1-2)"
export CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
export AIRFLOW__CORE__SQL_ALCHEMY_CONN="postgresql+psycopg2://airflow_user:XXXX@macmini/airflow_db"
. .airflow
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
pip install psycopg2-binary
airflow db init
...
[2021-04-09 22:44:48,157] {dag.py:2273} INFO - Setting next_dagrun for example_subdag_operator.section-2 to None
Initialization done

Database was initialized, create now the admin user

(airflow) [josevnz@macmini2 airflow]$ airflow users create --username admin --firstname Jose --lastname Nunez --role Admin --email YYYY 
Password:
Repeat for confirmation:
Admin user admin created

I want to make sure I can send emails from my Airflow server, so I test that first using Python + SMTP (you can use one of my scripts for that. Make sure you check your ISP provider instructors to figure out your mail exchanger, allowed outgoing ports, etc.),:

[josevnz@macmini2 ~]$ ~/bin/test_smtp.py --relay smtpout.secureserver.net --fromemail XXX@domain.com --to YYY@anotherdomain.com This is a test
Type your password and press enter:
send: 'mail FROM:<XXX@domain.com> size=199\r\n'
reply: b'250 <XXX@domain.com> sender ok\r\n'
reply: retcode (250); Msg: b'<XXX@domain.com> sender ok'
send: 'rcpt TO:<YYY@anotherdomain.com>\r\n'
reply: b'250 <YYY@anotherdomain.com> recipient ok\r\n'
reply: retcode (250); Msg: b'<YYY@anotherdomain.com> recipient ok'
...

Good. Let’s make sure this option is part of the $AIRFLOW_HOME/airflow.cfg file then. Also will change the executor from ‘SequentialExecutor’ to ‘LocalExecutor’, to have proper support of parallel tasks (possible now than we are using PostgreSQL as the back-end instead of SQLLite)

[core]
executor = LocalExecutor
sql_alchemy_conn = postgresql+psycopg2://airflow_user:XXXX@192.168.1.16/airflow_db
...
[smtp]
smtp_host = smtpout.XXX.net
smtp_starttls = False
smtp_ssl = True
smtp_user = user@domain.com
smtp_password = ZZZZ
smtp_port = 465
smtp_mail_from = user@domain.com
smtp_timeout = 30
smtp_retry_limit = 5

Run the web server and the job scheduler as daemons

(airflow) [josevnz@macmini2 airflow]$ airflow webserver --daemon --port 8080
(airflow) [josevnz@macmini2 airflow]$ airflow scheduler --daemon

This is great. But could installation be easier if I use docker image? After all, that’s the whole point of containers for a development environment, right?

What about Docker? (Why I did not use Docker compose)

Instructions are much easier to follow using the ‘Airflow Docker’ and ‘Docker compose’. Also have the advantage I you remove the images if something goes wrong, start from scratch:

So I downloaded the image and tested the plumbing:

docker run --interactive --tty  apache/airflow:2.0.1 airflow config get-value core sql_alchemy_conn
qlite:////opt/airflow/airflow.db
# Set the URL to something like postgresql+psycopg2://<user>:<password>@<host>/<db>export AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow_user:XXXX@airflow_psql/airflow_db/usr/bin/docker run --link airflow_psql --interactive --tty -e AIRFLOW__CORE__SQL_ALCHEMY_CONN=$AIRFLOW__CORE__SQL_ALCHEMY_CONN apache/airflow:2.0.1 airflow config get-value core sql_alchemy_conn
BACKEND=postgresql+psycopg2
DB_HOST=airflow_psql
DB_PORT=5432

Initialized the database

[josevnz@macmini2 airflow]$ /usr/bin/docker run --link airflow_psql --interactive --tty -e AIRFLOW__CORE__SQL_ALCHEMY_CONN=$AIRFLOW__CORE__SQL_ALCHEMY_CONN apache/airflow:2.0.1 airflow db init
BACKEND=postgresql+psycopg2
DB_HOST=airflow_psql
DB_PORT=5432
DB: postgresql+psycopg2://airflow_user:***@airflow_psql/airflow_db
[2021-04-10 00:18:03,149] {db.py:674} INFO - Creating tables
INFO [alembic.runtime.migration] Context impl PostgresqlImpl.
INFO [alembic.runtime.migration] Will assume transactional DDL.

Good so far. Now time to create the admin user

export AIRFLOW_UID=$(/bin/id --user)
export AIRFLOW_GID=$(/bin/id --group)
usr/bin/docker run --user $AIRFLOW_UID:$AIRFLOW_GID --link airflow_psql --interactive --tty -e AIRFLOW__CORE__SQL_ALCHEMY_CONN=$AIRFLOW__CORE__SQL_ALCHEMY_CONN --volume /home/josevnz/airflow/dags:/opt/airflow/dags:rw --volume /home/josevnz/airflow/logs:/opt/airflow/logs --volume /home/josevnz/airflow/plugins:/opt/airflow/plugins:rw apache/airflow:2.0.1 airflow users create --username admin --firstname Jose --lastname Nunez --role Admin --email XXX@domain.com

But then I hit a bug related with some internal libraries used by Airflow running in old hardware. Remember, at the beginning of the article I wanted to use an old machine to run Airflow. It turns out my MacMini 2 is very Very VERY old hardware.

[josevnz@macmini2 airflow]$ /usr/bin/docker run --name airflow_scheduler --user $AIRFLOW_UID:$AIRFLOW_GID --link airflow_psql --detach -e AIRFLOW__CORE__SQL_ALCHEMY_CONN=$AIRFLOW__CORE__SQL_ALCHEMY_CONN --volume /home/josevnz/airflow/dags:/opt/airflow/dags:rw --volume /home/josevnz/airflow/logs:/opt/airflow/logs --volume /home/josevnz/airflow/plugins:/opt/airflow/plugins:rw --publish 8080:8080 apache/airflow:2.0.1 airflow scheduler
fda84fa26a8b68d30e8db759a767555a7526e86105a123e467bcc9d03ba90a59
[josevnz@macmini2 airflow]$ docker logs airflow_scheduler
BACKEND=postgresql+psycopg2
DB_HOST=airflow_psql
DB_PORT=5432
Traceback (most recent call last):
File "/home/airflow/.local/bin/airflow", line 5, in <module>
from airflow.__main__ import main
ModuleNotFoundError: No module named 'airflow'
[josevnz@macmini2 airflow]$ docker rm airflow_scheduler
airflow_scheduler

I re-tested this on a more recent/ powerful hardware and it worked as advertised. So there is no point for me to go the Docker way, specially if the bare-metal option worked well. I could have tried an older version of the Docker image but that will limit my ability to perform upgrades.

So bye bye Docker on my old MacMini2… (for this at least)

Migrating the workflow from Cron to Python

If you like Python you will feel at home. Also you don’t need to give away your cron syntax. Also you can tag your jobs so you can filter them in the GUI.

I will not cover the basics of Airflow DAG execution or task writing here, but will show you how the equivalent cron jobs look in Python (I saved the dag as $AIRFLOW_HOME/dags/blog_security.py) and created another virtual environment on separate machine, where I can run PyCharm to edit my code.

NOTE 1: Please note the extra space after the ‘bash command’. It has to do how Airflow handles script template parsing with Jinja.

NOTE 2: I installed the Kubernetes module dependency to silence and annoying warning in my development Airflow environment

# Not really required, But I also want to study the example K8s tasks provided with Airflow.
pip install apache-airflow['cncf.kubernetes']

Here is the code. It is very self explanatory if you used Python before; Most of the environmental constraints are defined in the DAG and then you get busy writing tasks and making relationships between them

"""
# Tasks to run several security scans against my websites
## Replacing the following cron jobs
# Security scan every Friday at Midnight
0 0 * * 5 $HOME/bin/urlscan_batch.sh > $HOME/logs/urlscan_batch.log 2>&1
0 0 * * 5 $HOME/bin/wpscan_batch.sh > $HOME/logs/wpscan_batch.log 2>&1
0 0 * * 5 $HOME/bin/nikto_batch.sh > $HOME/logs/nikto_batch.log 2>&1
"""
import os
from pathlib import Path
from datetime import timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
BIN = os.path.join(str(Path.home()), 'bin')default_args = {
'owner': 'josevnz',
'depends_on_past': False,
'email': ['kodegeek.com@protonmail.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
'queue': 'security_queue'
}
security_dag = DAG(
'website_security',
default_args=default_args,
description='Security scans for Jose websites',
schedule_interval='0 0 * * 5',
start_date=days_ago(2),
tags=['security', 'websites'],
)
security_dag.doc_md = __doc__
urlscan = BashOperator(
task_id='urlscan',
depends_on_past=False,
bash_command=os.path.join(BIN, 'urlscan_batch.sh '),
dag=security_dag,
sla=timedelta(hours=1.5)
)
urlscan.doc_md = """\
#### Check websites using URLSCAN
[urlscan.io is a free service to scan and analyse websites.](https://urlscan.io/)
"""
wpscan = BashOperator(
task_id='wpscan',
depends_on_past=False,
bash_command=os.path.join(BIN, 'wpscan_batch.sh '),
dag=security_dag,
sla=timedelta(hours=11)
)
wpscan.doc_md = """\
#### Check websites using WPScan
* [WPScan](https://wpscan.com/)
* This scan is VERY slow, takes around 10 hours to complete
"""
nikto = BashOperator(
task_id='nikto',
depends_on_past=False,
bash_command=os.path.join(BIN, 'nikto_batch.sh '),
dag=security_dag,
sla=timedelta(minutes=30)
)
nikto.doc_md = """\
#### Check websites using NIKTO
[Nikto is an Open Source (GPL) web server scanner](https://www.cirt.net/Nikto2)
"""
# The 3 tasks below will run in parallel at the designated time
urlscan
wpscan
nikto

So now let’s do a few tests in the new code

# Initialized the database -- no op if you did before
airflow db init
...
# Make sure the website_security dag is in there
airflow dags list|grep website
website_security | blog_security.py | josevnz | True
# What tasks are in the websecurity_dag
josevnz > airflow tasks list website_security --tree
[2021-04-10 20:47:21,852] {dagbag.py:448} INFO - Filling up the DagBag from /home/josevnz/Documents/airflow/dags
[2021-04-10 20:47:21,875]
<Task(BashOperator): urlscan>
<Task(BashOperator): wpscan>
<Task(BashOperator): nikto>

So the Dag looks decent. Let’s run one of the tasks using Airflow CLI

airflow tasks test website_security urlscan 2021-04-11
# Output showing the task running ...
...

Really like this!. Infrastructure as code, you can save this in Git, make improvements and re-deploy. Possible with cron and Ansible too but Airflow framework gives you much more than the ‘BashOperator’ (please check the documentation to see the full operators list).

Also because is written in Python, you can create your own operators.

Extra: How to start Airflow automatically using a systemd unit.

I do not want to use cron to start Airflow if the machine gets rebooted (with the @reboot tag) and I definitely do not want to write a custom script to do this. So let’s do it the right way, using systemd units.

Airflow already has a bunch of systed units but I will need to tweak them as they are not exactly what I need for a non-privileged user. Also keep in mind that I will be running Airflow under my user ‘josevnz’, so no need to do a system wide systemd unit installation.

But before that, a small tweat to ensure systemd works well with my personal user.

Make sure my resources do not get cleaned up when the last session exists or I can start services at boot without being logged in

So I enable session lingering for josevnz

loginctl enable-linger josevnz

That’s it, painless. Then we need to create a common environment file so the virtual python environment gets created property:

[josevnz@macmini2 ~]$ cat /home/josevnz/airflow/.airflow
AIRFLOW_HOME="/home/josevnz/airflow"
AIRFLOW_VERSION="2.0.1"
PYTHON_VERSION="3.7"
# Systemd environment files do not expand variables so make sure you do
PATH="/home/josevnz/virtualenv/airflow/bin:/home/josevnz/virtualenv/airflow/bin:/home/josevnz/.local/bin:/home/josevnz/bin:/usr/local/bin:/usr/bin:/usr/local/sbin:/usr/sbin"VIRTUAL_ENV="/home/josevnz/virtualenv/airflow"

Then create the airflow_webserver.service systemd unit (systemctl --user start airflow-webserver.service)

#
# Systemd unit to allow running Airflow webserver as user josevnz
# Intended to be used on a small home network
# Please see the following (original file)
# https://github.com/apache/airflow/blob/master/scripts/systemd/airflow-webserver.service
# Make sure you run: loginctl enable-linger josevnz
# Created with: systemctl --user edit --full --force airflow-webserver.service
#
Unit]
Description=Airflow webserver daemon for josevnz
After=network.target postgresql.service
Wants=postgresql.service
[Service]
EnvironmentFile=/home/josevnz/airflow/.airflow
Type=simple
WorkingDirectory=/home/josevnz/airflow
ExecStart=/bin/bash -c '. /home/josevnz/virtualenv/airflow/bin/activate && airflow webserver --port 8080 --pid /home/josevnz/airflow/webserver.pid'
Restart=on-failure
RestartSec=5s
PrivateTmp=true
[Install]
WantedBy=multi-user.target

And do the same for the airflow_scheduler (systemctl --user edit --full --force airflow-scheduler.service):

#
# Systemd unit to allow running Airflow webserver as user josevnz
# Intended to be used on a small home network
# Please see the following (original file)
# https://github.com/apache/airflow/blob/master/scripts/systemd/airflow-scheduler.service
# Make sure you run: loginctl enable-linger josevnz
# Created with: systemctl --user edit --full --force airflow-scheduler.service
#
[Unit]
Description=Airflow scheduler daemon for josevnz
After=network.target postgresql.service
Wants=postgresql.service
[Service]
EnvironmentFile=/home/josevnz/airflow/.airflow
Type=simple
ExecStart=/bin/bash -c '. /home/josevnz/virtualenv/airflow/bin/activate && airflow scheduler --pid /home/josevnz/airflow/scheduler.pid'
Restart=on-failure
RestartSec=5s
PrivateTmp=true
[Install]
WantedBy=multi-user.target

Note than I’m not using the Celery or Kubernet executor, so I did not bother creating a worker systemd unit. Will see later if my simple home setup needs that.

Time to bring the services up:

[josevnz@macmini2 ~]$ systemctl --user enable airflow-webserver.service --now
[josevnz@macmini2 ~]$ systemctl --user enable airflow-scheduler.service --now

You can check what’s going on with (also please fire your browser and go to http://$yourserver:8080. Remember the user and password you setup on the previous step, right :-)):

journalctl --user --follow

Epilogue

You still have your logs. For all your runs. As detailed as before

The initial setup of Airflow to manage to your home jobs is definitely more complex than setting simple cron jobs; But after the initial boilerplate code is done you will get a extensible and easy to use tool to scheduled workflows across your machines. Also if you write Python scripts you will feel at home.

It is very good news than you can use Airflow on low resources machines as opposed to more hungry alternatives.

So what is next? Well I’m planning to monitor my Airflow instance with Grafana and statsd, among other things. Also I did implement a few tasks using the SSHOperator (it uses Python Paramiko under the hood) but that will be the topic of another article.

Please clap if you like this article! I also want to know what you think, so please leave me a message in the comments section.

🇻🇪 🇺🇸, proud dad and husband, DevOps and sysadmin, recreational runner and geek.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store