https://github.com/housleyjk/aiopyramid phù hợp với tôi. Xem tài liệu cho WebSocket http://aiopyramid.readthedocs.io/features.html#websockets
UPD: máy chủ
WebSocket với môi trường kim tự tháp.
import aiohttp
import asyncio
from aiohttp import web
from webtest import TestApp
from pyramid.config import Configurator
from pyramid.response import Response
async def websocket_handler(request):
ws = web.WebSocketResponse()
await ws.prepare(request)
while not ws.closed:
msg = await ws.receive()
if msg.tp == aiohttp.MsgType.text:
if msg.data == 'close':
await ws.close()
else:
hello = TestApp(request.app.pyramid).get('/')
ws.send_str(hello.text)
elif msg.tp == aiohttp.MsgType.close:
print('websocket connection closed')
elif msg.tp == aiohttp.MsgType.error:
print('ws connection closed with exception %s' %
ws.exception())
else:
ws.send_str('Hi')
return ws
def hello(request):
return Response('Hello world!')
async def init(loop):
app = web.Application(loop=loop)
app.router.add_route('GET', '/{name}', websocket_handler)
config = Configurator()
config.add_route('hello_world', '/')
config.add_view(hello, route_name='hello_world')
app.pyramid = config.make_wsgi_app()
srv = await loop.create_server(app.make_handler(),
'127.0.0.1', 8080)
print("Server started at http://127.0.0.1:8080")
return srv
loop = asyncio.get_event_loop()
loop.run_until_complete(init(loop))
try:
loop.run_forever()
except KeyboardInterrupt:
pass
WebSocket khách hàng:
import asyncio
import aiohttp
session = aiohttp.ClientSession()
async def client():
ws = await session.ws_connect('http://0.0.0.0:8080/foo')
while True:
ws.send_str('Hi')
await asyncio.sleep(2)
msg = await ws.receive()
if msg.tp == aiohttp.MsgType.text:
print('MSG: ', msg)
if msg.data == 'close':
await ws.close()
break
else:
ws.send_str(msg.data + '/client')
elif msg.tp == aiohttp.MsgType.closed:
break
elif msg.tp == aiohttp.MsgType.error:
break
loop = asyncio.get_event_loop()
loop.run_until_complete(client())
loop.close()
Không chắc về WebSocket bằng Python 3 (gevent-socketio dựa vào gevent, mà tôi không chắc chắn được hỗ trợ trong Python 3). Nhưng bạn đã xem các sự kiện đã gửi của máy chủ chưa? Ví dụ: https://github.com/antoineleclair/zmq-sse-chat/blob/master/sse/views.py –