python 并发编程 非阻塞IO模型原理解析

yipeiwu_com6年前Python基础

非阻塞IO(non-blocking IO)

Linux下,可以通过设置socket使其变为non-blocking。当对一个non-blocking socket执行读操作时,流程是这个样子:

从图中可以看出,当用户进程发出read操作时,如果kernel中的数据还没有准备好,那么它并不会block用户进程,而是立刻返回一个error。从用户进程角度讲 ,它发起一个read操作后,并不需要等待,而是马上就得到了一个结果。用户进程判断结果是一个error时,它就知道数据还没有准备好,于是用户就可以在本次到下次再发起read询问的时间间隔内做其他事情,或者直接再次发送read操作。一旦kernel中的数据准备好了,并且又再次收到了用户进程的system call,那么它马上就将数据拷贝到了用户内存(这一阶段仍然是阻塞的,这段是本地拷贝,copy data ),然后返回。

也就是说非阻塞的recvform系统调用调用之后,进程并没有被阻塞,内核马上返回给进程,如果数据还没准备好,
此时会返回一个error。进程在返回之后,可以干点别的事情,然后再发起recvform系统调用。重复上面的过程,
循环往复的进行recvform系统调用。这个过程通常被称之为轮询。轮询检查内核数据,直到数据准备好,再拷贝数据到进程,
进行数据处理。需要注意,拷贝数据整个过程,进程仍然是属于阻塞的状态。
所以,在非阻塞式IO中,用户进程其实是需要不断的主动询问kernel操作系统内存 数据准备好了没有。

非阻塞IO示例

  • 设置socket接口为 非阻塞IO接口
  • 默认是True 为阻塞
  • server.setblocking(False)
  • 处理一下这个异常

BlockingIOError: [WinError 10035] 无法立即完成一个非阻止性套接字操作。

from socket import *
server = socket(AF_INET,SOCK_STREAM)
server.bind(('127.0.0.1',8000))
server.listen(5)
# 设置socket接口为 非阻塞IO接口
# 默认是True 为阻塞
server.setblocking(False)
print("starting...")
while True:
  try:
    conn,addr = server.accept()
    print(addr)

  except BlockingIOError:
    print("干其他的工作")
server.close()

执行结果,如上面的图,一直返回error消息

starting...
干其他的工作
干其他的工作
干其他的工作
干其他的工作

服务端 可以与 多个客户端建立连接,实现服务端可以不停的建立连接

from socket import *
server = socket(AF_INET,SOCK_STREAM)
server.bind(('127.0.0.1',8000))
server.listen(5)
# 设置socket接口为 非阻塞IO接口
# 默认是True 为阻塞
server.setblocking(False)
r_list = []
print("starting...")
while True:
  try:
    conn,addr = server.accept()
    r_list.append(conn)
    print(r_list)
  except BlockingIOError:
    pass
server.close()

起三个客户端与服务端建立连接

r_list 存着所有建立的连接

有连接来,就建立连接,没有连接来,就抛出异常

实现IO非阻塞 并发 多个连接

from socket import *
server = socket(AF_INET,SOCK_STREAM)
server.bind(('127.0.0.1',8000))
server.listen(5)
# 设置socket接口为 非阻塞IO接口
# 默认是True 为阻塞
server.setblocking(False)
r_list = []
print("starting...")
while True:
  try:
    conn,addr = server.accept()
    r_list.append(conn)
    print(r_list)
  except BlockingIOError:
    # 定义删除连接列表
    del_rlist = []
    for conn in r_list:
      try:
        data = conn.recv(1024)
        # 收空数据时候
        if not data:
          del_rlist.append(conn)
          continue
        conn.send(data.upper())
      # 没有连接,抛出异常,就结束这次循环,继续
      except BlockingIOError:
        continue
      # 套接字出现异常,客户端单方面连接断开
      except Exception:
        conn.close()
        del_rlist.append(conn)
        break
    # 结束上面循环之后,循环del_list 连接元素 删除连接
    for conn in del_rlist:
      del_rlist.remove(conn)
server.close()

BUG:send也是IO阻塞接口

当send在数据量过大时候,也会阻塞。

send操作是,把应用程序把数据发送到操作系统缓存区里,而操作系统缓存区空间也是有限的。缓存区也会满了,后面还有数据需要发送,那只能等缓存区清掉数据,有空间了,才能发送数据。所以在这里缓存区满了,就阻塞。

