
    Ih-/                         d Z ddlZddlZddlmZ ddlmZ ddlmZ ddl	m
Z
 ddlmZmZ dd	lmZ dd
lmZmZ dZdZ G d de      Zd Z G d de      Z G d dej2                  e      Zy)zqThe ``RPC`` result backend for AMQP brokers.

RPC-style result backend, using reply-to and one queue per client.
    N)maybe_declare)register_after_fork)cached_property)states)current_tasktask_join_will_block   )base)AsyncBackendMixinBaseResultConsumer)BacklogLimitExceeded
RPCBackendz
The "rpc" result backend does not support chords!

Note that a group chained with a task is also upgraded to be a chord,
as this pattern requires synchronization.

Result backends that supports chords: Redis, Database, Memcached, and more.
c                       e Zd ZdZy)r   z'Too much state history to fast-forward.N)__name__
__module____qualname____doc__     L/var/www/html/planif/env/lib/python3.12/site-packages/celery/backends/rpc.pyr   r      s    1r   r   c                 $    | j                          y N)_after_fork)backends    r   _on_after_fork_cleanup_backendr   "   s    r   c                   f     e Zd Zej                  ZdZdZ fdZd	dZd
dZ	d Z
d Zd Zd Z xZS )ResultConsumerNc                 Z    t        |   |i | | j                  j                  | _        y r   )super__init__r   _create_bindingselfargskwargs	__class__s      r   r    zResultConsumer.__init__,   s'    $)&)#||;;r   c                 "   | j                   j                         | _        | j                  |      }| j	                  | j                  j
                  |g| j                  g|| j                        | _        | j                  j                          y )N)	callbacksno_ackaccept)
app
connection_connectionr!   Consumerdefault_channelon_state_changer*   	_consumerconsume)r#   initial_task_idr)   r%   initial_queues        r   startzResultConsumer.start0   sv    88..0,,_=,,}o++,V;; '   	 r   c                     | j                   r| j                   j                  |      S |rt        j                  |       y y )N)timeout)r-   drain_eventstimesleep)r#   r7   s     r   r8   zResultConsumer.drain_events9   s9    ##000AAJJw r   c                     	 | j                   j                          | j                  j                          y # | j                  j                          w xY wr   )r1   cancelr-   closer#   s    r   stopzResultConsumer.stop?   s<    	%NN!!#""$D""$s	   7 Ac                 n    d | _         | j                  "| j                  j                          d | _        y y r   )r1   r-   collectr>   s    r   on_after_forkzResultConsumer.on_after_forkE   s4    '$$&#D (r   c                    | j                   | j                  |      S | j                  |      }| j                   j                  |      s6| j                   j	                  |       | j                   j                          y y r   )r1   r5   r!   consuming_from	add_queuer2   )r#   task_idqueues      r   consume_fromzResultConsumer.consume_fromK   sd    >>!::g&&$$W-~~,,U3NN$$U+NN""$ 4r   c                     | j                   r5| j                   j                  | j                  |      j                         y y r   )r1   cancel_by_queuer!   namer#   rF   s     r   
cancel_forzResultConsumer.cancel_forS   s1    >>NN**4+?+?+H+M+MN r   Tr   )r   r   r   kombur.   r-   r1   r    r5   r8   r?   rB   rH   rM   __classcell__r&   s   @r   r   r   &   s:    ~~HKI<! %$%Or   r   c                       e Zd ZdZej
                  Zej                  ZeZeZdZ	dZ
dZdddddZ G d d	ej                        Z G d
 dej                        Z	 	 d& fd	Zd Zd'dZd Zd Zd Zd Zd Zd Zd(dZ	 d)dZd Zd Zd*dZeZd Z	 d+dZd Z d Z!d Z"d Z#d  Z$d(d!Z%d" Z&d, fd#	Z'e(d$        Z)e*d%        Z+ xZ,S )-r   z&Base class for the RPC result backend.FT   r   r	   )max_retriesinterval_startinterval_stepinterval_maxc                       e Zd ZdZdZy)RPCBackend.Consumerz4Consumer that requires manual declaration of queues.FN)r   r   r   r   auto_declarer   r   r   r.   rY   m   s
    Br   r.   c                       e Zd ZdZdZy)RPCBackend.Queuez$Queue that never caches declaration.FN)r   r   r   r   can_cache_declarationr   r   r   Queuer\   r   s    2 %r   r^   c                 2   t        
