
    Ihk                       d Z ddlmZ ddlZddlZddlZddlZddlmZm	Z	m
Z
mZ ddlmZ ddlmZ ddlmZ ddlmZ dd	lmZmZ dd
lmZmZ ddlmZ ddlmZ ddlmZmZm Z  ddl!m"Z" ddl#m$Z$ ddl%m&Z& ddl'm(Z(m)Z) ddl'm*Z+ ddl,m*Z- ddl.m*Z/ ddl0m1Z2 ddl3m4Z4 ddl5m6Z6 ddl7m8Z8m9Z9 ddl:m;Z;m<Z< ddl=m>Z> ddl?m@Z@  e6d      ZA eBej                        h dz
  ZD eEd       eEd      ieDD  ci c]  }  eE|        eEd       c} ZF G d  d!      ZG G d" d#      ZHej                   G d$ d%             ZJ G d& d'e@j                        ZK G d( d)e@j                        ZLyc c} w )*ap  GCP Pub/Sub transport module for kombu.

More information about GCP Pub/Sub:
https://cloud.google.com/pubsub

Features
========
* Type: Virtual
* Supports Direct: Yes
* Supports Topic: No
* Supports Fanout: Yes
* Supports Priority: No
* Supports TTL: No

Connection String
=================

Connection string has the following formats:

.. code-block::

    gcpubsub://projects/project-name

Transport Options
=================
* ``queue_name_prefix``: (str) Prefix for queue names.
* ``ack_deadline_seconds``: (int) The maximum time after receiving a message
  and acknowledging it before pub/sub redelivers the message.
* ``expiration_seconds``: (int) Subscriptions without any subscriber
  activity or changes made to their properties are removed after this period.
  Examples of subscriber activities include open connections,
  active pulls, or successful pushes.
* ``wait_time_seconds``: (int) The maximum time to wait for new messages.
  Defaults to 10.
* ``retry_timeout_seconds``: (int) The maximum time to wait before retrying.
* ``bulk_max_messages``: (int) The maximum number of messages to pull in bulk.
  Defaults to 32.
    )annotationsN)FIRST_COMPLETEDFutureThreadPoolExecutorwait)suppress)getpid)Empty)Lock)	monotonicsleep)NAMESPACE_OIDuuid3)gethostnametimeout)AlreadyExistsDeadlineExceededPermissionDenied)Retry)monitoring_v3)query)PublisherClientSubscriberClient)
exceptions)gapic_version)TRANSIENT_DELIVERY_MODE)
get_logger)bytes_to_strsafe_str)dumpsloads)cached_property   )virtualzkombu.transport.gcpubsub>   _-.r(   r'   r&   c                  >    e Zd ZdZd Zd Zd
dZddZd Zd Z	d Z
y	)
UnackedIdszThreadsafe list of ack_ids.c                0    g | _         t               | _        y N)_listr   _lockselfs    Q/var/www/html/planif/env/lib/python3.12/site-packages/kombu/transport/gcpubsub.py__init__zUnackedIds.__init__Z   s    
V
    c                :    | j                   j                  |       y r,   )r-   appendr0   vals     r1   r5   zUnackedIds.append^   s    

#r3   c                :    | j                   j                  |       y r,   )r-   extend)r0   valss     r1   r9   zUnackedIds.extendb   s    

$r3   c                |    | j                   5  | j                  j                  |      cd d d        S # 1 sw Y   y xY wr,   )r.   r-   pop)r0   indexs     r1   r<   zUnackedIds.popf   s.    ZZ 	)::>>%(	) 	) 	)s   2;c                    | j                   5  t        t              5  | j                  j	                  |       d d d        d d d        y # 1 sw Y   xY w# 1 sw Y   y xY wr,   )r.   r   
ValueErrorr-   remover6   s     r1   r@   zUnackedIds.removej   sL    ZZ 	#*- 	#JJc"	# 	# 	# 	# 	# 	#s!   AA
A
A	AAc                p    | j                   5  t        | j                        cd d d        S # 1 sw Y   y xY wr,   )r.   lenr-   r/   s    r1   __len__zUnackedIds.__len__n   s)    ZZ 	#tzz?	# 	# 	#s   ,5c                     | j                   |   S r,   )r-   )r0   items     r1   __getitem__zUnackedIds.__getitem__r   s    zz$r3   N)r:   list))__name__
__module____qualname____doc__r2   r5   r9   r<   r@   rC   rF    r3   r1   r*   r*   W   s(    % )## r3   r*   c                  .    e Zd ZdZddZddZddZd Zy)	AtomicCounterzIThreadsafe counter.

    Returns the value after inc/dec operations.
    c                0    || _         t               | _        y r,   )_valuer   r.   )r0   initials     r1   r2   zAtomicCounter.__init__}   s    V
