加载中...

基于最小堆的定时器主备切换

基于最小堆的定时器主备切换

定时器

总体结构:为一个最小堆,也就是STL中的优先队列,存储定时任务,线程池去定时获取任务并执行

New Image

最小堆

任务分为三种:固定时间执行,cron表达式执行,间隔执行,这些定时任务统一加入到最小堆,通过计算下一次执行时间做为key。

c++
代码解读
复制代码
typedef std::priority_queue<CSharedPtr<ITaskWrap>, std::vector< CSharedPtr<ITaskWrap> >, Compare> CCronTaskQueue; struct Compare { bool operator()(const CSharedPtr<ITaskWrap>& a, const CSharedPtr<ITaskWrap>& b) { return a->GetNextTime() > b->GetNextTime(); } };

下一次执行最近的任务将处于堆顶

任务添加

往最小堆加入定时任务包括三个来源:

  1. 程序启动加载时,从数据库加载定时任务:定时任务通过脚本维护
  2. 程序启动中,动态添加/删除/清空定时任务
    • 从web端添加定时任务时,只是放入action队列,由定时任务触发器(单线程运行)将这些动作封装成定时任务置入最小堆,并负责最小堆的出堆
    • 其他插件动态添加/删除/清空定时任务,如后面要讲的初始化/主备切换定时任务
