BigQuery Dynamic SQL using Jinja Template

Dynamic SQL using Jinja templating language for Python

Soumendra Mishra
Google Cloud - Community

--

Creating hand-crafted SQL is cumbersome and time consuming. Therefore, with an effective automation process, we can overcome these challenges. Jinja template provides the attributes and parameters to dynamically generate SQL queries and greatly saves time and effort. This is one of the optimal techniques adapted by organizations to improve accuracy of SQL queries, and reduce IT operational dependency.

You can perform the following steps to achieve this automation.

  1. Step 1: Create a Configuration file
  2. Step 2: Create a Jinja Template and Macro
  3. Step 3: Python Source Code

Configuration File

The configuration file (config.yaml) comprises of input paraments required to generate dynamic SQL and this is the only file that needs to be customized. Following example depicts the creation of a single BigQuery view, but you can create multiple views in a one-go by extending input parameters in the configuration file.

- viewName: mydataset.vw_job_profile
baseTables:
- mydataset.emp_profile
- mydataset.emp_department
- mydataset.emp_payroll
joinKey: emp_id
filterConditions:
- emp_profile.management_level > 87
- emp_department.dept_name = 'IT'
- emp_payroll.year = 2020
excludeColumns:
- insert_timestamp
- update_timestamp

Jinja Template and Macros

The Jinja template (sql_template.sql) is used to create a BigQuery view dynamically. The template contains variables/expressions, which get replaced with values when a template is rendered.

{% from 'macro.sql' import macro_join_condition %}
{% from 'macro.sql' import macro_filter_condition %}
CREATE OR REPLACE VIEW {{ params.viewName }}
AS
SELECT
{{ params.viewColumnList|join(',') }}
FROM
{{ params.baseTables|join(',') }}
WHERE
{{ macro_filter_condition(params.filterConditions) }}
{% if params.tableCount > 1 %}
{{ macro_join_condition(params.tableL, params.tableR, params.joinKey)}}
{% endif %}

Jinja macros used in this solution for designing reusable SQL statements. The {% macro %} tag allows the definition of reusable content snippets across templates.

{% macro macro_join_condition(l_tbl, r_tbls, column) %}
{% for r_tbl in r_tbls %}
AND {{ l_tbl }}.{{ column }} = {{ r_tbl }}.{{ column }}
{% endfor %}
{% endmacro %}
{% macro macro_filter_condition(filters) %}
{% for tbl_filter in filters %}
{% if loop.first %}
{{ tbl_filter }}
{% else %}
AND {{ tbl_filter }}
{% endif %}
{% endfor %}
{% endmacro %}

Source Code

The source code used for this solution performs multiple tasks, such as reading the configuration file, fetching metadata from BigQuery, and rendering Jinja SQL template with parameters.

Source Code (main.py)

import yaml
from jinja2 import Environment, FileSystemLoader
from google.cloud import bigquery
configList = yaml.safe_load(open("config.yaml"))
env = Environment(loader=FileSystemLoader("./"))
sqlTemplate = env.get_template('sql_template.sql')
for tableConf in configList:
viewName = tableConf['viewName']
baseTables = tableConf['baseTables']
joinKey = tableConf['joinKey']
filterConditions = tableConf['filterConditions']
excludeColumns = tableConf['excludeColumns']
tableColumnList = getTableColumns(baseTables)
viewColumnList = getViewColumns(tableColumnList, excludeColumns)
params = {
'viewName': viewName,
'baseTables': baseTables,
'viewColumnList': viewColumnList,
'filterConditions': filterConditions,
'tableCount': len(baseTables)
}
if len(baseTables) > 1:
tableL, tableR = getTableJoin(baseTables)
params['tableL'] = tableL
params['tableR'] = tableR
params['joinKey'] = joinKey
sqlQuery = sqlTemplate.render(params=params)
runSql(sqlQuery)

Source Code (User-defined modules)

def getTableColumns(baseTables):
client = bigquery.Client()
columnList = []
for tables in baseTables:
datasetName = tables.split(".")[0]
tableName = tables.split(".")[1]
tableRef = client.dataset(datasetName).table(tableName)
table = client.get_table(tableRef)
for column in table.schema:
if column.name not in str(columnList):
columnList.append(tableName + "." + column.name)
return columnList
def getViewColumns(columnList, excludeColumns):
viewColumns = set(columnList).difference(set(excludeColumns))
viewColumnList = sorted(list(viewColumns))
return viewColumnList
def getTableJoin(baseTables):
columnList = []
for tables in baseTables[1:]:
tableName = tables.split(".")[1]
columnList.append(tableName)
return baseTables[0].split(".")[1], columnListdef runSql(sqlQuery):
client = bigquery.Client()
sqlJob = client.query(sqlQuery)
sqlJob.result()

Source Code Repository

https://github.com/soumendra-mishra/jinja-sql-template.git

How to Use?

  1. Create BigQuery dataset and base tables required for dynamic view(s)
  2. Download source code from GitHub repository
  3. Edit configuration file (config.yaml) and change BigQuery dataset, table, join key, filtering conditions and excluded columns
  4. Run main.py file

--

--

Soumendra Mishra
Google Cloud - Community

Passionate Leader, Technology Enthusiast, Innovator, and Mentor