r3   c                    | j                   5  | xj                  |z  c_        | j                  cd d d        S # 1 sw Y   y xY wr,   r.   rQ   r0   ns     r1   inczAtomicCounter.inc   3    ZZ 	KK1K;;	 	 		   !8Ac                    | j                   5  | xj                  |z  c_        | j                  cd d d        S # 1 sw Y   y xY wr,   rT   rU   s     r1   deczAtomicCounter.dec   rX   rY   c                ^    | j                   5  | j                  cd d d        S # 1 sw Y   y xY wr,   rT   r/   s    r1   getzAtomicCounter.get   s%    ZZ 	;;	 	 	s   #,N)r   )r$   )rI   rJ   rK   rL   r2   rW   r[   r]   rM   r3   r1   rO   rO   w   s    


r3   rO   c                  j    e Zd ZU dZded<   ded<   ded<   ded<    ej                  e      Zded	<   y
)QueueDescriptorzPub/Sub queue descriptor.strname
topic_pathsubscription_idsubscription_path)default_factoryr*   unacked_idsN)	rI   rJ   rK   rL   __annotations__dataclassesfieldr*   rf   rM   r3   r1   r_   r_      s2    #
IO/k//
KKKr3   r_   c                  R    e Zd ZU dZdZdZdZdZdZdZ	dZ
dZ e       Zd	Zd
ed<    ej"                         Z e       Zi Zded<    e       Zded<    fdZefd1dZd Z	 d2	 	 	 	 	 	 	 d3dZd4dZ	 	 	 	 	 	 d5	 	 	 	 	 	 	 	 	 	 	 d6dZd Zd Zd Z d2d7dZ!d8dZ"d7dZ#d9dZ$d2 fd	Z%d:dZ&d; fd	Z'd<d Z(d=d!Z)d" Z*d=d#Z+e,d$        Z-e,d%        Z.e,d&        Z/e0d'        Z1e0d(        Z2e,d)        Z3e,d*        Z4e,d+        Z5e,d,        Z6e,d-        Z7e,d.        Z8 fd/Z9e:d0        Z; xZ<S )>ChannelzGCP Pub/Sub channel.TF
      iQ i,      Nzthreading.Thread_unacked_extenderzdict[str, QueueDescriptor]_queue_cachezset[str]_tmp_subscriptionsc                   t        |   |i | t               | _        t        j                  d| j                  j                         t        j                  | j                  j                        | _
        | j                  j                         dk(  rct        j                  | j                  d      t         _        | j$                  j'                          t         j"                  j)                          y y )Nznew GCP pub/sub channel: %sr$   T)targetdaemon)superr2   r   poolloggerinfoconninfohostname	Transport	parse_uri
project_id_n_channelsrW   	threadingThread_extend_unacked_deadlinerk   ro   _stop_extenderclearstart)r0   argskwargs	__class__s      r1   r2   zChannel.__init__   s    $)&)&(	14==3I3IJ#--dmm.D.DE!Q&(1(8(844)G% %%'%%++- 'r3   c                    |j                  | j                        s| j                  |z   }t        t        |            j	                  |      S )z7Format AMQP queue name into a valid Pub/Sub queue name.)
startswithqueue_name_prefixr`   r    	translate)r0   ra   tables      r1   entity_namezChannel.entity_name   s>    t556))D0D8D>",,U33r3   c                   | j                  |      j                  }| j                  |      }t        j	                  d||||       i }|dk(  r;dd| di}| j
                  j                  | j                  |      }| j                  }n|dk(  rt        t        t                dt                       }	| d|	 }
| j
                  j                  | j                  |
      }| j                  j                  |       | j                  j                  |       d	}nt!        d
| d      | j#                  | j                  ||      }| j%                  ||||       t'        ||||      }|| j(                  |<   y )Nz9binding queue: %s to %s exchange: %s with routing_key: %sdirectfilterzattributes.routing_key=""fanoutr(   r'   iX  zexchange type z not implemented)rb   rd   filter_argsmsg_retention)ra   rb   rc   rd   )typeoftyper   rw   debug
subscriberrd   r}   expiration_secondsr   r   r   r	   rq   add_fanout_exchangesNotImplementedError_create_topic_create_subscriptionr_   rp   )r0   exchangerouting_keypatternqueueexchange_typer   rd   message_retention_durationuiduniq_sub_nameexchange_topicqdescs                r1   _queue_bindzChannel._queue_bind   s   H-22  'G	
 H$ 4[MCK !% A A! *.)@)@&h& =[]O1VXJ*GHIC$gQse,M $ A A! ##''(9:""&&x0),&% /?@  ++OOX'A
 	!!%/#4	 	" 	
  %!/	
 $)% r3   c                2   | j                   j                  ||      }| j                  |      rt        j	                  d|       |S 	 t        j	                  d|       d|i}|r| d|d<   | j                   j                  |       |S # t        $ r Y |S w xY w)Nztopic: %s existszcreating topic: %sra   sr   request)	publisherrb   _is_topic_existsrw   r   create_topicr   )r0   r}   topic_idr   rb   r   s         r1   r   zChannel._create_topic  s     ^^..z8D
  ,LL+Z8		LL-z:z*G) 22!4 0 NN'''8   		s   A B	 		BBc                    | j                   j                  dd| j                   i      }|D ]  }|j                  |k(  s y y)Nprojectz	projects/r   TF)r   list_topicsr}   ra   )r0   rb   topicsts       r1   r   zChannel._is_topic_exists  sT    ++)DOO+< => , 
  	Avv#	 r3   c                   |xs& | j                   j                  | j                  |      }|xs | j                  j	                  ||      }	 t
        j                  d|||       |xs | j                  }| j                   j                  ||| j                  d| j                   di| dd|xs i        |S # t        $ r Y |S w xY w)Nz0creating subscription: %s, topic: %s, filter: %sttlr   )ra   topicack_deadline_secondsexpiration_policyr   r   )r   rd   r}   r   rb   rw   r   r   create_subscriptionr   r   )r0   r}   r   rb   rd   r   r   s          r1   r   zChannel._create_subscription#  s      L00(K 	   
4>>#<#<$

	LLB!	 *DT-D-DMOO//-',0,E,E$"9"9!:!<* 6CO12E	 #(b	 0  !   	  	s   A,B: :	CCc                   | j                  |      }t        j                  d|       | j                  j	                  |      }|sy| j
                  j                  d|j                  i       | j                  j                  |d       y)zDelete a queue by name.zdeleting queue: %sNsubscriptionr   )	r   rw   rx   rp   r]   r   delete_subscriptionrd   r<   )r0   r   r   r   r   s        r1   _deletezChannel._deleteK  sz      '(%0!!%%e,++#U%<%<= 	, 	
 	eT*r3   c                .   | j                  |      }| j                  |   }| j                  |      }t        j	                  d||j
                  |       t        |      }| j                  j                  |j
                  |j                  d      |       y)zPut a message onto the queue.z8putting message to queue: %s, topic: %s, routing_key: %sutf-8)r   N)
r   rp   _get_routing_keyrw   r   rb   r!   r   publishencode)r0   r   messager   r   r   encoded_messages          r1   _putzChannel._putW  s      '!!%(++G4F		
  .""7+# 	 	
r3   c                :   | j                  ||       | j                  j                  | j                  |      }t        j                  d||       t        |      }| j                  j                  ||j                  d      t        | j                               y)z#Put a message onto fanout exchange.z-putting msg to fanout exchange: %s, topic: %sr   deadline)retryN)_lookupr   rb   r}   rw   r   r!   r   r   r   retry_timeout_seconds)r0   r   r   r   r   rb   r   s          r1   _put_fanoutzChannel._put_fanouti  s    X{+^^..tI
;	

  .""7+!;!;< 	 	
r3   c                &   | j                  |      }| j                  |   }	 | j                  j                  |j                  ddt        | j                        |xs | j                        }t        |j                        dk(  r
t               |j                  d   }|j                  }t        |j                  j                        }|d   d   }t         j#                  d|||d          | j%                  |d         r5t         j#                  d	|       | j'                  |g|j                         |S |||j                  j(                  |j                  d
|d<   |j*                  j-                  |       |S # t        $ r t               w xY w)z(Retrieves a single message from a queue.r$   r   max_messagesr   r   r   r   r   
propertiesdelivery_infoz-queue:%s got message, ack_id: %s, payload: %szauto acking message ack_id: %sr   ack_id
message_idrd   gcpubsub_message)r   rp   r   pullrd   r   r   wait_time_secondsr   r
   rB   received_messagesr   r"   r   datarw   r   _is_auto_ack_do_ackr   rf   r5   )	r0   r   r   r   responser   r   payloadr   s	            r1   _getzChannel._gety  s     '!!%(
	++$)$;$;$% T%?%?@94#9#9 , H x))*a/'M,,Q/,,--o>;L!		
 W\23LL96BLL&5#:#:;   %oo88%*%<%<	1M,- $$V,9   	'M	s   AE< <Fc                N    |d   d   }|d   }|t         k(  xs || j                  v S )Nr   r   delivery_mode)r   r   )r0   payload_propertiesr   r   s       r1   r   zChannel._is_auto_ack  s<    %o6zB*?;44 24111	
r3   c                   | j                  |      }| j                  |   }| j                         }|s
t               	 | j                  j                  |j                  |dt        | j                        |xs | j                        }|j                  }t        |      dk(  r
t               g }g }	t        j                  dt        |      |       |D ]  }
|
j                  }t!        t#        |
j$                  j&                              }|d   d   }|||
j$                  j(                  |j                  d|d	<   | j+                  |d         r|j-                  |       n|j.                  j-                  |       |	j-                  |        |r2t        j                  d
|       | j1                  ||j                         ||	fS # t        $ r t               w xY w)z(Retrieves bulk of messages from a queue.r   r   r   r   z#batching %d messages from queue: %sr   r   r   r   zauto acking ack_ids: %s)r   rp   _get_max_messages_estimater
   r   r   rd   r   r   r   r   r   rB   rw   r   r   r"   r   r   r   r   r   r5   rf   r   )r0   r   r   prefixed_queuer   r   r   r   auto_ack_idsret_payloadsr   r   r   r   s                 r1   	_get_bulkzChannel._get_bulk  s   ))%0!!.1668'M
	++$)$;$;$0 T%?%?@94#9#9 , H %66 !Q&'M1!"	

 ) 	)G^^FL)=)=>?G#L1/BM' %oo88%*%<%<	1M,-   !67##F+!!((0(	) LL2LALLu'>'>?l""C   	'M	s   AG Gc                Z    | j                   j                         }| j                  }||S |S r,   )qoscan_consume_max_estimatebulk_max_messages)r0   max_allowedmax_if_unlimiteds      r1   r   z"Channel._get_max_messages_estimate  s1    hh77911#.#6GKGr3   c                :   | j                   j                  j                  |i       }|st        |   |||      S | j                  |      j                  | j                  |      |||      }|r|S t        j                  d|       | j                  |||       |gS )Nz3no queues bound to exchange: %s, binding on the fly)state	exchangesr]   ru   r   r   lookup	get_tablerw   r   
queue_bind)r0   r   r   defaultexchange_inforetr   s         r1   r   zChannel._lookup  s    

,,002>7?8['BBkk(#**NN8$	
 JA	
 	(K8zr3   c                   | j                  |      }|| j                  vry| j                  |   }t        j                  | j                  | j
                  dt        j                  j                         d      j                  |j                        }t        t              5  t        d |D              cddd       S # 1 sw Y   yxY w)	zReturn the number of messages in a queue.

        This is a *rough* estimation, as Pub/Sub doesn't provide
        an exact API.
        r   z;pubsub.googleapis.com/subscription/num_undelivered_messagesr$   )end_timeminutes)rc   c              3  b   K   | ]'  }|j                   d    j                  j                   ) yw)r   N)pointsvalueint64_value).0contents     r1   	<genexpr>z Channel._size.<locals>.<genexpr>  s*      8?q!''33s   -/NrH   )r   rp   r   Querymonitorr}   datetimenowselect_resourcesrc   r   r   sum)r0   r   r   results       r1   _sizezChannel._size  s       ')))!!%(LLOOI&&**,
 
5+@+@

A 	 &' 	 CI 	 	 	 s   #B??Cc                X   |rt        d      | j                  j                  |      j                  }|d   }|d   }|d   }t        j                  d||       |d   }| j                  |g|       | j                  |   }|j                  j                  |       t        	| -  |       y)zAcknowledge one message.zmultiple acks not implementedr   r   r   z!ack message. queue: %s ack_id: %srd   N)r   r   r]   r   rw   r   r   rp   rf   r@   ru   	basic_ack)
r0   delivery_tagmultipler   pubsub_messager   r   rd   r   r   s
            r1   r  zChannel.basic_ack  s    %&EFF\2@@&'9:)w'8%H*+>?fX01!!%(  (,'r3   c                l    | j                   j                  ||dt        | j                               y )N)r   ack_idsr   )r   r   )r   acknowledger   r   )r0   r  rd   s      r1   r   zChannel._do_ack#  s/    ##%67K!;!;< 	$ 	
r3   c                   | j                  |      }| j                  j                  |      }|sy| j                  |      }| j                  j                  |j                  t        j                  j                         d       |S )z'Delete all current messages in a queue.N)r   timer   )	r   rp   r]   r  r   seekrd   r  r  )r0   r   r   rV   s       r1   _purgezChannel._purge)  sz      '!!%%e,JJu % 7 7 ))--/ 	 	
 r3   c           
        t        j                         }t        j                  d|       | j                  dz  }t        || j                  dz        }| j                  j                  |      s| j                  j                         D ]  }t        |j                        dk(  r"t        j                  d||j                         =t        j                  d||j                  t        |j                        t        |j                               | j                   j#                  |j                  t        |j                        | j                  d        | j                  j                  |      st        j                  d	|       y )
Nz/unacked deadline extension thread: [%s] started      r   z'thread [%s]: no unacked messages for %sz5thread [%s]: extend ack deadline for %s: %d msgs [%s])r   r  r   r   z.unacked deadline extension thread [%s] stopped)r   get_native_idrw   rx   _min_ack_deadlinemaxr   r   r   rp   valuesrB   rf   r   rd   rG   r   modify_ack_deadline)r0   	thread_idmin_deadline_sleep
sleep_timer   s        r1   r   z Channel._extend_unacked_deadline9  sP   ++-	=	
 "33a7+T-F-F-JK
%%**:6**113 u(()Q.LLA!//
 K++))***+ 33(-(?(?#'(9(9#:040I0I 4  %%**:6. 	<i	
r3   c                    | j                  |      }| j                  j                  | j                  |      }t        j                  d||       | j                  j                  |       y )Nz0after_reply_message_received: queue: %s, sub: %s)r   r   rd   r}   rw   r   rq   r   )r0   r   subs      r1   after_reply_message_receivedz$Channel.after_reply_message_received\  sU      'oo//G>s	
 	##C(r3   c                    t               S r,   )r   r/   s    r1   r   zChannel.subscriberd  s    !!r3   c                    t               S r,   )r   r/   s    r1   r   zChannel.publisherh  s      r3   c                *    t        j                         S r,   )r   MetricServiceClientr/   s    r1   r  zChannel.monitorl  s    0022r3   c                .    | j                   j                  S r,   )
connectionclientr/   s    r1   ry   zChannel.conninfop  s    %%%r3   c                B    | j                   j                  j                  S r,   )r,  r-  transport_optionsr/   s    r1   r/  zChannel.transport_optionst  s    %%777r3   c                N    | j                   j                  d| j                        S )Nr   )r/  r]   default_wait_time_secondsr/   s    r1   r   zChannel.wait_time_secondsx  &    %%))!?!?
 	
r3   c                N    | j                   j                  d| j                        S )Nr   )r/  r]   default_retry_timeout_secondsr/   s    r1   r   zChannel.retry_timeout_seconds~  s&    %%))#T%G%G
 	
r3   c                N    | j                   j                  d| j                        S )Nr   )r/  r]   default_ack_deadline_secondsr/   s    r1   r   zChannel.ack_deadline_seconds  s&    %%))"D$E$E
 	
r3   c                :    | j                   j                  dd      S )Nr   zkombu-)r/  r]   r/   s    r1   r   zChannel.queue_name_prefix  s    %%))*=xHHr3   c                N    | j                   j                  d| j                        S )Nr   )r/  r]   default_expiration_secondsr/   s    r1   r   zChannel.expiration_seconds  s&    %%)) $"A"A
 	
r3   c                N    | j                   j                  d| j                        S )Nr   )r/  r]   default_bulk_max_messagesr/   s    r1   r   zChannel.bulk_max_messages  r2  r3   c                   t         j                  d       | j                  rs| j                  j                         }t	        t
              5  t         j                  d|       | j                  j                  d|i       ddd       | j                  rs| j                  j                         s8| j                  j                          t        j                  j                          t        | A          y# 1 sw Y   wxY w)zClose the channel.zclosing channelzdeleting subscription: %sr   r   N)rw   r   rq   r<   r   	Exceptionr   r   r~   r[   r   setrk   ro   joinru   close)r0   r%  r   s     r1   r@  zChannel.close  s    &'%%))--/C)$ 8#>33+S1 4  %% ##%##%%%**, s   5C77D c                P    | d   j                  di       j                  dd      }|S )Nr   r   r    )r]   )r   r   s     r1   r   zChannel._get_routing_key  s1     L!S"%S# 	
 r3   )ra   r`   returnr`   r,   )r}   r`   r   r`   r   intrC  r`   )rb   r`   rC  bool)NNNNNN)r}   r`   r   r`   rb   r`   rd   r`   r   rD  rC  r`   )r   r`   r   float)r   dict)rC  rD  )r   r`   rC  rD  )F)r  z	list[str]rd   r`   )r   r`   )=rI   rJ   rK   rL   supports_fanout
