TCP网络编程最本质是的处理三个半事件
什么都不做的EventLoop
意思是说每个线程最多只能有一个EventLoop对象。
FATAL)
= EventLoop构造函数会记住本对象所属线程(threadId
)。EventLoop.h
// Copyright 2010, Shuo Chen. All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
// Author: Shuo Chen (chenshuo at chenshuo dot com)
//
// This is a public header file, it must only include public header files.
#ifndef MUDUO_NET_EVENTLOOP_H
#define MUDUO_NET_EVENTLOOP_H
#include <boost/noncopyable.hpp>
#include <muduo/base/CurrentThread.h>
#include <muduo/base/Thread.h>
namespace muduo
{
namespace net
{
///
/// Reactor, at most one per thread.
///
/// This is an interface class, so don't expose too much details.
class EventLoop : boost::noncopyable
{
public:
EventLoop();
~EventLoop(); // force out-line dtor, for scoped_ptr members.
///
/// Loops forever.
///
/// Must be called in the same thread as creation of the object.
///
void loop();
//断言是否是当前的线程
void assertInLoopThread()
{
//过是当前线程,直接跳过,否则调用abortNotInLoopThread
if (!isInLoopThread())
{
abortNotInLoopThread();
}
}
//测试是否为当前线程
bool isInLoopThread() const { return threadId_ == CurrentThread::tid(); }
static EventLoop* getEventLoopOfCurrentThread();
private:
void abortNotInLoopThread();
bool looping_; /* atomic , 是否处于事件循环*/
const pid_t threadId_; // 当前对象所属线程ID
};
}
}
#endif // MUDUO_NET_EVENTLOOP_H
EventLoop.cpp
// Copyright 2010, Shuo Chen. All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
// Author: Shuo Chen (chenshuo at chenshuo dot com)
#include <muduo/net/EventLoop.h>
#include <muduo/base/Logging.h>
#include <poll.h>
using namespace muduo;
using namespace muduo::net;
namespace
{
// 当前线程EventLoop对象指针
// 线程局部存储 ,如果不用__thread修饰的话,那么就是线程共共享的了
//这样达不到目标,。。
//初始化时,设为0就OK了
__thread EventLoop* t_loopInThisThread = 0;
}
EventLoop* EventLoop::getEventLoopOfCurrentThread()
{
return t_loopInThisThread;
}
EventLoop::EventLoop()
: looping_(false), //初始化为false ,表示当前还没有处于事件循环状态
threadId_(CurrentThread::tid()) //当前的线程Id ,用于标识
{
LOG_TRACE << "EventLoop created " << this << " in thread " << threadId_;
// 如果当前线程已经创建了EventLoop对象,终止(LOG_FATAL),否者的话,设为当前线程this
if (t_loopInThisThread)
{
LOG_FATAL << "Another EventLoop " << t_loopInThisThread
<< " exists in this thread " << threadId_;
}
else
{
t_loopInThisThread = this;
}
}
EventLoop::~EventLoop()
{
t_loopInThisThread = NULL;
}
// 事件循环,该函数不能跨线程调用
// 只能在创建该对象的线程中调用
void EventLoop::loop()
{
//断言还没有事件循环
assert(!looping_);
// 断言当前处于创建该对象的线程中
assertInLoopThread();
//把事件循环标识设为true
looping_ = true;
LOG_TRACE << "EventLoop " << this << " start looping";
//关注事件为NULL,个数为0,延时5000
::poll(NULL, 0, 5*1000);
LOG_TRACE << "EventLoop " << this << " stop looping";
//事件循环标识设为false ,这只是测试程序
looping_ = false;
}
//终止程序
void EventLoop::abortNotInLoopThread()
{
LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop " << this
<< " was created in threadId_ = " << threadId_
<< ", current thread id = " << CurrentThread::tid();
}
#include <muduo/net/EventLoop.h>
#include <stdio.h>
using namespace muduo;
using namespace muduo::net;
/*
该程序 主要是用来测试EventLoop 是否是 一个Thread一个EventLoop
**/
void threadFunc()
{
printf("threadFunc(): pid = %d, tid = %d/n",
getpid(), CurrentThread::tid());
EventLoop loop;
loop.loop();
}
int main(void)
{
printf("main(): pid = %d, tid = %d/n",
getpid(), CurrentThread::tid());
EventLoop loop;
Thread t(threadFunc);
t.start();
loop.loop();
t.join();
return 0;
}
[root@localhost bin]# ./reactor_test01
main(): pid = 2724, tid = 2724
20131018 04:12:55.305234Z 2724 TRACE EventLoop EventLoop created 0xBFA6B2C0 in thread 2724 - EventLoop.cc:36
20131018 04:12:55.305599Z 2724 TRACE loop EventLoop 0xBFA6B2C0 start looping - EventLoop.cc:62
threadFunc(): pid = 2724, tid = 2725
20131018 04:12:55.305792Z 2725 TRACE EventLoop EventLoop created 0xB77E0068 in thread 2725 - EventLoop.cc:36
20131018 04:12:55.305809Z 2725 TRACE loop EventLoop 0xB77E0068 start looping - EventLoop.cc:62
20131018 04:13:00.321713Z 2724 TRACE loop EventLoop 0xBFA6B2C0 stop looping - EventLoop.cc:66
20131018 04:13:00.321779Z 2725 TRACE loop EventLoop 0xB77E0068 stop looping - EventLoop.cc:66
[root@localhost bin]#
#include <muduo/net/EventLoop.h>
#include <stdio.h>
/**
该程序主要用来测试,多个线程使用同一个eventloop时,程序将会错误终止!!
**/
using namespace muduo;
using namespace muduo::net;
EventLoop* g_loop;
void threadFunc()
{
g_loop->loop();
}
int main(void)
{
EventLoop loop;
g_loop = &loop;
Thread t(threadFunc);
t.start();
t.join();
return 0;
}
程序输出
[root@localhost bin]# ./reactor_test02
20131018 04:15:09.010234Z 2730 TRACE EventLoop EventLoop created 0xBFD53730 in thread 2730 - EventLoop.cc:36
20131018 04:15:09.010768Z 2731 FATAL EventLoop::abortNotInLoopThread - EventLoop 0xBFD53730 was created in threadId_ = 2730, current thread id = 2731 - EventLoop.cc:72
Aborted
[root@localhost bin]#
epollpller.h
// Copyright 2010, Shuo Chen. All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
// Author: Shuo Chen (chenshuo at chenshuo dot com)
//
// This is an internal header file, you should not include this.
#ifndef MUDUO_NET_POLLER_EPOLLPOLLER_H
#define MUDUO_NET_POLLER_EPOLLPOLLER_H
#include <muduo/net/Poller.h>
#include <map>
#include <vector>
struct epoll_event;
namespace muduo
{
namespace net
{
///
/// IO Multiplexing with epoll(4).
///
class EPollPoller : public Poller
{
public:
EPollPoller(EventLoop* loop);
virtual ~EPollPoller();
// timeoutMs 超时事件
// activeChannels活动通道
virtual Timestamp poll(int timeoutMs, ChannelList* activeChannels);
//更新通道
virtual void updateChannel(Channel* channel);
//移除通道
virtual void removeChannel(Channel* channel);
private:
// EventListd的初始值
static const int kInitEventListSize = 16;
void fillActiveChannels(int numEvents,
ChannelList* activeChannels) const;
//更新
void update(int operation, Channel* channel);
typedef std::vector<struct epoll_event> EventList;
typedef std::map<int, Channel*> ChannelMap;
//文件描述符 = epoll_create1(EPOLL_CLOEXEC),用来表示要关注事件的fd的集合的描述符
int epollfd_;
// epoll_wait返回的活动的通道channelList
EventList events_;
//通道map
ChannelMap channels_;
};
}
}
#endif // MUDUO_NET_POLLER_EPOLLPOLLER_H
epollpoller.cc
// Copyright 2010, Shuo Chen. All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
// Author: Shuo Chen (chenshuo at chenshuo dot com)
#include <muduo/net/poller/EPollPoller.h>
#include <muduo/base/Logging.h>
#include <muduo/net/Channel.h>
#include <boost/static_assert.hpp>
#include <assert.h>
#include <errno.h>
#include <poll.h>
#include <sys/epoll.h>
using namespace muduo;
using namespace muduo::net;
// On Linux, the constants of poll(2) and epoll(4)
// are expected to be the same.
BOOST_STATIC_ASSERT(EPOLLIN == POLLIN);
BOOST_STATIC_ASSERT(EPOLLPRI == POLLPRI);
BOOST_STATIC_ASSERT(EPOLLOUT == POLLOUT);
BOOST_STATIC_ASSERT(EPOLLRDHUP == POLLRDHUP);
BOOST_STATIC_ASSERT(EPOLLERR == POLLERR);
BOOST_STATIC_ASSERT(EPOLLHUP == POLLHUP);
namespace
{
const int kNew = -1; //新通道
const int kAdded = 1; //要关注的通道
const int kDeleted = 2; //要删除的通道
}
EPollPoller::EPollPoller(EventLoop* loop)
: Poller(loop),
epollfd_(::epoll_create1(EPOLL_CLOEXEC)),
events_(kInitEventListSize)
{
if (epollfd_ < 0)
{
LOG_SYSFATAL << "EPollPoller::EPollPoller";
}
}
EPollPoller::~EPollPoller()
{
::close(epollfd_);
}
// IO 线程
Timestamp EPollPoller::poll(int timeoutMs, ChannelList* activeChannels)
{
//监听事件的到来
int numEvents = ::epoll_wait(epollfd_,
&*events_.begin(),
static_cast<int>(events_.size()),
timeoutMs);
Timestamp now(Timestamp::now());
if (numEvents > 0)
{
LOG_TRACE << numEvents << " events happended";
/*添加活动通道channel*/
fillActiveChannels(numEvents, activeChannels);
//如果活动通道的容器已满,则增加活动通道容器的容量
if (implicit_cast<size_t>(numEvents) == events_.size())
{
events_.resize(events_.size()*2);
}
}
else if (numEvents == 0)
{
LOG_TRACE << " nothing happended";
}
else
{
LOG_SYSERR << "EPollPoller::poll()";
}
return now;
}
/*添加活动通道channel*/
void EPollPoller::fillActiveChannels(int numEvents,
ChannelList* activeChannels) const
{
assert(implicit_cast<size_t>(numEvents) <= events_.size());
for (int i = 0; i < numEvents; ++i)
{
Channel* channel = static_cast<Channel*>(events_[i].data.ptr);
//如果是调试状态,则
#ifndef NDEBUG
int fd = channel->fd();
ChannelMap::const_iterator it = channels_.find(fd);
assert(it != channels_.end());
assert(it->second == channel);
#endif
//否则直接跳到这里
// 设置channel的“可用事件”
channel->set_revents(events_[i].events);
// 加入活动通道容器
activeChannels->push_back(channel);
}
}
//更新某个通道 channel
void EPollPoller::updateChannel(Channel* channel)
{
//断言 在IO线程中
Poller::assertInLoopThread();
LOG_TRACE << "fd = " << channel->fd() << " events = " << channel->events();
//channel 的默认值是 -1 , ---->>channel class
const int index = channel->index();
// kNew 表示有新的通道要增加,kDeleted表示将已不关注事件的fd重新关注事件,及时重新加到epollfd_中去
if (index == kNew || index == kDeleted)
{
// a new one, add with EPOLL_CTL_ADD
int fd = channel->fd();
if (index == kNew)
{
//如果是新的channel,那么在channels_里面是找不到的
assert(channels_.find(fd) == channels_.end());
//添加到channels_中
channels_[fd] = channel;
}
else // index == kDeleted
{
assert(channels_.find(fd) != channels_.end());
assert(channels_[fd] == channel);
}
//
channel->set_index(kAdded);
update(EPOLL_CTL_ADD, channel);
}
else
{
// update existing one with EPOLL_CTL_MOD/DEL
int fd = channel->fd();
(void)fd;
assert(channels_.find(fd) != channels_.end());
assert(channels_[fd] == channel);
//断言已经在channels_里面了,并且已在epollfd_中
assert(index == kAdded);
//剔除channel的关注事件
//如果channel没有事件关注了,就把他从epollfd_中剔除掉
if (channel->isNoneEvent())
{
update(EPOLL_CTL_DEL, channel);
//更新index = kDeleted
channel->set_index(kDeleted);
}
else
{
update(EPOLL_CTL_MOD, channel);
}
}
}
// 移除channel
void EPollPoller::removeChannel(Channel* channel)
{
//断言实在IO线程中
Poller::assertInLoopThread();
int fd = channel->fd();
LOG_TRACE << "fd = " << fd;
//断言能在channels_里面找到channel
assert(channels_.find(fd) != channels_.end());
assert(channels_[fd] == channel);
//断言所要移除的channel已经没有事件关注了,但是此时在event_里面可能还有他的记录
assert(channel->isNoneEvent());
int index = channel->index();
//断言
assert(index == kAdded || index == kDeleted);
//真正从channels_里面删除掉channel
size_t n = channels_.erase(fd);
(void)n;
assert(n == 1);
//从event_中剔除channel
if (index == kAdded)
{
update(EPOLL_CTL_DEL, channel);
}
// channel现在变成新的channel了
channel->set_index(kNew);
}
void EPollPoller::update(int operation, Channel* channel)
{
struct epoll_event event;
bzero(&event, sizeof event);
event.events = channel->events();
event.data.ptr = channel;
int fd = channel->fd();
//更新操作
if (::epoll_ctl(epollfd_, operation, fd, &event) < 0)
{
//写入日志
if (operation == EPOLL_CTL_DEL)
{
LOG_SYSERR << "epoll_ctl op=" << operation << " fd=" << fd;
}
else
{
LOG_SYSFATAL << "epoll_ctl op=" << operation << " fd=" << fd;
}
}
}
Reactor_test03.cc
#include <muduo/net/Channel.h>
#include <muduo/net/EventLoop.h>
#include <boost/bind.hpp>
#include <stdio.h>
#include <sys/timerfd.h>
using namespace muduo;
using namespace muduo::net;
EventLoop* g_loop;
int timerfd;
void timeout(Timestamp receiveTime)
{
printf("Timeout!/n");
uint64_t howmany;
::read(timerfd, &howmany, sizeof howmany);
g_loop->quit();
}
int main(void)
{
EventLoop loop;
g_loop = &loop;
timerfd = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
Channel channel(&loop, timerfd);
channel.setReadCallback(boost::bind(timeout, _1));
channel.enableReading();
struct itimerspec howlong;
bzero(&howlong, sizeof howlong);
howlong.it_value.tv_sec = 1;
::timerfd_settime(timerfd, 0, &howlong, NULL);
loop.loop();
::close(timerfd);
}
程序输出
[root@localhost bin]# ./reactor_test03
20131020 02:24:05.657327Z 4009 TRACE EventLoop EventLoop created 0xBFD2AAD4 in thread 4009 - EventLoop.cc:42
20131020 02:24:05.657513Z 4009 TRACE updateChannel fd = 4 events = 3 - EPollPoller.cc:104
20131020 02:24:05.657554Z 4009 TRACE loop EventLoop 0xBFD2AAD4 start looping - EventLoop.cc:68
20131020 02:24:06.658756Z 4009 TRACE poll 1 events happended - EPollPoller.cc:65
20131020 02:24:06.658972Z 4009 TRACE printActiveChannels {4: IN } - EventLoop.cc:139
Timeout!
20131020 02:24:06.659008Z 4009 TRACE loop EventLoop 0xBFD2AAD4 stop looping - EventLoop.cc:93
[root@localhost bin]#
[28] muduo的定时器由三个类实现,TimerId(定时器)、Timer(最上层的抽象)、TimerQueue(定时器的列表)
EventLoop
runAt 在某个时刻运行定时器------>TimerQueue.addTimer
runAfter 过一段时间运行定时器---->TimeQueue.addTimer
runEvery 每隔一段时间运行定时器->TimerQueue.addTimer
cancel 取消定时器 ----》TimeQueue.cancal
TimerQueue数据结构的选择,能快速根据当前时间找到已到期的定时器,也要高效的添加和删除Timer,因而可以用二叉搜索树,用map或者set;如果选择map的话,是有问题的,可能TimeQueue里面有时间戳是一样的,但定时器timer*是不一样的,可以考虑multimap,不过最后还是不要使用
typedef std::pair<Timestamp, Timer*> Entry;
typedef std::set<Entry> TimerList;
timer.h
// Copyright 2010, Shuo Chen. All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
// Author: Shuo Chen (chenshuo at chenshuo dot com)
//
// This is an internal header file, you should not include this.
#ifndef MUDUO_NET_TIMER_H
#define MUDUO_NET_TIMER_H
#include <boost/noncopyable.hpp>
#include <muduo/base/Atomic.h>
#include <muduo/base/Timestamp.h>
#include <muduo/net/Callbacks.h>
namespace muduo
{
namespace net
{
///
/// Internal class for timer event.
///
class Timer : boost::noncopyable
{
public:
Timer(const TimerCallback& cb, Timestamp when, double interval)
: callback_(cb),
expiration_(when),
interval_(interval),
repeat_(interval > 0.0),
sequence_(s_numCreated_.incrementAndGet())
{ }
void run() const
{
callback_();
}
Timestamp expiration() const { return expiration_; }
bool repeat() const { return repeat_; }
int64_t sequence() const { return sequence_; }
void restart(Timestamp now);
static int64_t numCreated() { return s_numCreated_.get(); }
private:
const TimerCallback callback_; // 定时器回调函数
Timestamp expiration_; // 下一次的超时时刻
const double interval_; // 超时时间间隔,如果是一次性定时器,该值为0
const bool repeat_; // 是否重复
const int64_t sequence_; // 定时器序号
static AtomicInt64 s_numCreated_; // 定时器计数,当前已经创建的定时器数量
};
}
}
#endif // MUDUO_NET_TIMER_H
timer.cc
// Copyright 2010, Shuo Chen. All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
// Author: Shuo Chen (chenshuo at chenshuo dot com)
#include <muduo/net/Timer.h>
using namespace muduo;
using namespace muduo::net;
AtomicInt64 Timer::s_numCreated_;
void Timer::restart(Timestamp now)
{
if (repeat_)
{
// 重新计算下一个超时时刻
expiration_ = addTime(now, interval_);
}
else
{
expiration_ = Timestamp::invalid();
}
}
timerid.h
// Copyright 2010, Shuo Chen. All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
// Author: Shuo Chen (chenshuo at chenshuo dot com)
//
// This is a public header file, it must only include public header files.
#ifndef MUDUO_NET_TIMERID_H
#define MUDUO_NET_TIMERID_H
#include <muduo/base/copyable.h>
namespace muduo
{
namespace net
{
class Timer;
///是一个不透明的ID ,是外部可见的一个类
/// An opaque identifier, for canceling Timer.
///
class TimerId : public muduo::copyable
{
public:
TimerId()
: timer_(NULL),
sequence_(0)
{
}
TimerId(Timer* timer, int64_t seq)
: timer_(timer),
sequence_(seq)
{
}
// default copy-ctor, dtor and assignment are okay
friend class TimerQueue;
private:
Timer* timer_; //定时器的地址 ,timer里面也包含了timer的序号
int64_t sequence_; //定时器的序号
};
}
}
#endif // MUDUO_NET_TIMERID_H
timerqueue.h
// Copyright 2010, Shuo Chen. All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
// Author: Shuo Chen (chenshuo at chenshuo dot com)
//
// This is an internal header file, you should not include this.
#ifndef MUDUO_NET_TIMER_H
#define MUDUO_NET_TIMER_H
#include <boost/noncopyable.hpp>
#include <muduo/base/Atomic.h>
#include <muduo/base/Timestamp.h>
#include <muduo/net/Callbacks.h>
namespace muduo
{
namespace net
{
///
/// Internal class for timer event.
///
class Timer : boost::noncopyable
{
public:
Timer(const TimerCallback& cb, Timestamp when, double interval)
: callback_(cb),
expiration_(when),
interval_(interval),
repeat_(interval > 0.0),
sequence_(s_numCreated_.incrementAndGet())
{ }
void run() const
{
callback_();
}
Timestamp expiration() const { return expiration_; }
bool repeat() const { return repeat_; }
int64_t sequence() const { return sequence_; }
void restart(Timestamp now);
static int64_t numCreated() { return s_numCreated_.get(); }
private:
const TimerCallback callback_; // 定时器回调函数
Timestamp expiration_; // 下一次的超时时刻
const double interval_; // 超时时间间隔,如果是一次性定时器,该值为0
const bool repeat_; // 是否重复
const int64_t sequence_; // 定时器序号
static AtomicInt64 s_numCreated_; // 定时器计数,当前已经创建的定时器数量
};
}
}
#endif // MUDUO_NET_TIMER_H
timerqueue.cc
// Copyright 2010, Shuo Chen. All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
// Author: Shuo Chen (chenshuo at chenshuo dot com)
#define __STDC_LIMIT_MACROS
#include <muduo/net/TimerQueue.h>
#include <muduo/base/Logging.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/Timer.h>
#include <muduo/net/TimerId.h>
#include <boost/bind.hpp>
#include <sys/timerfd.h>
namespace muduo
{
namespace net
{
namespace detail
{
// 创建定时器
int createTimerfd()
{
int timerfd = ::timerfd_create(CLOCK_MONOTONIC,
TFD_NONBLOCK | TFD_CLOEXEC);
if (timerfd < 0)
{
LOG_SYSFATAL << "Failed in timerfd_create";
}
return timerfd;
}
// 计算超时时刻与当前时间的时间差
struct timespec howMuchTimeFromNow(Timestamp when)
{
int64_t microseconds = when.microSecondsSinceEpoch()
- Timestamp::now().microSecondsSinceEpoch();
if (microseconds < 100)
{
microseconds = 100;
}
struct timespec ts;
ts.tv_sec = static_cast<time_t>(
microseconds / Timestamp::kMicroSecondsPerSecond);
ts.tv_nsec = static_cast<long>(
(microseconds % Timestamp::kMicroSecondsPerSecond) * 1000);
return ts;
}
// 清除定时器,避免一直触发
void readTimerfd(int timerfd, Timestamp now)
{
uint64_t howmany;
ssize_t n = ::read(timerfd, &howmany, sizeof howmany);
LOG_TRACE << "TimerQueue::handleRead() " << howmany << " at " << now.toString();
if (n != sizeof howmany)
{
LOG_ERROR << "TimerQueue::handleRead() reads " << n << " bytes instead of 8";
}
}
// 重置定时器的超时时间
void resetTimerfd(int timerfd, Timestamp expiration)
{
// wake up loop by timerfd_settime()
struct itimerspec newValue;
struct itimerspec oldValue;
bzero(&newValue, sizeof newValue);
bzero(&oldValue, sizeof oldValue);
newValue.it_value = howMuchTimeFromNow(expiration);
int ret = ::timerfd_settime(timerfd, 0, &newValue, &oldValue);
if (ret)
{
LOG_SYSERR << "timerfd_settime()";
}
}
}
}
}
using namespace muduo;
using namespace muduo::net;
using namespace muduo::net::detail;
TimerQueue::TimerQueue(EventLoop* loop)
: loop_(loop),
timerfd_(createTimerfd()),
timerfdChannel_(loop, timerfd_),
timers_(),
callingExpiredTimers_(false)
{
timerfdChannel_.setReadCallback(
boost::bind(&TimerQueue::handleRead, this));
// we are always reading the timerfd, we disarm it with timerfd_settime.
/*把timerfd 加入到poller来关注*/
timerfdChannel_.enableReading();
}
TimerQueue::~TimerQueue()
{
::close(timerfd_);
// do not remove channel, since we're in EventLoop::dtor();
for (TimerList::iterator it = timers_.begin();
it != timers_.end(); ++it)
{
delete it->second;
}
}
/*增加一个定时器addTimer()----->addTimerInLoop()*/
TimerId TimerQueue::addTimer(const TimerCallback& cb,
Timestamp when,
double interval)
{
/**
注意:addTimer虽然是线程安全的,但是这里把安全实现的代码给注释掉了,所以以下的代码必须在所属
的EventLoop IO线程中调用
*/
Timer* timer = new Timer(cb, when, interval);
/*
loop_->runInLoop(
boost::bind(&TimerQueue::addTimerInLoop, this, timer));
*/
addTimerInLoop(timer);
return TimerId(timer, timer->sequence());
}
void TimerQueue::cancel(TimerId timerId)
{
/*
loop_->runInLoop(
boost::bind(&TimerQueue::cancelInLoop, this, timerId));
*/
cancelInLoop(timerId);
}
/*添加定时器到所属eventloop IO线程中*/
void TimerQueue::addTimerInLoop(Timer* timer)
{
loop_->assertInLoopThread();
// 插入一个定时器,有可能会使得最早到期的定时器发生改变
bool earliestChanged = insert(timer);
if (earliestChanged)
{
// 重置定时器的超时时刻(timerfd_settime)
resetTimerfd(timerfd_, timer->expiration());
}
}
/*取消某个timer*/
void TimerQueue::cancelInLoop(TimerId timerId)
{
loop_->assertInLoopThread();
assert(timers_.size() == activeTimers_.size());
ActiveTimer timer(timerId.timer_, timerId.sequence_);
// 查找该定时器
ActiveTimerSet::iterator it = activeTimers_.find(timer);
if (it != activeTimers_.end())
{
size_t n = timers_.erase(Entry(it->first->expiration(), it->first));
assert(n == 1); (void)n;
delete it->first; // FIXME: no delete please,如果用了unique_ptr,这里就不需要手动删除了
activeTimers_.erase(it);
}
else if (callingExpiredTimers_)
{
// 已经到期,并且正在调用回调函数的定时器 , 那么将其加到cancelingtimers中,
// 以便在其回调处理完后, 在reset(expired, now)里时无效,不需要重置
cancelingTimers_.insert(timer);
}
assert(timers_.size() == activeTimers_.size());
}
/*定时器触发的回调函数*/
void TimerQueue::handleRead()
{
loop_->assertInLoopThread();
Timestamp now(Timestamp::now());
readTimerfd(timerfd_, now); // 清除该事件,避免一直触发
// 获取该时刻之前所有的定时器列表(即超时定时器列表)
std::vector<Entry> expired = getExpired(now);
//已经处于处理到期定时器当中
callingExpiredTimers_ = true;
//清除上一次的超时定时器队列
cancelingTimers_.clear();
// safe to callback outside critical section
for (std::vector<Entry>::iterator it = expired.begin();
it != expired.end(); ++it)
{
// 这里回调定时器处理函数
it->second->run();
}
callingExpiredTimers_ = false;
// 不是一次性定时器,需要重启
reset(expired, now);
}
// rvo 优化
//返回已经超时的timers
// TimerQueue = 1,1,1,3,4,5,7,9
// 那么触发第一个1时,其实后面的2个1也被触发了
// timerQueue 只管队列中的第一个定时器
std::vector<TimerQueue::Entry> TimerQueue::getExpired(Timestamp now)
{
assert(timers_.size() == activeTimers_.size());
std::vector<Entry> expired;
Entry sentry(now, reinterpret_cast<Timer*>(UINTPTR_MAX));
// 返回第一个未到期的Timer的迭代器
// lower_bound的含义是返回第一个值>=sentry的元素的iterator
// 即*end >= sentry,从而end->first > now
TimerList::iterator end = timers_.lower_bound(sentry);
assert(end == timers_.end() || now < end->first);
// 将到期的定时器插入到expired中
std::copy(timers_.begin(), end, back_inserter(expired));
// 从timers_中移除到期的定时器
timers_.erase(timers_.begin(), end);
// 从activeTimers_中移除到期的定时器
for (std::vector<Entry>::iterator it = expired.begin();it != expired.end(); ++it)
{
ActiveTimer timer(it->second, it->second->sequence());
size_t n = activeTimers_.erase(timer);
assert(n == 1); (void)n;
}
assert(timers_.size() == activeTimers_.size());
return expired;
}
void TimerQueue::reset(const std::vector<Entry>& expired, Timestamp now)
{
Timestamp nextExpire;
for (std::vector<Entry>::const_iterator it = expired.begin();
it != expired.end(); ++it)
{
ActiveTimer timer(it->second, it->second->sequence());
// 如果是重复的定时器并且是未取消定时器(未被其他线程取消掉),则重启该定时器
// cancelingTimers_.find(timer) == cancelingTimers_.end() 在取消的队列中找到不到timer
if (it->second->repeat()&& cancelingTimers_.find(timer) == cancelingTimers_.end())
{
it->second->restart(now);
insert(it->second);
}
else
{
// 一次性定时器或者已被取消的定时器是不能重置的,因此删除该定时器
// FIXME move to a free list
delete it->second; // FIXME: no delete please
}
}
if (!timers_.empty())
{
// 获取最早到期的定时器超时时间
nextExpire = timers_.begin()->second->expiration();
}
if (nextExpire.valid())
{
// 重置定时器的超时时刻(timerfd_settime)
resetTimerfd(timerfd_, nextExpire);
}
}
bool TimerQueue::insert(Timer* timer)
{
loop_->assertInLoopThread();
assert(timers_.size() == activeTimers_.size());
// 最早到期时间是否改变
bool earliestChanged = false;
//获取传进来的定时器的超时时间
Timestamp when = timer->expiration();
TimerList::iterator it = timers_.begin();
// 如果timers_为空或者when小于timers_中的最早到期时间
if (it == timers_.end() || when < it->first)
{
earliestChanged = true;
}
{
// 插入到timers_中
std::pair<TimerList::iterator, bool> result
= timers_.insert(Entry(when, timer));
/*断言插入操作是否成功*/
assert(result.second); (void)result;
}
{
// 插入到activeTimers_中
std::pair<ActiveTimerSet::iterator, bool> result
= activeTimers_.insert(ActiveTimer(timer, timer->sequence()));
assert(result.second); (void)result;
}
//断言是否成功操作了
assert(timers_.size() == activeTimers_.size());
return earliestChanged;
}