
    Ih(                    4   d Z ddlmZ ddlZddlZddlZddlZddlmZ ddl	m
Z
 ddlmZ ddlmZ ddlmZ dd	lmZ dd
lmZmZ ddlmZmZ ddlmZ dZdj9                   eee            Zej@                  dk(  r>ddl!Z!ddl"Z"ddl#Z#e"jH                  Z%dZ&e"jN                  Z( e!jR                         Z*d Z+d Z,n*ej@                  dk(  rddl-Z-ddl-m%Z%m&Z& d Z+d Z,n e.d       edg d      Z/ G d dej`                        Z0 G d dejb                        Z1y)a	  File-system Transport module for kombu.

Transport using the file-system as the message store. Messages written to the
queue are stored in `data_folder_in` directory and
messages read from the queue are read from `data_folder_out` directory. Both
directories must be created manually. Simple example:

* Producer:

.. code-block:: python

    import kombu

    conn = kombu.Connection(
        'filesystem://', transport_options={
            'data_folder_in': 'data_in', 'data_folder_out': 'data_out'
        }
    )
    conn.connect()

    test_queue = kombu.Queue('test', routing_key='test')

    with conn as conn:
        with conn.default_channel as channel:
            producer = kombu.Producer(channel)
            producer.publish(
                        {'hello': 'world'},
                        retry=True,
                        exchange=test_queue.exchange,
                        routing_key=test_queue.routing_key,
                        declare=[test_queue],
                        serializer='pickle'
            )

* Consumer:

.. code-block:: python

    import kombu

    conn = kombu.Connection(
        'filesystem://', transport_options={
            'data_folder_in': 'data_out', 'data_folder_out': 'data_in'
        }
    )
    conn.connect()

    def callback(body, message):
        print(body, message)
        message.ack()

    test_queue = kombu.Queue('test', routing_key='test')

    with conn as conn:
        with conn.default_channel as channel:
            consumer = kombu.Consumer(
                conn, [test_queue], accept=['pickle']
            )
            consumer.register_callback(callback)
            with consumer:
                conn.drain_events(timeout=1)

Features
========
* Type: Virtual
* Supports Direct: Yes
* Supports Topic: Yes
* Supports Fanout: Yes
* Supports Priority: No
* Supports TTL: No

Connection String
=================
Connection string is in the following format:

.. code-block::

    filesystem://

Transport Options
=================
* ``data_folder_in`` - directory where are messages stored when written
  to queue.
* ``data_folder_out`` - directory from which are messages read when read from
  queue.
* ``store_processed`` - if set to True, all processed messages are backed up to
  ``processed_folder``.
* ``processed_folder`` - directory where are backed up processed files.
* ``control_folder`` - directory where are exchange-queue table stored.
    )annotationsN)
