Monday, April 10, 2023

Oracle Multi-Consumer AQ dequeue with dbms_aq.listen Wait Events (I)

Wait Events (I)           Performance Model (II)           Performance Analysis and Workaround (III)     


We will compose three Blogs to discuss Performance of Oracle Multi-Consumer AQ dequeue with dbms_aq.listen.

The first Blog performs test to show Wait Events: "library cache: mutex X" on queue object and "latch free" on Latch "channel operations parent latch".

The second Blog (II) build Performance Model and reveals implementation problems.

The third Blog (III) traces dequeue operation with different number of parallel concurrent sessions, and conclude with the performance Analysis.

Note: tested on Oracle 19.17 in Linux and 19.7 on Solaris


1. Test Setup


We create one multiple_consumer queue, then compare two dequeue methods: listen_deq and subscr_deq.
listen_deq is using dbms_aq.listen, subscr_deq is with subscriber.

At first, we create 100 subscribers (see appended Test Code):

   exec create_subscribers('msg_queue_multiple', 100);


2. Test Run


We run two tests as follows:

   exec test_run(80, 1, 10*60, 1);   -- listen dequeue 
   
   exec test_run(80, 2, 10*60, 1);   -- subscriber dequeue
It first starts 80 enqueue Jobs and 80 listen_deq Jobs for 10 minutes, gets AWR.
Then starts 80 enqueue Jobs and 80 subscr_deq Jobs for 10 minutes, gets AWR.


2.1 listen_deq


AWR shows heavy "library cache: mutex X" waits on queue object: msg_queue_multiple:

Top 10 Foreground Events by Total Wait Time

  Event                   Waits     Total Wait Time (sec) Avg Wait  % DB time Wait Class
  ----------------------- --------- --------------------- --------- --------- ----------
  library cache: mutex X  4,075,073 9538.6                2.34ms    14.5      Concurrency
  latch free                 68,333 1195.5                17.49ms   1.8       Other

Top Event P1/P2/P3 Values

  Event                   % Event P1,           P2,   P3 Values           % Activity  Parameter 1 Parameter 2 Parameter 3
  ----------------------- ------- ------------- ----- ------------------- ----------- ----------- ----------- -----------
  library cache: mutex X  14.28   "198468696",  "0",  "23028197302599684" 0.47        idn         value       where
  latch free               2.18   "3012157976", "98", "3012157904"        2.16        address     number      why


