Procedural SQL Extension in the Lakehouse Platform — New Capabilities for Data Work
This article describes the implementation of a procedural extension for working with the MPP SQL engine of the Alphyn Lakehouse data platform, which has now become available to users.
It covers the capabilities, applicability, and use cases of a procedural language in an analytical data platform.
Motivation
The presence of a procedural extension in legacy DBMS systems such as Oracle, MS SQL Server, or Postgres/Greenplum is a fairly common reason for not considering Lakehouse solutions when choosing a new analytical data warehouse system. Not everyone is mentally prepared to accept that procedural SQL code built and used for years at enterprise scale will suddenly become unavailable. All accumulated expertise and experience would be reset to zero, requiring a fresh start — building new pipeline execution and data processing using tools like Airflow and dbt, or other analogues that allow implementing conditional operators or SQL code generation.
Frankly, I personally consider that approach correct: use platform-independent orchestration and generation tools like Airflow, especially when building a new framework with minimal coupling to yet another SQL dialect. If you decide to change technologies again in a few years, it will be much simpler and faster.
At the same time, it is clear that developing a new framework requires significant time, resources, and knowledge in a new technical domain. Moreover, even after transitioning to an Airflow/dbt stack, some of the functionality available in procedural extensions will still remain unachievable. And the biggest gap, according to users themselves, is the ability to return a result dataset from procedural code back to the client side — that capability simply does not exist in the orchestration-based approach.
Imagine you have dozens or even hundreds of services in a legacy consumer system that reads data from a corporate data warehouse, all working on the following principle:
- The client side calls a procedure with a set of session variables;
- Inside the procedure, executable code is generated depending on the input variables;
- The SQL code performs DML operations — selecting and calculating data, creating temporary objects, updating permanent database objects as the result of all manipulations;
- A resulting SELECT/CURSOR on a permanent database object is returned to the client side.
This is exactly the situation our users faced, and the problem was compounded by the fact that the owners of the consumer services could not or were not willing to change the functionality that had been stable for years on their side.
It seemed that with the release of Spark 4 and its inclusion in the data platform, a procedural extension had arrived — here was the answer. But on the other hand, Spark is not really about BI access with fast response times and high throughput.
In the end, we had no choice but to implement a procedural extension for the platform's MPP SQL engine.
Capabilities of the Alphyn Lakehouse Procedural SQL (LPSQL) Extension
Note: Information about current capabilities is accurate as of the publication date.
The first release of the procedural extension provides the following functionality:
- Saving stored procedures in the system metastore for subsequent invocation
- Parameterized execution of stored procedures (input parameters)
- Working with variables: declaration, assignment of values, including from SQL queries
- Dynamic SQL via the EXECUTE construct
- Numeric FOR LOOP and WHILE END loops
- Working with explicit and implicit cursors
- Returning variables from a procedure to the client side
- Returning a dataset via SELECT from a procedure to the client side
- Special functions for working with the data dictionary for use in generating dynamic queries
- Conditional operators
- Exception handling
Usage Examples
Let's see how this looks in practice and walk through several examples demonstrating the main capabilities.
CREATE PROCEDURE demo_proc(IN X INT)
--PRC Example 1
--Declare vars
DECLARE row_nums INT;
BEGIN
--Set up session parameters
EXECUTE IMMEDIATE 'set mem_limit = 100m';
EXECUTE IMMEDIATE 'set mt_dop = 2';
--ReCreate temporary table
DROP TABLE IF EXISTS default.temp_table;
CREATE TABLE default.temp_table (
product_id_int BIGINT,
product_id_char STRING,
name STRING
);
--Create loop with counter
FOR i IN 1..X LOOP
--lets do some data transformation and multiply rows according to the counter
INSERT INTO default.temp_table
SELECT * FROM default.product;
END LOOP;
--Lets count total number of multiplied rows and put it into the variable
SELECT COUNT(*) INTO row_nums FROM default.temp_table;
--Return total number of rows from the procedure
SELECT row_nums as Result;
END;
To validate and save a procedure, you currently need to use the LPSQL command with the procedure text enclosed in quotation marks:
LPSQL "text of procedure";
Calling a saved procedure is done with the CALL command:
LPSQL "CALL demo_proc(3);"
Let's look at a second example that prints a list of tables in a database. It uses a loop based not on an integer auto-increment counter, but on a cursor, and returns a dataset from a SELECT statement to the client side as the result.
CREATE PROCEDURE show_table_cursor()
-- Example of dynamic SQL query with a FOR LOOP
-- Iterates through the list of tables in a schema and writes the result to a table
BEGIN
DECLARE c1 CURSOR;
--ReCreate temp table
DROP TABLE IF EXISTS default.cursor_result_table;
CREATE TABLE default.cursor_result_table
(
TABLE_NAME STRING
) STORED AS PARQUET;
-- Cursor over results of SHOW TABLES IN default
FOR c1 IN ('SHOW TABLES IN default') LOOP
-- c.name — table name from SHOW TABLES result
EXECUTE IMMEDIATE
'INSERT INTO default.cursor_result_table VALUES("' || c1.name || '")';
END LOOP;
-- Output the result
SELECT * FROM default.cursor_result_table ORDER BY 1;
END;
Result verification — the output shows a list of all tables in the default schema, returned as a dataset to the client side.
Now let's implement an example with nested loops, returning cursor values into session variables, and using dynamic SQL with those variables.
-- Example of a nested cursor implementation
CREATE PROCEDURE create_runtime_dict()
BEGIN
DECLARE name STRING;
DECLARE Column STRING;
DECLARE Column_Type STRING;
DECLARE rn INT;
-- Drop and create result table
EXECUTE IMMEDIATE 'DROP TABLE IF EXISTS default.cursor_result_table';
EXECUTE IMMEDIATE 'CREATE TABLE default.cursor_result_table
(
TABLE_NAME STRING,
COLUMN_NAME STRING,
COLUMN_TYPE STRING,
DIST_ROWS INT
) STORED AS PARQUET';
-- Declare cursor via query string
DECLARE c1 CURSOR FOR 'SHOW TABLES IN default';
OPEN c1;
LOOP
FETCH c1 INTO name;
IF SQLCODE <> 0 THEN LEAVE;
END IF; -- Exit if data is exhausted
-- Nested cursor (dynamic)
BEGIN
DECLARE c2 CURSOR FOR 'SHOW COLUMN STATS default.' || name;
OPEN c2;
LOOP
FETCH c2 INTO Column, Column_Type, rn;
IF SQLCODE <> 0 THEN LEAVE; END IF;
EXECUTE IMMEDIATE 'INSERT INTO default.cursor_result_table VALUES (''' || name || ''', ''' || Column || ''', ''' || Column_Type || ''', '|| rn ||')';
END LOOP;
CLOSE c2;
END;
END LOOP;
CLOSE c1;
END;
END;
Now let's create another procedure with a nested call to the previous one. It returns a list of tables in the database that have a number of distinct values in any column greater than the input parameter.
CREATE PROCEDURE show_tables(n INT)
-- Example of nested call
BEGIN
--Drop view
EXECUTE 'DROP VIEW IF EXISTS default.V_show_table';
--Create session temp dictionary
CALL runtime_dict();
--Lets get list of tables with more than n distinct rows of any column
EXECUTE 'CREATE VIEW default.V_show_table AS SELECT DISTINCT table_name FROM default.cursor_result_table WHERE dist_rows >' || n;
SELECT * FROM default.V_show_table;
END;
Now let's verify the results and query tables with more than 10,000 distinct values — the procedure correctly filters and returns only the matching tables.
Enough abstract examples. It's time to move on to solving the most common task in a data warehouse: implementing a simple loader using the Slowly Changing Dimensions Type I (SCD1) scenario with procedural code.
CREATE OR REPLACE PROCEDURE scd1_load(schema_nm STRING, tabname STRING, PK STRING)
-- schema_nm - target table schema
-- tabname - target table
-- PK - primary key. Comma-separated list if the key is composite
BEGIN
DECLARE Column STRING;
DECLARE UPD_FLDS STRING; -- List of columns for update
DECLARE INS_FLDS STRING; -- List of columns for insert
DECLARE PK_FLDS STRING; -- Expression for primary key equality
DECLARE EQ_FLDS STRING; -- List of business fields for comparison
DECLARE MERGE_SQL STRING; -- Merge script for loading the target table
DECLARE buf = '';
PK_FLDS = '1=1';
counter = 1;
tar_tab = schema_nm || '.' || tabname;
int_tab = schema_nm || '.INT_' || tabname;
LOOP -- Build primary key equality expression in a loop
SELECT SPLIT_PART(PK, ',', counter) INTO buf;
IF buf = '' THEN LEAVE; END IF;
PK_FLDS = PK_FLDS || ' AND old.' || buf || ' = new.' || buf;
counter = counter + 1;
END LOOP;
-- Technical fields
DECLARE task_id_fld = 'tech_task_id'; -- Current load identifier
DECLARE del_fld = 'tech_deleted_flg'; -- Logical deletion flag
DECLARE chng_dttm_fld = 'tech_changed_dttm'; -- Record change date and time
DECLARE c1 CURSOR FOR 'SHOW COLUMN STATS ' || tar_tab;
OPEN c1;
LOOP
FETCH c1 INTO Column, Column_Type, rn;
IF SQLCODE <> 0 THEN LEAVE; END IF;
IF Column != task_id_fld AND Column != del_fld AND Column != chng_dttm_fld
THEN
INS_FLDS = INS_FLDS || '\tnew.' || Column || ', \n';
UPD_FLDS = UPD_FLDS || '\told.' || Column || ' = new.' || Column || ',\n';
EQ_FLDS = EQ_FLDS || '\told.' || Column || ' != new.' || Column || ' OR ';
END IF;
END LOOP;
MERGE_SQL = '
MERGE INTO ' || tar_tab || ' AS old
USING ' || int_tab || ' AS new
ON ' || PK_FLDS || '
WHEN MATCHED AND new.' || del_fld || ' != 1 AND (' || EQ_FLDS || ' 1 = 0)
THEN UPDATE SET
' || UPD_FLDS || '\told.' || task_id_fld || ' = CAST(UNIX_TIMESTAMP() AS INT),
\told.' || del_fld || ' = 0,
\told.' || chng_dttm_fld || ' = CURRENT_TIMESTAMP()
WHEN MATCHED AND new.' || del_fld || ' = 1 AND old.' || del_fld || ' = 0
THEN UPDATE SET
old.' || task_id_fld || ' = CAST(UNIX_TIMESTAMP() AS INT),
old.' || del_fld || ' = 1,
old.' || chng_dttm_fld || ' = CURRENT_TIMESTAMP()
WHEN NOT MATCHED BY SOURCE AND old.' || del_fld || ' = 0
THEN UPDATE SET
old.' || del_fld || ' = 1,
old.' || chng_dttm_fld || ' = CURRENT_TIMESTAMP(),
old.' || task_id_fld || ' = CAST(UNIX_TIMESTAMP() AS INT)
WHEN NOT MATCHED AND new.' || del_fld || ' != 1
THEN INSERT VALUES (
' || INS_FLDS || 'CURRENT_TIMESTAMP(),
CAST(UNIX_TIMESTAMP() AS INT),
new.' || del_fld || '
)
';
EXECUTE MERGE_SQL;
END;
Example call:
LPSQL "CALL scd1_load('DDS', 'ACCOUNT_TAB', 'id, fld1')";
Let's prepare one more example — a routine for collecting table statistics within a database. The procedure has two parameters: forced statistics recalculation or collection only for tables without existing statistics (never calculated before), and a choice between incremental and full recalculation. Incidentally, in our data platform we implemented incremental statistics collection for Iceberg tables, which allows significant savings in time and compute cluster resources.
CREATE OR REPLACE PROCEDURE stats_missing_proc(cur_schema STRING, proc_mode INT, inc_mode INT)
-- cur_schema - schema to check tables in
-- proc_mode - procedure mode: 0 = forced collection | 1 = only tables without statistics
-- inc_mode - 1 = incremental statistics | 0 = full statistics
BEGIN
DECLARE name STRING;
DECLARE clmn STRING;
DECLARE tp STRING;
DECLARE dv INT;
DECLARE nlls INT;
-- Technical table
CREATE TABLE IF NOT EXISTS default.tech_compactor_proc
(
tabname STRING,
got_stats INT
)
STORED AS PARQUET;
-- Logging table
CREATE TABLE IF NOT EXISTS default.tech_compactor_logs
(
tabname STRING,
query STRING,
dttm TIMESTAMP
)
STORED AS PARQUET;
TRUNCATE TABLE default.tech_compactor_proc;
TRUNCATE TABLE default.tech_compactor_logs;
-- Open cursor to iterate over all objects in the schema
DECLARE c1 CURSOR FOR 'SHOW TABLES IN ' || cur_schema;
OPEN c1;
LOOP
FETCH c1 INTO name;
IF SQLCODE <> 0 THEN LEAVE;
END IF; -- Exit if data is exhausted
-- Open nested cursor to get statistics for each object
DECLARE c2 CURSOR FOR 'SHOW COLUMN STATS ' || cur_schema || '.' || name;
OPEN c2;
LOOP
FETCH c2 INTO clmn, tp, dv, nlls;
IF SQLCODE <> 0 THEN LEAVE;
END IF;
IF (dv < 0 AND proc_mode = 1) OR (proc_mode = 0) THEN
EXECUTE IMMEDIATE 'INSERT INTO default.tech_compactor_proc SELECT ''' || name || ''', ' || dv || '';
END IF;
END LOOP;
END LOOP;
-- Exception handling in case the schema contains views
EXCEPTION WHEN OTHERS THEN
END;
-- Full statistics collection
IF inc_mode = 0 THEN
DECLARE c1 CURSOR FOR 'SELECT DISTINCT tabname FROM default.tech_compactor_proc';
OPEN c1;
LOOP
FETCH c1 INTO name;
IF SQLCODE <> 0 THEN LEAVE;
END IF; -- Exit when done
EXECUTE IMMEDIATE 'COMPUTE STATS ' || cur_schema || '.' || name;
EXECUTE IMMEDIATE 'INSERT INTO default.tech_compactor_logs(tabname, query, dttm) SELECT ''' || cur_schema || '.' || name || ''', ''COMPUTE STATS ' || cur_schema || '.' || name || ''', NOW()';
END LOOP;
END IF;
-- Incremental statistics collection
IF inc_mode = 1 THEN
DECLARE c1 CURSOR FOR 'SELECT DISTINCT tabname FROM default.tech_compactor_proc';
OPEN c1;
LOOP
FETCH c1 INTO name;
IF SQLCODE <> 0 THEN LEAVE;
END IF; -- Exit when done
EXECUTE IMMEDIATE 'COMPUTE INCREMENTAL STATS ' || cur_schema || '.' || name;
EXECUTE IMMEDIATE 'INSERT INTO default.tech_compactor_logs(tabname, query, dttm) SELECT ''' || cur_schema || '.' || name || ''', ''COMPUTE INCREMENTAL STATS ' || cur_schema || '.' || name || ''', NOW()';
END LOOP;
END IF;
END;
Conclusion
As you can see, the procedural extension offers extensive capabilities for independently building auxiliary user solutions with a low barrier to entry — especially for those migrating their expertise from traditional computing systems.
The next production release of the platform will expand procedural code support to additional processing engines. We want end users to have a choice of execution engine without needing to rewrite the procedure code itself, as long as the SQL operators it contains are universal. We also plan to:
- Expand compatibility with popular procedural dialects T-SQL and PL/SQL
- Make the metastore API available for direct work with the data dictionary from engines inside procedures
- Implement an analogue of the
ALL_SOURCESview familiar to all Oracle users - Add output of informational messages to the OUTPUT log
In parallel, we are conducting research and development on a query transpiler from Postgres and Greenplum to Alphyn Lakehouse SQL engines. The goal is not just to ease the migration of functionality into the platform, but to make migration unnecessary — so that client applications require no changes on their side, and they effectively "think" they are still sending queries to and receiving responses from Greenplum or Postgres.