Parallel PL/SQL launcher

This is an experimental package for submitting PL/SQL calls in parallel using the Oracle SQL Parallel Query mechanism.

Many have complained that PL/SQL has no multi-threading support, as Java (for example) does. The conventional solution is to use DBMS_JOB or DBMS_SCHEDULER, and if necessary write your own wrapper to handle completion status reporting (Has it finished yet? Did it succeed?) - for an example of this approach, see job_ot.

However, SQL does have a parallel threading mechanism, in the form of Parallel Query (PQ). You simply execute your query using a PARALLEL() hint, or against tables with a parallel degree defined - particularly effective with partitioned tables, where partitions are divided among available PQ slave processes depending on resources available, and the results are combined at the end. You can think of this as a PQ controller dividing up the job, handing out tasks to PQ slaves, and collating the results. Meanwhile over in PL/SQL, you can define parallel-enabled pipelined functions that accept a ref cursor. The PQ controller decides how to divide up the ref cursor among available PQ slaves and passes them one function call each, thus invoking multiple instances of the function to handle the cursor.

Now, if you create a table "PQ_DRIVER" with 4 partitions and a parallel degree of 4, with one row in each partition, and you execute a query along the lines of select /*+ parallel(pq,4) */ thread_id from pq_driver pq; that should persuade the PQ controller to set four PQ slave processes to work on it (one per partition). And if you pass that query as a cursor parameter to a parallel-enabled pipelined function, then shouldn't that create a situation where each row is processed by a separate PQ slave process? So here is a way to use (alright, hack) the PQ engine so that it processes arbitrary PL/SQL procedure calls in parallel.

Connected to:
Oracle Database 10g Enterprise Edition Release 10.2.0.1.0 - Production
With the Partitioning, OLAP and Data Mining options

SQL> set timing on
SQL> exec exec parallel_launch.test
sid=151 serial#=39222: Degree of parallelism: requested 4, actual 4: 20:04:16 - 20:04:18
sid=137 serial#=27074: Degree of parallelism: requested 4, actual 4: 20:04:16 - 20:04:18
sid=146 serial#=33830: Degree of parallelism: requested 4, actual 4: 20:04:16 - 20:04:18
sid=132 serial#=11582: Degree of parallelism: requested 4, actual 4: 20:04:16 - 20:04:18

PL/SQL procedure successfully completed.

Elapsed: 00:00:03.55


Connected to:
Oracle Database 11g Enterprise Edition Release 11.1.0.6.0 - Production
With the Partitioning, OLAP, Data Mining and Real Application Testing options

SQL> set timing on
SQL> exec parallel_launch.test
sid=123 serial#=21233: Degree of parallelism: requested 4, actual 2: 06:07:47 - 06:07:49
sid=123 serial#=21233: Degree of parallelism: requested 4, actual 2: 06:07:49 - 06:07:51
sid=127 serial#=21506: Degree of parallelism: requested 4, actual 2: 06:07:47 - 06:07:49
sid=127 serial#=21506: Degree of parallelism: requested 4, actual 2: 06:07:49 - 06:07:51

PL/SQL procedure successfully completed.

Elapsed: 00:00:06.83

The degree of parallelism is lower in 11g than in 10.2 - possibly 11g is more intelligent and decides that four slaves would be excessive for a table with only four rows, especially in my single-processor RAM-starved desktop VM.

I also tested in 10.1.0.3.0:

sid=439 serial#=58467:   Degree of parallelism: requested 4, actual 4: 16:18:57 - 16:18:59
sid=439 serial#=58467: Degree of parallelism: requested 4, actual 4: 16:18:59 - 16:19:01
sid=439 serial#=58467: Degree of parallelism: requested 4, actual 4: 16:19:01 - 16:19:03
sid=439 serial#=58467: Degree of parallelism: requested 4, actual 4: 16:19:03 - 16:19:05

I got the four PQ slaves I asked for, but everything was executed by just one of them. What this was really telling me is that I didn't fully understand PQ.

I had another look at my initialisation parameters, and I noticed CPU_COUNT was set to 1 (which was true, there is only 1 cpu) and PARALLEL_THREADS_PER_CPU was 2 (the default setting). I found that setting PARALLEL_THREADS_PER_CPU = 4 (or CPU_COUNT = 2) enabled the full 4 parallel threads requested in parallel_launch.test(). Also, since gathering statistics on the PQ_DRIVER table had the effect of serialising queries against it, and since by default the background stats job will do this, I added a call to dbms_stats.lock_table_stats(user,'pq_driver'). I still don't know what statistics cause this to happen and why, and it's a little puzzling because I would have expected dynamic sampling (enabled by default since 10g) to give the same results.

