
    IhG                    (   d Z ddlmZ ddlZddlmZ ddlmZ ddlm	Z	 ddl	m
Z
 d	Zd
Z G d de	j                        Z G d dej                  e	j                        Z G d dej                         Z G d de	j"                        Z G d de      Zy)a  pyamqp transport module for Kombu.

Pure-Python amqp transport using py-amqp library.

Features
========
* Type: Native
* Supports Direct: Yes
* Supports Topic: Yes
* Supports Fanout: Yes
* Supports Priority: Yes
* Supports TTL: Yes

Connection String
=================
Connection string can have the following formats:

.. code-block::

    amqp://[USER:PASSWORD@]BROKER_ADDRESS[:PORT][/VIRTUALHOST]
    [USER:PASSWORD@]BROKER_ADDRESS[:PORT][/VIRTUALHOST]
    amqp://

For TLS encryption use:

.. code-block::

    amqps://[USER:PASSWORD@]BROKER_ADDRESS[:PORT][/VIRTUALHOST]

Transport Options
=================
Transport Options are passed to constructor of underlying py-amqp
:class:`~kombu.connection.Connection` class.

Using TLS
=========
Transport over TLS can be enabled by ``ssl`` parameter of
:class:`~kombu.Connection` class. By setting ``ssl=True``, TLS transport is
used::

    conn = Connect('amqp://', ssl=True)

This is equivalent to ``amqps://`` transport URI::

    conn = Connect('amqps://')

For adding additional parameters to underlying TLS, ``ssl`` parameter should
be set with dict instead of True::

    conn = Connect('amqp://broker.example.com', ssl={
            'keyfile': '/path/to/keyfile'
            'certfile': '/path/to/certfile',
            'ca_certs': '/path/to/ca_certfile'
        }
    )

All parameters are passed to ``ssl`` parameter of
:class:`amqp.connection.Connection` class.

SSL option ``server_hostname`` can be set to ``None`` which is causing using
hostname from broker URL. This is useful when failover is used to fill
``server_hostname`` with currently used broker::

    conn = Connect('amqp://broker1.example.com;broker2.example.com', ssl={
            'server_hostname': None
        }
    )
    )annotationsN)get_manager)version_string_as_tuple   )baseto_rabbitmq_queue_argumentsi(  i'  c                  $     e Zd ZdZd fd	Z xZS )MessagezAMQP Message.c                    |j                   }t        |   d|j                  ||j                  |j                  d      |j                  d      |j                  |j                   |j                  d      xs i d| y )Ncontent_typecontent_encodingapplication_headers)bodychanneldelivery_tagr   r   delivery_info
propertiesheaders )r   super__init__r   r   getr   )selfmsgr   kwargsprops	__class__s        O/var/www/html/planif/env/lib/python3.12/site-packages/kombu/transport/pyamqp.pyr   zMessage.__init__X   sv     		))>2"YY'9:++~~II34:		 		    N__name__
__module____qualname____doc__r   __classcell__r   s   @r   r   r   U   s     r    r   c                  H    e Zd ZdZeZdddddej                  fdZd Zd Zy)ChannelzAMQP Channel.Nc                &     ||f||||d|xs i S )z<Prepare message so that it can be sent using this transport.)priorityr   r   r   r   )r   r   r,   r   r   r   r   _Messages           r   prepare_messagezChannel.prepare_messagek   s4     
%- '
 B
 	
r    c                    t        |fi |S r!   r   )r   	argumentsr   s      r   prepare_queue_argumentszChannel.prepare_queue_argumentsx   s    *9???r    c                (    | j                  ||       S )z4Convert encoded message body back to a Python value.r   )r   )r   raw_messages     r   message_to_pythonzChannel.message_to_python{   s    ||K|66r    )	r#   r$   r%   r&   r   amqpr.   r1   r5   r   r    r   r*   r*   f   s-    G-1%)D $
@7r    r*   c                      e Zd ZdZeZy)
ConnectionzAMQP Connection.N)r#   r$   r%   r&   r*   r   r    r   r8   r8      s
    Gr    r8   c                     e Zd ZdZeZeZeZe	j                  j                  Z
e	j                  j                  Ze	j                  j                  Ze	j                  j                  ZdZdZej"                  j$                  j'                  dd      Z	 ddZd Zd	 Zd
 Zd Zd Zd Zd Zd Zd ZddZd Ze d        Z!d Z"y)	TransportzAMQP Transport.zpy-amqpr6   T)asynchronous