do_restorer1  r6  r9  r4  r;  r  r>  r   ro   rg   r   Eventr   rO   r~   rp   rq   r2   CHARS_REPLACE_TABLEr   r   r   r   r   r   r   r   r   r   r   r   r   r  r  r   r  r   r&  r#   r   r   r  propertyry   r/  r   r   r   r   r   r   r@  staticmethodr   __classcell__r   s   @r1   rk   rk      sT   OJ "#& !&$'! "*.'.$Y__&N/K/1L,1#&5(. ,? 4=)F +/	  %(	
 
0 !%!&!&! &! 	&!
 &! &! 
&!P
+
$
 )V
1#fH
&8( 
 !
F) " " ! ! 3 3 & & 8 8 
 

 
 

 
 

 I I 
 

 
 

  r3   rk   c                      e Zd ZdZeZdZdZej                  j                  e
j                  fz   Z	ej                  j                  ej                  ej                  ej                   ej                  ej"                  fz   ej&                  fz   ZdZdZej                  j,                  j/                   eddg            Z fd	Zd
 Zedd       Zeddd       ZddZd Z d Z!d Z" xZ#S )r{   zGCP Pub/Sub transport.Tg?gcpubsub	pubsub_v1r   r   )r   c                b    t        |   |fi | t               | _        t	               | _        y r,   )ru   r2   r   _poolrG  _get_bulk_future_to_queue)r0   r-  r   r   s      r1   r2   zTransport.__init__  s)    *6*')
