Tornado简单聊天室,原理探究
分类:www.澳门新萄京赌场

python websocket

在项目中用到socket.io加强时推送,遂花了点时间看了socket.io完毕,做个轻巧拆解解析,如有错漏,迎接指正。

python3知识点

jquery.min.js

澳门新萄京 1

运用python-kafka类库开拓kafka临蓐者&花费者&顾客端

译者说

Tornado 4.3于二零一六年十月6日颁发,该版本正式补助Python3.5async/await主要字,况且用旧版本CPython编写翻译Tornado一样能够选用那五个关键字,那活脱脱是大器晚成种发展。其次,那是倒数支撑Python2.6Python3.2的版本了,在继续的本子了会移除对它们的合营。今后互连网上还并未有Tornado4.3的国语文书档案,所认为了让越多的意中人能接触并就学到它,作者起来了这几个翻译项目,希望感兴趣的友人可以合营参与翻译,项目地址是tornado-zh on Github,翻译好的文书档案在Read the Docs上直白能够看来。款待Issues or P陆风X8。本节多谢@thisisx7翻译

安装

1 概述

socket.io是叁个依据WebSocket的CS的实时通讯库,它底层基于engine.io。engine.io使用WebSocket和xhr-polling(或jsonp)封装了后生可畏套自个儿的公约,在不帮忙WebSocket的低版本浏览器中(扶植websocket的浏览器版本见这里)使用了长轮询(long polling)来代替。socket.io在engine.io的根底上平添了namespace,room,自动重连等特色。

正文接下去会先简介websocket协议,然后在那底子上上课下engine.io和socket.io公约以至源码深入分析,后续再通过例子表明socket.io的工作流程。

web服务器代码:

#coding=utf-8

importtornado.websocket

importtornado.web

importtornado.ioloop

importdatetime

classIndexHandler(tornado.web.RequestHandler):

defget(self, *args, **kwargs):

self.render('templates/index.html')

classWebHandler(tornado.websocket.WebSocketHandler):

users =set()#存放在线客商

defopen(self, *args, **kwargs):

self.users.add(self)#把创设连接后的客户增加到客商容器中

foruserinself.users:#向在线的客商发送步向消息

