
    Ih5                       d Z ddlmZ ddlZddlZddlZddlmZ ddlm	Z	 ddl
mZ ddlmZ ddlmZmZmZmZ dd	lmZmZ d
dlmZmZ d
dlmZ d
dlmZ d
dlmZ dZdZ  ee!      Z"da#d Z$d Z%d'dZ& G d de      Z'd Z(d(dZ)d Z*d Z+d Z,d)dZ-	 	 d)dZ.d*dZ/	 d+dZ0d Z1d  Z2e	d!        Z3d,d"Z4d,d#Z5d-d$Z6 G d% d&      Z7y).zCommon Utilities.    )annotationsN)deque)contextmanager)partial)count)NAMESPACE_OIDuuid3uuid4uuid5)ChannelErrorRecoverableConnectionError   )ExchangeQueue)
get_logger)registry)uuid)		Broadcastmaybe_declarer   itermessages
send_replycollect_repliesinsureddrain_consumer	eventloopi  c                 B    t         t               j                  a t         S N)_node_idr
   int     E/var/www/html/planif/env/lib/python3.12/site-packages/kombu/common.pyget_node_idr#   "   s    7;;Or!   c                    dj                  | ||t        |            }	 t        t        t        |            }|S # t
        $ r t        t        t        |            }Y |S w xY w)Nz{:x}-{:x}-{:x}-{:x})formatidstrr	   r   
ValueErrorr   )node_id
process_id	thread_idinstanceentrets         r"   generate_oidr/   )   sc    

&
&Y86C-%s+, J  -%s+,J-s   : "A A c                    t        t               t        j                         |rt	        j
                         |       S d|       S Nr   )r/   r#   osgetpid	threading	get_ident)r,   threadss     r"   oid_fromr7   3   s@    
		!(		  /0	 r!   c                  N     e Zd ZdZej
                  dz   Z	 	 	 	 	 	 d fd	Z xZS )r   a  Broadcast queue.

    Convenience class used to define broadcast queues.

    Every queue instance will have a unique name,
    and both the queue and exchange is configured with auto deletion.

    Arguments:
    ---------
        name (str): This is used as the name of the exchange.
        queue (str): By default a unique id is used for the queue
            name for every consumer.  You can specify a custom
            queue name here.
        unique (bool): Always create a unique queue
            even if a queue name is supplied.
        **kwargs (Any): See :class:`~kombu.Queue` for a list
            of additional keyword arguments supported.
    ))queueNc                    |rdj                  |xs dt                     }n|xs dt                }t        |   d|xs ||||||nt	        |d      d| y )Nz{}.{}bcastzbcast.fanout)type)aliasr9   nameauto_deleteexchanger    )r%   r   super__init__r   )	selfr?   r9   uniquer@   rA   r>   kwargs	__class__s	           r"   rC   zBroadcast.__init__R   sp     NN5#3GTV<E.vdfX.E 	
-4#"*"6h#Dx8	
 	
r!   )NNFTNN)__name__
__module____qualname____doc__r   attrsrC   __classcell__)rG   s   @r"   r   r   <   s7    & KK,,E !
 
r!   r   c                F    | |j                   j                  j                  v S r   )
connectionclientdeclared_entities)entitychannels     r"   declaration_cachedrT   i   s    W''..@@@@r!   c                8    |rt        | |fi |S t        | |      S )zDeclare entity (cached).)_imaybe_declare_maybe_declare)rR   rS   retryretry_policys       r"   r   r   m   s$    vw?,??&'**r!   c                j    | j                   }|s$|st        d| d|        | j                  |      } | S )zMake sure the channel is bound to the entity.

    :param entity: generic kombu nomenclature, generally an exchange or queue
    :param channel: channel to bind to the entity
    :return: the updated entity
    zCannot bind channel z to entity )is_boundr   bind)rR   rS   r[   s      r"   _ensure_channel_is_boundr]   t   sE     H&wi{6(CE EW%Mr!   c                   | }t        | |       ||j                  '| j                  st        d|  d      | j                  }d x}}|j                  r<| j
                  r0|j                  j                  j                  }t        |       }||v ry|j                  st        d      | j                  |       ||r|j                  |       || j                  |_        y)Nzchannel is None and entity z not bound.Fchannel disconnected)rS   T)r]   rO   r[   r   rS   can_cache_declarationrP   rQ   hashr   declareaddr?   )rR   rS   origdeclaredidents        r"   rW   rW      s    DVW-',,4 -fX[AC C..Huf::%%,,>>VH()?@@
