
    Ih=                    $   d Z ddlmZ ddlZddlmZ ddlZddlmZmZm	Z	 ddl
mZ ddlmZ ddlmZ dd	lmZ dd
lmZmZ ddlmZ ddlmZ ddlmZ ddlmZ dZdZ G d d      Z G d dej@                        Z  G d dejB                        Z!y)a  MongoDB transport module for kombu.

Features
========
* Type: Virtual
* Supports Direct: Yes
* Supports Topic: Yes
* Supports Fanout: Yes
* Supports Priority: Yes
* Supports TTL: Yes

Connection String
=================
 *Unreviewed*

Transport Options
=================

* ``connect_timeout``,
* ``ssl``,
* ``ttl``,
* ``capped_queue_size``,
* ``default_hostname``,
* ``default_port``,
* ``default_database``,
* ``messages_collection``,
* ``routing_collection``,
* ``broadcast_collection``,
* ``queues_collection``,
* ``calc_queue_size``,
    )annotationsN)Empty)MongoClienterrors
uri_parser)
CursorType)VersionMismatch)_detect_environment)bytes_to_str)dumpsloads)cached_property)maybe_sanitize_url   )virtualto_rabbitmq_queue_argumentsz3Kombu requires MongoDB version 1.3+ (server is {0})zKKombu requires MongoDB version 2.2+ (server is {0}) for TTL indexes supportc                  :    e Zd ZdZd Zd Zd Zd	dZd Zd Z	e	Z
y)
BroadcastCursorzCursor for broadcast queues.c                D    || _         d| _        | j                  d       y )Nr   F)rewind)_cursor_offsetpurge)selfcursors     P/var/www/html/planif/env/lib/python3.12/site-packages/kombu/transport/mongodb.py__init__zBroadcastCursor.__init__C   s    

%
     c                f    | j                   j                  j                  i       | j                  z
  S N)r   
collectioncount_documentsr   r   s    r   get_sizezBroadcastCursor.get_sizeH   s&    ||&&66r:T\\IIr   c                8    | j                   j                          y r!   )r   closer$   s    r   r'   zBroadcastCursor.closeK   s    r   c                    |r| j                   j                          | j                   j                  j                  i       | _        | j                   j                  | j                        | _         y r!   )r   r   r"   r#   r   skip)r   r   s     r   r   zBroadcastCursor.purgeN   sM    LL! ||..>>rB||((6r   c                    | S r!    r$   s    r   __iter__zBroadcastCursor.__iter__V   s    r   c                    	 	 t        | j                        }	 | xj                  dz  c_        |S # t        j                  j                  $ r(}dt        |      v r| j                          Y d }~n d }~ww xY w)Nznot valid at serverr   )nextr   pymongor   OperationFailurestrr   r   )r   msgexcs      r   __next__zBroadcastCursor.__next__Y   si    4<<( 
 >>22  )CH4JJLs   0 A5A0/A00A5N)T)__name__
__module____qualname____doc__r   r%   r'   r   r,   r4   r.   r+   r   r   r   r   @   s+    &!
J7& Dr   r   c                  l    e Zd ZdZdZi ZdZdZdZdZ	dZ
dZdZdZd	Zd
ZdZdZej&                  j(                  dz   Z fdZd Zd Z fdZd Zd Zd Zd Zd Z fdZd*dZd Z d Z!d*dZ"d Z#d Z$d Z%e&d        Z'e&d         Z(e&d!        Z)e&d"        Z*e&d#        Z+d$ Z,d% Z-d& Z.d' Z/d( Z0d) Z1 xZ2S )+ChannelzMongoDB Channel.TFNi z	127.0.0.1ii  kombu_defaultmessageszmessages.routingzmessages.broadcastzmessages.queues)connect_timeoutsslttlcapped_queue_sizedefault_hostnamedefault_portdefault_databasemessages_collectionrouting_collectionbroadcast_collectionqueues_collectioncalc_queue_sizec                J    t        |   |i | i | _        | j                   y r!   )superr   _broadcast_cursorsclient)r   vargskwargs	__class__s      r   r   zChannel.__init__   s&    %*6*"$ 	r   c           
         | j                   r7| j                  j                  d|id||| j                  |d      did       y y )N_id$set	x-expires)rQ   options	expire_atTupsert)r?   queues