Note: "library cache: mutex X" on idn=198468696, V$DB_OBJECT_CACHE.name='MSG_QUEUE_MULTIPLE', V$DB_OBJECT_CACHE.hash_value=198468696.

      "latch free" on Latch "channel operations parent latch" (V$LATCH.latch#=98), child: 144 (V$LATCH_CHILDREN.child#=144 for latch#=98).


2.2 subscr_deq


AWR shows less "library cache: mutex X" waits on queue object: msg_queue_multiple,
and no "latch free" on Latch "channel operations parent latch" is listed in Top 10 Events.

Top 10 Foreground Events by Total Wait Time

  Event                   Waits     Total Wait Time (sec) Avg Wait  % DB time Wait Class
  ----------------------- --------- --------------------- --------- --------- ----------
  library cache: mutex X  1,070,942 3235                  3.02ms    3.5       Concurrency

Top Event P1/P2/P3 Values

  Event                   % Event P1,           P2,   P3 Values           % Activity  Parameter 1 Parameter 2 Parameter 3
  ----------------------- ------- ------------- ----- ------------------- ----------- ----------- ----------- -----------
  library cache: mutex X  3.40    "198468696",  "0",  "23028197302599684" 0.26        idn         value       where


3. Test Code



3.1 Queue Table, Queue and Subscribers



--  Oracle Advanced Queuing by Example (https://docs.oracle.com/cd/B10500_01/appdev.920/a96587/apexampl.htm)

execute dbms_aqadm.stop_queue (queue_name         => 'msg_queue_multiple');

execute dbms_aqadm.drop_queue (queue_name         => 'msg_queue_multiple');

execute dbms_aqadm.drop_queue_table (queue_table  => 'MultiConsumerMsgs_qtab', force=> TRUE);

drop type Message_typ force;

/* Creating a message type: */
create or replace noneditionable type message_typ as object (
	subject     varchar2(30),
	text        varchar2(256),
	return_msg  clob);
/   

----- 2. Creating a Multiconsumer Queue Table and Queue

begin
  dbms_aqadm.create_queue_table (
		queue_table        => 'MultiConsumerMsgs_qtab',
		multiple_consumers => TRUE, 
		compatible        => '10.0.0',
		sort_list         =>  'PRIORITY,ENQ_TIME',
		queue_payload_type => 'Message_typ');
end;
/

begin
  dbms_aqadm.create_queue (
		queue_name         => 'msg_queue_multiple',
		queue_table        => 'MultiConsumerMsgs_qtab',
	  max_retries        =>   5,
    retry_delay        =>   2,
    retention_time     =>   86400);
end;
/

-- exec dbms_aqadm.alter_queue('MSG_QUEUE_MULTIPLE', 10, 2, 86400);

begin
  dbms_aqadm.start_queue (
		queue_name         => 'msg_queue_multiple');
end;
/


create or replace procedure purge_queue_table as
	po dbms_aqadm.aq$_purge_options_t;
begin
	dbms_aqadm.purge_queue_table('MultiConsumerMsgs_qtab', null, po);
end;
/

create or replace procedure create_subscribers(p_queue_name varchar2, p_cnt number) as
   l_subscriber sys.aq$_agent;
begin
   for i in 1..p_cnt loop
     begin
		   l_subscriber := sys.aq$_agent('KSUB_'||i, null, null);
		   dbms_aqadm.add_subscriber(queue_name => p_queue_name, subscriber => l_subscriber);
	   exception when others then
	      dbms_output.put_line ('Error '||sqlerrm);
	      commit;
	   end;
   end loop;
end;
/

--  Test
--exec create_subscribers('msg_queue_multiple', 100);

create or replace procedure remove_subscribers(p_queue_name varchar2, p_cnt number) as
   l_subscriber SYS.aq$_agent;
begin
	  for idx in (select consumer_name from dba_queue_subscribers a where a.queue_name = p_queue_name
	                                                                --and consumer_name like 'KSUB%'
	  ) loop
	    l_subscriber := sys.aq$_agent(idx.consumer_name, null, null);
	    dbms_aqadm.remove_subscriber(p_queue_name, l_subscriber);
	  end loop;
end;
/

-- exec remove_subscribers('MSG_QUEUE_MULTIPLE', 100);


3.2 Job clearup Helper and cpu_burner simulator



create or replace procedure clearup_test as
begin
  for c in (select * from dba_scheduler_jobs where job_name like '%TEST_JOB%') loop
    begin
      --set DBA_SCHEDULER_JOBS.enabled=FALSE
	    dbms_scheduler.disable (c.job_name, force => true, commit_semantics =>'ABSORB_ERRORS');
	    --set DBA_SCHEDULER_JOBS.enabled=TRUE, so that it can be scheduled to run (state='RUNNING')
	    --  dbms_scheduler.enable (c.job_name, commit_semantics =>'ABSORB_ERRORS');
	  exception when others then null;
	  end;
	end loop;
	
  for c in (select * from dba_scheduler_running_jobs where job_name like '%TEST_JOB%') loop
    begin
      --If force=FALSE, gracefully stop the job, slave process can update the status of the job in the job queue.
      --If force= TRUE, the Scheduler immediately terminates the job slave.
      --For repeating job with attribute "start_date => systimestamp" and enabled=TRUE, 
      --re-start immediate (state changed from 'SCHEDULED' to 'RUNNING'), DBA_SCHEDULER_JOBS.run_count increases 1.
	    dbms_scheduler.stop_job (c.job_name, force => true, commit_semantics =>'ABSORB_ERRORS');
	  exception when others then null;
	  end;
	end loop;
	
  for c in (select * from dba_scheduler_jobs where job_name like '%TEST_JOB%') loop
    begin
      --If force=TRUE, the Scheduler first attempts to stop the running job instances 
      --(by issuing the STOP_JOB call with the force flag set to false), and then drops the jobs.
	    dbms_scheduler.drop_job (c.job_name, force => true, commit_semantics =>'ABSORB_ERRORS');
	    -- if there is still not-killable jobs, try following
	    --dbms_scheduler.drop_job (c.job_name, force => true);
	  exception when others then null;
	  end;
	end loop;
end;
/

--exec clearup_test;

-- Simulate CPU Load
create or replace procedure cpu_burner (n number) as
  x   number := 0;
begin
  for i in 1 .. n loop
    x := mod (n, 999999) + sqrt (i);
  end loop;
end;
/


3.3 Enqueue, Dequeue (listen_deq and subscr_deq)



create or replace procedure enq(p_sub number) as
   l_enqueue_options     dbms_aq.enqueue_options_t;
   l_message_properties  dbms_aq.message_properties_t;
   l_recipients          dbms_aq.aq$_recipient_list_t;
   l_message_handle      raw(16);
   l_message             message_typ;
   l_return_msg          clob; 
   l_rep_string          varchar2(32000);
   c_queue_name          varchar2(30) := 'K.MSG_QUEUE_MULTIPLE';
   l_name                varchar2(100);
   l_agt_name            varchar2(100);
begin
   --visibility:
     --   dbms_aq.immediate: no commit is required, message is enqueued and committed for dequeue immediately in an autonomous transaction.
     --   dbms_aq.on_commit (default): commit is required, message is visibile for dequeue after COMMIT. enqueue session has an open transaction before commit.
     
   l_enqueue_options.visibility    := dbms_aq.immediate;
   l_enqueue_options.delivery_mode := dbms_aq.persistent;
   
   l_name := 'Message for KSUB_'||p_sub;
   
   ----- when dbms_lob used, "ibrary cache: mutex X" on dbms_lob
   -- l_rep_string := rpad('ABC', 4000, 'x');
   -- dbms_lob.createtemporary(l_return_msg, TRUE, dbms_lob.session);    
   -- l_return_msg := l_name;   
   -- write 300 KB CLOB, It will cause heavy "enq: HW - contention" wait event.
   --  for i in 1..10 loop
   --    dbms_lob.writeappend(l_return_msg, length(l_rep_string), l_rep_string);
   --  end loop;
   
   l_return_msg :=  rpad('ABC', 4000, 'x');
   l_message := message_typ(l_name, to_char(sysdate, 'YYYY-MM-DD HH24:MI:SS')||
                        '. This message is queued for recipient: Subscriber_'||p_sub, l_return_msg);
   l_agt_name := 'KSUB_'||p_sub;    -- subscriber, M_IDEN
	 l_recipients(1) := SYS.aq$_agent(l_agt_name,  NULL, NULL);     --(name, address, protocol)
   l_message_properties.recipient_list := l_recipients;
   l_message_properties.correlation := l_agt_name;

   dbms_aq.enqueue(
     queue_name         => c_queue_name,
     enqueue_options    => l_enqueue_options,
     message_properties => l_message_properties,
     payload            => l_message,
     msgid              => l_message_handle);
   
   --debug('enq '||l_name);
   dbms_output.put_line (to_char(sysdate, 'YYYY-MM-DD HH24:MI:SS')||' --- enqueue '||l_name||
                         ', corr_id=' || l_message_properties.correlation||', text=' || l_message.text);
   commit;    -- enqueue with dbms_aq.immediate, no commit required
end;
/

-- exec enq(1);

-- With using sys.aq$_agent, heavy "latch frree" on "channel operations parent latch" (latch#=98) 
--    on a particular child: v$latch_children.CHILD#=144
--    Event: wait for unread message on broadcast channel
create or replace procedure listen_deq(p_sub number, p_wait number := 10) as
   l_dequeue_options     dbms_aq.dequeue_options_t;
   l_message_properties  dbms_aq.message_properties_t;
   l_message_handle      raw(16);
   l_message             message_typ;
   l_listen_list         DBMS_AQ.aq$_agent_list_t;
   l_agent               sys.aq$_agent;
   c_queue_name          varchar2(30) := 'K.MSG_QUEUE_MULTIPLE';
   l_name                varchar2(100);
   l_agt_name            varchar2(100);
   l_start_time          number; 
   l_elapsed_time        pls_integer := 0;
   l_timeout             pls_integer := p_wait *100;   --centisecond
   l_ret                 varchar2(100);
   no_messages           exception;
   pragma exception_init (no_messages, -25228);
begin
    l_agt_name := 'KSUB_'||p_sub;
    l_listen_list(1):= sys.aq$_agent(l_agt_name, c_queue_name, null);
    l_start_time:= dbms_utility.get_time();    -- this has overflow !
    
    while l_ret is null and (dbms_utility.get_time() - l_start_time) < l_timeout loop
	    begin
	      -- dbms_aq.listen can have false wakeup if two or more sessions are waiting with dbms_aq.listen, 
	      -- but only one enqueued message is arrived.
          
          -- when dequeue_options.visibility = dbms_aq.on_commit (default, it has an open transaction, requires commit)
          -- listen call will return a false positive when another session has an uncommitted dequeue on a multi-consumer queue.
          -- Listen call is supposed to return a true if it finds one message for a subscriber.
          -- When there is an uncommitted dequeue, message is still there in queue table. So listen will return true.
          -- Also it might be the case that first session rolls back just before commit.
 
		    dbms_aq.listen(
		        agent_list => l_listen_list
		       ,wait       => p_wait
		       ,agent      => l_agent
		      );    
		      
		    l_dequeue_options.consumer_name := l_agent.name;
		    l_dequeue_options.visibility    := dbms_aq.immediate;    -- for dequeue, when on_commit, there is an open transaction, requires commit
		    l_dequeue_options.delivery_mode := dbms_aq.persistent;
		    l_dequeue_options.navigation    := dbms_aq.first_message;
		    l_dequeue_options.wait          := DBMS_AQ.NO_WAIT;
		    
			  dbms_aq.dequeue(
			    queue_name         => c_queue_name,
			    dequeue_options    => l_dequeue_options,
			    message_properties => l_message_properties,
			    payload            => l_message,
			    msgid              => l_message_handle);
			  commit;
			  
			  dbms_output.put_line (to_char(sysdate, 'YYYY-MM-DD HH24:MI:SS')||' >>> dequeue ' || l_message.subject||
			                        ', corr_id=' || l_message_properties.correlation||
			                        ', text=' || l_message.text|| ' , return_msg Length=' || length(l_message.return_msg) ||
			                        ' -- dequeue_options.consumer_name for '||l_agt_name);
			  --dequeue_options.navigation := DBMS_AQ.NEXT_MESSAGE;
			  --debug('deq '||l_message.subject);
			  l_ret := l_message.text;
			  exit;  --DEQUEUE SUC, exit, 
			  
			  exception 
			    when NO_MESSAGES then
			      dbms_output.put_line (to_char(sysdate, 'YYYY-MM-DD HH24:MI:SS')||' -- Error for '||l_agt_name||', '||SQLERRM);
			      commit;
			    when OTHERS then
			      dbms_output.put_line (to_char(sysdate, 'YYYY-MM-DD HH24:MI:SS')||' -- Error for '||l_agt_name||', '||SQLERRM);
			      l_ret := SQLERRM;
			      commit;
			end;
	end loop;
end;
/

-- exec listen_deq(1, 100);

-- Without using sys.aq$_agent, avoid "latch free" on "channel operations parent latch" (latch#=98)
--     Event: Streams AQ: waiting for messages in the queue
--     Same Event in non-multi-consumer(i.e, single) queue
create or replace procedure subscr_deq(p_sub number, p_wait number := 10) as
   l_dequeue_options     dbms_aq.dequeue_options_t;
   l_message_properties  dbms_aq.message_properties_t;
   l_message_handle      raw(16);
   l_message             message_typ;
   c_queue_name          varchar2(30) := 'K.MSG_QUEUE_MULTIPLE';
   l_msg_cnt             number := 0;
   l_try_limit           number := 1;      --- only dequeue one message
   l_try_cnt             number := 0;
   l_agt_name            varchar2(100);
   no_messages           exception;
   pragma exception_init (no_messages, -25228);
begin
  l_agt_name :=  'KSUB_'||p_sub;
  l_dequeue_options.visibility    := dbms_aq.immediate;
  l_dequeue_options.delivery_mode := dbms_aq.persistent;
  l_dequeue_options.wait          := p_wait;     --DBMS_AQ.NO_WAIT;   -- wait 10 seconds
  l_dequeue_options.consumer_name := l_agt_name;
	l_dequeue_options.navigation    := dbms_aq.first_message;
	--dequeue_options.navigation    := dbms_aq.next_message;   
	
	loop
	  l_try_cnt := l_try_cnt + 1;
	  begin
	    dbms_aq.dequeue(
	      queue_name         => c_queue_name,
	      dequeue_options    => l_dequeue_options,
	      message_properties => l_message_properties,
	      payload            => l_message,
	      msgid              => l_message_handle);
	    COMMIT;
			dbms_output.put_line (to_char(sysdate, 'YYYY-MM-DD HH24:MI:SS')||' >>> dequeue ' || l_message.subject||
			                        ', corr_id=' || l_message_properties.correlation||
			                        ', text=' || l_message.text|| ' , return_msg Length=' || length(l_message.return_msg)||
			                        ' -- dequeue_options.consumer_name for '||l_agt_name);
			
	    --debug('deq '||l_message.subject);
	    l_msg_cnt := l_msg_cnt + 1;
	    
	    exception when no_messages then
	      dbms_output.put_line (to_char(sysdate, 'YYYY-MM-DD HH24:MI:SS')||' -- Error for '||l_agt_name||', '||SQLERRM);
	      commit;
    end;

	  if l_msg_cnt > 0 or l_try_cnt >= l_try_limit then 
	    exit;
	  end if; 
	end loop;  
end;
/

-- exec subscr_deq(1, 10);


--  create or replace type sys.aq$_agent as object
--  (name     varchar2 (30),     -- M_IDEN, name of a message producer or consumer
--   address  varchar2 (1024),   -- address where message must be sent
--   protocol number             -- protocol for communication, must be 0
--  );
--  
--  alter type aq$_agent modify attribute (name varchar2 (512)) cascade
--  /
-- 
--  DBMS_AQ.LISTEN (
--     agent_list            IN    AQ$_AGENT_LIST_T,
--     wait                  IN    BINARY_INTEGER DEFAULT DBMS_AQ.FOREVER,
--     agent                 OUT   SYS.AQ$_AGENT);


3.4 Test Jobs



-- enqueue loop
-- adjust cpu_burner
create or replace procedure enq_loop(p_sub number, p_dur_seconds number) as
  l_timeout    pls_integer := p_dur_seconds *100;    -- centisecond
  l_start_time number := dbms_utility.get_time();    -- this has overflow !
  l_cnt        number := 0;
begin
  dbms_application_info.set_action('Enq Job '||p_sub);
  while (dbms_utility.get_time() - l_start_time) < l_timeout
  loop 
    cpu_burner(1*1e4);
    enq(p_sub);
    l_cnt := l_cnt + 1;
  end loop;
  dbms_output.put_line('Enq Messages = '||l_cnt||', Dureation (seconds) = '||((dbms_utility.get_time() - l_start_time)/100));
end;
/

-- exec enq_loop(1, 10);

-- adjust cpu_burner
create or replace procedure start_enq_jobs(p_cnt number, p_dur_seconds number) as
  l_job_name varchar2(50);
begin
  for i in 1..p_cnt loop
    l_job_name := 'TEST_JOB_ENQ_'||i;
    dbms_scheduler.create_job (
      job_name        => l_job_name,
      job_type        => 'PLSQL_BLOCK',
      job_action      => 
        'begin 
           enq_loop('||i||', '||p_dur_seconds||');
        end;',    
      start_date      => systimestamp,
      --repeat_interval => 'systimestamp',
      auto_drop       => true,
      enabled         => true);
  end loop;
end;
/

-- dequeue loop
-- adjust cpu_burner
create or replace procedure sel_deq_loop(p_sub number, p_sel number := 1, p_dur_seconds number, p_wait number := 10) as
  l_timeout    pls_integer := p_dur_seconds *100;    --centisecond
  l_start_time number := dbms_utility.get_time();    -- this has overflow !
  l_cnt        number := 0;
begin
  dbms_application_info.set_action('Deq Job '||p_sub);
  while (dbms_utility.get_time() - l_start_time) < l_timeout
  loop
    cpu_burner(1*1e2);
    if p_sel = 1 then
      listen_deq(p_sub, p_wait);
    else
      subscr_deq(p_sub, p_wait);
    end if; 
    l_cnt := l_cnt + 1; 
  end loop;
  
  if p_sel = 1 then
    dbms_output.put_line('Listener Deq Execs (SUC and Failed) = '||l_cnt||', Duration (seconds) = '||((dbms_utility.get_time() - l_start_time)/100));
  else
    dbms_output.put_line('Subscriber Deq Execs (SUC and Failed) = '||l_cnt||', Duration (seconds) = '||((dbms_utility.get_time() - l_start_time)/100));
  end if;
end;
/

-- exec sel_deq_loop(1, 1, 10);   -- listen dequeue 
-- exec sel_deq_loop(1, 2, 10);   -- subscriber dequeue

create or replace procedure start_deq_jobs(p_cnt number, p_sel number := 1, p_dur_seconds number) as
  l_job_name varchar2(50);
begin
  for i in 1..p_cnt loop
    l_job_name := 'TEST_JOB_DEQ_'||i;
    dbms_scheduler.create_job (
      job_name        => l_job_name,
      job_type        => 'PLSQL_BLOCK',
      job_action      => 
        'begin 
           sel_deq_loop('||i||','||p_sel||', '||p_dur_seconds||');
        end;',    
      start_date      => systimestamp,
      --repeat_interval => 'systimestamp',
      auto_drop       => true,
      enabled         => true);
  end loop;
end;
/


3.5 Test Run: listen vs. subscriber



create or replace procedure test_run (p_jobs number, p_sel number, p_dur_seconds number, is_awr number := 0) as
begin
  clearup_test;
  purge_queue_table;
  
  if is_awr > 0 then SYS.DBMS_WORKLOAD_REPOSITORY.create_snapshot('ALL'); end if;
  start_enq_jobs(p_jobs, p_dur_seconds);
  start_deq_jobs(p_jobs, p_sel, p_dur_seconds);     -- listen dequeue 
  
  -- after 10 minutes, stop, get AWR
  dbms_session.sleep(p_dur_seconds);
  
  if is_awr > 0 then SYS.DBMS_WORKLOAD_REPOSITORY.create_snapshot('ALL'); end if;
  clearup_test;
end;
/


-- Start test run for 10 minutes
--   exec test_run(80, 1, 10*60, 1);   -- listen dequeue 
--   exec test_run(80, 2, 10*60, 1);   -- subscriber dequeue