All pastes #1883073 Raw Edit

WebSockets/AMQP chat client

public python v1 · immutable
#1883073 ·published 2010-06-14 18:36 UTC
rendered paste body
# -*- coding: utf-8 -*-import sysimport randomimport timeimport selectimport loggingfrom logging import handlersif hasattr(select, "poll"):    from asyncore import poll2 as pollelse:    from asyncore import pollimport pikaimport tornadoimport tornado.httpserverimport tornado.ioloopfrom tornado import websocketfrom cherrypy.process import wspbus, pluginsclass MyBus(wspbus.Bus):    def __init__(self, name=""):        wspbus.Bus.__init__(self)        self.open_logger(name)        self.subscribe("log", self._log)        self.ioloop = tornado.ioloop.IOLoop.instance()        self.ioloop.add_callback(self.call_main)    def call_main(self):        self.publish('main')        time.sleep(0.1)        self.ioloop.add_callback(self.call_main)            def block(self):        ioloop = tornado.ioloop.IOLoop.instance()        try:            ioloop.start()        except KeyboardInterrupt:            ioloop.stop()            self.exit()    def exit(self):        wspbus.Bus.exit(self)        self.close_logger()    def open_logger(self, name=""):        logger = logging.getLogger(name)        logger.setLevel(logging.INFO)        h = logging.StreamHandler(sys.stdout)        h.setLevel(logging.INFO)        h.setFormatter(logging.Formatter("[%(asctime)s] %(name)s - %(levelname)s - %(message)s"))        logger.addHandler(h)        self.logger = logger    def close_logger(self):        for handler in self.logger.handlers:            handler.flush()            handler.close()                def _log(self, msg="", level=logging.INFO):        self.logger.log(level, msg)class WS2AMQPPlugin(plugins.SimplePlugin):    def __init__(self, bus):        plugins.SimplePlugin.__init__(self, bus)        self.conn = pika.AsyncoreConnection(pika.ConnectionParameters('localhost'))        self.channel = self.conn.channel()        self.channel.exchange_declare(exchange="X", type="direct", durable=False)        self.channel.queue_declare(queue="Q", durable=False, exclusive=False)        self.channel.queue_bind(queue="Q", exchange="X", routing_key="")        self.channel.basic_consume(self.amqp2ws, queue="Q")        self.bus.subscribe("ws2amqp", self.ws2amqp)        self.bus.subscribe("stop", self.cleanup)    def cleanup(self):        self.bus.unsubscribe("ws2amqp", self.ws2amqp)        self.bus.unsubscribe("stop", self.cleanup)        self.channel.queue_delete(queue="Q")        self.channel.exchange_delete(exchange="X")        self.conn.close()    def amqp2ws(self, ch, method, header, body):        self.bus.publish("amqp2ws", body)        ch.basic_ack(delivery_tag=method.delivery_tag)            def ws2amqp(self, message):        self.bus.log("Publishing to AMQP: %s" % message)        self.channel.basic_publish(exchange="X", routing_key="", body=message)        bus = MyBus()class MainHandler(tornado.web.RequestHandler):    def get(self):        username = "User%d" % random.randint(0, 100)        self.write("""<html>        <head>          <script type='application/javascript' src='/static/jquery-1.4.2.min.js'> </script>          <script type='application/javascript'>            $(document).ready(function() {              var ws = new WebSocket('ws://192.168.0.10:8888/ws');              ws.onmessage = function (evt) {                 $('#chat').val($('#chat').val() + evt.data + '\\n');                                };              $('#chatform').submit(function() {                 ws.send('%(username)s: ' + $('#message').val());                 $('#message').val("");                 return false;              });            });          </script>        </head>        <body>        <form action='/ws' id='chatform' method='post'>          <textarea id='chat' cols='35' rows='10'></textarea>          <br />          <label for='message'>%(username)s: </label><input type='text' id='message' />          <input type='submit' value='Send' />          </form>        </body>        </html>        """ % {'username': username})class WebSocket2AMQP(websocket.WebSocketHandler):    def __init__(self, *args, **kwargs):        websocket.WebSocketHandler.__init__(self, *args, **kwargs)        self.settings['bus'].subscribe("amqp2ws", self.push_message)            def open(self):        self.receive_message(self.on_message)    def on_message(self, message):        self.settings['bus'].publish("ws2amqp", message)        self.receive_message(self.on_message)    def on_connection_close(self):        self.settings['bus'].unsubscribe("amqp2ws", self.push_message)    def push_message(self, message):        self.write_message(message)if __name__ == '__main__':    application = tornado.web.Application([        (r"/", MainHandler),        (r"/ws", WebSocket2AMQP),        ], static_path=".", bus=bus)        http_server = tornado.httpserver.HTTPServer(application)    http_server.listen(8888)        bus.subscribe("main", poll)    WS2AMQPPlugin(bus).subscribe()    bus.start()    bus.block()