HEX
Server: nginx/1.24.0
System: Linux webserver 6.8.0-85-generic #85-Ubuntu SMP PREEMPT_DYNAMIC Thu Sep 18 15:26:59 UTC 2025 x86_64
User: wpuser (1002)
PHP: 8.3.6
Disabled: NONE
Upload Files
File: //lib/python3/dist-packages/twisted/test/__pycache__/test_threadable.cpython-312.pyc
�

Ϫ�f

����dZddlZddlZddlmZ	ddlZdZddlm	Z	ddl
mZmZGd�d	�Z
e	je
�Gd
�de�ZGd�d
e�Zy#e$rdZY�JwxYw)z)
Tests for L{twisted.python.threadable}.
�N)�skipIfFT)�
threadable)�FailTest�SynchronousTestCasec� �eZdZdgZdZdZd�Zy)�
TestObject�aMethod����c���td�D]g}|j|jc|_|_|j|jz|_|jdk(r�TJd|jfz��y)N�
rzz == %d, not 0 as expected)�range�y�x�z)�self�is  �>/usr/lib/python3/dist-packages/twisted/test/test_threadable.pyr	zTestObject.aMethodsb���r��	I�A�!�V�V�T�V�V�N�D�F�D�F��V�V�d�f�f�_�D�F��6�6�Q�;�H� <����y� H�H�;�	I�N)�__name__�
__module__�__qualname__�synchronizedrrr	�rrrrs���;�L�
�A�	�A�Irrc�Z�eZdZd�Zd�Zeed�d��Zeed�d��Zd�Z	y)�SynchronizationTestsc��|jtjtj��tjd�y)z�
        Reduce the CPython check interval so that thread switches happen much
        more often, hopefully exercising more possible race conditions.  Also,
        delay actual test startup until the reactor has been started.
        gH�����z>N)�
addCleanup�sys�setswitchinterval�getswitchinterval�rs r�setUpzSynchronizationTests.setUp)s/��	
����-�-�s�/D�/D�/F�G����i�(rc�X�|jdtjj�y)zk
        The name of a synchronized method is inaffected by the synchronization
        decorator.
        r	N)�assertEqualrr	rr"s r�test_synchronizedNamez*SynchronizationTests.test_synchronizedName2s ��
	
����J�$6�$6�$?�$?�@r�!Platform does not support threadsc���tj�g�tj�fd���}|j	�|j�|j
�dd�|jtj�d�y)z�
        L{threadable.isInIOThread} returns C{True} if and only if it is called
        in the same thread as L{threadable.registerAsIOThread}.
        c�J���jtj��S�N)�appendr�isInIOThread)�
foreignResults�r�<lambda>z8SynchronizationTests.test_isInIOThread.<locals>.<lambda>Bs���=�/�/�
�0G�0G�0I�J�r��targetrz#Non-IO thread reported as IO threadz#IO thread reported as not IO threadN)	r�registerAsIOThread�	threading�Thread�start�join�assertFalse�
assertTruer,)r�tr-s  @r�test_isInIOThreadz&SynchronizationTests.test_isInIOThread9sn���	�%�%�'��
����J�
��	
���	�	��������q�)�+P�Q�����#�#�%�'L�	
rc����t��g���fd�}g}td�D]9}tj|��}|j	|�|j��;|D]}|j
���rt���y)Nc���	td�D]}�j��y#t$r$}�jt	|��Yd}~yd}~wwxYw�Ni�)rr	�AssertionErrorr+�str)r�e�errors�os  ��r�callMethodLotszHSynchronizationTests.testThreadedSynchronization.<locals>.callMethodLotsQsG���
&��t�� �A��I�I�K� ��!�
&��
�
�c�!�f�%�%��
&�s� $�	A�A�A�r/)rrr2r3r+r4r5r)rrB�threadsrr8r@rAs     @@r�testThreadedSynchronizationz0SynchronizationTests.testThreadedSynchronizationKs�����L����	&����q��	�A�� � ��7�A��N�N�1��
�G�G�I�	�
�	�A�
�F�F�H�	���6�"�"�rc�X�t�}td�D]}|j��yr<)rrr	)rrArs   r�testUnthreadedSynchronizationz2SynchronizationTests.testUnthreadedSynchronizationds%���L���t��	�A�
�I�I�K�	rN)
rrrr#r&r�
threadingSkipr9rErGrrrrr(sL��)�A��M�>�?�
�@�
�"�M�>�?�#�@�#�0rrc�0�eZdZeed�d��Zd�Zy)�SerializationTestsr'c��tj�}t|�}tj|�}tj
|�}|j
||�yr*)r�XLock�type�pickle�dumps�loads�assertIsInstance)r�lock�lockType�
lockPickle�newLocks     r�testPicklingzSerializationTests.testPicklingksE�����!����:���\�\�$�'�
��,�,�z�*�����g�x�0rc��d}tj|�}tj|d�}tj|�y)Ns6ctwisted.python.threadable
unpickle_lock
p0
(tp1
Rp2
.�)rNrPrO)rrTrR�	newPickles    r�testUnpicklingz!SerializationTests.testUnpicklingss1��S�
��|�|�J�'���L�L��q�)�	����Y�rN)rrrrrHrVrZrrrrJrJjs"���M�>�?�1�@�1� rrJ)�__doc__rNr�unittestrr2rH�ImportError�twisted.pythonr�twisted.trial.unittestrrr�synchronizerrJrrr�<module>rasz���
�
�����M�%�@�
I�
I��
���z�"�?�.�?�D
 �,�
 ��w���M��s�A�A#�"A#