We will compose three Blogs to discuss Performance of Oracle Multi-Consumer AQ dequeue with dbms_aq.listen.
The first Blog performs test to show Wait Events: "library cache: mutex X" on queue object and "latch free" on Latch "channel operations parent latch".
The second Blog (II) build Performance Model and reveals implementation problems.
The third Blog (III) traces dequeue operation with different number of parallel concurrent sessions, and conclude with the performance Analysis.
Note: tested on Oracle 19.17 in Linux and 19.7 on Solaris
1. Test Setup
We create one multiple_consumer queue, then compare two dequeue methods: listen_deq and subscr_deq.
listen_deq is using dbms_aq.listen, subscr_deq is with subscriber.
At first, we create 100 subscribers (see appended Test Code):
exec create_subscribers('msg_queue_multiple', 100);
2. Test Run
We run two tests as follows:
exec test_run(80, 1, 10*60, 1); -- listen dequeue
exec test_run(80, 2, 10*60, 1); -- subscriber dequeue
It first starts 80 enqueue Jobs and 80 listen_deq Jobs for 10 minutes, gets AWR.Then starts 80 enqueue Jobs and 80 subscr_deq Jobs for 10 minutes, gets AWR.
2.1 listen_deq
AWR shows heavy "library cache: mutex X" waits on queue object: msg_queue_multiple:
Top 10 Foreground Events by Total Wait Time
Event Waits Total Wait Time (sec) Avg Wait % DB time Wait Class
----------------------- --------- --------------------- --------- --------- ----------
library cache: mutex X 4,075,073 9538.6 2.34ms 14.5 Concurrency
latch free 68,333 1195.5 17.49ms 1.8 Other
Top Event P1/P2/P3 Values
Event % Event P1, P2, P3 Values % Activity Parameter 1 Parameter 2 Parameter 3
----------------------- ------- ------------- ----- ------------------- ----------- ----------- ----------- -----------
library cache: mutex X 14.28 "198468696", "0", "23028197302599684" 0.47 idn value where
latch free 2.18 "3012157976", "98", "3012157904" 2.16 address number why
Note: "library cache: mutex X" on idn=198468696, V$DB_OBJECT_CACHE.name='MSG_QUEUE_MULTIPLE', V$DB_OBJECT_CACHE.hash_value=198468696.
"latch free" on Latch "channel operations parent latch" (V$LATCH.latch#=98), child: 144 (V$LATCH_CHILDREN.child#=144 for latch#=98).
2.2 subscr_deq
AWR shows less "library cache: mutex X" waits on queue object: msg_queue_multiple,
and no "latch free" on Latch "channel operations parent latch" is listed in Top 10 Events.
Top 10 Foreground Events by Total Wait Time
Event Waits Total Wait Time (sec) Avg Wait % DB time Wait Class
----------------------- --------- --------------------- --------- --------- ----------
library cache: mutex X 1,070,942 3235 3.02ms 3.5 Concurrency
Top Event P1/P2/P3 Values
Event % Event P1, P2, P3 Values % Activity Parameter 1 Parameter 2 Parameter 3
----------------------- ------- ------------- ----- ------------------- ----------- ----------- ----------- -----------
library cache: mutex X 3.40 "198468696", "0", "23028197302599684" 0.26 idn value where
3. Test Code
3.1 Queue Table, Queue and Subscribers
-- Oracle Advanced Queuing by Example (https://docs.oracle.com/cd/B10500_01/appdev.920/a96587/apexampl.htm)
execute dbms_aqadm.stop_queue (queue_name => 'msg_queue_multiple');
execute dbms_aqadm.drop_queue (queue_name => 'msg_queue_multiple');
execute dbms_aqadm.drop_queue_table (queue_table => 'MultiConsumerMsgs_qtab', force=> TRUE);
drop type Message_typ force;
/* Creating a message type: */
create or replace noneditionable type message_typ as object (
subject varchar2(30),
text varchar2(256),
return_msg clob);
/
----- 2. Creating a Multiconsumer Queue Table and Queue
begin
dbms_aqadm.create_queue_table (
queue_table => 'MultiConsumerMsgs_qtab',
multiple_consumers => TRUE,
compatible => '10.0.0',
sort_list => 'PRIORITY,ENQ_TIME',
queue_payload_type => 'Message_typ');
end;
/
begin
dbms_aqadm.create_queue (
queue_name => 'msg_queue_multiple',
queue_table => 'MultiConsumerMsgs_qtab',
max_retries => 5,
retry_delay => 2,
retention_time => 86400);
end;
/
-- exec dbms_aqadm.alter_queue('MSG_QUEUE_MULTIPLE', 10, 2, 86400);
begin
dbms_aqadm.start_queue (
queue_name => 'msg_queue_multiple');
end;
/
create or replace procedure purge_queue_table as
po dbms_aqadm.aq$_purge_options_t;
begin
dbms_aqadm.purge_queue_table('MultiConsumerMsgs_qtab', null, po);
end;
/
create or replace procedure create_subscribers(p_queue_name varchar2, p_cnt number) as
l_subscriber sys.aq$_agent;
begin
for i in 1..p_cnt loop
begin
l_subscriber := sys.aq$_agent('KSUB_'||i, null, null);
dbms_aqadm.add_subscriber(queue_name => p_queue_name, subscriber => l_subscriber);
exception when others then
dbms_output.put_line ('Error '||sqlerrm);
commit;
end;
end loop;
end;
/
-- Test
--exec create_subscribers('msg_queue_multiple', 100);
create or replace procedure remove_subscribers(p_queue_name varchar2, p_cnt number) as
l_subscriber SYS.aq$_agent;
begin
for idx in (select consumer_name from dba_queue_subscribers a where a.queue_name = p_queue_name
--and consumer_name like 'KSUB%'
) loop
l_subscriber := sys.aq$_agent(idx.consumer_name, null, null);
dbms_aqadm.remove_subscriber(p_queue_name, l_subscriber);
end loop;
end;
/
-- exec remove_subscribers('MSG_QUEUE_MULTIPLE', 100);
3.2 Job clearup Helper and cpu_burner simulator
create or replace procedure clearup_test as
begin
for c in (select * from dba_scheduler_jobs where job_name like '%TEST_JOB%') loop
begin
--set DBA_SCHEDULER_JOBS.enabled=FALSE
dbms_scheduler.disable (c.job_name, force => true, commit_semantics =>'ABSORB_ERRORS');
--set DBA_SCHEDULER_JOBS.enabled=TRUE, so that it can be scheduled to run (state='RUNNING')
-- dbms_scheduler.enable (c.job_name, commit_semantics =>'ABSORB_ERRORS');
exception when others then null;
end;
end loop;
for c in (select * from dba_scheduler_running_jobs where job_name like '%TEST_JOB%') loop
begin
--If force=FALSE, gracefully stop the job, slave process can update the status of the job in the job queue.
--If force= TRUE, the Scheduler immediately terminates the job slave.
--For repeating job with attribute "start_date => systimestamp" and enabled=TRUE,
--re-start immediate (state changed from 'SCHEDULED' to 'RUNNING'), DBA_SCHEDULER_JOBS.run_count increases 1.
dbms_scheduler.stop_job (c.job_name, force => true, commit_semantics =>'ABSORB_ERRORS');
exception when others then null;
end;
end loop;
for c in (select * from dba_scheduler_jobs where job_name like '%TEST_JOB%') loop
begin
--If force=TRUE, the Scheduler first attempts to stop the running job instances
--(by issuing the STOP_JOB call with the force flag set to false), and then drops the jobs.
dbms_scheduler.drop_job (c.job_name, force => true, commit_semantics =>'ABSORB_ERRORS');
-- if there is still not-killable jobs, try following
--dbms_scheduler.drop_job (c.job_name, force => true);
exception when others then null;
end;
end loop;
end;
/
--exec clearup_test;
-- Simulate CPU Load
create or replace procedure cpu_burner (n number) as
x number := 0;
begin
for i in 1 .. n loop
x := mod (n, 999999) + sqrt (i);
end loop;
end;
/
3.3 Enqueue, Dequeue (listen_deq and subscr_deq)
create or replace procedure enq(p_sub number) as
l_enqueue_options dbms_aq.enqueue_options_t;
l_message_properties dbms_aq.message_properties_t;
l_recipients dbms_aq.aq$_recipient_list_t;
l_message_handle raw(16);
l_message message_typ;
l_return_msg clob;
l_rep_string varchar2(32000);
c_queue_name varchar2(30) := 'K.MSG_QUEUE_MULTIPLE';
l_name varchar2(100);
l_agt_name varchar2(100);
begin
--visibility:
-- dbms_aq.immediate: no commit is required, message is enqueued and committed for dequeue immediately in an autonomous transaction.
-- dbms_aq.on_commit (default): commit is required, message is visibile for dequeue after COMMIT. enqueue session has an open transaction before commit.
l_enqueue_options.visibility := dbms_aq.immediate;
l_enqueue_options.delivery_mode := dbms_aq.persistent;
l_name := 'Message for KSUB_'||p_sub;
----- when dbms_lob used, "ibrary cache: mutex X" on dbms_lob
-- l_rep_string := rpad('ABC', 4000, 'x');
-- dbms_lob.createtemporary(l_return_msg, TRUE, dbms_lob.session);
-- l_return_msg := l_name;
-- write 300 KB CLOB, It will cause heavy "enq: HW - contention" wait event.
-- for i in 1..10 loop
-- dbms_lob.writeappend(l_return_msg, length(l_rep_string), l_rep_string);
-- end loop;
l_return_msg := rpad('ABC', 4000, 'x');
l_message := message_typ(l_name, to_char(sysdate, 'YYYY-MM-DD HH24:MI:SS')||
'. This message is queued for recipient: Subscriber_'||p_sub, l_return_msg);
l_agt_name := 'KSUB_'||p_sub; -- subscriber, M_IDEN
l_recipients(1) := SYS.aq$_agent(l_agt_name, NULL, NULL); --(name, address, protocol)
l_message_properties.recipient_list := l_recipients;
l_message_properties.correlation := l_agt_name;
dbms_aq.enqueue(
queue_name => c_queue_name,
enqueue_options => l_enqueue_options,
message_properties => l_message_properties,
payload => l_message,
msgid => l_message_handle);
--debug('enq '||l_name);
dbms_output.put_line (to_char(sysdate, 'YYYY-MM-DD HH24:MI:SS')||' --- enqueue '||l_name||
', corr_id=' || l_message_properties.correlation||', text=' || l_message.text);
commit; -- enqueue with dbms_aq.immediate, no commit required
end;
/
-- exec enq(1);
-- With using sys.aq$_agent, heavy "latch frree" on "channel operations parent latch" (latch#=98)
-- on a particular child: v$latch_children.CHILD#=144
-- Event: wait for unread message on broadcast channel
create or replace procedure listen_deq(p_sub number, p_wait number := 10) as
l_dequeue_options dbms_aq.dequeue_options_t;
l_message_properties dbms_aq.message_properties_t;
l_message_handle raw(16);
l_message message_typ;
l_listen_list DBMS_AQ.aq$_agent_list_t;
l_agent sys.aq$_agent;
c_queue_name varchar2(30) := 'K.MSG_QUEUE_MULTIPLE';
l_name varchar2(100);
l_agt_name varchar2(100);
l_start_time number;
l_elapsed_time pls_integer := 0;
l_timeout pls_integer := p_wait *100; --centisecond
l_ret varchar2(100);
no_messages exception;
pragma exception_init (no_messages, -25228);
begin
l_agt_name := 'KSUB_'||p_sub;
l_listen_list(1):= sys.aq$_agent(l_agt_name, c_queue_name, null);
l_start_time:= dbms_utility.get_time(); -- this has overflow !
while l_ret is null and (dbms_utility.get_time() - l_start_time) < l_timeout loop
begin
-- dbms_aq.listen can have false wakeup if two or more sessions are waiting with dbms_aq.listen,
-- but only one enqueued message is arrived.
-- when dequeue_options.visibility = dbms_aq.on_commit (default, it has an open transaction, requires commit)
-- listen call will return a false positive when another session has an uncommitted dequeue on a multi-consumer queue.
-- Listen call is supposed to return a true if it finds one message for a subscriber.
-- When there is an uncommitted dequeue, message is still there in queue table. So listen will return true.
-- Also it might be the case that first session rolls back just before commit.
dbms_aq.listen(
agent_list => l_listen_list
,wait => p_wait
,agent => l_agent
);
l_dequeue_options.consumer_name := l_agent.name;
l_dequeue_options.visibility := dbms_aq.immediate; -- for dequeue, when on_commit, there is an open transaction, requires commit
l_dequeue_options.delivery_mode := dbms_aq.persistent;
l_dequeue_options.navigation := dbms_aq.first_message;
l_dequeue_options.wait := DBMS_AQ.NO_WAIT;
dbms_aq.dequeue(
queue_name => c_queue_name,
dequeue_options => l_dequeue_options,
message_properties => l_message_properties,
payload => l_message,
msgid => l_message_handle);
commit;
dbms_output.put_line (to_char(sysdate, 'YYYY-MM-DD HH24:MI:SS')||' >>> dequeue ' || l_message.subject||
', corr_id=' || l_message_properties.correlation||
', text=' || l_message.text|| ' , return_msg Length=' || length(l_message.return_msg) ||
' -- dequeue_options.consumer_name for '||l_agt_name);
--dequeue_options.navigation := DBMS_AQ.NEXT_MESSAGE;
--debug('deq '||l_message.subject);
l_ret := l_message.text;
exit; --DEQUEUE SUC, exit,
exception
when NO_MESSAGES then
dbms_output.put_line (to_char(sysdate, 'YYYY-MM-DD HH24:MI:SS')||' -- Error for '||l_agt_name||', '||SQLERRM);
commit;
when OTHERS then
dbms_output.put_line (to_char(sysdate, 'YYYY-MM-DD HH24:MI:SS')||' -- Error for '||l_agt_name||', '||SQLERRM);
l_ret := SQLERRM;
commit;
end;
end loop;
end;
/
-- exec listen_deq(1, 100);
-- Without using sys.aq$_agent, avoid "latch free" on "channel operations parent latch" (latch#=98)
-- Event: Streams AQ: waiting for messages in the queue
-- Same Event in non-multi-consumer(i.e, single) queue
create or replace procedure subscr_deq(p_sub number, p_wait number := 10) as
l_dequeue_options dbms_aq.dequeue_options_t;
l_message_properties dbms_aq.message_properties_t;
l_message_handle raw(16);
l_message message_typ;
c_queue_name varchar2(30) := 'K.MSG_QUEUE_MULTIPLE';
l_msg_cnt number := 0;
l_try_limit number := 1; --- only dequeue one message
l_try_cnt number := 0;
l_agt_name varchar2(100);
no_messages exception;
pragma exception_init (no_messages, -25228);
begin
l_agt_name := 'KSUB_'||p_sub;
l_dequeue_options.visibility := dbms_aq.immediate;
l_dequeue_options.delivery_mode := dbms_aq.persistent;
l_dequeue_options.wait := p_wait; --DBMS_AQ.NO_WAIT; -- wait 10 seconds
l_dequeue_options.consumer_name := l_agt_name;
l_dequeue_options.navigation := dbms_aq.first_message;
--dequeue_options.navigation := dbms_aq.next_message;
loop
l_try_cnt := l_try_cnt + 1;
begin
dbms_aq.dequeue(
queue_name => c_queue_name,
dequeue_options => l_dequeue_options,
message_properties => l_message_properties,
payload => l_message,
msgid => l_message_handle);
COMMIT;
dbms_output.put_line (to_char(sysdate, 'YYYY-MM-DD HH24:MI:SS')||' >>> dequeue ' || l_message.subject||
', corr_id=' || l_message_properties.correlation||
', text=' || l_message.text|| ' , return_msg Length=' || length(l_message.return_msg)||
' -- dequeue_options.consumer_name for '||l_agt_name);
--debug('deq '||l_message.subject);
l_msg_cnt := l_msg_cnt + 1;
exception when no_messages then
dbms_output.put_line (to_char(sysdate, 'YYYY-MM-DD HH24:MI:SS')||' -- Error for '||l_agt_name||', '||SQLERRM);
commit;
end;
if l_msg_cnt > 0 or l_try_cnt >= l_try_limit then
exit;
end if;
end loop;
end;
/
-- exec subscr_deq(1, 10);
-- create or replace type sys.aq$_agent as object
-- (name varchar2 (30), -- M_IDEN, name of a message producer or consumer
-- address varchar2 (1024), -- address where message must be sent
-- protocol number -- protocol for communication, must be 0
-- );
--
-- alter type aq$_agent modify attribute (name varchar2 (512)) cascade
-- /
--
-- DBMS_AQ.LISTEN (
-- agent_list IN AQ$_AGENT_LIST_T,
-- wait IN BINARY_INTEGER DEFAULT DBMS_AQ.FOREVER,
-- agent OUT SYS.AQ$_AGENT);
3.4 Test Jobs
-- enqueue loop
-- adjust cpu_burner
create or replace procedure enq_loop(p_sub number, p_dur_seconds number) as
l_timeout pls_integer := p_dur_seconds *100; -- centisecond
l_start_time number := dbms_utility.get_time(); -- this has overflow !
l_cnt number := 0;
begin
dbms_application_info.set_action('Enq Job '||p_sub);
while (dbms_utility.get_time() - l_start_time) < l_timeout
loop
cpu_burner(1*1e4);
enq(p_sub);
l_cnt := l_cnt + 1;
end loop;
dbms_output.put_line('Enq Messages = '||l_cnt||', Dureation (seconds) = '||((dbms_utility.get_time() - l_start_time)/100));
end;
/
-- exec enq_loop(1, 10);
-- adjust cpu_burner
create or replace procedure start_enq_jobs(p_cnt number, p_dur_seconds number) as
l_job_name varchar2(50);
begin
for i in 1..p_cnt loop
l_job_name := 'TEST_JOB_ENQ_'||i;
dbms_scheduler.create_job (
job_name => l_job_name,
job_type => 'PLSQL_BLOCK',
job_action =>
'begin
enq_loop('||i||', '||p_dur_seconds||');
end;',
start_date => systimestamp,
--repeat_interval => 'systimestamp',
auto_drop => true,
enabled => true);
end loop;
end;
/
-- dequeue loop
-- adjust cpu_burner
create or replace procedure sel_deq_loop(p_sub number, p_sel number := 1, p_dur_seconds number, p_wait number := 10) as
l_timeout pls_integer := p_dur_seconds *100; --centisecond
l_start_time number := dbms_utility.get_time(); -- this has overflow !
l_cnt number := 0;
begin
dbms_application_info.set_action('Deq Job '||p_sub);
while (dbms_utility.get_time() - l_start_time) < l_timeout
loop
cpu_burner(1*1e2);
if p_sel = 1 then
listen_deq(p_sub, p_wait);
else
subscr_deq(p_sub, p_wait);
end if;
l_cnt := l_cnt + 1;
end loop;
if p_sel = 1 then
dbms_output.put_line('Listener Deq Execs (SUC and Failed) = '||l_cnt||', Duration (seconds) = '||((dbms_utility.get_time() - l_start_time)/100));
else
dbms_output.put_line('Subscriber Deq Execs (SUC and Failed) = '||l_cnt||', Duration (seconds) = '||((dbms_utility.get_time() - l_start_time)/100));
end if;
end;
/
-- exec sel_deq_loop(1, 1, 10); -- listen dequeue
-- exec sel_deq_loop(1, 2, 10); -- subscriber dequeue
create or replace procedure start_deq_jobs(p_cnt number, p_sel number := 1, p_dur_seconds number) as
l_job_name varchar2(50);
begin
for i in 1..p_cnt loop
l_job_name := 'TEST_JOB_DEQ_'||i;
dbms_scheduler.create_job (
job_name => l_job_name,
job_type => 'PLSQL_BLOCK',
job_action =>
'begin
sel_deq_loop('||i||','||p_sel||', '||p_dur_seconds||');
end;',
start_date => systimestamp,
--repeat_interval => 'systimestamp',
auto_drop => true,
enabled => true);
end loop;
end;
/
3.5 Test Run: listen vs. subscriber
create or replace procedure test_run (p_jobs number, p_sel number, p_dur_seconds number, is_awr number := 0) as
begin
clearup_test;
purge_queue_table;
if is_awr > 0 then SYS.DBMS_WORKLOAD_REPOSITORY.create_snapshot('ALL'); end if;
start_enq_jobs(p_jobs, p_dur_seconds);
start_deq_jobs(p_jobs, p_sel, p_dur_seconds); -- listen dequeue
-- after 10 minutes, stop, get AWR
dbms_session.sleep(p_dur_seconds);
if is_awr > 0 then SYS.DBMS_WORKLOAD_REPOSITORY.create_snapshot('ALL'); end if;
clearup_test;
end;
/
-- Start test run for 10 minutes
-- exec test_run(80, 1, 10*60, 1); -- listen dequeue
-- exec test_run(80, 2, 10*60, 1); -- subscriber dequeue