#include "ThreadPool.h"int main() { CThreadPool threadPool(10); CMyTask taskObj[100]; char szTmp[1024] = {0}; int i = 0; for (i = 0; i < 10; i++) { snprintf(szTmp, sizeof(szTmp), "this is the [%d] thread running", i); taskObj[i].SetData(szTmp); threadPool.AddTask(&taskObj[i]); }int n = 0; while (1) { int taskSize = threadPool.getTaskSize(); if (0 == taskSize && !threadPool.isAnyThreadBusy()) { if (threadPool.StopAll() == 0) { cout << "Now I will exit from main" << endl; return 0; } }// 添加新的任务 sleep(1); if (n % 4) { snprintf(szTmp, sizeof(szTmp), "this is the [%d] thread running", n+10); taskObj[1].SetData(szTmp); threadPool.AddTask(&taskObj[1]); } n++; } return 0; }

#ifndef __THREADPOOL_H #define __THREADPOOL_H#include "task.h"/** * 线程结构体 */ struct CThread { pthread_t pthread_id; //线程id int iStat; //线程状态CThread():iStat(0) { }bool operator == (const CThread &obj) const { return (long)&pthread_id == (long)&obj.pthread_id; } }; /** * 线程池管理类的实现 */ class CThreadPool { public: CThreadPool(int threadNum = 10); int AddTask(CTask *task); // 把任务添加到任务队列中 int getTaskSize(); // 获取当前任务队列中的任务数 int StopAll(); // 使线程池中的线程退出 bool isAnyThreadBusy(); // 是否有处于繁忙状态的线程 int getTaskNum(); // 获取当前等待执行的任务数protected: int CreateThread(); // 创建线程池中的线程 static void* ThreadFunc(void * threadData); // 新线程的线程回调函数 static int MoveToIdle(CThread *pThread); // 线程执行结束后,状态置为空闲0 static int MoveToBusy(CThread *pThread); // 线程开始执行,状态置为运行1private: staticvector m_vecTaskList; //任务列表 staticbool shutdown; //线程退出标志 intm_iThreadNum; //线程池中启动的线程数 staticvector m_vecThread; //线程列表 static pthread_mutex_t m_pthreadMutex; //线程同步锁 static pthread_cond_t m_pthreadCond; //线程同步的条件变量 }; #endif