update_one_get_queue_expire)r   queuerN   s      r   
_new_queuezChannel._new_queue   sW    88KK""$#)%)%;%;"K&  #  r   c                X   || j                   v r	 t        | j                  |            }n0| j                  j                  d|idt        j                  fg      }| j                  r| j                  |       |
t               t        t        |d               S # t        $ r d }Y Mw xY w)Nr[   priority)sortpayload)_fanout_queuesr.   _get_broadcast_cursorStopIterationr<   find_one_and_deleter/   	ASCENDINGr?   _update_queues_expirer   r   r   )r   r[   r2   s      r   _getzChannel._get   s    D'''455e<= --33% !7#4#456 4 C
 88&&u-;'M\#i.122 ! s   B B)(B)c                    | j                   st        | 	  |      S || j                  v r| j	                  |      j                         S | j                  j                  d|i      S Nr[   )rH   rJ   _sizera   rb   r%   r<   r#   )r   r[   rO   s     r   rj   zChannel._size   s_     ##7=''D'''--e4==??}},,gu-=>>r   c                   t        |      || j                  |d      d}| j                  r:| j                  |d      |d<   | j	                  |      }||d   ||d   k  r||d<   | j
                  j                  |       y )NT)reverse)r`   r[   r^   zx-message-ttlrU   )r   _get_message_priorityr?   rZ   _get_message_expirer<   
insert_one)r   r[   messagerN   data
msg_expires         r   _putzChannel._put   s    W~227D2I
 88 $ 6 6uo ND11':J%[!)Z${:K-K$.[!  &r   c                R    | j                   j                  t        |      |d       y )N)r`   r[   )	broadcastro   r   )r   exchangerp   routing_keyrN   s        r   _put_fanoutzChannel._put_fanout   s"    !!eGn+3#5 	6r   c                    | j                  |      }|| j                  v r!| j                  |      j                          |S | j                  j                  d|i       |S ri   )rj   ra   rb   r   r<   delete_many)r   r[   sizes      r   _purgezChannel._purge   sZ    zz% D'''&&u-335  MM%%w&67r   c                    t        | j                  j                  |   d         }| j                  j	                  d|i      }|t        d |D              z  S )Ntablerv   c              3  8   K   | ]  }|d    |d   |d   f  yw)rw   patternr[   Nr+   ).0rs     r   	<genexpr>z$Channel.get_table.<locals>.<genexpr>   s,      '
 }q|QwZ8'
s   )	frozensetstate	exchangesroutingfind)r   rv   localRoutesbrokerRoutess       r   	get_tablezChannel.get_table   s_    

 4 4X >w GH||(("
 Y '
!'
 
 
 	
r   c                6   | j                  |      j                  dk(  r#| j                  ||||       || j                  |<   ||||d}|j	                         }| j
                  r| j                  |d      |d<   | j                  j                  |d|id       y )Nfanout)rv   r[   rw   r   rS   rU   rR   TrV   )	typeoftype_create_broadcast_cursorra   copyr?   rZ   r   rY   )r   rv   rw   r   r[   lookuprq   s          r   _queue_bindzChannel._queue_bind   s    ;;x %%1))+w7)1D& !&	
 {{}88 $ 6 6uk JDtDr   c                |   | j                   j                  d|i       | j                  r| j                  j	                  d|i       t        |   |fi | || j                  v rH	 | j                  j                  |      }|j                          | j                  j                  |       y y # t        $ r Y y w xY w)Nr[   rQ   )r   rz   r?   rX   