(To investigate further, it might be worth looking at some of the trace events, such as 10975 trace execution of parallel propagation, 10383 auto parallelization testing event and 10384 parallel dataflow scheduler tracing.)

Also I noticed a potential bug in the pq_submit function. It queries v$px_session as a self-diagnostic check, but if for any reason no parallel slaves were used there will be no rows in it, leading to a no_data_found exception because I used a SELECT INTO construction. Since no_data_found exceptions are automatically handled by the SQL engine, my pq_submit might appear to return successfully after mysteriously doing nothing. That's now fixed.

This has not been stress tested and I make no promises that it will work. Let me know what you find.

-- Parallel PL/SQL Launcher,  William Robertson 2007 www.williamrobertson.net
-- Experimental package for submitting PL/SQL in parallel using the Oracle SQL Parallel Query mechanism.
--
-- Many have complained that PL/SQL has no multi-threading support, as Java (for example) does. The conventional solution,
-- if one is really needed (arguably it is never a good idea anyway) is to use DBMS_JOB or DBMS_SCHEDULER, and if necessary
-- write your own wrapper to handle completion status reporting (Has it finished yet? Did it succeed?) - for an example of
-- this approach, see http://www.williamrobertson.net/code/job_ot.typ.txt
--
-- However, SQL does have a parallel threading mechanism, in the form of Parallel Query (PQ). You simply execute your
-- query using a PARALLEL() hint, or against tables with a parallel degree defined - particularly effective with partitioned
-- tables, where each PQ slave can take one partition (depending on resources available) and combine results at the end.
-- You can think of this as a PQ controller dividing up the job, handing out tasks to PQ slaves, and collating the results.
-- Also, PL/SQL lets you define parallel-enabled pipelined functions that accept a ref cursor, fetch the rows in parallel,
-- and return the results as they come back from the PQ slaves.
--
-- Now, if you create a table "PQ_DRIVER" with 4 partitions and a parallel degree of 4, with one row in each partition, and
-- you execute a query along the lines of "SELECT /*+ PARALLEL(pq,4) */ thread_id FROM pq_driver pq", that should persuade the
-- PQ controller to set four PQ slave processes to work on it (one per partition). And if you pass that query as a cursor
-- parameter to a parallel-enabled pipelined function, then shouldn't that create a situation where each each row is
-- processed by a separate PQ slave process? So here is a way to use (alright, hack) the PQ engine so that it processes
-- arbitrary PL/SQL procedure calls in parallel.
--
-- This has not been stress tested and I make no promises that it will work. In particular I have found that the degree of
-- parallelism is different (lower) in 11g than in 10g - possibly 11g is more intelligent and decides that four slaves would
-- be excessive for a table with only four rows. Let me know what you find.

-- Update: 2008-08-01:
-- In my home 11.1.0.6.0 EE test database, I noticed initialisation parameters CPU_COUNT=1 (which was true, there is only 1 cpu)
-- and PARALLEL_THREADS_PER_CPU=2 (the default setting, and again reasonable). I found that setting PARALLEL_THREADS_PER_CPU = 4
-- (or CPU_COUNT = 2) enabled the full 4 parallel threads requested in parallel_launch.test. Also since gathering statistics on
-- the PQ_DRIVER table had the effect of serialising queries against it, and since by default the background stats job will do
-- this, I added a call to dbms_stats.lock_table_stats(user,'pq_driver').
--
-- Update: Oracle 11g introduced DBMS_PARALLEL_EXECUTE, although this is primarily aimed at DML:
-- https://docs.oracle.com/cd/E11882_01/appdev.112/e40758/d_parallel_ex.htm

-- Note also, the schema requires select privilege on v$mystat, v$px_session and v$session (i.e. grant select on v_$mystat etc).


drop table pq_driver purge;
drop table log_times purge;

create type varchar2_tt as table of varchar2(4000)
/

grant execute on varchar2_tt to public;

create table pq_driver
( thread_id number(1) not null primary key )
partition by list(thread_id)
( partition p1 values(1)
, partition p2 values(2)
, partition p3 values(3)
, partition p4 values(4) )
parallel 4
/

insert all
into pq_driver values (1)
into pq_driver values (2)
into pq_driver values (3)
into pq_driver values (4)
select * from dual;

