BigQuery Dynamic SQL using Jinja Template
Dynamic SQL using Jinja templating language for Python
Creating hand-crafted SQL is cumbersome and time consuming. Therefore, with an effective automation process, we can overcome these challenges. Jinja template provides the attributes and parameters to dynamically generate SQL queries and greatly saves time and effort. This is one of the optimal techniques adapted by organizations to improve accuracy of SQL queries, and reduce IT operational dependency.
You can perform the following steps to achieve this automation.
- Step 1: Create a Configuration file
- Step 2: Create a Jinja Template and Macro
- Step 3: Python Source Code
Configuration File
The configuration file (config.yaml) comprises of input paraments required to generate dynamic SQL and this is the only file that needs to be customized. Following example depicts the creation of a single BigQuery view, but you can create multiple views in a one-go by extending input parameters in the configuration file.
- viewName: mydataset.vw_job_profile
baseTables:
- mydataset.emp_profile
- mydataset.emp_department
- mydataset.emp_payroll
joinKey: emp_id
filterConditions:
- emp_profile.management_level > 87
- emp_department.dept_name = 'IT'
- emp_payroll.year = 2020
excludeColumns:
- insert_timestamp
- update_timestamp
Jinja Template and Macros
The Jinja template (sql_template.sql) is used to create a BigQuery view dynamically. The template contains variables/expressions, which get replaced with values when a template is rendered.
{% from 'macro.sql' import macro_join_condition %}
{% from 'macro.sql' import macro_filter_condition %}CREATE OR REPLACE VIEW {{ params.viewName }}
AS
SELECT
{{ params.viewColumnList|join(',') }}
FROM
{{ params.baseTables|join(',') }}
WHERE
{{ macro_filter_condition(params.filterConditions) }}
{% if params.tableCount > 1 %}
{{ macro_join_condition(params.tableL, params.tableR, params.joinKey)}}
{% endif %}
Jinja macros used in this solution for designing reusable SQL statements. The {% macro %} tag allows the definition of reusable content snippets across templates.
{% macro macro_join_condition(l_tbl, r_tbls, column) %}
{% for r_tbl in r_tbls %}
AND {{ l_tbl }}.{{ column }} = {{ r_tbl }}.{{ column }}
{% endfor %}
{% endmacro %}{% macro macro_filter_condition(filters) %}
{% for tbl_filter in filters %}
{% if loop.first %}
{{ tbl_filter }}
{% else %}
AND {{ tbl_filter }}
{% endif %}
{% endfor %}
{% endmacro %}
Source Code
The source code used for this solution performs multiple tasks, such as reading the configuration file, fetching metadata from BigQuery, and rendering Jinja SQL template with parameters.
Source Code (main.py)
import yaml
from jinja2 import Environment, FileSystemLoader
from google.cloud import bigqueryconfigList = yaml.safe_load(open("config.yaml"))
env = Environment(loader=FileSystemLoader("./"))
sqlTemplate = env.get_template('sql_template.sql')for tableConf in configList:
viewName = tableConf['viewName']
baseTables = tableConf['baseTables']
joinKey = tableConf['joinKey']
filterConditions = tableConf['filterConditions']
excludeColumns = tableConf['excludeColumns'] tableColumnList = getTableColumns(baseTables)
viewColumnList = getViewColumns(tableColumnList, excludeColumns) params = {
'viewName': viewName,
'baseTables': baseTables,
'viewColumnList': viewColumnList,
'filterConditions': filterConditions,
'tableCount': len(baseTables)
} if len(baseTables) > 1:
tableL, tableR = getTableJoin(baseTables)
params['tableL'] = tableL
params['tableR'] = tableR
params['joinKey'] = joinKey sqlQuery = sqlTemplate.render(params=params)
runSql(sqlQuery)
Source Code (User-defined modules)
def getTableColumns(baseTables):
client = bigquery.Client()
columnList = []
for tables in baseTables:
datasetName = tables.split(".")[0]
tableName = tables.split(".")[1]
tableRef = client.dataset(datasetName).table(tableName)
table = client.get_table(tableRef)
for column in table.schema:
if column.name not in str(columnList):
columnList.append(tableName + "." + column.name)
return columnListdef getViewColumns(columnList, excludeColumns):
viewColumns = set(columnList).difference(set(excludeColumns))
viewColumnList = sorted(list(viewColumns))
return viewColumnListdef getTableJoin(baseTables):
columnList = []
for tables in baseTables[1:]:
tableName = tables.split(".")[1]
columnList.append(tableName) return baseTables[0].split(".")[1], columnListdef runSql(sqlQuery):
client = bigquery.Client()
sqlJob = client.query(sqlQuery)
sqlJob.result()
Source Code Repository
https://github.com/soumendra-mishra/jinja-sql-template.git
How to Use?
- Create BigQuery dataset and base tables required for dynamic view(s)
- Download source code from GitHub repository
- Edit configuration file (config.yaml) and change BigQuery dataset, table, join key, filtering conditions and excluded columns
- Run main.py file