How to run twisted transport in python correctly?

I was asked to write a class that connects to the server, passes various commands to the server asynchronously, and then provides the returned data to the client. I was asked to do this in Python, this is a new language for me. I started digging around and found a Twisted framework that offers some very nice abstractions (Protocol, ProtocolFactory, Reactor) that do a lot of what I would have to do if I included my own socket based application. It seems to be the right choice, given the problem I have to solve.

I looked at a lot of examples on the Internet (mainly Krondo ), but I still haven’t seen a good example of creating a client that will send several commands over a wire, and I support the connection that I create. The server (from which I do not control), in this case does not turn off after sending the response. So what is the right way to develop a client so that I can tickle the server in different ways?

Now I am doing this:

class TestProtocol(Protocol)
    def connectionMade(self):
         self.transport.write(self.factory.message)

class TestProtocolFactory(Factory):
    message = ''
    def setMessage(self, msg):
        self.message = msg

def main():
    f = TestProtocolFactory()
    f.setMessage("my message")
    reactor.connectTCP(...)
    reactor.run()

What I really want to do is call self.transport.write(...)through the reactor (indeed, call TestProtocolFactory :: setMessage () on demand from another thread of execution), and not just when creating the connection.

+3
source share
3 answers

It depends. Here are a few possibilities:

I guess

1. , - . , :

class proto(parentProtocol):
    def stringReceived(self, data):
        self.handle_server_response(data)
        next_command = self.command_queue.pop()
        # do stuff

2. , , , :

class proto(parentProtocol):
    def stringReceived(self, data):
        if data == "this":
            self.sendString("that")
        elif data == "foo":
            self.sendString("bar")
        # and so on

3. , , :

class proto(parentProtocol):
    def callback(self):
        next_command = self.command_queue.pop()
        # do stuff
    def connectionMade(self):
        from twisted.internet import task
        self.task_id = task.LoopingCall(self.callback)
        self.task_id.start(1.0)

4: . , , proto.sendString . , . 3 . ( ) .

, ; , . , - . , .

+4

Service.

- Twisted, , . , SayStuffToServerService ( , , , , , :)), - :

class SayStuffToServerService:
    def __init__(self, host, port):
        # this is the host and port to connect to

    def sendToServer(self, whatToSend):
        # send some line to the remote server

    def startService(self):
        # call me before using the service. starts outgoing connection efforts.

    def stopService(self):
        # clean reactor shutdowns should call this method. stops outgoing
        # connection efforts.

( , , -.)

startService() stopService() - , Twisted Services. , , Twisted Service, TCP . twisted.application.internet.TCPClient, , ProtocolFactory .

SayStuffToServerService, TCPClient:

from twisted.application import internet

class SayStuffToServerService(internet.TCPClient):
    factoryclass = SayStuffToServerProtocolFactory

    def __init__(self, host, port):
        self.factory = self.factoryclass()
        internet.TCPClient.__init__(self, host, port, self.factory)

    def sendToServer(self, whatToSend):
        # we'll do stuff here

(. SayStuffToServerProtocolFactory.)

; , , , . . application - , twistd, , , . , , .

from twisted.application import service

...

application = service.Application('say-stuff')

sttss = SayStuffToServerService('localhost', 65432)
sttss.setServiceParent(service.IServiceCollection(application))

. , twistd (.. , twistd -noy saystuff.py), application , , , SayStuffToServerService, : 65432, factory . reactor.run() .

, SayStuffToServerProtocolFactory. , , , ( sendToServer , ), factory ReconnectingClientFactory.

from twisted.internet import protocol

class SayStuffToServerProtocolFactory(protocol.ReconnectingClientFactory):
    _my_live_proto = None
    protocol = SayStuffToServerProtocol

, TCP- SayStuffToServerProtocol . , , , ( ). _my_live_proto factory resetDelay() , , . :

