Back to Blog
Product12 min read

Procedural SQL Extension in the Lakehouse Platform — New Capabilities for Data Work

LPSQL brings stored procedures, cursors, dynamic SQL, and SCD1 loading to the lakehouse — enabling migration from Oracle, SQL Server, and PostgreSQL without rewriting procedural logic.

Alphyn.ai Engineering Team

Procedural SQL Extension in the Lakehouse Platform — New Capabilities for Data Work

This article describes the implementation of a procedural extension for working with the MPP SQL engine of the Alphyn Lakehouse data platform, which has now become available to users.

It covers the capabilities, applicability, and use cases of a procedural language in an analytical data platform.

Motivation

The presence of a procedural extension in legacy DBMS systems such as Oracle, MS SQL Server, or Postgres/Greenplum is a fairly common reason for not considering Lakehouse solutions when choosing a new analytical data warehouse system. Not everyone is mentally prepared to accept that procedural SQL code built and used for years at enterprise scale will suddenly become unavailable. All accumulated expertise and experience would be reset to zero, requiring a fresh start — building new pipeline execution and data processing using tools like Airflow and dbt, or other analogues that allow implementing conditional operators or SQL code generation.

Frankly, I personally consider that approach correct: use platform-independent orchestration and generation tools like Airflow, especially when building a new framework with minimal coupling to yet another SQL dialect. If you decide to change technologies again in a few years, it will be much simpler and faster.

At the same time, it is clear that developing a new framework requires significant time, resources, and knowledge in a new technical domain. Moreover, even after transitioning to an Airflow/dbt stack, some of the functionality available in procedural extensions will still remain unachievable. And the biggest gap, according to users themselves, is the ability to return a result dataset from procedural code back to the client side — that capability simply does not exist in the orchestration-based approach.

Imagine you have dozens or even hundreds of services in a legacy consumer system that reads data from a corporate data warehouse, all working on the following principle:

  • The client side calls a procedure with a set of session variables;
  • Inside the procedure, executable code is generated depending on the input variables;
  • The SQL code performs DML operations — selecting and calculating data, creating temporary objects, updating permanent database objects as the result of all manipulations;
  • A resulting SELECT/CURSOR on a permanent database object is returned to the client side.

This is exactly the situation our users faced, and the problem was compounded by the fact that the owners of the consumer services could not or were not willing to change the functionality that had been stable for years on their side.

It seemed that with the release of Spark 4 and its inclusion in the data platform, a procedural extension had arrived — here was the answer. But on the other hand, Spark is not really about BI access with fast response times and high throughput.

In the end, we had no choice but to implement a procedural extension for the platform's MPP SQL engine.

Capabilities of the Alphyn Lakehouse Procedural SQL (LPSQL) Extension

Note: Information about current capabilities is accurate as of the publication date.

The first release of the procedural extension provides the following functionality:

  • Saving stored procedures in the system metastore for subsequent invocation
  • Parameterized execution of stored procedures (input parameters)
  • Working with variables: declaration, assignment of values, including from SQL queries
  • Dynamic SQL via the EXECUTE construct
  • Numeric FOR LOOP and WHILE END loops
  • Working with explicit and implicit cursors
  • Returning variables from a procedure to the client side
  • Returning a dataset via SELECT from a procedure to the client side
  • Special functions for working with the data dictionary for use in generating dynamic queries
  • Conditional operators
  • Exception handling

Usage Examples

Let's see how this looks in practice and walk through several examples demonstrating the main capabilities.

CREATE PROCEDURE demo_proc(IN X INT)
--PRC Example 1
--Declare vars
DECLARE row_nums INT;
BEGIN

	--Set up session parameters
	EXECUTE IMMEDIATE 'set mem_limit = 100m';
	EXECUTE IMMEDIATE 'set mt_dop = 2';

	--ReCreate temporary table
	DROP TABLE IF EXISTS default.temp_table;
	CREATE TABLE default.temp_table (
		product_id_int BIGINT,
		product_id_char STRING,
  		name STRING
	);
	--Create loop with counter
	FOR i IN 1..X LOOP
		--lets do some data transformation and multiply rows according to the counter
		INSERT INTO default.temp_table
		SELECT * FROM default.product;
	END LOOP;

	--Lets count total number of multiplied rows and put it into the variable
	SELECT COUNT(*) INTO row_nums FROM default.temp_table;

	--Return total number of rows from the procedure
	SELECT row_nums as Result;
