Job control object
Job control modelled on Unix: declare "job" objects, submit them, wait for them to complete. Uses DBMS_JOB to submit tasks, DBMS_ALERT for jobs to pass back status and error messages, so the calling procedure can easily track the background jobs it started.
-- PL/SQL job control modelled on Unix: submit one or more tasks, wait for them to complete.
-- Uses DBMS_JOB to submit tasks, DBMS_ALERT for jobs to pass back started/completed/failed messages.
-- The DBMS_ALERT mechanism is hidden from the submit/wait interface.
--
-- Object-based approach allows you to declare a job object then call myjob.submit() etc.
-- (See demo block for examples.)
--
-- Prerequisites: Execute permission on sys.dbms_alert.
--
-- William Robertson 2004, www.williamrobertson.net
-- Package (spec only) for constants used in type body:
create or replace package job_pkg
as
k_error_code constant pls_integer := -20000;
k_delimiter constant varchar2(3) := chr(0); -- Delimiter for packing messages
k_job_queue_interval constant interval day(0) to second(0) default interval '5' second;
-- Job statuses:
k_submitted constant varchar2(30) := 'Submitted';
k_not_submitted constant varchar2(30) := 'Not submitted';
k_waiting_confirm constant varchar2(30) := 'Awaiting start confirmation';
k_submit_failed constant varchar2(30) := 'Submit failed';
k_in_progress constant varchar2(30) := 'In Progress';
k_wait_timed_out constant varchar2(30) := 'Wait timeout';
k_failed constant varchar2(30) := 'Abnormal Termination';
k_completed_ok constant varchar2(30) := 'Completed Successfully';
end job_pkg;
/
show errors
-- job_ot object type:
-- Submit one or more tasks, wait for them to complete.
-- Uses dbms_job to submit tasks, dbms_alert to pass back started/completed/failed messages.
-- The dbms_alert mechanism is hidden from the submit/wait interface.
--
-- Object-based approach allows you to declare a new job_ot object then call myjob.submit() etc.
create sequence job_seq start with 1 maxvalue 999 cycle;
create or replace type job_ot as object
( name varchar2(30)
, job_number number -- Number set by dbms_job.submit
, handle varchar2(30) -- Generated unique id formed from name + generated sequence
, command varchar2(4000) -- Command to execute
, description varchar2(500) -- Text description
, message varchar2(1000) -- Message returned from execution; error message if failed
, wait_timeout interval day to second(0)
, created_time timestamp
, submitted_time timestamp
, started_time timestamp
, completed_time timestamp
, run_status varchar2(50) -- In progress, failed etc
, member procedure submit
, member procedure wait
( p_timeout interval day to second default null
, p_alertname varchar2 default null ) -- What to wait for: defaults to self.handle, above
, constructor function job_ot
( p_name varchar2
, p_command varchar2
, p_description varchar2 default null
, p_timeout interval day to second default interval '1' minute )
return self as result
, static procedure execute
( p_command varchar2 -- What to execute
, p_receipt_alert varchar2 -- Signal name for confirming receipt to submitting procedure
, p_completed_alert varchar2 -- Signal name for confirming job completion to submitting procedure
, p_invoking_session varchar2 ) -- Session id of submitting process
, member procedure print
)
/
show errors
create or replace type body job_ot as
constructor function job_ot
( p_name varchar2
, p_command varchar2
, p_description varchar2 default null
, p_timeout interval day to second default interval '1' minute )
return self as result
is
v_seq pls_integer;
v_handle varchar2(3000); -- Bigger than required to ensure assignment does not fail
begin
select job_seq.nextval
into v_seq
from dual;
v_handle := substr(nvl(upper(p_name),'alert') || '_' || v_seq,1,30);
-- "_receipt" must not be longer than 35 characters:
if length(v_handle) > 27 then
v_handle := substr(upper(p_name),1,14) || '_' || v_seq;
end if;
self.name := p_name;
self.handle := v_handle;
self.command := p_command;
self.description := p_description;
self.wait_timeout := nvl(p_timeout,interval '1' minute);
self.created_time := systimestamp;
self.run_status := job_pkg.k_not_submitted;
return;
end;
member procedure submit
is
pragma autonomous_transaction;
k_session_id constant varchar2(30) := dbms_session.unique_session_id;
k_wait_timeout interval day(0) to second(0) := job_pkg.k_job_queue_interval * 2;
cursor c_jobs(cp_command varchar2 := self.command)
is
select job
from user_jobs
where what = cp_command;
v_command varchar2(5000);
v_job_number pls_integer;
v_message varchar2(1000);
v_job_status pls_integer;
v_receipt_name varchar2(30);
begin
if self.command is null then
raise_application_error
( job_pkg.k_error_code
, 'Command must be specified' );
elsif self.handle is null then
raise_application_error
( job_pkg.k_error_code
, 'Job handle must be specified' );
end if;
v_receipt_name := self.handle || '_receipt';
v_command :=
'job_ot.execute(''begin '
|| replace(rtrim(self.command,'; '),'''','''''')
|| '; end;'', '''
|| v_receipt_name || ''', '''
|| self.handle
|| ''','''
|| k_session_id || ''');';
-- In case same command already in job queue, remove any jobs for same command:
for r_job in c_jobs(self.command)
loop
dbms_job.remove(r_job.job);
end loop;
-- Register interest in both receipt and completion alerts.
-- submit waits for receipt; calling procedure waits for completion.
dbms_alert.register(v_receipt_name);
dbms_alert.register(self.handle);
commit;
submitted_time := systimestamp;
begin
dbms_job.submit
( self.job_number -- 'out' parameter: job number returned by dbms_job.submit
, v_command -- in format 'job_ot.execute('begin cmd; end;', receipt_alert, completed_alert);'
, sysdate -- First run
, null ); -- Interval (none = no repeat)
commit;
run_status := job_pkg.k_submitted;
exception
when others then
raise_application_error
( job_pkg.k_error_code
, 'dbms_job.submit failed'
, true );
end;
-- Wait for background job just submitted to confirm that it is ready to begin:
-- Job should send initial confirmation before doing anything (picked up here) followed by
-- completion signal on success/failure at end (picked up by calling procedure).
-- Note that when a member procedure fails with an exception, changes to the object state
-- are lost, e.g. run_status will revert to 'Not submitted' - we cannot both raise an exception
-- and change an attribute value.
begin
-- wait() will raise exceptions on failure
self.wait(k_wait_timeout, v_receipt_name);
dbms_alert.remove(v_receipt_name);
if self.run_status = job_pkg.k_wait_timed_out then
raise_application_error
( job_pkg.k_error_code
, 'Submitted task failed to start (no confirmation received after ' ||
ltrim(to_char(k_wait_timeout),'+') || ')'
, true );
end if;
end;
started_time := systimestamp;
run_status := job_pkg.k_in_progress;
commit;
exception
when others then
run_status := job_pkg.k_submit_failed;
raise_application_error
( job_pkg.k_error_code
, 'Failed to submit job ' || v_command
, true );
end submit;
member procedure wait
( p_timeout interval day to second default null
, p_alertname varchar2 default null ) -- What to wait for: defaults to self.handle, above
is
k_alertname constant varchar2(50) := nvl(p_alertname,self.handle);
k_is_confirmation constant boolean := k_alertname = self.handle;
k_timeout constant interval day to second := nvl(p_timeout,self.wait_timeout);
k_maxwait constant pls_integer :=
nvl
( extract(day from k_timeout) * 86400
+ extract(hour from k_timeout) * 3600
+ extract(minute from k_timeout) * 60
+ extract(second from k_timeout)
, dbms_alert.maxwait );
v_delimiter_position pls_integer;
v_message varchar2(32767);
v_dbms_alert_status integer;
begin
if k_is_confirmation then
run_status := job_pkg.k_waiting_confirm;
end if;
dbms_alert.waitone
( k_alertname
, v_message
, v_dbms_alert_status
, k_maxwait );
dbms_alert.remove(k_alertname);
v_delimiter_position := instr(v_message, job_pkg.k_delimiter);
-- The returned message may be in the form "status[delimiter]message":
-- if so, extract into run_status and message:
if v_delimiter_position > 0 then
run_status := substr(v_message,1,v_delimiter_position -1);
message := substr(v_message,v_delimiter_position + length(job_pkg.k_delimiter));
else
message := v_message;
end if;
if v_dbms_alert_status = 1 then
-- Check status returned by dbms_alert: 1 = "timed out while waiting"
self.run_status := job_pkg.k_wait_timed_out;
self.message :=
'Timeout after waiting ' || k_maxwait
|| ' seconds for job ' || self.name || ': "'
|| substr(self.command,1,80)
|| case when length(self.command) > 80 then '...' else null end
|| '"';
-- Not good idea to have wait() fail, as then we lose any state changes that
-- wait() made. Instead, the calling application should check thisjob.status.
else
if k_is_confirmation then
completed_time := systimestamp;
message := nvl(message,job_pkg.k_completed_ok);
end if;
end if;
end wait;
static procedure execute
( p_command varchar2 -- What to execute
, p_receipt_alert varchar2 -- Signal name for confirming receipt to submitting procedure
, p_completed_alert varchar2 -- Signal name for confirming job completion to submitting procedure
, p_invoking_session varchar2 ) -- Session id of submitting process
is
pragma autonomous_transaction;
v_status varchar2(30) := job_pkg.k_in_progress;
v_message varchar2(1000) := job_pkg.k_in_progress;
begin
dbms_application_info.set_module('job_ot.execute', p_command);
-- Send initial message to confirm instruction received:
-- (echo back the value of 'p_receipt_alert')
dbms_alert.signal(p_receipt_alert,v_status);
commit;
begin
execute immediate(p_command);
v_message :=
job_pkg.k_completed_ok ||
job_pkg.k_delimiter ||
job_pkg.k_completed_ok;
exception
when others then
v_message :=
job_pkg.k_failed ||
job_pkg.k_delimiter ||
sqlerrm;
end;
-- Send specified alert to inform waiting process that task has completed:
dbms_alert.signal(p_completed_alert,v_message);
commit;
end execute;
member procedure print
is
procedure print_item
( p_name varchar2
, p_value varchar2 )
is
begin
dbms_output.put_line(rpad(p_name || ':', 15) || p_value);
end print_item;
begin
print_item('Job name', name);
print_item('Command', command);
print_item('Description', description);
print_item('Message', message);
print_item('Created', created_time);
print_item('Submitted', submitted_time);
print_item('Started', started_time);
print_item('Completed', completed_time);
print_item('Run status', run_status);
dbms_output.put_line(chr(9));
end print;
end;
/
show errors
set serverout on size 100000
prompt
prompt Demo/test (allow 20 seconds for submit/wait/output cycle to complete):
prompt
set echo on
declare
-- Create some job objects:
v_ok_job job_ot := new job_ot('should_work', 'null;', 'Background job test: successful job');
v_invalid_job job_ot := new job_ot('should_fail', 'nosuchcommand', 'Background job test: handle failure');
v_slow_job job_ot :=
new job_ot
( 'should_timeout'
, 'dbms_lock.sleep(20)' -- User must have execute permission on sys.dbms_lock
, 'Background job demo 2: trapping failure'
, interval '5' second ); -- Job will fail to complete within time allowed
begin
dbms_output.put_line('Defined the following jobs:' || chr(10));
v_ok_job.print(); -- Display basic info about the job
v_invalid_job.print();
v_slow_job.print();
dbms_output.new_line;
dbms_output.put_line('Calling submit() method of each job in turn...' || chr(10));
v_ok_job.submit(); -- Submit in background
v_invalid_job.submit();
v_slow_job.submit();
dbms_output.put_line('Jobs have now been submitted in background.');
dbms_output.put_line('We can now get on with something else until we are ready to wait for them to complete.');
dbms_output.put_line(chr(10)||'...'||chr(10));
dbms_output.put_line('Calling wait() method of each job in turn...' || chr(10));
v_ok_job.wait();
v_invalid_job.wait();
v_slow_job.wait();
dbms_output.put_line('Job details after all jobs have returned:' || chr(10));
v_ok_job.print();
v_invalid_job.print();
v_slow_job.print();
end;
/
set echo off