
    Ih                         d Z ddlZddlmZ ddlmZ ddlmZ ddlm	Z	 ddl
mZ ddlmZ dd	lmZ d
dlmZ dZdZ edd      Z G d de	      Zy)zEvent receiver implementation.    N)
itemgetter)Queue)maybe_channel)ConsumerMixin)uuid)app_or_default)adjust_timestamp   )get_exchange)EventReceiver	utcoffset	timestampc                       e Zd ZdZdZ	 	 	 ddZd Zd Z	 ddZddZ	dd	Z
dd
Zdej                  eeefdZeefdZed        Zy)r   a?  Capture events.

    Arguments:
        connection (kombu.Connection): Connection to the broker.
        handlers (Mapping[Callable]): Event handlers.
            This is  a map of event type names and their handlers.
            The special handler `"*"` captures all events that don't have a
            handler.
    Nc
           	         t        |xs | j                        | _        t        |      | _        |i n|| _        || _        |xs
 t               | _        |xs  | j                  j                  j                  | _
        t        | j                  xs | j                  j                         | j                  j                  j                        | _        | | j                  j                  j                   }|	 | j                  j                  j"                  }	t%        dj'                  | j                  | j                  g      | j                  | j
                  dd||	      | _        | j                  j*                  | _        | j*                  j,                  | _        | j*                  j0                  | _        |"| j                  j                  j4                  dh}|| _        y )N)name.TF)exchangerouting_keyauto_deletedurablemessage_ttlexpiresjson)r   appr   channelhandlersr   r   node_idconfevent_queue_prefixqueue_prefixr   
connectionconnection_for_writeevent_exchanger   event_queue_ttlevent_queue_expiresr   joinqueueclockadjustadjust_clockforwardforward_clockevent_serializeraccept)
selfr   r   r   r   r   r!   r/   	queue_ttlqueue_expiress
             O/var/www/html/planif/env/lib/python3.12/site-packages/celery/events/receiver.py__init__zEventReceiver.__init__#   sh    "#/2$W-&.H&($&(LDHHMM,L,L$OO>txx<<>--/ 55I  HHMM==MHHd''67]]((e!!

 XX^^
 JJ--!ZZ//>hhmm44f=F    c                     | j                   j                  |      xs | j                   j                  d      }|xr
  ||       y y)z3Process event by dispatching to configured handler.*N)r   get)r0   typeeventhandlers       r3   processzEventReceiver.processB   s9    --##D)CT]]->->s-C"GEN"r5   c                 \     || j                   g| j                  gd| j                        gS )NT)queues	callbacksno_ackr/   )r(   _receiver/   )r0   Consumerr   s      r3   get_consumerszEventReceiver.get_consumersG   s.    $(MM?4 $- . 	.r5   Tc                 .    |r| j                  |       y y )N)r   )wakeup_workers)r0   r"   r   	consumerswakeupkwargss         r3   on_consume_readyzEventReceiver.on_consume_readyL   s    0 r5   c                 *    | j                  |||      S )NlimittimeoutrG   consume)r0   rL   rM   rG   s       r3   itercapturezEventReceiver.itercaptureQ   s    ||%|HHr5   c                 6    | j                  |||      D ]  } y)zOpen up a consumer capturing events.

        This has to run in the main process, and it will never stop
        unless :attr:`EventDispatcher.should_stop` is set to True, or
        forced via :exc:`KeyboardInterrupt` or :exc:`SystemExit`.
        rK   NrN   )r0   rL   rM   rG   _s        r3   capturezEventReceiver.captureT   s%     E76J 	A	r5   c                 h    | j                   j                  j                  d| j                  |       y )N	heartbeat)r"   r   )r   control	broadcastr"   )r0   r   s     r3   rE   zEventReceiver.wakeup_workers^   s+    "";.2oo+2 	# 	4r5   c                 ^   |d   }|dk(  r4| j                   j                  xs d|z   x}|d<   | j                  |       n	 |d   }	| j                  |	       |r	  ||      \  }
} |||
      |d<    |       |d<   ||fS # t        $ r | j	                         |d<   Y Fw xY w# t        $ r Y ;w xY w)Nr9   z	task-sentr
   r)   r   local_received)r)   valuer+   KeyErrorr-   )r0   bodylocalizenowtzfieldsr	   CLIENT_CLOCK_SKEWr9   _cr)   offsetr   s               r3   event_from_messagez EventReceiver.event_from_messagec   s     F|;"&**"2"2"7a;L!LLBgb!)W !!%(H$,TN!	 %5Y$G[!!$Tz  5 $ 2 2 4W5  s$    A> B  >BB 	B,+B,c                      |||      r2| j                   | j                  }}|D cg c]  } | ||        c} y  | j                   | j                  |        y c c}w N)r<   rc   )r0   r\   messagelist
isinstancer<   from_messager:   s           r3   rA   zEventReceiver._receive~   sU    dD!$(LL$2I2I\G8<=uWl5)*=DLL$11$78 >s   Ac                 ^    | j                   r | j                   j                  j                  S d S re   )r   r"   client)r0   s    r3   r"   zEventReceiver.connection   s#    15t||&&--G4Gr5   )N#NNNNNN)T)NNTre   )__name__
__module____qualname____doc__r   r4   r<   rC   rI   rP   rS   rE   time	_TZGETTERr	   r`   rc   rg   rh   rA   propertyr"    r5   r3   r   r      s{     C;>6:<@>#
. !%1
I4
 15#yy9,<->6 ,0J 9 H Hr5   r   )rp   rq   operatorr   kombur   kombu.connectionr   kombu.mixinsr   celeryr   
celery.appr   celery.utils.timer	   r:   r   __all__r`   rr   r   rt   r5   r3   <module>r}      sJ    $    * &  % . 
 {K0	qHM qHr5   