comment on table pq_driver is 'Control table for generating parallel ref cursors with package parallel_launch';

call dbms_stats.lock_table_stats(user,'pq_driver');

-- PQ selection is sensitive to table stats. Analyzing table with
-- exec DBMS_STATS.GATHER_TABLE_STATS(user,'pq_driver')
-- causes serialisation.
-- Will automatic stats gathering have the same effect?

create table log_times
( thread_id   integer not null constraint log_times_pk primary key
, what        varchar2(100)
, sid         integer
, serial#     integer
, start_time  timestamp default systimestamp not null
, end_time    timestamp
, label       varchar2(30)
, errors      varchar2(1000) );

create trigger log_times_defaults_trg
before insert on log_times
for each row
begin
   select sid, serial#
   into   :new.sid, :new.serial#
   from   v$session
   where  sid in
          ( select sid from v$mystat );
end;
/

show errors



create or replace package parallel_launch
as
   type rc_pq_driver is ref cursor return pq_driver%rowtype;

   -- PQ launch vehicle:
   -- Must be strongly typed ref cursor to allow partition by range or hash
   -- Must be in package spec to be visible to SQL
   -- Must be an autonomous transaction if we are doing any DML in the submitted procedures

   function pq_submit
      ( p_job_list  varchar2_tt
      , p_pq_refcur rc_pq_driver )
      return varchar2_tt
      parallel_enable(partition p_pq_refcur by any)
      pipelined;

   procedure submit
      ( p_job1 varchar2
      , p_job2 varchar2
      , p_job3 varchar2
      , p_job4 varchar2 );

   -- Convenient test procedure - calls 'parallel_launch.slow_proc()' 4 times to see whether they all run at once:
   procedure test;

   -- Dummy procedure for testing - calls dbms_lock.sleep(2)
   procedure slow_proc( p_id integer );
end parallel_launch;
/

show errors