|   |fi | | j                  j                  }	|| _        i | _        | j                  |      | _        | j                  rdnd| _        |xs |	j                  }|xs |	j                  }| j                  ||| j                        | _        |xs |	j                  | _        || _        | j!                  | | j                  | j"                  | j$                  | j&                        | _        t*        t+        | t,               y y )N   r	   )r   r    r+   confr-   _out_of_bandprepare_persistent
persistentdelivery_moderesult_exchangeresult_exchange_type_create_exchangeexchangeresult_serializer
serializerauto_deleter   r*   _pending_results_pending_messagesresult_consumerr   r   )r#   r+   r,   ri   exchange_typerd   rk   rl   r%   ra   r&   s             r   r    zRPCBackend.__init__w   s    ''xx}}%11*="&//Qq3t33%B)B)B--mT%7%7
 %>(>(>&#22$((DKK!!4#9#9 
 *&DE +r   c                 l    | j                   j                          | j                  j                          y r   )rm   clearro   r   r>   s    r   r   zRPCBackend._after_fork   s&    ##%((*r   c                 $    | j                  d       S r   )Exchange)r#   rK   typere   s       r   rh   zRPCBackend._create_exchange   s    }}T""r   c                     | j                   S )z$Create new binding for task with id.)bindingrL   s     r   r!   zRPCBackend._create_binding   s     ||r   c                 <    t        t        j                               r   )NotImplementedErrorE_NO_CHORD_SUPPORTstripr>   s    r   ensure_chords_allowedz RPCBackend.ensure_chords_allowed   s    !"4":":"<==r   c                 f    t               s't        | j                  |j                        d       y y )NT)retry)r   r   rw   channel)r#   producerrF   s      r   on_task_callzRPCBackend.on_task_call   s(    
 $%$,,x'7'78E &r   c                     	 |xs t         j                  }|j                  |j
                  xs |fS # t        $ r t        d|      w xY w)zGet the destination for result by task id.

        Returns:
            Tuple[str, str]: tuple of ``(reply_to, correlation_id)``.
        z%RPC backend missing task request for )r   requestAttributeErrorRuntimeErrorreply_tocorrelation_id)r#   rF   r   s      r   destination_forzRPCBackend.destination_for   sb    	E5!5!5G !7!7!B7BB  	E7{CE E	Es	   2 A
c                      y r   r   rL   s     r   on_reply_declarezRPCBackend.on_reply_declare   s     	r   c                      y r   r   )r#   results     r   on_result_fulfilledzRPCBackend.on_result_fulfilled   s     	r   c                      y)Nzrpc://r   )r#   include_passwords     r   as_urizRPCBackend.as_uri   s    r   c                    | j                  ||      \  }}|sy| j                  j                  j                  j	                  d      5 }	|	j                  | j                  |||||      | j                  ||| j                  d| j                  | j                  |      | j                  	       ddd       |S # 1 sw Y   |S xY w)z!Send task return value and state.NTblock)ri   routing_keyr   rk   r~   retry_policydeclarere   )r   r+   amqpproducer_poolacquirepublish
_to_resultri   rk   r   r   re   )
r#   rF   r   state	tracebackr   r%   r   r   r   s
             r   store_resultzRPCBackend.store_result   s     '+&:&:7G&L#^XX]]((00t0< 
		7K'-??):):--g6"00  	
	 
	 s   	A%B88Cc                 P    ||| j                  ||      || j                  |      dS )N)rF   statusr   r   children)encode_resultcurrent_task_children)r#   rF   r   r   r   r   s         r   r   zRPCBackend._to_result   s3    ((7"227;
 	
r   c                 p    | j                   r| j                   j                  |       || j                  |<   y r   )ro   on_out_of_band_resultrb   )r#   rF   messages      r   r   z RPCBackend.on_out_of_band_result   s1    
   66w?%,'"r   c                 L   | j                   j                  |d       }|r| j                  ||      S i }d }| j                  || j                  |      D ]?  }| j                  |      }|j                  |      |c}||<   |s.|j                          d }A |j                  |d       }|j                         D ]  \  }}	| j                  ||	        |r"|j                          | j                  ||      S 	 | j                  |   S # t        $ r t        j                  d dcY S w xY w)N)r   r   )rb   pop_set_cache_by_message_slurp_from_queuer*   _get_message_task_idgetackitemsr   requeue_cacheKeyErrorr   PENDING)