delete_onerJ   queue_deletera   rK   popr'   KeyError)r   r[   rN   r   rO   s       r   r   zChannel.queue_delete  s      '5!1288KK""E5>2U-f-D'''/0044U; ##''. (  s   'B/ /	B;:B;c                8   | j                   j                  }|j                  }|j                  d      rd}d|z   }|j                  |      s||z   }|t	        |      d  s|| j
                  z  }|j                  rPd|vrL|j                  d      \  }}|j                  }|j                  r|d|j                  z   z  }|dz   |z   dz   |z   }|j                  r|j                  n| j                  }t        j                  ||d      }|d	   xs |j                  }	|	d
v r| j                  }	d| j                  | j                   rt#        | j                   dz        nd d}
|
j%                  |d          | j'                  |
      }
d|
v r|
j)                  d       ||	|
fS )Nzsrv://zmongodb+srv://zmongodb+@z://:F)validatedatabase)/NTi  )auto_start_requestr>   connectTimeoutMSrT   tlsr>   )
connectionrL   hostname
startswithlenrA   useridsplitpasswordportrB   r   	parse_urivirtual_hostrC   r>   r=   intupdate_prepare_client_optionsr   )r   schemerL   r   headtailcredentialsr   parseddbnamerT   s              r   
_parse_urizChannel._parse_uri  s    ''??x(%F!H,H""6*(HF%---H==S0!.JD$ --KsV__44e|k1C7$>H$kkv{{t/@/@ %%huE
#:v':':[ **F #'88$($8$8 "%T%9%9D%@!A>B	
 	vi()..w7GKK((r   c                    t         j                  dk\  rV|j                  dd        t        |j	                  d      t
              r%t         j                  j                  }||d      |d<   |S )N   r   readpreference)r/   version_tupler   
isinstancegetr   read_preferences_MONGOS_MODES)r   rT   modess      r   r   zChannel._prepare_client_optionsK  s_      D(KK,d3'++&67=00>>,1':J2K,L()r   c                    t        |fi |S r!   r   )r   	argumentsrN   s      r   prepare_queue_argumentszChannel.prepare_queue_argumentsS  s    *9???r   c                   | j                  |      \  }}}||d<   t               }|dk(  rddlm} |j	                          n|dk(  rddlm}  |        t        di |}||   }	|j                         d   }
|
j                  d	      d   }
t        t        t        |
j                  d
                  }|dk  rt        t        j                  |
            | j                   r#|dk  rt        t"        j                  |
            |	S )N)r   hostgeventr   )monkeyeventlet)monkey_patchversion-.)r   r   )   r   r+   )r   r
   r   r   	patch_allr   r   r   server_infor   tuplemapr   r	   E_SERVER_VERSIONformatr?   E_NO_TTL_INDEXES)r   r   r   r   confenvr   r   	mongoconnr   version_strr   s               r   _openzChannel._openV  s    !%!?&$V!#(?%J-N'$'	V$++-i8!'',Q/C!2!23!789V!"2"9"9+"FGGXX'F*!"2"9"9+"FGGr   c                    | j                   |j                         v ry|j                  | j                   | j                  d       y)z0Create capped collection for broadcast messages.NT)r{   capped)rF   list_collection_namescreate_collectionr@   r   r   s     r   _create_broadcastzChannel._create_broadcastq  sC    $$(F(F(HH""4#<#<(,(>(>*. 	# 	0r   c                z   || j                      }|j                  g dd       || j                     j                  dg       || j                     }|j                  ddg       | j                  rJ|j                  dgd       |j                  dgd       || j
                     j                  dgd       y	y	)
zEnsure indexes on collections.)r[   r   )r^   r   )rQ   r   T)
backgroundr   )rv   r   )rU   r   r   )expireAfterSecondsN)rD   create_indexrF   rE   r?   rG   )r   r   r<   r   s       r   _ensure_indexeszChannel._ensure_indexesz  s    D4457D 	 	
 	**+88,H4223lO<=88!!#3"4!K  "2!3 JT++,99!"q : :	 r   c                j    | j                         }| j                  |       | j                  |       |S )zActually creates connection.)r   r   r   r   s     r   _create_clientzChannel._create_client  s.    ::<x(X&r   c                "    | j                         S r!   )r   r$   s    r   rL   zChannel.client  s    ""$$r   c                4    | j                   | j                     S r!   )rL   rD   r$   s    r   r<   zChannel.messages  s    {{43344r   c                4    | j                   | j                     S r!   )rL   rE   r$   s    r   r   zChannel.routing  s    {{42233r   c                4    | j                   | j                     S r!   )rL   rF   r$   s    r   ru   zChannel.broadcast  s    {{44455r   c                4    | j                   | j                     S r!   )rL   rG   r$   s    r   rX   zChannel.queues  s    {{41122r   c                    	 | j                   |   S # t        $ r$ | j                  | j                  |   d d |      cY S w xY wr!   )rK   r   r   ra   )r   r[   s     r   rb   zChannel._get_broadcast_cursor  sP    	**511 	 00##E*D$ 		s    *>>c                    t         j                  dk\  rd|it        j                  d}nd|idd} | j                  j
                  di |}t        |      x}| j                  |<   |S )Nr   r[   )filtercursor_typeT)querytailabler+   )r/   r   r   TAILABLEru   r   r   rK   )r   rv   rw   r   r[   r   r   rets           r   r   z Channel._create_broadcast_cursor  sv      E)"H-)22E "8, E
 %$$-u-/>v/FFd%%e,
r   c                    |j                  di       j                  d      }|0| j                         t        j                  t	        |            z   S y )N
properties
expirationmilliseconds)r   get_nowdatetime	timedeltar   )r   rp   values      r   rn   zChannel._get_message_expire  sH    L"-11,?<<>H$6$6CJ$OOO r   c                    t        |t              r&| j                  j                  d|i      }|sy|d   }n|}	 |d   |   }| j                         t        j                  |      z   S # t        t
        f$ r Y yw xY w)zGet expiration header named `argument` of queue definition.

        Note:
        ----
            `queue` must be either queue name or options itself.
        rQ   NrT   r   r   )	r   r1   rX   find_oner   	TypeErrorr   r   r   )r   r[   argumentdocrq   r   s         r   rZ   zChannel._get_queue_expire  s     eS!++&&u~6Cy>DD	%h/E ||~ 2 2 FFF )$ 		s   A) )A;:A;c                    | j                  |d      }|sy| j                  j                  d|idd|ii       | j                  j                  d|idd|ii       y)z,Update expiration field on queues documents.rS   Nr[   rR   rU   rQ   )rZ   r   update_manyrX   )r   r[   rU   s      r   rf   zChannel._update_queues_expire  sg    **5+>	  evY'?@	BENVk9%=>	@r   c                >    t         j                   j                         S )zReturn current time in UTC.)r   utcnowr$   s    r   r   zChannel.get_now  s      ''))r   )
mongodb://)3r5   r6   r7   r8   supports_fanoutra   r>   r?   r=   r@   rH   rA   rB   rC   rD   rE   rF   rG   r   r:   from_transport_optionsr   r\   rg   rj   rs   rx   r|   r   r   r   r   r   r   r   r   r   r   r   rL   r<   r   ru   rX   rb   r   rn   rZ   rf   r   __classcell__)rO   s   @r   r:   r:   o   sO   O N C
COO"L&$+/)%ooDD H 3(	?'"6	
E(/(0)d@60:& % % 5 5 4 4 6 6 3 3	 P
G0
@*r   r:   c                  N   e Zd ZdZeZdZdZej                  Zej                  j                  ej                  fz   Z
ej                  j                  ej                  ej                  fz   ZdZdZej                  j"                  j%                   eg d            Zd Zddd	Zy
)	TransportzMongoDB Transport.Tr   mongodbr/   )directtopicr   )exchange_typec                "    t         j                  S r!   )r/   r   r$   s    r   driver_versionzTransport.driver_version	  s    r   c                    |sy|r|S d|vrt        |      S |j                  dd      \  }}dj                  t        |      |g      S )Nr  ,r   )r   r   join)r   uriinclude_passwordmaskuri1	remainders         r   as_urizTransport.as_uri  sP    Jc>%c**))C+ixx+D19=>>r   N)Fz**)r  r1   returnr1   )r5   r6   r7   r8   r:   can_parse_urlpolling_intervalrB   r   r  connection_errorsr   ConnectionFailurechannel_errorsr0   driver_typedriver_name
implementsextendr   r  r  r+   r   r   r  r    s    GM''L++v/G/G.II  	(($$##,% 	% 
 KK""--44 => 5 J
?r   r  )"r8   
__future__r   r   r[   r   r/   r   r   r   pymongo.cursorr   kombu.exceptionsr	   kombu.utils.compatr
   kombu.utils.encodingr   kombu.utils.jsonr   r   kombu.utils.objectsr   kombu.utils.urlr    r   baser   r   r   r   r:   r  r+   r   r   <module>r/     sz   @ #    3 3 % , 2 - ) / .  -  
, ,^@*goo @*F$?!! $?r   