END;

To validate and save a procedure, you currently need to use the LPSQL command with the procedure text enclosed in quotation marks:

LPSQL "text of procedure";

Calling a saved procedure is done with the CALL command:

LPSQL "CALL demo_proc(3);"

Let's look at a second example that prints a list of tables in a database. It uses a loop based not on an integer auto-increment counter, but on a cursor, and returns a dataset from a SELECT statement to the client side as the result.

CREATE PROCEDURE show_table_cursor()

-- Example of dynamic SQL query with a FOR LOOP
-- Iterates through the list of tables in a schema and writes the result to a table

BEGIN

 DECLARE c1 CURSOR;

	--ReCreate temp table
 	DROP TABLE IF EXISTS default.cursor_result_table;
 	CREATE TABLE default.cursor_result_table
	(
   TABLE_NAME STRING
 	) STORED AS PARQUET;

 -- Cursor over results of SHOW TABLES IN default
 FOR c1 IN ('SHOW TABLES IN default') LOOP
   -- c.name — table name from SHOW TABLES result
   EXECUTE IMMEDIATE
     'INSERT INTO default.cursor_result_table VALUES("'  || c1.name ||  '")';

 END LOOP;
 -- Output the result

 SELECT * FROM default.cursor_result_table ORDER BY 1;
END;

Result verification — the output shows a list of all tables in the default schema, returned as a dataset to the client side.

Now let's implement an example with nested loops, returning cursor values into session variables, and using dynamic SQL with those variables.

-- Example of a nested cursor implementation

CREATE PROCEDURE create_runtime_dict()