user.write_message("[%s]-[%s]-步入闲谈室"% (self.request.remote_ip,

datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")))

defon_close(self):

self.users.remove(self)# 顾客关闭连接后从容器中移除客户

foruserinself.users:

user.write_message("[%s]-[%s]-离开闲聊室"% (self.request.remote_ip,

datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")))

defon_message(self, message):

foruserinself.users:#向在线顾客发送闲扯消息

user.write_message("[%s]-[%s]-说:%s"% (self.request.remote_ip,

datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), message))

defcheck_origin(self, origin):

return True# 允许WebSocket的跨域伏乞

importos

BASE_DIR = os.path.dirname(__file__)

settings = {

'static_path':os.path.join(BASE_DIR,'static'),

"websocket_ping_interval":1,

"websocket_ping_timeout":10

}

app = tornado.web.Application([(r'/',IndexHandler),

(r'/chat',WebHandler)],

**settings)

app.listen(8009)

tornado.ioloop.IOLoop.instance().start()


By: 授客 QQ:1033553122

PS:本节最佳直接在https://tornado-zh.readthedocs.org或者http://tornado.moelove.info/阅读,以获得更好的翻阅体验(格式扶持)。原谅自个儿没排好版QAQ

pip install websocket-client

2 WebSocket协议

大家知晓,在HTTP 契约开垦的时候,并非为着双向通讯程序筹划的,初叶的 web 应用程序只要求 “须求-响应” 就够了。由于历史由来,在创制具有双向通讯机制的 web 应用程序时,就只可以选拔 HTTP 轮询的不二秘技,因此发出了 “短轮询” 和 “长轮询”(注意区分短连接和长连接)。

短轮询通过客商端依期轮询来打听服务端是或不是有新的音信发出,劣势也是路人皆知,轮询间隔大了则音信相当不够实时,轮询间隔过小又会消耗过多的流量,增添服务器的承负。长轮询是对短轮询的优化,供给服务端做相应的校勘来支撑。顾客端向服务端发送诉求时,假设那个时候服务端没有新的新闻发生,并不比时回到,而是Hang住意气风发段时间等有新的消息大概逾期再再次回到,客户端收到服务器的答应后持续轮询。能够看看长轮询比短轮询能够减去大气没用的央浼,并且客商端接收取新音信也会实时不菲。

纵然如此长轮询比短轮询优化了好多,不过每趟诉求如故都要带上HTTP央浼尾部,况且在长轮询的总是达成以往,服务器端累积的新音信要等到后一次顾客端连接时才干传递。更加好的办法是只用二个TCP连接来促成客商端和服务端的双向通讯,WebSocket协商就是为此而生。WebSocket是依靠TCP的三个单独的商业事务,它与HTTP公约的唯豆蔻梢头涉及就是它的拉手伏乞能够看成三个Upgrade request经由HTTP服务器解析,且与HTTP使用相像的端口。WebSocket私下认可对平时性乞请使用80端口,合同为ws://,对TLS加密央浼使用443端口,公约为wss://

拉手是由此叁个HTTP Upgrade request千帆竞发的,贰个央求和响应尾部示举例下(去掉了非亲非故的头顶)。WebSocket握手供给尾部与HTTP供给底部是同盟的(见奥迪Q7FC2616卡塔尔。

## Request Headers ##
Connection: Upgrade
Host: socket.io.demo.com
Origin: http://socket.io.demo.com
Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits
Sec-WebSocket-Key: mupA9l2rXciZKoMNQ9LphA==
Sec-WebSocket-Version: 13
Upgrade: websocket

## Response Headers ##
101 Web Socket Protocol Handshake
Connection: upgrade
Sec-WebSocket-Accept: s4VAqh7eedG0a11ziQlwTzJUY3s=
Sec-WebSocket-Origin: http://socket.io.demo.com
Server: nginx/1.6.2
Upgrade: WebSocket
  • Upgrade 是HTTP/1.第11中学明确的用来转移当前连接的应用层左券的头顶,表示顾客端希望用现成的连天调换来新的应用层合同WebSocket左券。

  • Origin 用于防止跨站攻击,浏览器平常会选拔那些来标记原始域,对于非浏览器的顾客端应用能够依照须要接受。

  • 乞求头中的 Sec-WebSocket-Version 是WebSocket版本号,Sec-WebSocket-Key 是用来握手的密钥。Sec-WebSocket-Extensions 和 Sec-WebSocket-Protocol 是可选择,暂不钻探。

  • 响应头中的 Sec-WebSocket-Accept 是将诉求头中的 Sec-WebSocket-Key 的值加上三个定位魔数258EAFA5-E914-47DA-95CA-C5AB0DC85B11经SHA1 base64编码后拿走。总计进度的python代码示例(uwsgi中的完成见 core/websockets.c的 uwsgi_websocket_handshake函数):

    magic_number = '258EAFA5-E914-47DA-95CA-C5AB0DC85B11'
    key = 'mupA9l2rXciZKoMNQ9LphA=='
    accept = base64.b64encode(hashlib.sha1(key   magic_number).digest())
    assert(accept == 's4VAqh7eedG0a11ziQlwTzJUY3s=')
    
  • 顾客端会检查响应头中的status code 和 Sec-WebSocket-Accept 值是否是期望的值,假若发掘Accept的值不得法或许状态码不是101,则不会树立WebSocket连接,也不会发送WebSocket数据帧。

WebSocket合计使用帧(Frame卡塔尔国收发数据,帧格式如下。基于安然考虑衡量,顾客端发送给服务端的帧必需经过4字节的掩码(Masking-key卡塔尔国加密,服务端收到新闻后,用掩码对数据帧的Payload Data实行异或运算解码得到数码(详见uwsgi的 core/websockets.c 中的uwsgi_websockets_parse函数卡塔 尔(英语:State of Qatar),若是服务端收到未经掩码加密的数据帧,则应该及时关闭该WebSocket。而服务端发给顾客端的数额则没有需求掩码加密,顾客端假如选拔了服务端的掩码加密的多寡,则也一定要关闭它。

 0                   1                   2                   3
      0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
      - - - - ------- - ------------- ------------------------------- 
     |F|R|R|R| opcode|M| Payload len |    Extended payload length    |
     |I|S|S|S|  (4)  |A|     (7)     |             (16/64)           |
     |N|V|V|V|       |S|             |   (if payload len==126/127)   |
     | |1|2|3|       |K|             |                               |
      - - - - ------- - -------------  - - - - - - - - - - - - - - -  
     |     Extended payload length continued, if payload len == 127  |
       - - - - - - - - - - - - - - -  ------------------------------- 
     |                               |Masking-key, if MASK set to 1  |
      ------------------------------- ------------------------------- 
     | Masking-key (continued)       |          Payload Data         |
      -------------------------------- - - - - - - - - - - - - - - -  
     :                     Payload Data continued ...                :
      --------------------------------------------------------------- 

帧分为调控帧和数据帧,调控帧不能够分片,数据帧能够分片。首要字段表明如下:

  • FIN: 未有分片的帧的FIN为1,分片帧的率先个分片的FIN为0,最终叁个分片FIN为1。
  • opcode: 帧类型编号,个中央调节制帧:0x8 (Close), 0x9 (Ping), and 0xA (Pong),数据帧首要有:0x1 (Text), 0x2 (Binary)。
  • MASK:顾客端发给服务端的帧MASK为1,Masking-key为加密掩码。服务端发往顾客端的MASK为0,Masking-key为空。
  • Payload len和Payload Data分别是帧的数码长度和数目内容。

HTML代码:python3知识点

微信QQ

#chatcontent{

/*呈现内容使用的*/

width:500px;

height:200px;

background-color:pink;

overflow-y:scroll;

overflow-x:scroll;

}

发送

ws=newWebSocket('ws://192.168.1.27:8009/chat')

//服务器给浏览器推送音信的时候回调

ws.onmessage=function(p1) {

$('#chatcontent').append('

' p1.data '

')

}

functionsend() {

varcontent=$('#msg_container').val()

ws.send(content)

$('#msg_container').val('')

}

1.测量试验情形

tornado.websocket — 浏览器与服务器双向通信

WebSocket 公约的得以完成

WebSockets 允许浏览器和服务器之间进行 双向通讯

负有主流浏览器的今世版本都协助WebSockets(扶助情形详见:http://caniuse.com/websockets)

该模块依据最新 WebSocket 左券 CR-VFC 6455 完成.

在 4.0 版更改: Removed support for the draft 76 protocol version.

 

3 engine.io和socket.io

前方提到socket.io是基于engine.io的包装,engine.io(合同版本3卡塔 尔(阿拉伯语:قطر‎有一套自身的交涉,任何engine.io服务器都必须要扶持polling(蕴含jsonp和xhr)和websocket二种传输形式。engine.io使用websocket时有生机勃勃套自个儿的ping/pong机制,使用的是opcode为0x1(Text)类型的数据帧,不是websocket商业事务明显的ping/pong类型的帧,规范的 ping/pong 帧被uwsgi使用

engine.io的数据编码分为Packet和Payload,个中 Packet是数据包,有6连串型:

  • 0 open:从服务端发出,标志三个新的传输方式已经展开。
  • 1 close:央浼关闭那条传输连接,不过它自身并不停业这几个三回九转。
  • 2 ping:客商端周期性发送ping,服务端响应pong。注意那些与uwsgi自带的ping/pong不一致等,uwsgi里面发送ping,而浏览器重临pong。
  • 3 pong:服务端发送。
  • 4 message:实际发送的音讯。
  • 5 upgrade:在转移transport前,engine.io会发送探测包测验新的transport(如websocket卡塔尔国是不是可用,借使OK,则客商端会发送七个upgrade音讯给服务端,服务端关闭老的transport然后切换来新的transport。
  • 6 noop:空操作数据包,客商端收到noop新闻会将事先等待暂停的轮询暂停,用于在吸取到五个新的websocket强制多少个新的轮询周期。

而Payload是指风姿浪漫雨后春笋绑定到意气风发道的编码后的Packet,它只用在poll中,websocket里面使用websocket帧里面包车型大巴Payload字段来传输数据。假如客户端不援救XHEvoque2,则payload格式如下,个中length是多少包Packet的尺寸,而packet则是编码后的数目包内容。

<length1>:<packet1>[<length2>:<packet2>[...]]

若扶助XH汉兰达2,则payload中剧情全方位以二进制编码,当中第4位0表示字符串,1象征二进制数据,而背后跟着的数字则是意味着packet长度,然后以xff结尾。假设二个长短为109的字符类型的数据包,则前边长度编码是 x00x01x00x09xff,然后后边接packet内容。

<0 for string data, 1 for binary data><Any number of numbers between 0 and 9><The number 255><packet1 (first type,
then data)>[...]

engine.io服务器维护了三个socket的字典结构用于管理总是到该机的顾客端,而客商端的标志正是sid。即便有多少个worker,则需求保障同二个顾客端的连续几天落在相像台worker上(能够布置nginx根据sid分发)。因为各样worker只保证了生龙活虎局地客商端连接,假若要扶植广播,room等特征,则后端需求利用 redis 恐怕 RabbitMQ 消息队列,使用redis的话则是通过redis的订阅公布机制贯彻多机多worker之间的音讯推送。

socket.io是engine.io的包装,在其底蕴上平添了自动重连,多路复用,namespace,room等特色。socket.io自身也是有朝气蓬勃套左券,它Packet类型分为(CONNECT 0, DISCONNECT 1, EVENT 2, ACK 3, ERROR 4, BINARY_EVENT 5, BINARY_ACK 6)。注意与engine.io的Packet类型有所区别,然而socket.io的packet实际是依靠的engine.io的Message类型发送的,在前面实例中得以看出Packet的编码方式。当连接出错的时候,socket.io会通过自动重连机制再度连接。

python 3.4

class tornado.websocket.WebSocketHandler(application, request, **kwargs)

经过持续该类来创建三个主干的 WebSocket handler.

重写 on_message 来拍卖收到的音信, 使用 write_message 来发送音信到客商端. 你也足以重写 open 和 on_close 来拍卖连接展开和关闭那多个动作.

关于JavaScript 接口的详细消息: http://dev.w3.org/html5/websockets/ 具体的商量: http://tools.ietf.org/html/rfc6455

一个精短的 WebSocket handler 的实例: 服务端直接回到全部接纳的消息给客商端

class EchoWebSocket(tornado.websocket.WebSocketHandler):
    def open(self):
        print("WebSocket opened")

    def on_message(self, message):
        self.write_message(u"You said: "   message)

    def on_close(self):
        print("WebSocket closed")

WebSockets 而不是正规的 HTTP 连接. “握手”动作切合 HTTP 标准,可是在”握手”动作之后, 左券是依赖新闻的. 由此,Tornado 里超过二分之一的 HTTP 工具对于这类 handler 都以不可用的. 用来报导的办法唯有write_message() , ping() , 和 close() . 肖似的,你的 request handler 类里应该使用 open() 并非 get() 或许 post()

风流倜傥经你在接受上将以此 handler 分配到 /websocket, 你可以经过如下代码实现:

var ws = new WebSocket("ws://localhost:8888/websocket");
ws.onopen = function() {
   ws.send("Hello, world");
};
ws.onmessage = function (evt) {
   alert(evt.data);
};

以此剧本将会弹出叁个提醒框 :”You said: Hello, world”

浏览器并不曾依据同源战略(same-origin policy),相应的同意了随意站点使用 javascript 发起任意 WebSocket 连接来决定别的互联网.那令人惊异,并且是贰个神秘的安全漏洞,所以 从 Tornado 4.0 起头 WebSocketHandler 须要对愿意选拔跨域诉求的使用通过重写.

check_origin (详细音信请查看文书档案中有关该方法的片段)来进展设置. 未有科学配置这一个特性,在确立 WebSocket 连接时候很或者会引致 403 错误.

当使用安全的 websocket 连接(wss://) 时, 来自浏览器的连年也许会战败,因为 websocket 未有位置输出 “认证成功” 的对话. 你在 websocket 连接营形成功从前,必得 使用相像的证书采访一个平常化的 HTML 页面.

 

4 源码剖析

在创立连接后,每一个socket会被电动踏向到三个默许的命名空间/。在每一种命名空间中,socket会被私下认可参与多个名字为Nonesid的房间。None的房间用于广播,而sid是现阶段客户端的session id,用于单播。除私下认可的屋家外,我们能够依照必要将对应socket参加自定义房间,roomid唯黄金年代就能够。socket.io基于engine.io,协助websocket和long polling。如若是long polling,会准时发送GET, POST诉求,当没有数量时,GET供给在拉取队列音信时会hang住(超时时间为pingTimeout),即便hang住中间服务器一贯非常少爆发,则须要等到顾客端发送下八个POST央浼时,这个时候服务器会往队列中积累POST央求中的音信,那样上七个GET央求才会回来。借使upgrade到了websocket连接,则会准时ping/pong来保活连接。

为方便描述,下边提到的engine.io服务器对应源文件是engineio/server.py,engine.io套接字对应源文件engineio/socket.py,而socket.io服务器则对应socketio/server.py。上边剖判下socket.io连接创立、音信采用和出殡和安葬、连接关闭进度。socket.io版本为1.9.0,engine.io版本为2.0.4。

zookeeper-3.4.13.tar.gz

Event handlers

先来看一下,长连接调用情势:

总是建设构造

第风流罗曼蒂克,客户端会发送二个polling央求来确立连接。这时的央求参数没有sid,表示要树立连接。 engine.io服务器通过handle_get_request()handle_post_request()艺术来分别管理初叶化连接以至长轮询中的 GET 和 POST 伏乞。

socket.io在发轫化时便登记了3个事件到engine.io的handlers中,分别是connect(处理函数_handle_eio_connect),message(_handle_eio_message),disconnect(_handle_eio_disconnect),在engine.io套接字选拔到了上述四个档期的顺序的消息后,在本身做了对应管理后都会触发socket.io中的对应的管理函数做进一层处理。

当接过到GET央浼且没有sid参数时,则engine.io服务器会调用 _handle_connect()艺术来确立连接。这么些措施首要工作是为当下顾客端生成sid,创立Socket对象并保存到engine.io服务器的sockets群集中。做了那个开头化专业后,engine.io服务器会发送三个OPEN类型的数量包给客商端,接着会触发socket.io服务器的connect事件。

顾客端第二遍三番五次的时候,socket.io也要做一些开首化的办事,那是在socket.io服务器的_handle_eio_connect()管理的。这里做的事务根本有几点:

  • 开端化manager,比方用的是redis做后端队列的话,则需求早先化redis_manager,包蕴安装redis连接配置,订阅频道,暗中同意频道是"socket.io",若是接收flask_socketio则频道是"flask_socketio",要是用到gevent,则还要对redis模块的socket库打monkey-patch等。

  • 将该顾客端加入到暗中认可房间None,sid中。

  • 调用代码中对connect事件注册的函数。如上边这一个,注意下,socket.io中也是有个用于事件管理的handlers,它保存的是在后端代码中对socket.io事件注册的函数(开采者定义的),而engine.io的handlers中保存的函数是socket.io注册的那七个针对connect,message和disconnect事件的定势的管理函数。

    socketio.on("connect")
    def test_connect():
        print "client connected"
    
  • 出殡二个sockeio的connect数据包给客商端。

末段在响应中engine.io会为顾客端设置三个名称为io值为sid的cookie,响应内容payload满含三个数据包,一个是engine.io的OPEN数据包,内容为sid,pingTimeout等配备和参数;另三个是socket.io的connect数据包,内容为40。个中4意味的是engine.io的message音讯,0则代表socket.io的connect音讯,以字节流回到。这里的pingTimeout顾客端和服务端分享那些布局,用于检查实验对端是或不是过期。

随之会发送三个轮询央求和websocket握手央浼,假若websocket握手成功后客商端会发送2 probe探测帧,服务端回应3 probe,然后顾客端会发送内容为5的Upgrade帧,服务端回应内容为6的noop帧。探测帧检查通过后,客商端甘休轮询央求,将传输通道转到websocket连接,转到websocket后,接下去就开头为期(暗中同意是25秒)的 ping/pong(这是socket.io自定义的ping/pong,除此而外,uwsgi也会定时(默许30秒)对客户端ping,客户端回应pong,这一个在chrome的Frames里面是看不到的,须要信赖wireshark恐怕用其余浏览器插件来侦察)。

下载地址1:

WebSocketHandler.open(*args, **kwargs)

当展开三个新的 WebSocket 时调用

open 的参数是从 tornado.web.UTucsonLSpec 通过正则表明式获取的, 就像是获取 tornado.web.RequestHandler.get 的参数相符

    ws = websocket.WebSocketApp("ws://echo.websocket.org/",
                              on_message = on_message,
                              on_error = on_error,
                              on_close = on_close)
    ws.on_open = on_open
    ws.run_forever()

服务端新闻接纳流程

对吸收接纳新闻的则统一通过engine.io套接字的receive()函数处理:

  • 对于轮询,黄金时代旦接收了polling的POST乞求,则会调用receive往该socket的音讯队列之中发送消息,从而释放以前hang住的GET央浼。
  • 对于websocket:
    • 接到了ping,则会即时响应一个pong。
    • 接到到了upgrade音讯,则随时发送三个noop音讯。
    • 收下到了message,则调用socket.io注册到engine.io的_handle_eio_message措施来拍卖socket.io自身定义的各类音讯。

WebSocketHandler.on_message(message)

管理在 WebSocket 中选拔的信息

其一艺术必得被重写

 

服务端音讯发送流程

而服务端要给客商端发送消息,则供给通过socket.io服务器的emit方法,注意emit方法是本着room来发送新闻的,要是是context-aware的,则emit私下认可是对namespace为/且room名称为sid的房子发送,假使是context-free的,则默许是广播即对具备连接的顾客端发送新闻(当然在context-free的情状上边,你也得以钦点room来只给钦点room推送音信卡塔尔。

socket.io要兑现多进度以至广播,房间等职能,势必定要连接二个redis之类的消息队列,进而socket.io的emit会调用对应队列微机pubsub_manager的emit方法,比方用redis做音信队列则最后调用 redis_manager中的_publish() 方法通过redis的订阅宣布功用将新闻推送到flask_socketio频道。其他方面,全数的socket在接二连三时都订阅了 flask_socketio频道,何况都有二个体协会程(或线程)在监听频道中是还是不是有信息,大器晚成旦有新闻,就能调用pubsub_manager._handle_emit()艺术对本机对应的socket发送对应的消息,最终是通过socket.io服务器的_emit_internal()措施达成对本机中room为sid的享有socket发送新闻的,假若room为None,则正是广播,即对持有连接到本机的享有客商端推送音讯。

socket.io服务器发送音讯要基于engine.io音信包装,所以归纳到底还是调用的engine.io套接字中的send()方法。engine.io为各样客户端都会爱戴叁个音信队列,发送数据都以先存到行列之中待拉取,websocket除了探测帧之外的任何数据帧也都是透过该音讯队列发送。

WebSocketHandler.on_close()

当关闭该 WebSocket 时调用

当连接被透顶关闭并且扶助 status code 或 reason phtase 的时候, 能够因而self.close_code 和 self.close_reason 那三个属性来博取它们

在 4.0 版更改: Added close_code and close_reason attributes. 添加 close_code 和 close_reason 那多少个特性

 长连接,参数介绍:

闭馆连接(只解析websocket)

websocket恐怕特别关闭的图景多多。比方客商端发了ping后等候pong超时关闭,服务端接受到ping跟上两个ping之间超过了pingTimeout;用的uwsgi的话,uwsgi发送ping,假如在websockets-pong-tolerance(私下认可3秒)内选拔不到pong回应,也会停业连接;还宛假设nginx的proxy_read_timeout配置的比pingInterval小等。

设若不是顾客端主动关闭连接,socket.io就能够在一而再出错后连连重试以建构连接。重试间距和重试次数由reconnectionDelayMax(默认5秒)reconnectionAttempts(默许一向重连卡塔 尔(英语:State of Qatar)设定。下边探究客商端平常关闭的场馆,种种特别关闭状态请具体意况具体解析。

顾客端主动关闭

假定客户端调用socket.close()责无旁贷关闭websocket连接,则会头阵送三个信息41(4:engine.io的message,1:socket.io的disconnect)再关闭连接。如前方提到,engine.io套接字接收到消息后会交给socket.io服务器注册的 _handle_eio_message()拍卖。最后是调用的socket.io的_handle_disconnect(),该函数职业包罗调用socketio.on("disconnect")登记的函数,将该客户端从参与的房间中移除,清理情状变量等。

uwsgi而选拔到顾客端关闭websocket连接音信后会关闭服务端到客商端的连续几日。engine.io服务器的websocket数据选取例程ws.wait()因为老是关闭报IOError,触发服务端循环收发数据经过甘休,并从维护的sockets集结中移除那一个闭馆的sid。然后调用engine.io套接字的close(wait=True, abort=True)办法,由于是顾客端主动关闭,这里就不会再给顾客端发送三个CLOSE音讯。而 engine.io服务器的close方法相近会触发socket.io在此以前注册的disconnect事件处理函数,由于前面已经调用_handle_disconnect()管理了关闭连接事件,所以那边_handle_eio_disconnect()无需再做其余操作(那个操作不是多余的,其作用见后意气风发节卡塔尔国。

浏览器关闭

直接关闭浏览器发送的是websocket的标准CLOSE音讯,opcode为8。socket.io服务端处理格局基本豆蔻梢头致,由于这种状态下并不曾发送socket.io的停业新闻41,socket.io的闭馆操作必要等到engine.io触发的_handle_eio_disconnect()中拍卖,那正是前风度翩翩节中缘何engine.io服务器前面还要多调用三回 _handle_eio_disconnect()的原故所在。

WebSocketHandler.select_subprotocol(subprotocols)

当一个新的 WebSocket 伏乞特定子左券(subprotocols)时调用

subprotocols 是一个由一文山会海能够被顾客端准确识别出相应的子公约(subprotocols卡塔尔国的字符串构成的 list . 那几个主意也许会被重载,用来回到 list 中某 个相配字符串, 未有匹配到则赶回 None. 若无找到相应的子公约,即使服务端并 不会活动关闭 WebSocket 连接,然而客商端能够筛选关闭连接.

(1)url: websocket的地址。

5 实例

共谋表达轻易令人多少头晕,websocket,engine.io,socket.io,各自行车运动协会议是如何是好事的,看看实例只怕会比较清楚,为了有帮忙测量试验,作者写了个Dockerfile,安装了docker的童鞋能够拉代替码试行 bin/start.sh 就能够运转具有完全的 nginx uwsgi gevent flask_socketio测量检验意况的器皿初始测量检验,浏览器张开http://127.0.0.1即可测验。async_mode用的是gevent_uwsgi,完整代码见 这里。

对此不协理websocket的低版本浏览器,socket.io会退化为长轮询的点子,通过定时的出殡GET, POST诉求来拉取数据。非常的少时,会将呼吁数据的GET诉求hang住,直到服务端有数量发生恐怕顾客端的POST乞求将GET央求释放,释放之后会跟着再度发送三个GET要求,除此而外,公约剖判和拍卖流程与websocket形式基本后生可畏致。实例只针对利用websocket的举办深入分析

为了考察socket.io客商端的调用流程,可以设置localStorage.debug = '*';,测量检验的前段代码片段如下(完整代码见货仓):

 <script type="text/javascript" charset="utf-8">
    var socket = io.connect('/', {
        "reconnectionDelayMax": 10000,
        "reconnectionAttempts": 10
    });
    socket.on('connect', function() {
        $('#log').append('<br>'   $('<div/>').text('connected').html());
    })

    $(document).ready(function() {

        socket.on('server_response', function(msg) {
            $('#log').append('<br>'   $('<div/>').text('Received from server: '   ': '   msg.data).html());
        });

        $('form#emit').submit(function(event) {
            socket.emit('client_event', {data: $('#emit_data').val()});
            return false;
        });
    });

 </script>

测验代码比较简单,引进socket.io的js库文件,然后在接连成功后在页面突显“connected”,在输入框输入文字,能够因而连接发送至服务器,然后服务器将浏览器发送的字符串加上server标志回显回来。

下载地址2:

Output

(2卡塔 尔(英语:State of Qatar)header: 客商发送websocket握手央求的央求头,{'head1:value1','head2:value2'}。

确立连接

在chrome中开垦页面能够看看发了3个央求,分别是:

1 http://127.0.0.1/socket.io/?EIO=3&transport=polling&t=MAkXxBR
2 http://127.0.0.1/socket.io/? EIO=3&transport=polling&t=MAkXxEz&sid=9c54f9c1759c4dbab8f3ce20c1fe43a4
3 ws://127.0.0.1/socket.io/?EIO=3&transport=websocket&sid=9c54f9c1759c4dbab8f3ce20c1fe43a4

诉求暗中同意路径是/socket.io,注意命名空间并不会在路径中,而是在参数中传送。第四个要求是polling,EIO是engine.io公约的版本号,t是一个率性字符串,第二个诉求时还还从未生成sid。服务端接纳到音讯后会调用engine.io/server.py_handle_connect()创建连接。

重回的结果是

## Response Headers: Content-Type: application/octet-stream ##
�ÿ0{"pingInterval":25000,"pingTimeout":60000,"upgrades":["websocket"],"sid":"9c54f9c1759c4dbab8f3ce20c1fe43a4"}�ÿ40

能够看看,这里再次来到的是字节流的payload,content-type为"application/octet-stream"。那几个payload其实包罗七个packet,第三个packet是engine.io的OPEN音信,类型为0,它的故事情节为pingInterval,pingTimeout,sid等;第3个packet类型是4(message),而它的多寡内容是0,表示socket.io的CONNECT。而内部的看起来乱码的大器晚成对其实是日前提到的payload编码中的长度的编码x00x01x00x09xffx00x02xff

  • 第四个央求是轮询乞请,假使websocket创立并测验成功(使用内容为probe的ping/pong帧)后,会暂停轮询央浼。能够看来轮询诉求平素hang住到websocket创建并测量检验成功后才回去,响应结果是�ÿ6,前边乱码部分是payload长度编码x00x01xff,后边的数字6是engine.io的noop音讯。

  • 首个伏乞是websocket握手乞请,握手成功后,能够在chrome的Frames中间见到websocket的数码帧交互作用流程,能够看见如前方深入分析,确实是头阵的探测帧,然后是Upgrade帧,接着就是定时的ping/pong帧了。

    2probe
    3probe
    5
    2
    3
    ...
    

WebSocketHandler.write_message(message, binary=False)

将交给的 message 发送到客商端

message 能够是 string 也许 dict(将会被编码成 json ) 纵然 binary 为 false, message 将会以 utf8 的编码发送; 在 binary 形式下 message 可以是 任何 byte string.

万三回九转接已经关门, 则会触发 WebSocketClosedError

在 3.2 版修改: 增添了 WebSocketClosedError (在前边版本会触发 AttributeError)

在 4.3 版修改: 重临能够被用来 flow control 的 Future.

(3)on_open:在创造Websocket握手时调用的可调用对象,那么些主意唯有三个参数,正是此类本身。

客商端发送消息给服务端

如果要发送消息给服务器,在浏览器输入框输入test,点击echo按键,能够看出websocket发送的帧的内容如下,个中4是engine.io的message类型标志,2是socket.io的EVENT类型标志,而前边则是事件名称和数目,数据能够是字符串,字典,列表等项目。

42["client_event",{"data":"test"}]

kafka_2.12-2.1.0.tgz

WebSocketHandler.close(code=None, reason=None)

关门当前 WebSocket

假设挥手动作成功,socket将会被关闭.

code 恐怕是多个数字组合的状态码, 选择 MuranoFC 6455 section 7.4.1. 定义的值.

reason 或者是描述连接关闭的文件音信. 这么些值被提给顾客端,不过不会被 WebSocket 合同单独解释.

在 4.0 版更改: Added the code and reason arguments.

(4)on_message:那几个指标在选拔到服务器重回的音讯时调用。有三个参数,叁个是此类本身,贰个是我们从服务器获取的字符串(utf-8格式卡塔尔。

服务端接纳消息流程

而服务端选取新闻并回到叁个新的event为"server_response",数据为"TEST",代码如下,此中socketio是flask_socketio模块的SocketIO对象,它提供了装饰器方法 on将自定义的client_event和拍卖函数test_client_event注册到sockerio服务器的handlers中。

当选拔到 client_event 消息时,会通过sockerio/server.py中的 _handle_eio_message()艺术管理新闻,对于socket.io的EVENT类型的新闻最后会通过_trigger_event()方法管理,该办法也便是从handlers中拿到client_event对应的管理函数并调用之。

from flask_socketio import SocketIO, emit
socketio = SocketIO(...)

@socketio.on("client_event")
def test_client_event(msg):
    emit("server_response", {"data": msg["data"].upper()})

下载地址1:

Configuration

(5)on_error:这几个目的在遇到错误时调用,有五个参数,第三个是此类本身,第叁个是老大对象。

服务端发送新闻到客户端

服务端发送音讯通过 flask_socketio提供的emit方法落成,如前后生可畏节解析的,最后依然通过的engine.io包装成engine.io的消息格式后发生。

42["server_response",{"data":"TEST"}]

WebSocketHandler.check_origin(origin)

经过重写那个措施来实现域的切换

参数 origin 的值来自 HTTP header 中的Origin,url 担负先河化那些供给. 这几个主意并非讲求顾客端不发送那样的 heder;那样的倡议一贯被允许(因为具有的浏览器 实现的 websockets 都扶植这几个 header ,並且非浏览器顾客端从未同样的跨域安全难点.

再次来到 True 代表选拔,相应的回来 False 代表回绝.默认谢绝除 host 外其余域的央求.

其一是二个浏览器制止 XSS 攻击的安全计策,因为 WebSocket 允许绕过平凡的同源计策 以致不选择 CO昂CoraS 头.

要允许具有跨域通讯的话(那在 Tornado 4.0 早前是暗中认可的卡塔尔国,只要轻易的重写那几个艺术 让它直接再次回到 true 就能够了:

def check_origin(self, origin):
    return True

要允许全部全部子域下的连天,能够如此完成:

def check_origin(self, origin):
    parsed_origin = urllib.parse.urlparse(origin)
    return parsed_origin.netloc.endswith(".mydomain.com")

4.0 新版效率.

(6)on_close:在碰着接二连三关闭的景况时调用,参数唯有三个,就是此类本人。

关门连接

顾客端要积极关闭连接,在JS中调用 socket.close() 就能够,当时出殡的数据包为 41,在那之中4象征的是engine.io的新闻类型message,而数据1则是指的socket.io的音讯类型disconnect,关闭流程见上大器晚成章的验证。

下载地址2:

WebSocketHandler.get_compression_options()

重写该措施重临当前线总指挥部是的 compression 选项

假使这一个措施重临 None (私下认可), compression 将会被禁止使用. 假诺它回到 dict (即便 是空的),compression 都会被开启. dict 的内容将会被用来决定 compression 所 使用的内存和CPU.可是那类的安装未来还从未被完毕.

4.1 新版功效.

(7)on_cont_message:那一个指标在选用到连年帧数据时被调用,有多少个参数,分别是:类本身,从服务器选拔的字符串(utf-8卡塔 尔(英语:State of Qatar),再三再四标记。

6 总结

正文示例中,为了便利剖判,只用了暗中认可的namespace和room,而在实际上项目中得以依照作业需求使用namespace,room等高等性格。

nginx uwsgi应用socket.io时,当用到websocket时,注意nginx的晚点配置proxy_read_timeout和uwsgi的websocket超时配置websocket-ping-freq和websockets-pong-tolerance,配置不当会诱致socke.io因为websocket的ping/pong超时而持续重连。

WebSocketHandler.set_nodelay(value)

为当前 stream 设置 no-delay

在私下认可意况下, 小块数据会被延迟和/或归并以减小发送包的数量. 那在有一点点时候会因为 Nagle’s 算法和 TCP ACKs 相互影响会引致 200-500ms 的延迟.在 WebSocket 连接 已经创设的景色下,能够透过设置 self.set_nodelay(True) 来下滑延迟(那或者 会占用越来越多带宽卡塔尔

越来越多详细音信: BaseIOStream.set_nodelay.

在 BaseIOStream.set_nodelay 查看详细音讯.

3.1 新版功效.

(8)on_data:当从服务器收到到新闻时被调用,有多个参数,分别是:该类本人,接纳到的字符串(utf-8卡塔尔国,数据类型,接二连三标记。

参谋资料

  • https://tools.ietf.org/html/rfc6455
  • https://www.nginx.com/blog/websocket-nginx/
  • https://security.stackexchange.com/questions/36930/how-does-websocket-frame-masking-protect-against-cache-poisoning
  • https://github.com/suexcxine/blog/blob/master/source/_posts/websocket.md
  • https://github.com/abbshr/abbshr.github.io/issues/47
  • https://socket.io/docs/logging-and-debugging/
  • http://uwsgi-docs.readthedocs.io/en/latest/WebSockets.html
  • https://flask-socketio.readthedocs.io/en/latest/

pip-18.1.tar.gz

Other

(9)keep_running:二个二进制的标识位,要是为True,那个app的主循环将四处运作,暗中认可值为True。

下载地址:

WebSocketHandler.ping(data)

出殡 ping 包到远端.

(10)get_mask_key:用于产生叁个掩码。

表达:推行中窥见,pip版本比较旧的话,没有办法安装whl文件

WebSocketHandler.on_Tornado简单聊天室,原理探究。pong(data)

当收到ping 包的响应时试行.

(11卡塔尔国subprotocols:少年老成组可用的子协议,暗中认可为空。

kafka_python-1.4.4-py2.py3-none-any.whl

exception tornado.websocket.WebSocketClosedError

并发关闭连接错误触发.

3.2 新版功用.

 

下载地址1:

Client-side support

长连接重要措施:ws.run_forever(ping_interval=60,ping_timeout=5)

tornado.websocket.websocket_connect(url, io_loop=None, callback=None, connect_timeout=None, on_message_callback=None, compression_options=None)

顾客端 WebSocket 扶助 供给钦点 url, 重临八个结实为 WebSocketClientConnection 的 Future 对象

compression_options 作为 WebSocketHandler.get_compression_options 的 重返值, 将会以相符的艺术执行.

其延续续援救两种等级次序的操作.在协程风格下,应用程序常常在三个循环里调用~.WebSocket ClientConnection.read_message:

conn = yield websocket_connect(url)
while True:
    msg = yield conn.read_message()
    if msg is None: break
    # Do something with msg

在回调风格下,必要传递 on_message_callback 到 websocket_connect 里. 在此二种风格里,八个剧情是 None 的 message 都标识着 WebSocket 连接已经.

在 3.2 版更正: 允许使用 HTTPRequest 对象来顶替 urls.

在 4.1 版更改: 添加 compression_options 和 on_message_callback .

不赞成采纳 compression_options .

 要是持续开关闭websocket连接,会向来不通下去。别的那个函数带七个参数,如若传的话,运维心跳包发送。

class tornado.websocket.WebSocketClientConnection(io_loop, request, on_message_callback=None, compression_options=None)

WebSocket 客商端连接

本条类不该直接被实例化, 请使用 websocket_connect

 

下载地址2:

close(code=None, reason=None)

关闭 websocket 连接

code 和 reason 的文书档案在 WebSocketHandler.close 下已给出.

3.2 新版功用.

在 4.0 版校订: 增添 code 和 reason 这多少个参数

ping_interval:自动发送“ping”命令,每种钦赐的时刻(秒),倘使设置为0,则不会活动发送。

write_message(message, binary=False)

发送音讯到 websocket 服务器.

ping_timeout:若无选拔pong新闻,则为超时(秒)。

python_snappy-0.5.3-cp34-cp34m-win_amd64.whl

read_message(callback=None)

读取来自 WebSocket 服务器的新闻.

如若在 WebSocket 初叶化时钦定了 on_message_callback ,那么这几个方法永世不会重回音讯

如果延续已经关门,重临结果会是叁个结果是 message 的 future 对象大概是 None. 要是 future 给出了回调参数, 那几个参数将会在 future 达成时调用.


能够经过上面二维码订阅小编的稿子公众号【MoeLove】

澳门新萄京 2

ws.run_forever(ping_interval=60,ping_timeout=5)

#ping_interval心跳发送间隔时间

#ping_timeout 设置,发送ping到收到pong的超时时间

下载地址1:

 

我们看源代码,会开掘这么生机勃勃断代码:

下载地址2:

ping的超时时间,要超越ping间距时间

 

说明:

        if not ping_timeout or ping_timeout <= 0:
            ping_timeout = None
        if ping_timeout and ping_interval and ping_interval <= ping_timeout:
            raise WebSocketException("Ensure ping_interval > ping_timeout")

kafka-python帮忙gzip压缩/解压缩。假诺要花费lz4情势减弱的音讯,则供给设置python-lz4,假设要帮忙snappy方式减少/解压缩则要求设置,不然或者会报错:kafka.errors.UnsupportedCodecError: UnsupportedCodecError: Libraries for snappy compression codec not found.

 

创设坐褥者对象时,可经过compression_type 参数钦赐由相应分娩者坐蓐的新闻数据的滑坡方式,只怕在producer.properties配置中布局compression.type参数。

 

参照链接:

 

 

长连接:

2.代码施行

示例1:

生产者

#-- encoding:utf-8 -*-*

__author__ = 'shouke'

from kafka import KafkaProducer

import json

producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'])

for i in range:

producer.send('MY_TOPIC1', value=b'lai zi shouke de msg', key=None, headers=None, partition=None, timestamp_ms=None)

# Block直到单条音讯发送完或许逾期

future = producer.send('MY_TOPIC1', value=b'another msg',key=b'othermsg')

result = future.get(timeout=60)

print

# Block直到全数梗塞的音信发送到网络

# 注意: 该操作不保证传输也许音讯发送成功,仅在安排了linger_ms的事态下有用。(It is really only useful if you configure internal batching using linger_ms

# 序列化json数据

producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092', value_serializer=lambda v: json.dumps.encode('utf-8'))

producer.send('MY_TOPIC1', {'shouke':'kafka'})

# 系列化字符串key

producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092', key_serializer=str.encode)

producer.send('MY_TOPIC1', b'shouke', key='strKey')

producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092',compression_type='gzip')

for i in range:

producer.send('MY_TOPIC1', ('msg %d' % i).encode('utf-8'))

# 音讯记录辅导header

producer.send('MY_TOPIC1', value=b'c29tZSB2YWx1ZQ==', headers=[('content-encoding', b'base64'),])

# 获取品质数据(注意,实践开采分区很多的境况下,该操作相比耗费时间

metrics = producer.metrics()

print

producer.flush()

实施中相见错误: kafka.errors.NoBrokersAvailable: NoBrokersAvailable,设计方案如下:

跻身到布置目录,编辑server.properties文件,

 

追寻并设置listener,配置监听端口,格式:listeners

listener_name://host_name:port,供kafka顾客端连接用的ip和端口,例中配置如下:

listeners=PLAINTEXT://127.0.0.1:9092

API及常用参数表达:

class kafka.KafkaProducer(**configs)

bootstrap_servers–'host[:port]'字符串,或者由'host[:port]'组成的字符串,形如['10.202.24.5:9096', '10.202.24.6:9096', '10.202.24.7:9096']),当中,host为broker(Broker:缓存代理,Kafka集群中的单台服务器)地址,暗中认可值为 localhost, port私下认可值为9092,这里能够不用填写全体broker的host和port,但必需确认保证最稀有叁个broker卡塔 尔(阿拉伯语:قطر‎

key_serializer –用于转移顾客提供的key值为字节,必需重回字节数据。 假若为None,则等同调用f。 暗许值: None.

澳门新萄京,value_serializer – 用于转移客商提供的value新闻值为字节,必需回到字节数据。 如若为None,则等同调用f。 暗中认可值: None.

send(topic, value=None, key=None, headers=None, partition=None, timestamp_ms=None)

topic – 设置音信就要公布到的宗旨,即音信所属主旨

value – 音信内容,必得为字节数据,可能通过value_serializer连串化后的字节数据。借使为None,则key必填,音信无差异于“删除”。( If value is None, key is required and message acts as a ‘delete’卡塔尔

partition – 内定分区。如果未安装,则运用布署的partitioner

key – 和消息对应的key,可用来决定新闻发送到哪个分区。若是平partition为None,则一点差异也未有于key的音讯会被透露到均等分区(可是假使key为None,则随机筛选分区卡塔尔(If partition is None (and producer’s partitioner config is left as default), then messages with the same key will be delivered to the same partition (but if key is None, partition is chosen randomly)). 必需为字节数据或然通过配备的key_serializer系列化后的字节数据.

headers – 设置音信header,header-value键值对表示的list。list项为元组:格式 (str_header,bytes_value)

timestamp_ms –微秒数 (从一九七〇 5月1日 UTC算起) ,作为音信时间戳。暗中同意为日前光阴

函数再次回到FutureRecordMetadata类型的RecordMetadata数据

flush(timeout=None)

出殡全部可以即时收获的缓冲消息(即时linger_ms大于0),线程block直到这一个记录发送完成。当三个线程等待flush调用完了而block时,别的线程能够三番两次发送音讯。

留意:flush调用不保障记录发送成功

metrics(raw=False)

收获生产者品质指标。

参考API:

注:生产者代码是线程安全的,支持多线程,而花费者则不然

import websocket
try:
    import thread
except ImportError:
    import _thread as thread
import time

def on_message(ws, message):
    print(message)

def on_error(ws, error):
    print(error)

def on_close(ws):
    print("### closed ###")


def on_open(ws):
    def run(*args):
        ws.send("hello1")
        time.sleep(1)
        ws.close()
    thread.start_new_thread(run,())

if __name__ == "__main__":
    websocket.enableTrace(True)
    ws = websocket.WebSocketApp("ws://echo.websocket.org/",
                              on_message = on_message,
                              on_error = on_error,
                              on_close = on_close)
    ws.on_open = on_open
    ws.run_forever(ping_interval=60,ping_timeout=5)

消费者

#-- encoding:utf-8 -*-*

__author__ = 'shouke'

from kafka import KafkaConsumer

from kafka import TopicPartition

import json

consumer = KafkaConsumer('MY_TOPIC1',
bootstrap_servers=['127.0.0.1:9092'],
#auto_offset_reset='', auto_offset_reset='latest',# 花销kafka中方今的数目,假若设置为earliest则开销最初的数量,不管那么些多少是还是不是花费 enable_auto_commit=True, # 自动提交开销者的offset auto_commit_interval_ms=3000, ## 自动提交花费者offset的时光间隔 group_id='MY_GROUP1',
consumer_timeout_ms= 10000, # 倘若10秒内kafka中绝非可供成本的数目,自动退出 client_id='consumer-python3' )

for msg in consumer:

print

print('topic: ', msg.topic)

print('partition: ', msg.partition)

print('key: ', msg.key, 'value: ', msg.value)

print('offset:', msg.offset)

print('headers:', msg.headers)

# Get consumer metrics

metrics = consumer.metrics()

print

运维效果

经过assign、subscribe两者之生龙活虎为成本者设置开支的宗旨

consumer = KafkaConsumer(bootstrap_servers=['127.0.0.1:9092'],

auto_offset_reset='latest',

enable_auto_commit=True, # 自动提交开支数量的offset

consumer_timeout_ms= 10000, # 要是1秒内kafka中并未可供花费的多少,自动退出

value_deserializer=lambda m: json.loads(m.decode('ascii')), #开支json 格式的音讯

client_id='consumer-python3'

)

# consumer.assign([TopicPartition('MY_TOPIC1', 0)])

# msg = next

# print

consumer.subscribe('MY_TOPIC1')

for msg in consumer:

print

API及常用参数表达:

class kafka.KafkaConsumer(*topics, **configs)

*topics – 可选,设置供给订阅的topic,假若未安装,须要在开支记录前调用subscribe或然assign。

client_id – 客商端名称,暗中认可值: ‘kafka-python-{version}’

group_id(str or None) – 花费组名称。假设为None,则经过group coordinator auto-partition分区分配,offset提交被剥夺。默以为None

auto_offset_reset – 重新设置offset战略: 'earliest'将活动到最老的可用音讯, 'latest'将移步到前段时间音讯。 设置为此外任何值将抛出特别。暗许值:'latest'。

enable_auto_commit – 假设为True,将活动定期提交开销者offset。默以为True。

auto_commit_interval_ms – 自动提交offset之间的间隔皮秒数。借使enable_auto_commit 为true,默许值为: 5000。

value_deserializer - 指引原始消息value并赶回反体系化后的value

subscribe, pattern=None, listener=None)

订阅需求的主旨

topics – 供给订阅的大旨列表

pattern – 用于相称可用核心的形式,即正则表明式。注意:必得提供topics、pattern两个参数之风流洒脱,但不可能并且提供双方。

metrics(raw=False)

获取花费者品质目的。

参考API:

 

客户端

#-- encoding:utf-8 -*-*

__author__ = 'shouke'

from kafka.client import KafkaClient

client = KafkaClient(bootstrap_servers=['127.0.0.1:9092'], request_timeout_ms=3000)

# 获取具有broker

brokers = client.cluster.brokers()

for broker in brokers:

print('broker: ', broker)

print('broker nodeId: ', broker.nodeId)

# 获取核心的具有分区

topic = 'MY_TOPIC1'

partitions = client.cluster.available_partitions_for_topic

print(partitions)

partition_dict = {}

partition_dict[topic] = [partition for partition in partitions]

print(partition_dict)

运转结果:

broker: BrokerMetadata(nodeId=0, host='127.0.0.1', port=9092, rack=None)

broker nodeId: 0

{0}

{'MY_TOPIC1': [0]}

API及常用参数说明:

class kafka.client.KafkaClient(**configs)

bootstrap_servers–'host[:port]'字符串,或者由'host[:port]'组成的字符串,形如['10.202.24.5:9096', '10.202.24.6:9096', '10.202.24.7:9096']),个中,host为broker(Broker:缓存代理,Kafka集群中的单台服务器)地址,默许值为 localhost, port暗中同意值为9092,这里能够不要填写全体broker的host和port,但一定要确定保证最稀少二个broker卡塔 尔(阿拉伯语:قطر‎

client_id – 客商端名称,暗许值: ‘kafka-python-{version}’

request_timeout_ms – 客商端央求超时时间,单位飞秒。默许值: 30000.

参考API:

brokers()

拿到具备broker元数据

available_partitions_for_topic

回到核心的有所分区

参考API:

示例2:

import websocket
from threading import Thread
import time
import sys


class MyApp(websocket.WebSocketApp):
    def on_message(self, message):
        print(message)

    def on_error(self, error):
        print(error)

    def on_close(self):
        print("### closed ###")

    def on_open(self):
        def run(*args):
            for i in range(3):
                # send the message, then wait
                # so thread doesn't exit and socket
                # isn't closed
                self.send("Hello %d" % i)
                time.sleep(1)

            time.sleep(1)
            self.close()
            print("Thread terminating...")

        Thread(target=run).start()


if __name__ == "__main__":
    websocket.enableTrace(True)
    if len(sys.argv) < 2:
        host = "ws://echo.websocket.org/"
    else:
        host = sys.argv[1]
    ws = MyApp(host)
    ws.run_forever()

 

 

短连接:

from websocket import create_connection
ws = create_connection("ws://echo.websocket.org/")
print("Sending 'Hello, World'...")
ws.send("Hello, World")
print("Sent")
print("Receiving...")
result =  ws.recv()
print("Received '%s'" % result)
ws.close()

 

——

本文由澳门新萄京发布于www.澳门新萄京赌场,转载请注明出处:Tornado简单聊天室,原理探究

上一篇:内建函数getattr工厂方式,python元类编制程序 下一篇:没有了
猜你喜欢
热门排行
精彩图文