目录
线程池的实现
线程池已基于C++11重写
:
前言
初学C++,想封装点常用的C++类,已经写好了mutex,cond,thread的类,想用起来写点东西,于是就决定写线程池了,这里拙笔记录下学习笔记.
本文主要内容包括:线程池的概念
、使用原因
、适用场景
、线程池的实现
、任务调度逻辑
、样例测试
. 线程池的概念
线程池是指在一个多线程程序中创建一个线程集合,在执行新的任务的时候不是新建一个线程,而是使用线程池中已创建好的线程,一旦任务执行完毕,线程就会休眠等待新的任务分配下来,线程池中的线程数量取决于机器给进程所能分配的内存大小,以及应用程序的需求.
使用原因及适用场合
1.在服务器端编程中,最原始的方法我们使用顺序化的结构,一个服务器只能处理一个客户,如果同时2个客户端链接上来了,服务器只能先处理了先到达的那个个,这样第二个客户端只能等了,影响客户的响应时间.它只适用于客户量少的短连接.这时候有方案2.
2.在多线程服务器端编程中,一个服务器如果要处理多条链接的客户端,当链接很少的时候我们可以每来一条链接创建一个线程。但当并发量很大的时候呢,不停地的增加线程,在某个时间计算机资源可能耗尽.于是有了方案3
3.为了弥补方案2中每个请求创建线程的缺陷,我们使用固定大小线程池,全部IO交给IO复用线程解决(本文不涉及),而任务计算交给线程池.如果任务彼此独立,IO压力不大,那么这种方案非常适合.
当然服务器模型远不止这3种,还有很多方案,本文不涉.
线程池的实现原理
线程池类主要维系两个队列:任务队列
,线程队列
线程池通过take方法从线程队列提取任务,到一个线程中去执行; 有任务就提取执行,无任务则阻塞线程休眠.
任务队列
可以单独写个任务类出来,也可以写个任务类基类,预留虚任务函数接口,继承下来泛化.
typedef void (*Task)(void);
线程队列
线程队列通过我自己写的线程类实现.
#includeclass Thread{public:typedef void (*threadFun_t)(void *arg); explicit Thread(const threadFun_t &threadRoutine, void *arg); ~Thread(); void start(); void join(); static void *threadGuide(void *arg); pthread_t getThreadId() const{ return m_threadId; }private: pthread_t m_threadId; bool m_isRuning; threadFun_t m_threadRoutine; void *m_threadArg;};Thread::Thread(const threadFun_t &threadRoutine, void *arg) :m_isRuning(false), m_threadId(0), m_threadRoutine(threadRoutine), m_threadArg(arg){}Thread::~Thread(){ if(m_isRuning){//如果线程正在执行,则分离此线程. CHECK(!pthread_detach(m_threadId)); }}void *Thread::threadGuide(void *arg){ Thread *p = static_cast (arg); p->m_threadRoutine(p->m_threadArg); return NULL;}void Thread::join(){ VERIFY(m_isRuning); CHECK(!pthread_join(m_threadId, NULL)); m_isRuning = false;}void Thread::start(){ pthread_attr_t attr; CHECK(!pthread_attr_init(&attr)); //CHECK(!pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED)); //set thread separation state property CHECK(!pthread_attr_setinheritsched(&attr, PTHREAD_EXPLICIT_SCHED)); //Set thread inheritance CHECK(!pthread_attr_setschedpolicy(&attr, SCHED_OTHER)); //set thread scheduling policy CHECK(!pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM)); //Set thread scope CHECK(!pthread_create(&m_threadId, &attr, threadGuide, this)); m_isRuning = true;}{ MutexLockGuard lock(m_mutex); m_isRuning = false; }}
构造传入带一个空类型参数指针(为什么要带这个空类型指针后面会提
)函数指针,通过start()方法创建线程,然后执行threadGuide()方法调用构造时候传入的函数指针执行咱们想运行的函数实现Thread类.
这两个队列在线程池中的定义如下:
private: std::vectorm_threads; std::deque m_tasks;
任务调度逻辑
任务分配逻辑主要靠两个条件变量实现,(条件变量本文不做详述)
1.任务队列是否空. 2.任务队列是否满. 其逻辑如下图所示:start()方法是线程池的运行方法.通过它创建线程池.
threadRoutine()就是我们就是线程池中创建的线程, 线程跑起来后,通过 isRunning 控制线程循环是否退出. stop()方法关闭线程池,回收资源. 循环中判断 : 有任务则执行,无任务则wait 阻塞等待.void ThreadPool::start(){ m_isRuning = true; m_threads.reserve(m_threadsSize); for(size_t i = 0; i < m_threadsSize; i++){ m_threads.push_back(new Thread(threadRoutine, this)); m_threads[i]->start(); }}
Thread(threadRoutine, this) 这里就是为什么我线程类要带一个无符号类型指针参数的原因,因为静态函数无法调用c++的类成员函数(主要原因是类在编译期间未实例化没有明确的地址.)我们只能通过线程池对象的this指针调用它的成员.
任务调度的源码实现:
ThreadPool::ThreadPool(size_t tasksSize, size_t threadsSize) :m_tasksSzie(tasksSize), m_threadsSize(threadsSize), m_mutex(), m_tasksEmpty(m_mutex), m_tasksFull(m_mutex), m_isRuning(false){}ThreadPool::~ThreadPool(){ if(m_isRuning){ stop(); }}void ThreadPool::threadRoutine(void *arg){ ThreadPool *p = static_cast(arg); while(p->m_isRuning){ ThreadPool::Task task(p->take()); if(task){ task(); } }}ThreadPool::Task ThreadPool::take(){ MutexLockGuard lock(m_mutex); while(m_tasks.empty() && m_isRuning){ m_tasksEmpty.wait(); } if(!m_tasks.empty()){ Task task = m_tasks.front(); m_tasks.pop_front(); m_tasksFull.notify(); return task; } return NULL;}void ThreadPool::addTask(Task task){ if(m_threads.empty()){//如果线程池是空的,直接跑任务. task(); } else{ MutexLockGuard lock(m_mutex); while(m_tasksSzie > 0 && m_tasks.size() >= m_tasksSzie){ m_tasksFull.wait(); } m_tasks.push_back(task); m_tasksEmpty.notify(); }}void ThreadPool::start(){ m_isRuning = true; m_threads.reserve(m_threadsSize); for(size_t i = 0; i < m_threadsSize; i++){ m_threads.push_back(new Thread(threadRoutine, this)); m_threads[i]->start(); }}void ThreadPool::stop(){ { MutexLockGuard lock(m_mutex); m_isRuning = false; m_tasksEmpty.notifyAll(); } for(int i = m_threadsSize - 1; i >= 0; i--){ m_threads[i]->join(); delete(m_threads[i]); m_threads.pop_back(); }}
程序测试
测试代码:
创建一个有两个线程,拥有5个任务的任务队列,执行8个加数任务.#include#include #include #include "MutexLock.hh"#include "Thread.hh"#include #include "Condition.hh"#include "ThreadPool.hh"#include //threadPool testMutexLock CntLock;int cnt = 0;void test(void){ unsigned long i = 0xfffffff; //MutexLockGuard loo(CntLock); //CntLock.lock(); while(i--); printf("%d\n", ++cnt); //CntLock.unlock(); sleep(1);}int main(){//ThreadPool Test ThreadPool tp(5, 2); tp.start(); sleep(3); for(int i = 0; i < 8; i++) tp.addTask(test); getchar(); return 0;}
简单test结果:
thread 140068496353024 run taskthread 140068504745728 run task12thread 140068504745728 run taskthread 140068496353024 run task34thread 140068496353024 run taskthread 140068504745728 run task56thread 140068496353024 run taskthread 140068504745728 run task78