<@F&r3   c                "    t         j                  S r,   )package_version__version__r/   s    r1   driver_versionzTransport.driver_version  s    ***r3   c                L    | j                  d      d   }|j                  d      S )Nzgcpubsub://projects/r$   /)splitstrip)urir   s     r1   r|   zTransport.parse_uri  s'    
 ))23A6}}S!!r3   c                    |xs dS )Nzgcpubsub://rM   )r0   r^  include_passwordmasks       r1   as_urizTransport.as_uri  s    #m#r3   c                    t               }| j                  }|r	|r||kD  r|}	 	 | j                  |       y # t        $ r, |rt               |z
  |k\  r
t	               |rt        |       Y nw xY wM)Nr   )r   polling_interval_drain_from_active_queuesr
   socket_timeoutr   )r0   r,  r   
time_startrd  s        r1   drain_eventszTransport.drain_events  s    [
00',<w,F&..w.?   ,y{Z77B(**#*+	, s   7 2A,+A,c                   | j                          | j                  d       t        | j                  |t              \  }}|D ch c]  }|j                         s| }}||z  }|D ]  }| j                  j                  |d          |s
t               t        j                  dt        |             |D ]  }|j                         \  }}|D ]O  }t        j                  d|       || j                  vrt        j                  d|       >| j                  ||       Q | j                  j                  |d         y c c}w )Nrl   r   )r   return_whenzgot %d done get_bulk tasksz consuming message from queue: %sz&Message for queue %s without consumers)_rm_empty_bulk_requests_submit_get_bulk_requestsr   rU  r   	exceptionr<   r
   rw   r   rB   r  