NN7N#UKK	r!   c                    t        | |      } | j                  j                  st        d        | j                  j                  j                  j
                  | t        fi || |      S )Nr_   )r]   rS   rO   r   rP   ensurerW   )rR   rS   rY   s      r"   rV   rV      sj    %fg6F>>$$()?@@026>>$$++220".006A Ar!   c              #  "  K   t               fd}|g|xs g z   | _        | 5  t        | j                  j                  j
                  ||d      D ]  }	 j                           	 ddd       y# t        $ r Y -w xY w# 1 sw Y   yxY ww)z&Drain messages from consumer instance.c                ,    j                  | |f       y r   )append)bodymessageaccs     r"   
on_messagez"drain_consumer.<locals>.on_message   s    

D'?#r!   T)limittimeoutignore_timeoutsN)r   	callbacksr   rS   rO   rP   popleft
IndexError)consumerrp   rq   rs   ro   _rn   s         @r"   r   r      s     
'C$ %b9H	 8++66==!&O 	Akkm#	 
   s@   !B1BA4(B+	B4	B =B?B  BBBc                H    t         | j                  d|g|d||||      S )zIterator over messages.)queuesrS   )rp   rq   rs   r    )r   Consumer)connrS   r9   rp   rq   rs   rF   s          r"   r   r      s2     @eWg@@W	 r!   c              #     K   |xr t        |      xs
 t               D ]  }	 | j                  |        y# t        j                  $ r |r|s Y 5w xY ww)a   Best practice generator wrapper around ``Connection.drain_events``.

    Able to drain events forever, with a limit, and optionally ignoring
    timeout errors (a timeout of 1 is often used in environments where
    the socket can get "stuck", and is a best practice for Kombu consumers).

    ``eventloop`` is a generator.

    Examples
    --------
        >>> from kombu.common import eventloop

        >>> def run(conn):
        ...     it = eventloop(conn, timeout=1, ignore_timeouts=True)
        ...     next(it)   # one event consumed, or timed out.
        ...
        ...     for _ in eventloop(conn, timeout=1, ignore_timeouts=True):
        ...         pass  # loop forever.

    It also takes an optional limit parameter, and timeout errors
    are propagated by default::

        for _ in eventloop(connection, limit=1, timeout=1):
            pass

    See Also
    --------
        :func:`itermessages`, which is an event loop bound to one or more
        consumers, that yields any messages received.
    )rq   N)ranger   drain_eventssocketrq   )r{   rp   rq   rr   is        r"   r   r      s]     > #uU|.uw 	##G#44 ~~ 		s%   A9AAAAAc                     |j                   |f| ||dt        |j                  d   |j                  j                  d      t        j
                  |j                     |j                  dfi |S )a  Send reply for request.

    Arguments:
    ---------
        exchange (kombu.Exchange, str): Reply exchange
        req (~kombu.Message): Original request, a message with
            a ``reply_to`` property.
        producer (kombu.Producer): Producer instance
        retry (bool): If true must retry according to
            the ``reply_policy`` argument.
        retry_policy (Dict): Retry settings.
        **props (Any): Extra properties.
    )rA   rX   rY   reply_tocorrelation_id)routing_keyr   
serializercontent_encoding)publishdict
propertiesgetserializerstype_to_namecontent_typer   )rA   reqmsgproducerrX   rY   propss          r"   r   r      s     8, s~~j9"%.."4"45E"F)66s7G7GH$'$8$8: D >CD r!   c              /    K   |j                  dd      }d}	 t        | ||g|i |D ]  \  }}|s|j                          d}|  	 |r|j                  |j                         yy# |r|j                  |j                         w w xY ww)z,Generator collecting replies from ``queue``.no_ackTFN)
setdefaultr   ackafter_reply_message_receivedr?   )	r{   rS   r9   argsrF   r   receivedrl   rm   s	            r"   r   r     s     x.FH	=)$ ;+/;39; 	MD'HJ	 00< 800< s   B1A) 
B) B		Bc                6    t         j                  d| |d       y )Nz#Connection error: %r. Retry in %ss
T)exc_info)loggererror)excintervals     r"   _ensure_errbackr     s    
LL.X  r!   c              #  Z   K   	 d  y # | j                   | j                  z   $ r Y y w xY wwr   )connection_errorschannel_errors)r{   s    r"   _ignore_errorsr     s0     !!D$7$77 s   +	 +(+(+c                    |rt        |       5   ||i |cddd       S t        |       S # 1 sw Y   t        |       S xY w)a  Ignore connection and channel errors.

    The first argument must be a connection object, or any other object
    with ``connection_error`` and ``channel_error`` attributes.

    Can be used as a function:

    .. code-block:: python

        def example(connection):
            ignore_errors(connection, consumer.channel.close)

    or as a context manager:

    .. code-block:: python

        def example(connection):
            with ignore_errors(connection):
                consumer.channel.close()


    Note:
    ----
        Connection and channel errors should be properly handled,
        and not ignored.  Using this function is only acceptable in a cleanup
        phase, like when a connection is lost or at shutdown.
    N)r   )r{   funr   rF   s       r"   ignore_errorsr   '  sG    8 D! 	(''	( 	($	($s   +>c                    |r	 ||       y y r   r    )rO   rS   	on_revives      r"   revive_connectionr   I  s    ' r!   c           	     $   |xs t         }| j                  d      5 }|j                  |       |j                  }t	        t
        ||      }	 |j                  ||f||	d|}
 |
