Python: threads controlling events notified by other threads

I am developing a multithreaded application in Python. In particular, in this application, a thread must be able to generate an event that must be notified of one (or more) threads; threads that receive an event notification must interrupt their execution and run a specific function. At the end of this service function, they should return to what they were doing before the event was created.

To do something like this, I thought about using some kind of publish / subscribe module. I found one that is very easy to use: PyPubSub . You can find here a very simple example of how to use it.

By the way, when I started using this, I realized that I did what I was looking for, but only when you work only with processes . If you have more threads, it pauses the whole process (so that all threads in it) starts a specific procedure. This is not the behavior I was looking for. Unfortunately, I cannot change my application from multi-threaded to multi-processor.

Do you know any module that can help me do what I'm trying to do in a multi-threaded application? Thank.

+3
source share
1 answer

In python, there is no true concurrency, except through the multiprocessing module, since the GIL is then not part of the image.

, , , . , Pypubsub , , ( pubsub , , :) , mp, concurrency , ?

, , , , , , , . , "" : , , . , . .

3 , :

from multiprocessing import Process, Queue, Lock
from Queue import Empty as QueueEmpty
from random import randint


def log(lock, threadId, msg):
    lock.acquire()
    print 'Thread', threadId, ':', msg
    lock.release()


def auxThread(id, lock, sendQueue, recvQueue, genType):
    ## Read from the queue
    log(lock, id, 'starting')
    while True:
        # send a message (once in a while!)
        if randint(1,10) > 7:
            event = dict(type = genType, fromId = id, val = randint(1, 10) )
            log(lock, id, 'putting message type "%(type)s" = %(val)s' % event)
            sendQueue.put(event)

        # block until we get a message:
        maxWait = 1 # second
        try:
            msg = recvQueue.get(False, maxWait)
            log(lock, id, 'got message type "%(type)s" = %(val)s from thread %(fromId)s' % msg)
            if (msg['val'] == 'DONE'):
                break
        except QueueEmpty:
            pass

    log(lock, id, 'done')


def createThread(id, lock, postOffice, genType):
    messagesForAux = Queue()
    args = (id, lock, postOffice, messagesForAux, genType)
    auxProc = Process(target=auxThread, args=args)
    auxProc.daemon = True
    return dict(q=messagesForAux, p=auxProc, id=id)


def mainThread():
    postOffice = Queue()   # where all threads post their messages
    lock = Lock() # so print can be synchronized

    # setup threads:
    msgThreads = [
        createThread(1, lock, postOffice, 'heartbeat'),
        createThread(2, lock, postOffice, 'new_socket'),
        createThread(3, lock, postOffice, 'keypress'),
    ]

    # identify which threads listen for which messages
    dispatch = dict(
        heartbeat  = (2,),
        keypress   = (1,),
        new_socket = (3,),
    )

    # start all threads
    for th in msgThreads:
        th['p'].start()

    # process messages
    count = 0
    while True:
        try:
            maxWait = 1 # second
            msg = postOffice.get(False, maxWait)
            for threadId in dispatch[msg['type']]:
                thObj = msgThreads[threadId - 1]
                thObj['q'].put(msg)
            count += 1
            if count > 20:
                break

        except QueueEmpty:
            pass

    log(lock, 0, "Main thread sending exit signal to aux threads")
    for th in msgThreads:
        th['q'].put(dict(type='command', val='DONE', fromId=0))

    for th in msgThreads:
        th['p'].join()
        log(lock, th['id'], 'joined main')
    log(lock, 0, "DONE")


if __name__ == '__main__':
    mainThread()

, pypubsub, pypubsub, , - , pypubsub . , mp ( ), pypubsub / , .

+3

All Articles