namedtuple)Path)Empty)	monotonic)ChannelError)virtual)bytes_to_strstr_to_bytes)dumpsloads)cached_property)   r   r   .ntc                    t        j                  | j                               }t        j                  ||ddt               y)Create file lock.r         N)	win32file_get_osfhandlefileno
LockFileEx__overlapped)fileflagshfiles      S/var/www/html/planif/env/lib/python3.12/site-packages/kombu/transport/filesystem.pylockr   }   s.    ((7UE1j,G    c                    t        j                  | j                               }t        j                  |ddt               y)Remove file lock.r   r   N)r   r   r   UnlockFileExr   )r   r   s     r   unlockr#      s,    ((7ua\Br   posix)LOCK_EXLOCK_SHc                L    t        j                  | j                         |       y)r   N)fcntlflockr   )r   r   s     r   r   r      s    DKKM5)r   c                h    t        j                  | j                         t         j                         y)r!   N)r(   r)   r   LOCK_UN)r   s    r   r#   r#      s    DKKM5==1r   z9Filesystem plugin only defined for NT and POSIX platformsexchange_queue_t)routing_keypatternqueuec                      e Zd ZdZdZd Zd Zd Zd Zd Z	d Z
d	 Zed
        Zed        Zed        Zed        Zed        Zed        Zy)ChannelzFilesystem Channel.Tc                   | j                   | dz  }	 |j                  d      }	 t        |t               t	        t        |j                                     }|D cg c]
  }t        |  c}t        |       |j                          S c c}w # t        |       |j                          w xY w# t        $ r g cY S t        $ r t        d|       w xY w)N	.exchangerzCannot open )control_folderopenr   r&   r   r
   readr,   r#   closeFileNotFoundErrorOSErrorr   )selfexchanger   f_objexchange_tableqs         r   	get_tablezChannel.get_table   s    ""z%;;	6IIcNEUG$!&|EJJL'A!B6DE(!,Eu Fu  	I 	6dV455	6s:   B. 6B B	+B -B. 	B B++B. .C;Cc                   | j                   | dz  }| j                   j                  d       t        |xs d|xs d|xs d      }	 |j                         r|j	                  dd      }t        |t               t        t        |j                                     }|D 	cg c]
  }	t        |	  }
}	||
vr|
j                  d|       |j                  d       |j                  t        t        |
                   nI|j	                  dd      }t        |t               |g}
|j                  t        t        |
                   t        |       |j!                          y c c}	w # t               |j!                          w xY w)	Nr3   T)exist_ok zrb+r   	bufferingwb)r5   mkdirr,   existsr6   r   r%   r   r
   r7   insertseekwriter   r   r#   r8   )r;   r<   r-   r.   r/   r   	queue_valr=   r>   r?   queuess              r   _queue_bindzChannel._queue_bind   s>   ""z%;;!!4!0$[%6B2%*[b2		{{}		%1	5UG$!&|EJJL'A!B8FG1*A.GGF*MM!Y/JJqMKKU6] ;<		$!	4UG$#Lv785MKKM H 5MKKMs    	AE( "E#1BE( #E( (Fc                l    | j                  |      D ]   } | j                  |j                  |fi | " y N)r@   _putr/   )r;   r<   payloadr-   kwargsr?   s         r   _put_fanoutzChannel._put_fanout   s3    ) 	2ADIIaggw1&1	2r   c                   dj                  t        t        t               dz              t	        j
                         |      }t        j                  j                  | j                  |      }	 t        |dd      }t        |t               |j                  t        t        |                   	 t%        |       |j'                          y# t         $ r t#        d|d      w xY w# t%               |j'                          w xY w)	zPut `message` onto `queue`.z{}_{}.{}.msgi  rF   r   rD   zCannot add file z to directoryN)formatintroundr   uuiduuid4ospathjoindata_folder_outr6   r   r%   rK   r   r   r:   r   r#   r8   )r;   r/   rR   rS   filenamefs         r   rQ   zChannel._put   s    !((U9;3E-F)G)-u>77<< 4 4h?		Xtq1AGGGLw01
 1IGGI  	>"8,m<> >	> 1IGGIs   .AC C%%C( (Dc                   d|z   dz   }t        j                  | j                        }t        |      }t	        |      dkD  r|j                  d      }|j                  |      dk  r5| j                  r| j                  }nt        j                         }	 t        j                  t         j                  j                  | j                  |      |       t         j                  j                  ||      }	 t!        |d      }|j#                         }|j%                          | j                  st        j&                  |       t+        t-        |            S t/               # t        $ r Y 9w xY w# t        $ r t)        d|d      w xY w)zGet next message from `queue`.r   .msgr   rbzCannot read file z from queue.)r[   listdirdata_folder_insortedlenpopfindstore_processedprocessed_foldertempfile
gettempdirshutilmover\   r]   r:   r6   r7   r8   remover   r   r
   r   )r;   r/   