#include "ThreadPool.h" #include #include using namespace std; #define MAX_TASK_NUM 100//最大任务数vector CThreadPool::m_vecTaskList; bool CThreadPool::shutdown = false; vector CThreadPool::m_vecThread; pthread_mutex_t CThreadPool::m_pthreadMutex = PTHREAD_MUTEX_INITIALIZER; pthread_cond_t CThreadPool::m_pthreadCond = PTHREAD_COND_INITIALIZER; /** * 线程池管理类构造函数 */ CThreadPool::CThreadPool(int threadNum) { this->m_iThreadNum = threadNum; cout << "threadNum:" << threadNum << " threads will be created." << endl; CreateThread(); //创建线程 }/** * 创建线程 */ int CThreadPool::CreateThread() { m_vecThread.resize(m_iThreadNum); for (size_t i = 0; i < m_vecThread.size(); i++) { pthread_create(&m_vecThread[i].pthread_id, NULL, ThreadFunc, &m_vecThread[i]); } return 0; }/** * 线程回调函数 */ void* CThreadPool::ThreadFunc(void* threadData) { CThread *pThread = (CThread*)threadData; while (1) { pthread_mutex_lock(&m_pthreadMutex); //lock只有第一个空闲的线程能获取到锁,其他空闲线程阻塞 while (m_vecTaskList.size() == 0 && !shutdown) { pthread_cond_wait(&m_pthreadCond, &m_pthreadMutex); // 第一个空闲的线程获取到锁后,阻塞等待信号量 /* pthread_cond_wait前要先加锁 pthread_cond_wait把线程放进阻塞队列后,内部会解锁,然后等待条件变量被其它线程唤醒 pthread_cond_wait被唤醒后会再自动加锁 */ }if (shutdown) { pthread_mutex_unlock(&m_pthreadMutex); cout << "thread:" << (long)&pThread->pthread_id << " will exit." << endl; pthread_exit(NULL); }//线程状态置1 MoveToBusy(pThread); //取出一个任务 CTask* task = NULL; vector::iterator iter = m_vecTaskList.begin(); if (iter != m_vecTaskList.end()) { task = *iter; m_vecTaskList.erase(iter); }pthread_mutex_unlock(&m_pthreadMutex); //unlock// 执行任务 if (task) { task->Run(); }// 任务执行完线程状态置0 MoveToIdle(pThread); } return (void*)0; }int CThreadPool::MoveToIdle(CThread *pThread) { vector::iterator iter_thread = std::find(m_vecThread.begin(), m_vecThread.end(), *pThread); if (iter_thread != m_vecThread.end()) { iter_thread->iStat = 0; cout << "tid:" << (long)&pThread->pthread_id << " idle." << endl; } return 0; }int CThreadPool::MoveToBusy(CThread *pThread) { vector::iterator iter_thread = std::find(m_vecThread.begin(), m_vecThread.end(), *pThread); if (iter_thread != m_vecThread.end()) { iter_thread->iStat = 1; cout << "tid:" << (long)&pThread->pthread_id << " run." << endl; } return 0; }bool CThreadPool::isAnyThreadBusy() { for (int i = 0; i < m_iThreadNum; i++) { if (1 == m_vecThread[i].iStat) return true; }return false; }/** * 往任务队列里边添加任务并发出线程同步信号 */ int CThreadPool::AddTask(CTask *task) { pthread_mutex_lock(&m_pthreadMutex); if (MAX_TASK_NUM > this->m_vecTaskList.size()) { this->m_vecTaskList.push_back(task); pthread_cond_signal(&m_pthreadCond); } else { pthread_mutex_unlock(&m_pthreadMutex); return -1; }pthread_mutex_unlock(&m_pthreadMutex); return 0; }/** * 获取当前队列中任务数 */ int CThreadPool::getTaskSize() { return m_vecTaskList.size(); }/** * 停止所有线程 */ int CThreadPool::StopAll() { /** 避免重复调用 */ if (shutdown) { return -1; }cout << "All threads will be stoped." << endl; /** 唤醒所有等待线程,线程池要销毁了 */ shutdown = true; pthread_cond_broadcast(&m_pthreadCond); /** 阻塞等待线程退出,否则就成僵尸了 */ for (size_t i = 0; i < m_vecThread.size(); i++) { pthread_join(m_vecThread[i].pthread_id, NULL); } m_vecThread.clear(); m_vecTaskList.clear(); /** 销毁条件变量和互斥体 */ pthread_mutex_destroy(&m_pthreadMutex); pthread_cond_destroy(&m_pthreadCond); return 0; }

#ifndef _TASK_H_ #define _TASK_H_#include #include #include #include #include #include #include using namespace std; /** * 执行任务的类,设置任务数据、定义执行方法(纯虚函数) */ class CTask { protected: string m_strTaskName; //任务的名称 char m_ptrData[1024]; //具体数据 public: CTask() {} CTask(string taskName) { m_strTaskName = taskName; memset(m_ptrData, 0, 1024); } virtual int Run() = 0; //任务执行方法 void SetData(constchar *data); //设置任务数据public: virtual ~CTask() {} }; class CMyTask : public CTask { public: CMyTask() {}inline int Run() { int n = 0; while(1) { cout << (char*)this->m_ptrData << endl; sleep(3); n++; if (n > 3) break; } return 0; } }; #endif

#include "task.h"void CTask::SetData(const char *data) { strcpy(m_ptrData, data); }

TARGET:=threadpool INC:= -I./ LIB_PATH:= LIB:= -lpthreadCFLAGS:=-Wall -g -O0 -D_REENTRANT -Wl,-rpath=./ $(INC) $(LIB_PATH) CPPFLAGS:=$(CFLAGS)SRC:=$(shell echo *.cpp) OBJ:=$(patsubst %.cpp,%.o,$(SRC))all: $(TARGET)$(TARGET): $(OBJ) $(CXX) $^ $(CFLAGS) $(LIB) -o $@clean: rm -f $(OBJ) rm -f $(TARGET)

