Reactor

Reactor事件驱动的两种设计实现:面向对象 VS 函数式编程

Reactor事件驱动的两种设计实现:面向对象 VS 函数式编程

这里的函数式编程的设计以muduo为例进行对比说明;

Reactor实现架构对比

面向对象的设计类图如下:

 oo_class

函数式编程以muduo为例,设计类图如下:

muduo 

面向对象的Reactor方案设计

我们先看看面向对象的设计方案,想想为什么这么做; 
拿出Reactor事件驱动的模式设计图,对比来看,清晰明了;

 reactor_model 

从左边开始,事件驱动,需要一个事件循环和IO分发器,EventLoop和Poller很好理解;为了让事件驱动支持多平台,Poller上加一个继承结构,实现select、epoller等IO分发器选用;

Channel是要监听的事件封装类,核心成员:fd文件句柄; 
成员方法围绕着fd展开展开,如关注fd的读写事件、取消关注fd的读写事件; 
核心方法: 
enableReading/Writing; 
disableReading/Writing; 
以及事件到来后的处理方法: 
handleEvent; 
在OO设计这里,handleEvent设计成一个虚函数,回调上层实际的数据处理;

AcceptChannel和ConnetionChannel派生自Channel,负责实际的网络数据处理;根据职责的不同而区分,AcceptChannel用于监听套接字,接收新连接请求;有新的请求到来时,生成新的socket并加入到事件循环,关注读事件; 
ConnetionChannel用于真实的用户数据处理,处理用户的读写请求;涉及到具体的数据处理,当然,在这里会需要用到应用层的缓存区;

比较困难的是用户逻辑层的设计;放在哪里合适? 
先看看需求,用户逻辑层需要知道的事件点(在这之后可能会有应用层的逻辑): 
连接建立、消息到来、消息发送完毕、连接关闭; 
这四个事件的源头是Channel的handleEvent(),直接调用者应该Channel的派生类(AcceptChannel和ConnetionChannel),貌似可以将用户逻辑层的指针放到Channel里; 
且不说架构上是否合理,单是实现上右边Channel这一块(含AcceptChannel和ConnetionChannel)对用户是透明的,用户只需要关注以上四个事件点,底层的细节用户层并不关心(比如是否该在事件循环中关注某个事件,取消关注某个事件,对用户都是透明的),所以外部用户无法直接将用户逻辑层的指针给Channel;

想想用户与网络库的接口在哪里? 
IO分发器对用户也是透明的,用户可见就是EventLoop,在main方法中:

EventLoop loop; 
loop.loop();

用户逻辑层也就只有通过EventLoop与Channel的派生类关联上; 
这样,就形成的最终的设计类图,在main方法中:

UserLogicCallBack callback;
EventLoop loop(&callback); //在定义 EventLoop时,将callback的指针传入,供后续使用;
loop.loop();

而网络层调用业务层代码时,则通过eventloop_的过渡调用到业务逻辑的函数; 
比如ConnetionChannel中数据到达的处理:

eventloop_->getCallBack()->onMessage(this);

函数式编程的Reactor设计

函数式编程中,类之间的关系主要通过组合来实现,而不是通过派生实现; 
整个类图中仅有Poller处使用了继承关系;其它的都没有使用; 
这也是函数式编程的一个设计理念,更多的使用组合而不是继承来实现类之间的关系,而支撑其能够这样设计的根源在于function()+bind()带来的函数自由传递,实现回调非常简单; 
而OO设计中,只能使用基于虚函数/多态来实现回调,不可避免的使用继承结构;

下面再看看各个类的实现; 
事件循环EventLoop和IO分发器没有区别; 
Channel的职责也和上面类似,封装事件,所不同的是,Channel不再是继承结构中的基类,而是作为一个实体; 
这样,handleEvent方法就不再是一个纯虚函数,而是包含具体的逻辑处理,当然,只有最基本的事件判断,然后调用上层的读写回调:

void Channel::handleEvent()
{
  if (revents_ & (POLLIN | POLLPRI | POLLRDHUP))
  {
    if (readCallback_) readCallback_();
  }
  if (revents_ & POLLOUT)
  {
    if (writeCallback_) writeCallback_();
  }
}