queue_findfolderr_   rk   r`   rR   s           r   _getzChannel._get   sV   5[6)
D//0&kAozz!}H }}Z(1,###'#8#8 #+#6#6#8 BGGLL)<)<hG,. ww||$4h?HB4(&&(	++IIh'
 g.//g#    B"'|<@B BBs   >E  5AE0  	E-,E-0F	c                   d}d|z   dz   }t        j                  | j                        }t        |      dkD  ry|j	                         }	 |j                  |      dk  r4t         j                  j                  | j                  |      }t        j                  |       |dz  }t        |      dkD  ry|S # t        $ r Y w xY w)z!Remove all messages from `queue`.r   r   rb   r   )
r[   rd   re   rg   rh   ri   r\   r]   rp   r:   r;   r/   countrq   rr   r_   s         r   _purgezChannel._purge	  s    5[6)
D//0&kAozz|H==,q077<<(;(;XF		(#
 &kAo"    s   	B3 AB3 3	B?>B?c                    d}d| d}t        j                  | j                        }t        |      dkD  r9|j	                         }|j                  |      dk  r3|dz  }t        |      dkD  r9|S )z<Return the number of messages in `queue` as an :class:`int`.r   r   rb   r   )r[   rd   re   rg   rh   ri   ru   s         r   _sizezChannel._size"  sq    t_
D//0&kAozz|H }}Z(1,QJE &kAo r   c                B    | j                   j                  j                  S rP   )
connectionclienttransport_optionsr;   s    r   r}   zChannel.transport_options3  s    %%777r   c                :    | j                   j                  dd      S )Nre   data_inr}   getr~   s    r   re   zChannel.data_folder_in7  s    %%))*:IFFr   c                :    | j                   j                  dd      S )Nr^   data_outr   r~   s    r   r^   zChannel.data_folder_out;  s    %%))*;ZHHr   c                :    | j                   j                  dd      S )Nrj   Fr   r~   s    r   rj   zChannel.store_processed?  s    %%))*;UCCr   c                :    | j                   j                  dd      S )Nrk   	processedr   r~   s    r   rk   zChannel.processed_folderC  s    %%))*<kJJr   c                L    t        | j                  j                  dd            S )Nr5   control)r   r}   r   r~   s    r   r5   zChannel.control_folderG  s!    D**../?KLLr   N)__name__
__module____qualname____doc__supports_fanoutr@   rN   rT   rQ   rs   rw   ry   propertyr}   r   re   r^   rj   rk   r5    r   r   r1   r1      s    O6 02"&P2" 8 8 G G I I D D K K M Mr   r1   c                       e Zd ZdZej
                  j                  j                  d eg d            Ze	Z	 ej                         ZdZdZdZ fdZd Z xZS )		TransportzFilesystem Transport.F)directtopicfanout)asynchronousexchange_typer   
filesystemc                H    t        |   |fi | | j                  | _        y rP   )super__init__global_statestate)r;   r|   rS   	__class__s      r   r   zTransport.__init__[  s"    *6*&&
r   c                     y)NzN/Ar   r~   s    r   driver_versionzTransport.driver_version_  s    r   )r   r   r   r   r	   r   
implementsextend	frozensetr1   BrokerStater   default_portdriver_typedriver_namer   r   __classcell__)r   s   @r   r   r   L  sc    ""--44 => 5 J
 G&7&&(LLKK'r   r   )2r   
__future__r   r[   rn   rl   rY   collectionsr   pathlibr   r/   r   timer   kombu.exceptionsr   kombu.transportr	   kombu.utils.encodingr
   r   kombu.utils.jsonr   r   kombu.utils.objectsr   VERSIONr]   mapstr__version__name
pywintypeswin32conr   LOCKFILE_EXCLUSIVE_LOCKr%   r&   LOCKFILE_FAIL_IMMEDIATELYLOCK_NB
OVERLAPPEDr   r   r#   r(   RuntimeErrorr,   r1   r   r   r   r   <module>r      s  Yv # 	    "    ) # ; ) /
hhs3() 77d?..GG00G(:((*LH
C WW&*2 CE E 0AC jMgoo jMZ!! r   