class SayStuffToServerProtocol(basic.LineReceiver):
    def connectionMade(self):
        # if there are things you need to do on connecting to ensure the
        # connection is "all right" (maybe authenticate?) then do that
        # before calling:
        self.factory.resetDelay()
        self.factory._my_live_proto = self

    def connectionLost(self, reason):
        self.factory._my_live_proto = None
        del self.factory

    def sayStuff(self, stuff):
        self.sendLine(stuff)

    def lineReceived(self, line):
        # do whatever you want to do with incoming lines. often it makes sense
        # to have a queue of Deferreds on a protocol instance like this, and
        # each incoming response gets sent to the next queued Deferred (which
        # may have been pushed on the queue after sending some outgoing
        # message in sayStuff(), or whatever).
        pass

twisted.protocols.basic.LineReceiver, , .

. factory _my_live_proto, , , ( ), . SayStuffToServerService.sendToServer:

class NotConnectedError(Exception):
    pass

class SayStuffToServerService(internet.TCPClient):

    ...

    def sendToServer(self, whatToSend):
        if self.factory._my_live_proto is None:
            # define here whatever behavior is appropriate when there is no
            # current connection (in case the client can't connect or
            # reconnect)
            raise NotConnectedError
        self.factory._my_live_proto.sayStuff(whatToSend)

:

from twisted.application import internet, service
from twisted.internet import protocol
from twisted.protocols import basic

class SayStuffToServerProtocol(basic.LineReceiver):
    def connectionMade(self):
        # if there are things you need to do on connecting to ensure the
        # connection is "all right" (maybe authenticate?) then do that
        # before calling:
        self.factory.resetDelay()
        self.factory._my_live_proto = self

    def connectionLost(self, reason):
        self.factory._my_live_proto = None
        del self.factory

    def sayStuff(self, stuff):
        self.sendLine(stuff)

    def lineReceived(self, line):
        # do whatever you want to do with incoming lines. often it makes sense
        # to have a queue of Deferreds on a protocol instance like this, and
        # each incoming response gets sent to the next queued Deferred (which
        # may have been pushed on the queue after sending some outgoing
        # message in sayStuff(), or whatever).
        pass

class SayStuffToServerProtocolFactory(protocol.ReconnectingClientFactory):
    _my_live_proto = None
    protocol = SayStuffToServerProtocol

class NotConnectedError(Exception):
    pass

class SayStuffToServerService(internet.TCPClient):
    factoryclass = SayStuffToServerProtocolFactory

    def __init__(self, host, port):
        self.factory = self.factoryclass()
        internet.TCPClient.__init__(self, host, port, self.factory)

    def sendToServer(self, whatToSend):
        if self.factory._my_live_proto is None:
            # define here whatever behavior is appropriate when there is no
            # current connection (in case the client can't connect or
            # reconnect)
            raise NotConnectedError
        self.factory._my_live_proto.sayStuff(whatToSend)

application = service.Application('say-stuff')

sttss = SayStuffToServerService('localhost', 65432)
sttss.setServiceParent(service.IServiceCollection(application))

, , . , , , , -, , .. .., .

+3

- ; async, .

The fundamental nature is adapted to develop the protocol, you just need to change your mind from traditional sequential programming. The Protocol class is similar to a state machine with events such as: make connection, loss of connection, receiving data. You can convert your client code to FSM, and then easily fit into the protocol class.

Below is an example of what I want to express. A little blush, but I can provide this now:

class SyncTransport(Protocol):
    # protocol
    def dataReceived(self, data):
        print 'receive data', data
    def connectionMade(self):
        print 'i made a sync connection, wow'
        self.transport.write('x')
        self.state = I_AM_LIVING
    def connectionLost(self):
        print 'i lost my sync connection, sight'
    def send(self, data):
        if self.state == I_AM_LIVING:
            if data == 'x':
              self.transport.write('y')
           if data == 'Y':
              self.transport.write('z')
              self.state = WAITING_DEAD
        if self.state == WAITING_DEAD:
              self.transport.close()
0
source

All Articles