这样的关键是设置一堆回调函数,通过boost::function()+boost::bind()可以轻松的做到;

Acceptor 和TcpConnection

Acceptor类,这个对应到上面的AcceptChannel,但实现不是通过继承,而是通过组合实现; 
Acceptor用于监听,关注连接,建立连接后,由TCPConnection来接管处理; 
这个类没有业务处理,用来处理监听和连接请求到来后的逻辑; 
所有与事件循环相关的都是channel,Acceptor不直接和EventLoop打交道,所以在这个类中需要有一个channel的成员,并包含将channel挂到事件循环中的逻辑(listen()); 
TcpConnection,处理连接建立后的收发数据;业务处理回调完成;

TCPServer

TCPServer就是胶水,作用有二:

  1. 作为最终用户的接口方,和外部打交道通过TCPServer交互,而业务逻辑处理将回调函数传入到底层,这种传递函数的方式犹如数据的传递一样自然和方便;
  2. 作用Acceptor和TcpConnection的粘合剂,调用Acceptor开始监听连接并设置回调,连接请求到来后,在回调中新建TcpConnection连接,设置TcpConnection的回调(将用户的业务处理回调函数传入,包括:连接建立后,读请求处理、写完后的处理,连接关闭后的处理),从这里可以看到,业务逻辑的传递就跟数据传递一样,多么漂亮;

示例对比

通过一个示例来体会这两种实现中回调实现的差别; 
示例:分析读事件到来时,底层如何将消息传递给用户逻辑层函数来处理的?

OO实现

channel作为事件的监听接口,加入到事件循环中,当读事件到来时,需要调用 
ConnetionChannel上的handleEvent();而异步数据的读请求最终需要业务逻辑层来判断是否读到相应的数据,这就需要从ConnetionChannel中调用用户逻辑层上的OnMessage(); 
看看这段逻辑的OO实现序列图:

oo_seq_msg 

代码层面的实现: 
定义用户逻辑处理类UserLogicCallBack,接收消息的处理函数为onMessage(); 
我们关注最终底层是如何调用到业务逻辑层的onMessage()的;

int main()
{
    UserLogicCallBack urlLogic;
    EventLoop loop(urlLogic);//将用户逻辑对象与事件循环对象关联起来
    loop.loop();
}

callback_用户逻辑层的对象在EventLoop初始化时传入:

class EventLoop{
    EventLoop(CallBack & callback):
        callback_(callback)
    {
    }
    CallBack* getCallBack()
    {
        return &callback_;
    }
    CallBack& callback_; //回调方法基类
}

当读事件到来,在ConnectionChannel中通过eventloop对象作为桥梁,回调消息业务处理onMesssage();

void ConnectionChannel::handleRead(){
      int savedErrno = 0;
    //返回缓存区可读的位置,返回所有读到的字节,具体到是否收全,
    //是否达到业务需要的数据字节数,由业务层来判断处理
    ssize_t n = inputBuffer_.readFd(fd_, &savedErrno);
    if (n > 0)
    {    
                //通过eventloop作为中介,调用业务层的回调逻辑
        loop_->getCallBack()->onMesssage(this,&inputBuffer_);
    }
    else if (n == 0)
    {
        handleClose();
    }
    else
    {
        errno = savedErrno;
        handleError();
    }
}

函数式编程实现

而muduo的回调,使用boost::function()+boost::bind()实现,通过这两个神器,将使用者和实现者解耦; 
通过TcpServer,将用户逻辑层的函数传递到底层;读事件到来,回调用户逻辑;

以下是时序

fun_seq_msg 

代码层面,我们看看用户逻辑层的代码是如何传入的: 
UserLogicCallBack中包含TcpServer的对象;

TcpServer server_;

在构造函数中,将onMessage传递给TcpServer,这是第一次传递:

UserLogicCallBack::UserLogicCallBack(muduo::net::EventLoop* loop,
                       const muduo::net::InetAddress& listenAddr)
  : server_(loop, listenAddr, "UserLogicCallBack")
{
  server_.setConnectionCallback(
      boost::bind(&UserLogicCallBack::onConnection, this, _1));
  //这里将onMessage传递给TcpServer
  server_.setMessageCallback(
      boost::bind(&UserLogicCallBack::onMessage, this, _1, _2, _3));
}

TcpServer中的相关细节:

class TcpServer{
    void setMessageCallback(const MessageCallback& cb)
    { messageCallback_ = cb; }

    typedef boost::function<void (const TcpConnectionPtr&,
                                  Buffer*,
                                  Timestamp)> MessageCallback;
    MessageCallback messageCallback_;
};

TcpServer新建连接时,将用户层的回调函数继续往底层传递,这是第二次传递:

void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr)
{
  TcpConnectionPtr conn(new TcpConnection(ioLoop,
                                          connName,
                                          sockfd,
                                          localAddr,
                                          peerAddr));
  conn->setConnectionCallback(connectionCallback_);
  // 这里将onMessage()传递给TcpConnection
  conn->setMessageCallback(messageCallback_); 
  conn->setWriteCompleteCallback(writeCompleteCallback_);
  conn->setCloseCallback(boost::bind(&TcpServer::removeConnection, this, _1)); 
  ioLoop->runInLoop(boost::bind(&TcpConnection::connectEstablished, conn));
}

通过这两次传递,messageCallback_作为成员变量保存在TcpConnection中; 
当读事件到来时,TcpConnection中就可以直接调用业务层的回调逻辑:

void TcpConnection::handleRead(Timestamp receiveTime)
{
  //返回缓存区可读的位置,返回所有读到的字节,具体到是否收全,
  //是否达到业务需要的数据字节数,由业务层来判断处理
  ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);
  if (n > 0)
  {
    //回调业务层的逻辑
    messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);
  }
  else if (n == 0)
  {
    handleClose();
  }
  else
  {
    errno = savedErrno;
    handleError();
  }
}

完整时序详见最后一节;源代码来自muduo库;

两者的时序图对比

Reactor的面向对象编程时序:

 oo_sequence

 

Reacotr的函数式编程时序:

EchoServer_sequence 

结论

在面向对象的设计中,事件底层回调上层逻辑,本来和loop这个发动机没有任何关系的一件事,却需要使用它来作为中转;EventLoop作为回调的中间桥梁,实在是迫不得已的实现; 
而muduo的设计中加入了TcpServer这一胶水层,整个架构就清晰多了; 
boost::function()+boost::bind()让我们在回调的实现上有了更大的自由度,不用再依赖于基于虚函数的多态继承结构;但更大的自由度,也更容易带来糟糕的设计,使用boost::function()+boost::bind()基于对象的设计,还需要多多体会,多加应用;

Posted by: 大CC | 30DEC,2015 


=====================================================

=====================================================

======================================================

一. Reactor模式简介

Reactor释义“反应堆”,是一种事件驱动机制。和普通函数调用的不同之处在于:应用程序不是主动的调用某个API完成处理,

而是恰恰相反,Reactor逆置了事件处理流程,应用程序需要提供相应的接口并注册到Reactor上,如果相应的时间发生,

Reactor将主动调用应用程序注册的接口,这些接口又称为“回调函数”。

二. moduo库Reactor模式的实现

muduo主要通过3个类来实现Reactor模式:EventLoop,Channel,Poller。

1. EventLoop

事件循环。moduo的线程模型为one loop per thread,即每个线程只能有一个EventLoop对象。
EventLoop对象的生命周期通常和其所属的线程一样长。

数据成员:

       const pid_t threadId_;保存当前EventLoop所属线程id

       boost::scoped_ptr poller_; 实现I/O复用 boost::scoped_ptr timerQueue_;

       int wakeupFd_;

       boost::scoped_ptr wakeupChannel_; 用于处理wakeupFd_上的可读事件,将事件分发到handlRead() ChannelList activeChannels_; 有事件就绪的              Channel Channel* currentActiveChannel_;

       MutexLock mutex_; pendingFunctors_回暴露给其他线程,所以需要加锁 std::vectorpendingFunctors_; 