r#   rF   backlog_limitbufferedlatest_by_idprevacctidlatestmsgs
             r   get_task_metazRPCBackend.get_task_meta   s4   $$(($7--gx@@ ))'4;;N 	C++C0C&2&6&6s&;S#D,s# 
	 !!'40$**, 	1HC&&sC0	1 NN--gv>>B{{7++ B"(..DAABs   5D D#"D#c                 Z    | j                  |j                        x}| j                  |<   |S r   )meta_from_decodedpayloadr   )r#   rF   r   r   s       r   r   z RPCBackend._set_cache_by_message	  s.    )-)?)?OO* 	$++g&r   c              #   P  K   | j                   j                  j                  d      5 \  }} | j                  |      |      }|j	                          t        |      D ]  }|j                  ||      }|s n|  | j                  |      	 d d d        y # 1 sw Y   y xY ww)NTr   )r*   r)   )r+   poolacquire_channelr!   r   ranger   r   )	r#   rF   r*   limitr)   _r   rw   r   s	            r   r   zRPCBackend._slurp_from_queue  s     XX]]***6 
	9,1g3d**73G<GOO5\ 9kkk?		9 //88 
	9 
	9 
	9s   'B&A'B	B&B#B&c                 j    	 |j                   d   S # t        t        f$ r |j                  d   cY S w xY w)Nr   rF   )
propertiesr   r   r   )r#   r   s     r   r   zRPCBackend._get_message_task_id  s>    	. %%&677) 	.??9--	.s    22c                      y r   r   )r#   r   s     r   revivezRPCBackend.revive%  s    r   c                     t        d      )Nz4reload_task_result is not supported by this backend.ry   rL   s     r   reload_task_resultzRPCBackend.reload_task_result(  s    !BD 	Dr   c                     t        d      )z<Reload group result, even if it has been previously fetched.z5reload_group_result is not supported by this backend.r   rL   s     r   reload_group_resultzRPCBackend.reload_group_result,  s    !CE 	Er   c                     t        d      )Nz,save_group is not supported by this backend.r   )r#   group_idr   s      r   
save_groupzRPCBackend.save_group1  s    !:< 	<r   c                     t        d      )Nz/restore_group is not supported by this backend.r   )r#   r   caches      r   restore_groupzRPCBackend.restore_group5  s    !=? 	?r   c                     t        d      )Nz.delete_group is not supported by this backend.r   )r#   r   s     r   delete_groupzRPCBackend.delete_group9  s    !<> 	>r   c                    |si n|}t         |   |t        || j                  | j                  j
                  | j                  j                  | j                  | j                  | j                  | j                              S )N)r,   ri   rp   rd   rk   rl   expires)r   
__reduce__dictr-   ri   rK   ru   rd   rk   rl   r   r"   s      r   r   zRPCBackend.__reduce__=  sk    !vw!$'']]''--,,((LL	)
 	 		r   c                     | j                  | j                  | j                  | j                  dd| j                        S )NFT)durablerl   r   )r^   oidri   r   r>   s    r   rw   zRPCBackend.bindingJ  s8    zzHHdmmTXXLL	  
 	
r   c                 .    | j                   j                  S r   )r+   
thread_oidr>   s    r   r   zRPCBackend.oidS  s     xx"""r   )NNNNNT)directr`   rN   )NN)  )r   F)r   N)-r   r   r   r   rO   rt   Producerr   r   rd   supports_autoexpiresupports_native_joinr   r.   r^   r    r   rh   r!   r|   r   r   r   r   r   r   r   r   r   pollr   r   r   r   r   r   r   r   r   r   propertyrw   r   r   rP   rQ   s   @r   r   r   X   s,   0~~H~~H#N 0J 	L5>> 
& &
 KO?CF,+
#
>FC
 .2&
-B> D .39.DE
<?> 
 
 # #r   r   )r   r9   rO   kombu.commonr   kombu.utils.compatr   kombu.utils.objectsr   celeryr   celery._stater   r    r
   asynchronousr   r   __all__rz   	Exceptionr   r   r   Backendr   r   r   r   <module>r      sj      & 2 /  <  ?
0 29 2/O' /Od~#0 ~#r   