BEGIN
 DECLARE name STRING;
 DECLARE Column STRING;
 DECLARE Column_Type STRING;
 DECLARE rn INT;

  -- Drop and create result table
 EXECUTE IMMEDIATE 'DROP TABLE IF EXISTS default.cursor_result_table';
 EXECUTE IMMEDIATE 'CREATE TABLE default.cursor_result_table
 (
   TABLE_NAME STRING,
   COLUMN_NAME STRING,
   COLUMN_TYPE STRING,
   DIST_ROWS INT
 ) STORED AS PARQUET';

 -- Declare cursor via query string
 DECLARE c1 CURSOR FOR  'SHOW TABLES IN default';
 OPEN c1;
 LOOP
   FETCH c1 INTO name;
   IF SQLCODE <> 0 THEN LEAVE;
   END IF; -- Exit if data is exhausted

   -- Nested cursor (dynamic)
   BEGIN
     DECLARE c2 CURSOR FOR  'SHOW COLUMN STATS default.' || name;

     OPEN c2;

     LOOP
       FETCH c2 INTO Column, Column_Type, rn;
       IF SQLCODE <> 0 THEN LEAVE; END IF;
       EXECUTE IMMEDIATE 'INSERT INTO default.cursor_result_table VALUES ('''  || name ||  ''', '''  || Column ||  ''', '''  || Column_Type ||  ''', '|| rn ||')';
     END LOOP;

     CLOSE c2;

   END;

 END LOOP;
CLOSE c1;
END;

END;

Now let's create another procedure with a nested call to the previous one. It returns a list of tables in the database that have a number of distinct values in any column greater than the input parameter.

CREATE PROCEDURE show_tables(n INT)
-- Example of nested call

BEGIN

	--Drop view
	EXECUTE  'DROP VIEW IF EXISTS default.V_show_table';

	--Create session temp dictionary
	CALL runtime_dict();

	--Lets get list of tables with more than n distinct rows of any column

	EXECUTE  'CREATE VIEW default.V_show_table AS SELECT DISTINCT table_name FROM default.cursor_result_table WHERE dist_rows >' || n;

SELECT * FROM  default.V_show_table;

END;

Now let's verify the results and query tables with more than 10,000 distinct values — the procedure correctly filters and returns only the matching tables.

Enough abstract examples. It's time to move on to solving the most common task in a data warehouse: implementing a simple loader using the Slowly Changing Dimensions Type I (SCD1) scenario with procedural code.

CREATE OR REPLACE PROCEDURE scd1_load(schema_nm STRING, tabname STRING, PK STRING)

-- schema_nm - target table schema
-- tabname - target table
-- PK - primary key. Comma-separated list if the key is composite

BEGIN

	DECLARE Column STRING;
	DECLARE UPD_FLDS STRING; -- List of columns for update
	DECLARE INS_FLDS STRING; -- List of columns for insert
	DECLARE PK_FLDS STRING;  -- Expression for primary key equality
	DECLARE EQ_FLDS STRING;  -- List of business fields for comparison
	DECLARE MERGE_SQL STRING; -- Merge script for loading the target table

	DECLARE buf = '';
	PK_FLDS = '1=1';

	counter = 1;

	tar_tab = schema_nm || '.' || tabname;
	int_tab = schema_nm || '.INT_' || tabname;

	LOOP -- Build primary key equality expression in a loop

		SELECT SPLIT_PART(PK, ',', counter) INTO buf;
		IF buf = '' THEN LEAVE; END IF;
		PK_FLDS = PK_FLDS || ' AND old.' || buf || ' = new.' || buf;
		counter = counter + 1;

	END LOOP;

	-- Technical fields
	DECLARE task_id_fld = 'tech_task_id';       -- Current load identifier
	DECLARE del_fld = 'tech_deleted_flg';        -- Logical deletion flag
	DECLARE chng_dttm_fld = 'tech_changed_dttm'; -- Record change date and time
	DECLARE c1 CURSOR FOR  'SHOW COLUMN STATS ' || tar_tab;

   OPEN c1;

	LOOP
		FETCH c1 INTO Column, Column_Type, rn;
		IF SQLCODE <> 0 THEN LEAVE; END IF;
		IF Column != task_id_fld AND Column != del_fld AND Column != chng_dttm_fld
		THEN
			INS_FLDS = INS_FLDS  || '\tnew.' || Column || ', \n';
			UPD_FLDS = UPD_FLDS  || '\told.' || Column || ' = new.' || Column || ',\n';
			EQ_FLDS  = EQ_FLDS   || '\told.' || Column || ' != new.' || Column || ' OR ';
		END IF;
	END LOOP;

	MERGE_SQL = '
		MERGE INTO ' || tar_tab || ' AS old
		USING ' || int_tab || ' AS new
		ON ' || PK_FLDS || '
		WHEN MATCHED AND new.' || del_fld || ' != 1 AND (' || EQ_FLDS || ' 1 = 0)
		THEN UPDATE SET
		' || UPD_FLDS || '\told.' || task_id_fld || ' = CAST(UNIX_TIMESTAMP() AS INT),
		\told.' || del_fld || ' = 0,
		\told.' || chng_dttm_fld || ' = CURRENT_TIMESTAMP()
		WHEN MATCHED AND new.' || del_fld || ' = 1 AND old.' || del_fld || ' = 0
		THEN UPDATE SET
			old.' || task_id_fld || ' = CAST(UNIX_TIMESTAMP() AS INT),
			old.' || del_fld || ' = 1,
			old.' || chng_dttm_fld || ' = CURRENT_TIMESTAMP()
		WHEN NOT MATCHED BY SOURCE AND old.' || del_fld || ' = 0
		THEN UPDATE SET
			old.' || del_fld || ' = 1,
			old.' || chng_dttm_fld || ' = CURRENT_TIMESTAMP(),
			old.' || task_id_fld || ' = CAST(UNIX_TIMESTAMP() AS INT)
		WHEN NOT MATCHED AND new.' || del_fld || ' != 1
		THEN INSERT VALUES (
		' || INS_FLDS || 'CURRENT_TIMESTAMP(),
		CAST(UNIX_TIMESTAMP() AS INT),
			new.' || del_fld || '
		)
		';

	EXECUTE MERGE_SQL;

END;

Example call:

LPSQL "CALL scd1_load('DDS', 'ACCOUNT_TAB', 'id, fld1')";

Let's prepare one more example — a routine for collecting table statistics within a database. The procedure has two parameters: forced statistics recalculation or collection only for tables without existing statistics (never calculated before), and a choice between incremental and full recalculation. Incidentally, in our data platform we implemented incremental statistics collection for Iceberg tables, which allows significant savings in time and compute cluster resources.

CREATE OR REPLACE PROCEDURE stats_missing_proc(cur_schema STRING, proc_mode INT, inc_mode INT)

-- cur_schema - schema to check tables in
-- proc_mode - procedure mode: 0 = forced collection | 1 = only tables without statistics
-- inc_mode  - 1 = incremental statistics | 0 = full statistics

BEGIN

	DECLARE name STRING;
	DECLARE clmn STRING;
	DECLARE tp STRING;
	DECLARE dv INT;
	DECLARE nlls INT;

	-- Technical table
  	CREATE TABLE IF NOT EXISTS default.tech_compactor_proc
	(
		tabname STRING,
		got_stats INT
	)
	STORED AS PARQUET;

	-- Logging table
	CREATE TABLE IF NOT EXISTS default.tech_compactor_logs
	(
		tabname STRING,
		query STRING,
		dttm TIMESTAMP
	)
	STORED AS PARQUET;

	TRUNCATE TABLE default.tech_compactor_proc;
	TRUNCATE TABLE default.tech_compactor_logs;

	-- Open cursor to iterate over all objects in the schema
	DECLARE c1 CURSOR FOR  'SHOW TABLES IN ' || cur_schema;
	OPEN c1;
	  LOOP
	    FETCH c1 INTO name;
	    IF SQLCODE <> 0 THEN LEAVE;
		END IF; -- Exit if data is exhausted

		-- Open nested cursor to get statistics for each object
		DECLARE c2 CURSOR FOR 'SHOW COLUMN STATS ' || cur_schema || '.' || name;
		OPEN  c2;
			LOOP
				FETCH c2 INTO clmn, tp, dv, nlls;
				IF SQLCODE <> 0 THEN LEAVE;
				END IF;
				IF (dv < 0 AND proc_mode = 1) OR (proc_mode = 0) THEN
					EXECUTE IMMEDIATE 'INSERT INTO default.tech_compactor_proc SELECT ''' || name || ''', ' || dv || '';
				END IF;
			END LOOP;

		END LOOP;

		-- Exception handling in case the schema contains views
		EXCEPTION WHEN OTHERS THEN
		END;

	-- Full statistics collection
	IF inc_mode = 0 THEN
		DECLARE c1 CURSOR FOR 'SELECT DISTINCT tabname FROM default.tech_compactor_proc';

		OPEN c1;
		LOOP
			FETCH c1 INTO name;
		    IF SQLCODE <> 0 THEN LEAVE;
			END IF; -- Exit when done
			EXECUTE IMMEDIATE 'COMPUTE STATS ' || cur_schema || '.' || name;
			EXECUTE IMMEDIATE 'INSERT INTO default.tech_compactor_logs(tabname, query, dttm) SELECT ''' || cur_schema || '.' || name || ''', ''COMPUTE STATS ' || cur_schema || '.' || name || ''', NOW()';
		END LOOP;

	END IF;

	-- Incremental statistics collection
	IF inc_mode = 1 THEN
		DECLARE c1 CURSOR FOR 'SELECT DISTINCT tabname FROM default.tech_compactor_proc';
		OPEN c1;
		LOOP
			FETCH c1 INTO name;
		    IF SQLCODE <> 0 THEN LEAVE;
			END IF; -- Exit when done

			EXECUTE IMMEDIATE 'COMPUTE INCREMENTAL STATS ' || cur_schema || '.' || name;
			EXECUTE IMMEDIATE 'INSERT INTO default.tech_compactor_logs(tabname, query, dttm) SELECT ''' || cur_schema || '.' || name || ''', ''COMPUTE INCREMENTAL STATS ' || cur_schema || '.' || name || ''', NOW()';

		END LOOP;
	END IF;

END;

Conclusion

As you can see, the procedural extension offers extensive capabilities for independently building auxiliary user solutions with a low barrier to entry — especially for those migrating their expertise from traditional computing systems.

The next production release of the platform will expand procedural code support to additional processing engines. We want end users to have a choice of execution engine without needing to rewrite the procedure code itself, as long as the SQL operators it contains are universal. We also plan to:

  • Expand compatibility with popular procedural dialects T-SQL and PL/SQL
  • Make the metastore API available for direct work with the data dictionary from engines inside procedures
  • Implement an analogue of the ALL_SOURCES view familiar to all Oracle users
  • Add output of informational messages to the OUTPUT log

In parallel, we are conducting research and development on a query transpiler from Postgres and Greenplum to Alphyn Lakehouse SQL engines. The goal is not just to ease the migration of functionality into the platform, but to make migration unnecessary — so that client applications require no changes on their side, and they effectively "think" they are still sending queries to and receiving responses from Greenplum or Postgres.

Topics

lpsqlprocedural-sqlmigrationoraclestored-procedureslakehouse

Ready to Modernize Your Data Platform?

See how Alphyn Lakehouse can transform your data infrastructure.