|i t        ||      \  }}|cddd       S # 1 sw Y   yxY w)zFunction wrapper to handle connection errors.

    Ensures function performing broker commands completes
    despite intermittent connection failures.
    T)block)errback)r   )r   r   )rO   N)r   acquireensure_connectiondefault_channelr   r   	autoretryr   )poolr   r   rF   r   r   optsr{   rS   reviver   retvalrw   s                r"   r   r   N  s     (G	D	! 	Tw/ &&*DIF $..g ;w+1;59;TCT&T%BC		 	 	s   ABBc                  6    e Zd ZdZdZd ZddZddZd Zd Z	y)	QoSa  Thread safe increment/decrement of a channels prefetch_count.

    Arguments:
    ---------
        callback (Callable): Function used to set new prefetch count,
            e.g. ``consumer.qos`` or ``channel.basic_qos``.  Will be called
            with a single ``prefetch_count`` keyword argument.
        initial_value (int): Initial prefetch count value..

    Example:
    -------
        >>> from kombu import Consumer, Connection
        >>> connection = Connection('amqp://')
        >>> consumer = Consumer(connection)
        >>> qos = QoS(consumer.qos, initial_prefetch_count=2)
        >>> qos.update()  # set initial

        >>> qos.value
        2

        >>> def in_some_thread():
        ...     qos.increment_eventually()

        >>> def in_some_other_thread():
        ...     qos.decrement_eventually()

        >>> while 1:
        ...    if qos.prev != qos.value:
        ...        qos.update()  # prefetch changed so update.

    It can be used with any function supporting a ``prefetch_count`` keyword
    argument::

        >>> channel = connection.channel()
        >>> QoS(channel.basic_qos, 10)


        >>> def set_qos(prefetch_count):
        ...     print('prefetch count now: %r' % (prefetch_count,))
        >>> QoS(set_qos, 10)
    Nc                Z    || _         t        j                         | _        |xs d| _        y r1   )callbackr4   RLock_mutexvalue)rD   r   initial_values      r"   rC   zQoS.__init__  s#     oo'"'a
r!   c                    | j                   5  | j                  r| j                  t        |d      z   | _        ddd       | j                  S # 1 sw Y   | j                  S xY w)zIncrement the value, but do not update the channels QoS.

        Note:
        ----
            The MainThread will be responsible for calling :meth:`update`
            when necessary.
        r   N)r   r   maxrD   ns     r"   increment_eventuallyzQoS.increment_eventually  sP     [[ 	4zz!ZZ#a)3
	4 zz	4 zzs   +AA c                    | j                   5  | j                  r+| xj                  |z  c_        | j                  dk  rd| _        ddd       | j                  S # 1 sw Y   | j                  S xY w)zDecrement the value, but do not update the channels QoS.

        Note:
        ----
            The MainThread will be responsible for calling :meth:`update`
            when necessary.
        r   N)r   r   r   s     r"   decrement_eventuallyzQoS.decrement_eventually  sY     [[ 	#zz

a
::>!"DJ		#
 zz	#
 zzs   8AA-c                    || j                   k7  rV|}|t        kD  rt        j                  dt               d}t        j	                  d|       | j                  |       || _         |S )z#Set channel prefetch_count setting.z(QoS: Disabled: prefetch_count exceeds %rr   zbasic.qos: prefetch_count->%s)prefetch_count)prevPREFETCH_COUNT_MAXr   warningdebugr   )rD   pcount	new_values      r"   setzQoS.set  s\    TYYI**I13	LL8)DMMM3DIr!   c                |    | j                   5  | j                  | j                        cddd       S # 1 sw Y   yxY w)z)Update prefetch count with current value.N)r   r   r   )rD   s    r"   updatez
QoS.update  s.    [[ 	(88DJJ'	( 	( 	(s   2;)r   )
rH   rI   rJ   rK   r   rC   r   r   r   r   r    r!   r"   r   r   b  s(    (T D(
(r!   r   )T)NF)r   NN)NNF)NFNr   )NN)8rK   
__future__r   r2   r   r4   collectionsr   
contextlibr   	functoolsr   	itertoolsr   r   r   r	   r
   r   amqpr   r   rR   r   r   logr   serializationr   r   
utils.uuid__all__r   rH   r   r   r#   r/   r7   r   rT   r   r]   rW   rV   r   r   r   r   r   r   r   r   r   r   r   r    r!   r"   <module>r      s     " 	    %   3 3 9 #  2   	H	*
 *
ZA+ >A$ 9=$P 9=2=    D
(^( ^(r!   