修改后服务端的代码 可以自己检测IO,遇到IO切换单个线程的其他任务,去运行,实现单线程并发

from socket import *
server = socket(AF_INET,SOCK_STREAM)
server.bind(('127.0.0.1',8000))
server.listen(5)
# 设置socket接口为 非阻塞IO接口
# 默认是True 为阻塞
server.setblocking(False)
r_list = []
w_list = []
print("starting...")
while True:
  try:
    conn,addr = server.accept()
    r_list.append(conn)
    print(r_list)
  except BlockingIOError:
    # 收消息
    # 定义删除连接列表
    del_rlist = []
    for conn in r_list:
      try:
        data = conn.recv(1024)
        # 收空数据时候
        if not data:
          del_rlist.append(conn)
          continue
        '''加入元祖 元祖有两个元素 
        1.存放套接字连接
        2.准备要发送的的数据
        '''
        w_list.append((conn, data.upper()))
      # 没有连接,抛出异常,就结束这次循环,继续
      except BlockingIOError:
        continue
      # 套接字出现异常,客户端单方面连接断开
      except Exception:
        conn.close()
        del_rlist.append(conn)
        break
    # 发消息
    # 用于 发成功数据后,删除套接字连接的列表
    del_wlist = []
        for item in w_list:
     try:
        conn = item[0]
        data = item[1]
        conn.send(data)
        # 发成功后,从列表删除连接
        del_wlist.append(item)
      # send 有可能出现异常 没发完情况
      except BlockingIOError:
        pass
    # 结束上面循环之后,循环del_wlist 连接元素 删除连接
    for item in del_wlist:
      del_wlist.remove(item)
    # 结束上面循环之后,循环del_rlist 连接元素 删除连接
    for conn in del_rlist:
      del_rlist.remove(conn)
server.close()

这就是非阻塞IO

但是非阻塞IO模型绝不被推荐。
我们不能否则其优点:能够在等待任务完成的时间里干其他活了(包括提交其他任务,也就是 “后台” 可以有多个任务在“”同时“”执行)。

干其他活时候,有可能来新的连接,新的连接来了,不能及时响应与该新的连接,建立连接。所以会导致问题:数据不会及时响应

但是也难掩其缺点:

1. 循环调用recv()将大幅度推高CPU占用率;这也是我们在代码中留一句time.sleep(2)的原因,否则在低配主机下极容易出现卡机情况

2. 任务完成的响应延迟增大了,因为每过一段时间才去轮询一次read操作,而任务可能在两次轮询之间的任意时间完成。
这会导致整体数据吞吐量的降低。

3.死循环While True会导致CPU的无用的耗用、占用

此外,在这个方案中recv()更多的是起到检测“操作是否完成”的作用,实际操作系统提供了更为高效的检测“操作是否完成“作用的接口,例如select()多路复用模式,可以一次检测多个连接是否活跃

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持【听图阁-专注于Python设计】。

相关文章

Python3多线程版TCP端口扫描器

本文实例为大家分享了Python3多线程版TCP端口扫描器的具体代码,供大家参考,具体内容如下 使用命令 python BannerDemo.py -H 192.168.200.10...

总结网络IO模型与select模型的Python实例讲解

总结网络IO模型与select模型的Python实例讲解

网络I/O模型 人多了,就会有问题。web刚出现的时候,光顾的人很少。近年来网络应用规模逐渐扩大,应用的架构也需要随之改变。C10k的问题,让工程师们需要思考服务的性能与应用的并发能力。...

python实现解数独程序代码

python实现解数独程序代码

偶然发现linux系统附带的一个数独游戏,打开玩了几把。无奈是个数独菜鸟,以前没玩过,根本就走不出几步就一团浆糊了。 于是就打算借助计算机的强大运算力来暴力解数独,还是很有乐趣的。 下面...

Python中functools模块的常用函数解析

1.partial 首先是partial函数,它可以重新绑定函数的可选参数,生成一个callable的partial对象: >>> int('10') # 实际上等...

如何使用Python破解ZIP或RAR压缩文件密码

如何使用Python破解ZIP或RAR压缩文件密码

这篇文章主要介绍了如何使用Python破解ZIP或RAR压缩文件密码,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 我们经常会从网络...