heartbeatsNc                f    || _         |xs | j                  | _        |xs | j                  | _        y r!   )clientdefault_portdefault_ssl_port)r   r>   r?   r@   r   s        r   r   zTransport.__init__   s/    (=D,=,= 0 ID4I4Ir    c                "    t         j                  S r!   )r6   __version__r   s    r   driver_versionzTransport.driver_version   s    r    c                "    |j                         S r!   r3   r   
connections     r   create_channelzTransport.create_channel   s    !!##r    c                &     |j                   di |S )Nr   )drain_events)r   rG   r   s      r   rJ   zTransport.drain_events   s    &z&&000r    c                *    ||j                          y y r!   )collectrF   s     r   _collectzTransport._collect   s    !  "r    c                   | j                   }| j                  j                         D ]   \  }}t        ||d      rt	        |||       " |j
                  dk(  rd|_        t        |j                  t              r6d|j                  v r(|j                  d   |j
                  |j                  d<   t        |j                  |j                  |j                  |j                  |j                  |j                  |j                  |j                  |j                   d	fi |j"                  xs i } | j$                  di |}| j                   |_         |j'                          |S )z(Establish connection to the AMQP broker.N	localhostz	127.0.0.1server_hostname)	hostuseridpasswordlogin_methodvirtual_hostinsistsslconnect_timeout	heartbeatr   )r>   default_connection_paramsitemsgetattrsetattrhostname
isinstancerW   dictrQ   rR   rS   rT   rU   rV   rX   rY   transport_optionsr8   connect)r   conninfonamedefault_valueoptsconns         r   establish_connectionzTransport.establish_connection   s8   ;;#'#A#A#G#G#I 	7D-8T40$6	7 + +HhllD)!X\\1./7.6.?.?HLL*+MMoo ))$11$11oo<<'77!++

 
/ ''-2
/ t&&kkr    c                    |j                   S r!   )	connectedrF   s     r   verify_connectionzTransport.verify_connection       ###r    c                2    d|_         |j                          y)z!Close the AMQP broker connection.N)r>   closerF   s     r   close_connectionzTransport.close_connection   s     
r    c                    |j                   S r!   )rY   rF   s     r   get_heartbeat_intervalz Transport.get_heartbeat_interval   rl   r    c                v    d|j                   _        |j                  |j                  | j                  ||       y NT)	transportraise_on_initial_eintr
add_readersockon_readable)r   rG   loops      r   register_with_event_loopz"Transport.register_with_event_loop   s,    6:
3
)9)9:tLr    c                &    |j                  |      S )N)rate)heartbeat_tick)r   rG   r|   s      r   heartbeat_checkzTransport.heartbeat_check   s    ((d(33r    c                f    |j                   }|j                  d      dk(  rt        |d         dk  S y)NproductRabbitMQversion)   r   T)server_propertiesr   r   )r   rG   r   s      r   qos_semantics_matches_specz$Transport.qos_semantics_matches_spec   s6    ,,99Y:-*5+;<vEEr    c                j    dd| j                   j                  r| j                  n| j                  dddS )NguestrO   PLAIN)rR   rS   portr^   rT   )r>   rW   r@   r?   rC   s    r   rZ   z#Transport.default_connection_params   s6     .2kkooT**++##
 	
r    c                4    t        | j                  g|i |S r!   )r   r>   )r   argsr   s      r   r   zTransport.get_manager   s    4;;8888r    )NN)   )#r#   r$   r%   r&   r8   DEFAULT_PORTr?   DEFAULT_SSL_PORTr@   r6   connection_errorschannel_errorsrecoverable_connection_errorsrecoverable_channel_errorsdriver_namedriver_typer   r:   
implementsextendr   rD   rH   rJ   rM   rh   rk   ro   rq   rz   r~   r   propertyrZ   r   r   r    r   r:   r:      s    JL' 99__33N55 "!%!K!KKK**11 2 J 6:J $1!:$
$M4 
 
9r    r:   c                  "     e Zd ZdZ fdZ xZS )SSLTransportzAMQP SSL Transport.c                t    t        |   |i | | j                  j                  sd| j                  _        y y rs   )r   r   r>   rW   )r   r   r   r   s      r   r   zSSLTransport.__init__   s2    $)&) {{"DKKO r    r"   r(   s   @r   r   r      s    # #r    r   )r&   
__future__r   r6   kombu.utils.amq_managerr   kombu.utils.textr    r   r	   r   r   r   r*   
StdChannelr8   r:   r   r   r    r   <module>r      s   CL #  / 4  - dll "7dllDOO 74 l9 l9^#9 #r    