C++实现线程池 一、业务流程图与说明
1、主业务线程
(1)创建线程池
(2)判断任务状态,关闭线程池
2、线程池中的线程
(1)无任务时阻塞
(2)有任务时执行
文章图片
二、实现代码
1、main.cpp
#include "ThreadPool.h"int main()
{
CThreadPool threadPool(10);
CMyTask taskObj[100];
char szTmp[1024] = {0};
int i = 0;
for (i = 0;
i < 10;
i++)
{
snprintf(szTmp, sizeof(szTmp), "this is the [%d] thread running", i);
taskObj[i].SetData(szTmp);
threadPool.AddTask(&taskObj[i]);
}int n = 0;
while (1)
{
int taskSize = threadPool.getTaskSize();
if (0 == taskSize && !threadPool.isAnyThreadBusy())
{
if (threadPool.StopAll() == 0)
{
cout << "Now I will exit from main" << endl;
return 0;
}
}// 添加新的任务
sleep(1);
if (n % 4)
{
snprintf(szTmp, sizeof(szTmp), "this is the [%d] thread running", n+10);
taskObj[1].SetData(szTmp);
threadPool.AddTask(&taskObj[1]);
}
n++;
}
return 0;
}
2、ThreadPool.h
#ifndef __THREADPOOL_H
#define __THREADPOOL_H#include "task.h"/**
* 线程结构体
*/
struct CThread
{
pthread_t pthread_id;
//线程id
int iStat;
//线程状态CThread():iStat(0) { }bool operator == (const CThread &obj) const
{
return (long)&pthread_id == (long)&obj.pthread_id;
}
};
/**
* 线程池管理类的实现
*/
class CThreadPool
{
public:
CThreadPool(int threadNum = 10);
int AddTask(CTask *task);
// 把任务添加到任务队列中
int getTaskSize();
// 获取当前任务队列中的任务数
int StopAll();
// 使线程池中的线程退出
bool isAnyThreadBusy();
// 是否有处于繁忙状态的线程
int getTaskNum();
// 获取当前等待执行的任务数protected:
int CreateThread();
// 创建线程池中的线程
static void* ThreadFunc(void * threadData);
// 新线程的线程回调函数
static int MoveToIdle(CThread *pThread);
// 线程执行结束后,状态置为空闲0
static int MoveToBusy(CThread *pThread);
// 线程开始执行,状态置为运行1private:
staticvector m_vecTaskList;
//任务列表
staticbool shutdown;
//线程退出标志
intm_iThreadNum;
//线程池中启动的线程数
staticvector m_vecThread;
//线程列表
static pthread_mutex_t m_pthreadMutex;
//线程同步锁
static pthread_cond_t m_pthreadCond;
//线程同步的条件变量
};
#endif
3、ThreadPool.cpp
#include "ThreadPool.h"
#include
#include
using namespace std;
#define MAX_TASK_NUM 100//最大任务数vector CThreadPool::m_vecTaskList;
bool CThreadPool::shutdown = false;
vector CThreadPool::m_vecThread;
pthread_mutex_t CThreadPool::m_pthreadMutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t CThreadPool::m_pthreadCond = PTHREAD_COND_INITIALIZER;
/**
* 线程池管理类构造函数
*/
CThreadPool::CThreadPool(int threadNum)
{
this->m_iThreadNum = threadNum;
cout << "threadNum:" << threadNum << " threads will be created." << endl;
CreateThread();
//创建线程
}/**
* 创建线程
*/
int CThreadPool::CreateThread()
{
m_vecThread.resize(m_iThreadNum);
for (size_t i = 0;
i < m_vecThread.size();
i++)
{
pthread_create(&m_vecThread[i].pthread_id, NULL, ThreadFunc, &m_vecThread[i]);
}
return 0;
}/**
* 线程回调函数
*/
void* CThreadPool::ThreadFunc(void* threadData)
{
CThread *pThread = (CThread*)threadData;
while (1)
{
pthread_mutex_lock(&m_pthreadMutex);
//lock只有第一个空闲的线程能获取到锁,其他空闲线程阻塞
while (m_vecTaskList.size() == 0 && !shutdown)
{
pthread_cond_wait(&m_pthreadCond, &m_pthreadMutex);
// 第一个空闲的线程获取到锁后,阻塞等待信号量
/*
pthread_cond_wait前要先加锁
pthread_cond_wait把线程放进阻塞队列后,内部会解锁,然后等待条件变量被其它线程唤醒
pthread_cond_wait被唤醒后会再自动加锁
*/
}if (shutdown)
{
pthread_mutex_unlock(&m_pthreadMutex);
cout << "thread:" << (long)&pThread->pthread_id << " will exit." << endl;
pthread_exit(NULL);
}//线程状态置1
MoveToBusy(pThread);
//取出一个任务
CTask* task = NULL;
vector::iterator iter = m_vecTaskList.begin();
if (iter != m_vecTaskList.end())
{
task = *iter;
m_vecTaskList.erase(iter);
}pthread_mutex_unlock(&m_pthreadMutex);
//unlock// 执行任务
if (task)
{
task->Run();
}// 任务执行完线程状态置0
MoveToIdle(pThread);
}
return (void*)0;
}int CThreadPool::MoveToIdle(CThread *pThread)
{
vector::iterator iter_thread = std::find(m_vecThread.begin(), m_vecThread.end(), *pThread);
if (iter_thread != m_vecThread.end())
{
iter_thread->iStat = 0;
cout << "tid:" << (long)&pThread->pthread_id << " idle." << endl;
}
return 0;
}int CThreadPool::MoveToBusy(CThread *pThread)
{
vector::iterator iter_thread = std::find(m_vecThread.begin(), m_vecThread.end(), *pThread);
if (iter_thread != m_vecThread.end())
{
iter_thread->iStat = 1;
cout << "tid:" << (long)&pThread->pthread_id << " run." << endl;
}
return 0;
}bool CThreadPool::isAnyThreadBusy()
{
for (int i = 0;
i < m_iThreadNum;
i++)
{
if (1 == m_vecThread[i].iStat)
return true;
}return false;
}/**
* 往任务队列里边添加任务并发出线程同步信号
*/
int CThreadPool::AddTask(CTask *task)
{
pthread_mutex_lock(&m_pthreadMutex);
if (MAX_TASK_NUM > this->m_vecTaskList.size())
{
this->m_vecTaskList.push_back(task);
pthread_cond_signal(&m_pthreadCond);
}
else
{
pthread_mutex_unlock(&m_pthreadMutex);
return -1;
}pthread_mutex_unlock(&m_pthreadMutex);
return 0;
}/**
* 获取当前队列中任务数
*/
int CThreadPool::getTaskSize()
{
return m_vecTaskList.size();
}/**
* 停止所有线程
*/
int CThreadPool::StopAll()
{
/** 避免重复调用 */
if (shutdown)
{
return -1;
}cout << "All threads will be stoped." << endl;
/** 唤醒所有等待线程,线程池要销毁了 */
shutdown = true;
pthread_cond_broadcast(&m_pthreadCond);
/** 阻塞等待线程退出,否则就成僵尸了 */
for (size_t i = 0;
i < m_vecThread.size();
i++)
{
pthread_join(m_vecThread[i].pthread_id, NULL);
}
m_vecThread.clear();
m_vecTaskList.clear();
/** 销毁条件变量和互斥体 */
pthread_mutex_destroy(&m_pthreadMutex);
pthread_cond_destroy(&m_pthreadCond);
return 0;
}
4、task.h
#ifndef _TASK_H_
#define _TASK_H_#include
#include
#include
#include #include
#include
#include using namespace std;
/**
* 执行任务的类,设置任务数据、定义执行方法(纯虚函数)
*/
class CTask
{
protected:
string m_strTaskName;
//任务的名称
char m_ptrData[1024];
//具体数据
public:
CTask() {}
CTask(string taskName)
{
m_strTaskName = taskName;
memset(m_ptrData, 0, 1024);
}
virtual int Run() = 0;
//任务执行方法
void SetData(constchar *data);
//设置任务数据public:
virtual ~CTask() {}
};
class CMyTask : public CTask
{
public:
CMyTask() {}inline int Run()
{
int n = 0;
while(1)
{
cout << (char*)this->m_ptrData << endl;
sleep(3);
n++;
if (n > 3)
break;
}
return 0;
}
};
#endif
5、task.cpp
#include "task.h"void CTask::SetData(const char *data)
{
strcpy(m_ptrData, data);
}
6、Makefile
TARGET:=threadpool
INC:= -I./
LIB_PATH:=
LIB:= -lpthreadCFLAGS:=-Wall -g -O0 -D_REENTRANT -Wl,-rpath=./ $(INC) $(LIB_PATH)
CPPFLAGS:=$(CFLAGS)SRC:=$(shell echo *.cpp)
OBJ:=$(patsubst %.cpp,%.o,$(SRC))all: $(TARGET)$(TARGET): $(OBJ)
$(CXX) $^ $(CFLAGS) $(LIB) -o $@clean:
rm -f $(OBJ)
rm -f $(TARGET)
欢迎转载请注明出处:海漩涡
【C++实现线程池示例】http://blog.csdn.net/tanhuifang520
推荐阅读
- Linux|109 个实用 shell 脚本
- linux笔记|linux 常用命令汇总(面向面试)
- Linux|Linux--网络基础
- linux|apt update和apt upgrade命令 - 有什么区别()
- linux|2022年云原生趋势
- Go|Docker后端部署详解(Go+Nginx)
- 开源生态|GPL、MIT、Apache...开发者如何选择开源协议(一文讲清根本区别)
- GitHub|7 款可替代 top 命令的工具