主要功能函数:

     loop(),在该函数中会循环执行以下过程:调用Poller::poll(),通过此调用获得一个vector<channel*>activeChannels_的就绪事件集合,再遍历该容器,执行每个Channel的Channel::handleEvent()完成相应就绪事件回调,最后执行pendingFunctors_排队的函数。上述一次循环就是一次Reactor模式完成。

     runInLoop(boost::function<void()>),实现用户指定任务回调,若是EventLoop隶属的线程调用EventLoop::runInLoop()则EventLoop马上执行;若是其它线程调用则执行EventLoop::queueInLoop(boost::function<void()>将任务添加到队列中(线程转移)。EventLoop如何获得有任务这一事实呢?通过eventfd可以实现线程间通信,具体做法是:其它线程向EventLoop::vector<boost::function<void()> >添加任务T,然后通过EventLoop::wakeup()向eventfd写一个int,eventfd的回调函数EventLoop::handleRead()读取这个int,从而相当于EventLoop被唤醒,此时loop中遍历队列执行堆积的任务。这里采用Channel管理eventfd,Poller侦听eventfd体现了eventfd可以统一事件源的优势。

    queueInLoop(Functor& cb),将cb放入队列,并在必要时唤醒IO线程。有两种情况需要唤醒IO线程,1 调用queueInLoop()的线程不是IO线程,2 调用queueInLoop()的线程是IO线程,而此时正在调用pengding functor。

2. Channel

  事件分发器。每个Channel只属于一个EventLoop,每个Channel只负责一个文件描述符fd的IO事件分发,但其不拥有fd。

数据成员:

     int fd_文件描述符,

     int events_ 文件描述符注册事件,

     int revents_文件描述符的就绪事件,由Poller::poll设置

     readCallback_,writeCallback...各种事件回调,会在拥有该Channel类的构造函数中被注册,例如TcpConnction会在构造函数中TcpConnection::handlRead()注册给Channel::readCallback

主要功能函数:

     setCallback()系列函数,接受Channel所属的类注册相应的事件回调函数

     enableReading(),update(), 当一个fd想要注册可读事件时,首先通过Channel::enableReading()-->Channel::update(this)->EventLoop::updateChannel(Channel)->Poller::updateChannel(Channel*)调用链向poll系统调用的侦听事件表注册或者修改注册事件。

    handleEvent(), Channel作为是事件分发器其核心结构是Channel::handleEvent(),该函数调用Channel::handleEventWithGuard(),在其内根据Channel::revents的值分发调用相应的事件回调。

3. Poller

Poller是IO multiplexing的封装,封装了poll和epoll。Poller是EventLoop的间接成员,
只供拥有该Poller的EventLoop在IO线程调用。生命期与EventLoop相等。

数据成员:

    vector pollfds_事件结构体数组用于poll的第一个参数;

    map<int,channel*> channels_用于文件描述符fd到Channel的映射便于快速查找到相应的Channel

主要功能函数:

    updateChannel(Channel*) 用于将传入的Channel关心的事件注册给Poller。

    poll(int timeoutMs,vector<channel*> activeChannels)其调用poll侦听事件集合,将就绪事件所属的Channel调用fillActiveChannels()加入到activeChannels_中。

其他类

EventLoopThread: 启动一个线程执行一个EventLoop,其语义和"one loop per thread“相吻合。注意这里用到了互斥量和条件变量,这是因为线程A创建一个EventLoopThread对象后一个运行EventLoop的线程已经开始创建了,可以通过EventLoopThread::startLoop()获取这个EventLoop对象,但是若EventLoop线程还没有创建好,则会出错。所以在创建EventLoop完成后会执行condititon.notify()通知线程A,线程A调用EventLoopThread::startLoop()时调用condition.wai()等待,从而保证获取一个创建完成的EventLoop.毕竟线程A创建的EventLoop线程,A可能还会调用EventLoop执行一些任务回调呢。


版权声明:本文为lusic01原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/lusic01/article/details/80622575