create or replace package body parallel_launch
as
   type stats_rec is record
   ( thread_id           log_times.thread_id%type
   , what                log_times.what%type
   , start_timestr       varchar2(8)
   , sid                 v$mystat.sid%type
   , serial#             v$px_session.serial#%type
   , pq_actual_degree    number
   , pq_requested_degree number
   , rowcount            pls_integer := 0 );

   g_clear_stats_rec stats_rec;


   procedure log_start
      ( p_thread_id  log_times.thread_id%type
      , p_what       log_times.what%type )
   is
      pragma autonomous_transaction;
   begin
      delete log_times where thread_id = p_thread_id;

      insert into log_times
      ( thread_id, what )
      values
      ( p_thread_id, p_what );

      commit;
   end log_start;


   procedure log_end
      ( p_thread_id  log_times.thread_id%type
      , p_errors     log_times.errors%type default null )
   is
      pragma autonomous_transaction;
   begin
      update log_times
      set    end_time = systimestamp
           , errors = p_errors
      where  thread_id = p_thread_id;

      commit;
   end log_end;


   procedure execute_command
      ( p_what log_times.what%type )
   is
      pragma autonomous_transaction;
   begin
      execute immediate p_what;
      commit;
   end execute_command;


   function pq_submit
      ( p_job_list  varchar2_tt
      , p_pq_refcur rc_pq_driver )
      return varchar2_tt
      parallel_enable(partition p_pq_refcur by any)
      pipelined
   is
      v_error_text varchar2(2000);
      r pq_driver%rowtype;
      r_row_stats stats_rec;
   begin
      loop
         fetch p_pq_refcur into r;
         exit when p_pq_refcur%notfound;

         select to_char(sysdate,'HH24:MI:SS')
              , s.sid
              , pqs.serial#
              , pqs.degree
              , pqs.req_degree
         into   r_row_stats.start_timestr
              , r_row_stats.sid
              , r_row_stats.serial#
              , r_row_stats.pq_actual_degree
              , r_row_stats.pq_requested_degree
         from   ( select sid from v$mystat where rownum = 1 ) s
                left join v$px_session pqs on pqs.sid = s.sid;

         r_row_stats.thread_id := r.thread_id;
         r_row_stats.rowcount := p_pq_refcur%rowcount;
         r_row_stats.what := 'begin ' || rtrim(p_job_list(r.thread_id),';') || '; end;';

         begin
            log_start(r.thread_id, r_row_stats.what);

            execute_command(r_row_stats.what);

            log_end(r.thread_id);

            pipe row
            ( rpad('sid=' || r_row_stats.sid || ' serial#=' || r_row_stats.serial# || ':',25) ||
              'Degree of parallelism: requested '  || r_row_stats.pq_requested_degree ||
              ', actual ' || r_row_stats.pq_actual_degree || ': ' ||
              r_row_stats.start_timestr || ' - ' || to_char(sysdate,'HH24:MI:SS') );
         exception
            when others then
               log_end(r_row_stats.thread_id, sqlerrm);
               pipe row('sid=' || r_row_stats.sid || ' serial#=' || r_row_stats.serial# || ': ' || sqlerrm);
         end;
      end loop;

      if r_row_stats.rowcount = 0 then
         raise_application_error
         ( -20000
         , 'Cursor returned no rows' );
      end if;

      return;
   end pq_submit;


   procedure submit
      ( p_job1 varchar2
      , p_job2 varchar2
      , p_job3 varchar2
      , p_job4 varchar2 )
   is
      v_results varchar2_tt;
   begin
      select /*+ parallel(4) */ column_value bulk collect into v_results
      from table(
          parallel_launch.pq_submit
          ( varchar2_tt(p_job1,p_job2,p_job3,p_job4)
          , cursor(select thread_id from pq_driver)
          )
      );

      if v_results.count = 0 then
         v_results.extend;
         v_results(1) := 'parallel_launch.SUBMIT: No rows returned from table function PQ_SUBMIT';
      end if;

      for i in v_results.first..v_results.last loop
         dbms_output.put_line(v_results(i));
      end loop;
   end submit;


   procedure test
   is
   begin
      submit
      ( 'parallel_launch.slow_proc(1)'
      , 'parallel_launch.slow_proc(2)'
      , 'parallel_launch.slow_proc(3)'
      , 'parallel_launch.slow_proc(4)' );
   end test;


   procedure slow_proc
      ( p_id integer )
   is
   begin
      dbms_lock.sleep(2);
   end slow_proc;
end parallel_launch;
/

show errors

prompt Executing parallel_launch.test():
prompt Submits four PL/SQL procedure calls using parallel_launch.submit()
prompt For the test, this uses parallel_launch.slow_proc() which is just a call to DBMS_LOCK.SLEEP(2).
prompt Four 2-second sleep() procedures run end to end should take 8 seconds:
set timi on
exec parallel_launch.test
set timi off

-- Got these results in 10.1.0.3.0:
-- sid=439 serial#=58467:   Degree of parallelism: requested 4, actual 4: 16:18:57 - 16:18:59
-- sid=439 serial#=58467:   Degree of parallelism: requested 4, actual 4: 16:18:59 - 16:19:01
-- sid=439 serial#=58467:   Degree of parallelism: requested 4, actual 4: 16:19:01 - 16:19:03
-- sid=439 serial#=58467:   Degree of parallelism: requested 4, actual 4: 16:19:03 - 16:19:05
--
-- Looks as though I got my PQ slaves but used them one at a time anyway (note start/end times). I don't get it.
-- db_file_multiblock_read_count was 16: maybe CBO decided it can read all of PQ_DRIVER table in one go?
-- (Would it do that for a partitioned table?)
-- parallel_query_mode enabled (default)
-- parallel_threads_per_cpu: 2 (default) and more than 1 CPU on DB server

-- Update 2008-08-01:
-- Possibly if I could repeat the 10.1 test (which I can't now) I would find a PX COORDINATOR FORCED SERIAL 
-- somewhere in the trace; i.e. I forced degree 4 so it generated a parallel plan but could only execute it
-- serially, perhaps feeling that was better than not executing it at all.

-- In home single-CPU 11.1.0.6.0 EE system with parallel_threads_per_cpu = 4 (the default is 2) and
-- taking care to lock the statistics on table pq_driver, I get these results:
--
-- sid=129 serial#=446:    Degree of parallelism: requested 4, actual 4: 19:19:25 - 19:19:27
-- sid=128 serial#=1097:   Degree of parallelism: requested 4, actual 4: 19:19:25 - 19:19:27
-- sid=122 serial#=119:    Degree of parallelism: requested 4, actual 4: 19:19:25 - 19:19:27
-- sid=126 serial#=289:    Degree of parallelism: requested 4, actual 4: 19:19:25 - 19:19:27
--
-- PL/SQL procedure successfully completed.
--
-- Elapsed: 00:00:02.14