基于python生成器封装的协程类

yipeiwu_com6年前Python基础

自从python2.2提供了yield关键字之后,python的生成器的很大一部分用途就是可以用来构建协同程序,能够将函数挂起返回中间值并能从上次离开的地方继续执行。python2.5的时候,这种生成器更加接近完全的协程,因为提供了将值和异常传递回到一个继续执行的函数中,当等待生成器的时候,生成器能返回控制。

python提供的生成器设施:

  • yield:能够将自己挂起,并提供一个返回值给等待方
  • send:唤起一个被挂起的生成器,并能够传递一个参数,可以在生成器中抛出异常
  • next:本质上相当于send(None),对每个生成器的第一次调用必须不能传递参数
  • close:主动退出一个生成器

python封装

虽然python3提供了asyncio这样的异步IO库,而且也有greenlet等其他协程库,但目前的需求并不是实际的网络IO并发操作,而是需要模拟状态机的运行,因此使用协程可以很方便的模拟,并加入认为的控制,下面是封装的一个python类。

class Coroutine(object):

  """ Base class of the general coroutine object """

  STATE_RUNNING = 0
  STATE_WAITING = 1
  STATE_CLOSING = 2

  def __init__(self):
    self.state = Coroutine.STATE_WAITING
    self.started = False
    self.args = None
    self.routine = self._co()

  def _co(self):
    self.ret = None
    while True:
      self.args = yield self.ret
      if not self.started:
        self.started = True
        continue
      else:
        self.state = Coroutine.STATE_RUNNING
        self.ret = self.run(self.args)
      if self.state == Coroutine.STATE_CLOSING:
        break
      self.state = Coroutine.STATE_WAITING

  def start(self):
    """ Start the generator """
    if self.routine is None:
      raise RuntimeError('NO task to start running!')
    self.started = True
    self.routine.next()

  def finish(self):
    """ Finish the execution of this routine """
    self.state = Coroutine.STATE_CLOSING
    self.routine.close()

  def run(self, args):
    """ The runing method to be executed every once time"""
    raise NotImplementedError

  def execute(self, arg_obj):
    """ Awake this routine to execute once time """
    return self.routine.send(arg_obj)

基于上述封装,下面实现了一个协同的生产者消费者示例:

class ProducerCoroutine(Coroutine):

  """ The Producer concrete coroutine """

  def __init__(self, cnsmr):
    if not isinstance(cnsmr, Coroutine):
      raise RuntimeError('Consumer is not a Coroutine object')
    self.consumer = cnsmr
    self.consumer.start()
    super(ProducerCoroutine, self).__init__()

  def run(self, args):
    print 'produce ', args
    ret = self.consumer.execute(args)
    print 'consumer return:', ret

  def __call__(self, args):
    """ Custom method for the specific logic """
    self.start()
    while len(args) > 0:
      p = args.pop()
      self.execute(p)
    self.finish()


class ConsumerCoroutine(Coroutine):

  """ The Consumer concrete coroutine """

  def __init__(self):
    super(ConsumerCoroutine, self).__init__()

  def run(self, args):
    print 'consumer get args: ', args
    return 'hahaha' + repr(args)

运行结果如下:

produce 4
consumer get args: 4
consumer return: hahaha4
produce 3
consumer get args: 3
consumer return: hahaha3
produce 2
consumer get args: 2
consumer return: hahaha2
produce 1
consumer get args: 1
consumer return: hahaha1
produce 0
consumer get args: 0
consumer return: hahaha0

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持【听图阁-专注于Python设计】。

相关文章

Django REST framework 如何实现内置访问频率控制

对匿名用户采用 IP 控制访问频率,对登录用户采用 用户名 控制访问频率。 from rest_framework.throttling import SimpleRateThrot...

使用APScheduler3.0.1 实现定时任务的方法

需求是在某一指定的时刻执行操作 网上的建议多为通过调用Scheduler的add_date_job实现 不过APScheduler 3.0.1与之前差异较大, 无法通过上述方法实现 参考...

对pandas的行列名更改与数据选择详解

对pandas的行列名更改与数据选择详解

记录一些pandas选择数据的内容,此前首先说行列名的获取和更改,以方便获取数据。此文作为学习巩固。 这篇博的内容顺序大概就是: 行列名的获取 —> 行列名的更改 —> 数据...

Django Admin中增加导出Excel功能过程解析

Django Admin中增加导出Excel功能过程解析

在使用Django Admin时, 对于列表我们有时需要提供数据导出功能, 如下图: 增加导出Excel功能 在Django Admin中每个模型的Admin类(继承至admin.M...

python聚类算法解决方案(rest接口/mpp数据库/json数据/下载图片及数据)

python聚类算法解决方案(rest接口/mpp数据库/json数据/下载图片及数据)

1. 场景描述 一直做java,因项目原因,需要封装一些经典的算法到平台上去,就一边学习python,一边网上寻找经典算法代码,今天介绍下经典的K-means聚类算法,算法原理就不介绍...