_callbackswarning_deliver)	r0   r   doner&   femptyr   payloadsr   s	            r1   re  z#Transport._drain_from_active_queues  s1   $$&
 	&&r&2**'
a
 !2qAKKM22 	8A**..q$7	8 'M13t9= 
	8AhhjOE8# .?G/NN@% gu-. **..q$7
	8 3s   E E c                    | j                   D ch c]$  }|j                         r|j                         r|& }}|D ]  }| j                   j                  |d          y c c}w r,   )rU  rq  rm  r<   )r0   rr  rs  s      r1   rk  z!Transport._rm_empty_bulk_requests  sd     33
vvxAKKM 
 

  	8A**..q$7	8
s   )Ac                   t        | j                  j                               }| j                  D ]N  }|j                  D ]=  }||v r| j
                  j                  |j                  ||      }|| j                  |<   ? P y r,   )r>  rU  r  channels_active_queuesrT  submitr   )r0   r   queues_with_submitted_get_bulkchannelr   futures         r1   rl  z#Transport._submit_get_bulk_requests   s    ),**113*
& }} 	?G // ?::**7+<+<eWM9>..v6	?	?r3   )r^  r`   rC  r`   )Fz**r,   )$rI   rJ   rK   rL   rk   can_parse_urlrd  r%   r{   connection_errorspubsub_exceptionsTimeoutErrorchannel_errorspublisher_exceptionsFlowControlLimitErrorMessageTooLargeErrorPublishError#PublishToPausedOrderingKeyExceptionsubscriber_exceptionsAcknowledgeErrordriver_typedriver_name
implementsr9   	frozensetr2   rY  rM  r|   classmethodrb  rh  re  rk  rl  rN  rO  s   @r1   r{   r{     s    GM));;&&?  	(( 66 55 -- -- DD
	
 !11
3	4  KK""--448 45 5 JC
+ " " $ $ !8F8
?r3   r{   )MrL   
__future__r   rh   r  stringr   concurrent.futuresr   r   r   r   
contextlibr   osr	   r   r
   r   r  r   r   uuidr   r   _socketr   r   rf  google.api_core.exceptionsr   r   r   google.api_core.retryr   google.cloudr   google.cloud.monitoring_v3r   google.cloud.pubsub_v1r   r   r   r   google.cloud.pubsub_v1.publisherr  !google.cloud.pubsub_v1.subscriberr  google.pubsub_v1r   rW  kombu.entityr   	kombu.logr   kombu.utils.encodingr   r    kombu.utils.jsonr!   r"   kombu.utils.objectsr#   rB  r%   rw   r>  punctuationPUNCTUATIONS_TO_REPLACEordrK  r*   rO   	dataclassr_   rk   r{   )cs   0r1   <module>r     s0  %N #    & &     ! %  -: : ' & , D B O( = 0   7 ) / 	.	/ f001OC Hc#h!89As1vs3x9    @ 2 L L LUgoo Upw?!! w?A :s   /E!