c++
代码解读
复制代码
long CTimeTrigger::Run() { // 1. 将ActionQueue的任务入堆 while (true) { CSharedPtr<CCronTaskAction> lpAction = m_ActionQueue.Pop(TIME_TIGGER_CHECK_INTERVAL); while(lpAction.IsNotNull()) { if(lpAction->GetAction() == CCronTaskAction::ACTION_ADD) { // 创建定时任务,入堆... } else if(lpAction->GetAction() == CCronTaskAction::ACTION_REMOVE) { // 根据任务id删除定时任务节点... } else if(lpAction->GetAction() == CCronTaskAction::ACTION_CLEAR) { // 清空定时器... } lpAction = m_ActionQueue.PopNoWait(); } // 2. 将最小堆的定时任务出堆,加入到定时任务线程池的消息队列 time_t iNow = time(NULL); if(m_CronTaskQueue.size() > 0) { CSharedPtr<ITaskWrap> lpTaskWrap = m_CronTaskQueue.top(); while(lpTaskWrap.IsNotNull()) { // 检测下一次执行时间,触发定时任务... lpTaskWrap->OnTimer(iNow); } } }
  1. 定时任务执行后,触发回调下一次继续执行

定时任务线程池

线程池初始化,多个线程从定时任务消息队列取任务

c++
代码解读
复制代码
CWorkThreadPool::CWorkThreadPool(ISchedule* lpOwner,const std::string& szThreadPoolName,int iThreadCount) :m_lpOwner(lpOwner),m_iThreadCount(iThreadCount),m_szThreadPoolName(szThreadPoolName) { m_lpQueue = new CBlockingQueue< CSharedPtr<IWorkThreadTask> >(); // 消息队列 for(int i=0; i<m_iThreadCount; i++) { std::string szThreadName = szThreadPoolName + "-" + std::to_string((long long) i); m_Threads.push_back(new CScheduleWorkThread(m_lpOwner,m_lpQueue,i,szThreadName)); } }

线程池运行:取消息队列任务执行

c++
代码解读
复制代码
void CWorkThreadPool::Start() { for(int i=0; i<m_iThreadCount; i++) { m_Threads[i]->Start(); } } // 线程执行体 long CScheduleWorkThread::Run() { while(true) { CSharedPtr<IWorkThreadTask> lpTask = m_lpQueue->Pop(); // 取消息队列消息 if(lpTask.IsNotNull()) { try { lpTask->Execute(lpTask); // 执行定时任务api } catch(IError& e) { LogError("运行错误 error_no: " << e.GetErrorNo() << " error_info: " << e.GetErrorMsg()); } } } return 0; }

执行api,返回错误信息,并计算下一次执行时间将任务再次入堆:

c++
代码解读
复制代码
void CLocalScheduleTask::OnProcess(const CSharedPtr<IWorkThreadTask>& lpTask) { if(m_iTaskStatus == TASK_STATUS_DISABLE) { // 任务状态异常,直接退出... return; } if(m_iStep == STEP_EXEC) // 任务执行 { // 执行任务,返回错误信息,并计算下一次执行时间将任务再次入堆 ExecTask(lpTask,lpTaskInfo); Next(STEP_CALLBACK); } }

主备切换

主机和备机连接的同一个实体数据库,内存数据库UFTDB通过NFS挂载和redo文件进行实时同步,所以二者的数据是一样的,不一样的是内存中的数据

当主机宕机后,备机成为主机,继续执行定时任务,而主机未执行完成的任务丢弃

框架启动插件加载流程:

  1. 主线程启动,按照配置文件顺序加载各个插件库文件(so或dll)
  2. 调用各个插件库文件的OnInit接口进行初始化,一般来说此时各个插件都只有一个主线程运行
  3. 调用各个插件库文件的OnStart接口进行启动,一般来说此时各个插件的内部线程在此时启动

至此,程序启动时,会调用到定时器的入口函数:

c++
代码解读
复制代码
void CScheduleAgentImpl::OnStart(PFOnSetTimeOut pfOnSetTimeOut) { if (pfOnSetTimeOut) pfOnSetTimeOut(5000); // 向路由插件注册 mf_RegSvr(); m_lpTimeTrigger->Start(); m_lpThreadPool->Start(); // 执行初始化任务 CSharedPtr<CInitTask> lpInitTask = new CInitTask(this); CSharedPtr<ICronTask> lpCronTask = new CCronTask(this,0,"*/2 * * * * ?",-1,lpInitTask); AddCronTask(lpCronTask); // CSharedPtr<CInitTask> lpInitTask = new CInitTask(this); // m_lpThreadPool->CommitTask(lpInitTask); printf("schedule_agent started\n"); }

在进行初始化时,会添加一些系统级任务,如初始化、心跳、主备切换定时任务

c++
代码解读
复制代码
void CInitTask::OnProcess(const CSharedPtr<IWorkThreadTask>& lpTask) { if(m_iStep == STEP_CONNECT_MANAGER) { // 定时器插件连接所在节点 } else if(m_iStep == STEP_REFRESH_CRON_TASK) { // 刷新定时任务 RefreshCronTask(lpTask); } } void CInitTask::RefreshCronTask(const CSharedPtr<IWorkThreadTask>& lpTask) { CAutoPtr<IESBMessage> lpRsp = GetRspMessage(); if(lpRsp.IsNotNull()) { int iErrorNo = lpRsp->GetItem(TAG_ERROR_NO)->GetInt(); if(iErrorNo == 0) { m_lpOwner->RemoveCronTask(-1); if (m_lpOwner->IsMaster()) // Master启动心跳任务 { // 添加心跳任务 CSharedPtr<CHeartBeatTask> lpTask = new CHeartBeatTask(m_lpOwner); CSharedPtr<ICronTask> lpCronTask = new CCronTask(m_lpOwner, 0, "*/3 * * * * ?", 0, lpTask); m_lpOwner->AddCronTask(lpCronTask); } else { // Slave节点开启ChangeMasterTask检查主备切换 3s检查一次状态 LogInfo("定时任务管理器 备机开启主备切换检查"); CSharedPtr<CChangeMasterTask> lpTask = new CChangeMasterTask(m_lpOwner); CSharedPtr<ICronTask> lpCronTask = new CCronTask(m_lpOwner, 0, "*/3 * * * * ?", -2, lpTask); m_lpOwner->AddCronTask(lpCronTask); } } } }

主机执行心跳定时任务,而备机执行主备切换定时任务

c++
代码解读
复制代码
void CChangeMasterTask::OnProcess(const CSharedPtr<IWorkThreadTask>& lpTask) { if(m_lpOwner->IsMaster()) // 检查当前节点是否是主节点 { LogInfo("定时任务管理器 主备切换"); // 备机在切换成主机时,需要重新添加主备切换定时任务 m_lpOwner->RemoveCronTask(-2); CSharedPtr<IWorkThreadTask> lpTask = new CInitTask(m_lpOwner); CSharedPtr<ICronTask> lpCronTask = new CCronTask(m_lpOwner,0,"*/2 * * * * ?",-1,lpTask); m_lpOwner->AddCronTask(lpCronTask); } Complete(); }

主备切换检测定时任务通过CInitTask::OnProcess第一次触发后,后续的触发逻辑:

在主备正常时:

  • 主机会执行心跳的定时任务,保持插件和主机的连接
  • 备机会检测是否发生主备切换

在主备切换时:

  • 主机宕机,UFTDB将备机设置为主机
  • 备机检测自身成为主机,删除主备切换定时任务,通过初始化定时任务添加心跳定时任务,这使得它后续不会再执行主备切换的定时任务了
  • 原来的主机重启成为备机后,将执行主备切换定时任务