高并发|项目(高并发内存池)


目录

  • 项目介绍
    • 内存池介绍
      • 池化技术
      • 内存池
      • 内存池主要解决的问题
  • 定长内存池
    • 代码展示
    • 效果演示
  • 高并发内存池整体框架设计
    • 高并发内存池--thread cache
      • thread cache代码框架:
      • 自由链表的哈希桶跟对象大小的映射关系
    • 高并发内存池--central cache
      • central cache的工作过程
    • 高并发内存池--page cache

项目介绍 1.这个项目做的是什么?
当前项目是实现一个高并发的内存池,他的原型是google的一个开源项目tcmalloc,tcmalloc全称Thread-Caching Malloc,即线程缓存的malloc,实现了高效的多线程内存管理,用于替代系统的内存分配相关的函数(malloc、free)。
2.项目目标
模拟实现出一个自己的高并发内存池,在多线程环境下缓解了锁竞争问题,相比于malloc/free效率提高了25%左右,将内存碎片保持在10%左右。
内存池介绍 池化技术
所谓“池化技术”,就是程序先向系统申请过量的资源,然后自己管理,以备不时之需。之所以要申请过量的资源,是因为每次申请该资源都有较大的开销,不如提前申请好了,这样使用时就会变得非常快捷,大大提高程序运行效率。
在计算机中,有很多使用“池”这种技术的地方,除了内存池,还有连接池、线程池、对象池等。以服务器上的线程池为例,它的主要思想是:先启动若干数量的线程,让它们处于睡眠状态,当接收到客户端的请求时,唤醒池中某个睡眠的线程,让它来处理客户端的请求,当处理完这个请求,线程又进入睡眠状态。
内存池
内存池是指程序预先从操作系统申请一块足够大内存,此后,当程序中需要申请内存的时候,不是直接向操作系统申请,而是直接从内存池中获取;同理,当程序释放内存的时候,并不真正将内存返回给操作系统,而是返回内存池。当程序退出(或者特定时间)时,内存池才将之前申请的内存真正释放。
内存池主要解决的问题
内存池主要解决的当然是效率的问题,其次如果作为系统的内存分配器的角度,还需要解决一下内存碎片的问题。那么什么是内存碎片呢?
高并发|项目(高并发内存池)
文章图片

定长内存池 【高并发|项目(高并发内存池)】作为程序员(C/C++)我们知道申请内存使用的是malloc,malloc其实就是一个通用的大众货,什么场景下都可以用,但是什么场景下都可以用就意味着什么场景下都不会有很高的性能,下面我们就先来设计一个定长内存池做个开胃菜,当然这个定长内存池在我们后面的高并发内存池中也是有价值的,所以学习他目的有两层,先熟悉一下简单内存池是如何控制的,第二他会作为我们后面内存池的一个基础组件。
高并发|项目(高并发内存池)
文章图片

定长内存池之所以高效:是因为它可以切除固定大小的内存,供线程使用。还可以回收,线程释放的内存链接在自由链表中,供下一次线程申请内存使用。
高并发|项目(高并发内存池)
文章图片

代码展示
#pragma once#include #include #include #include using std::cout; using std::endl; // 直接去堆上按页申请空间 inline static void* SystemAlloc(size_t kpage) { void* ptr = VirtualAlloc(0, kpage << 13, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE); if (ptr == nullptr) throw std::bad_alloc(); return ptr; }template class ObjectPool { public: T* New() { T* obj = nullptr; // 优先把还回来内存块对象,再次重复利用 if (_freeList) { //头删 void* next = *((void**)_freeList); //将链表的第一个空间给obj使用,freeList存的就是第一个小内存的地址 obj = (T*)_freeList; _freeList = next; } else { // 剩余内存不够一个对象大小时,则重新开大块空间 if (_remainBytes < sizeof(T)) { _remainBytes = 128 * 1024; //16页 //_memory = (char*)malloc(_remainBytes); //SystemAlloc(x)直接向系统申请内存,x表示申请的页数 _memory = (char*)SystemAlloc(_remainBytes >> 13); //申请16页 if (_memory == nullptr) { throw std::bad_alloc(); } }obj = (T*)_memory; //一个对象的大小 ,小于指针大小,就给一个指针大小 size_t objSize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T); _memory += objSize; //指针往后走一个小块空间 _remainBytes -= objSize; //每用一小块空间,剩余空间更新 }// 定位new,显示调用T的构造函数初始化 new(obj)T; return obj; } void Delete(T* obj) { // 显示调用析构函数清理对象 obj->~T(); // 头插,将不用的小块空间,插入自由链表中 *(void**)obj = _freeList; //*(void**) 解引用拿到 void*,在32/64位下大小为 4/8 _freeList = obj; }private: char* _memory = nullptr; // 指向大块内存的指针(向系统申请的大块内存) size_t _remainBytes = 0; // 大块内存在切分过程中剩余字节数 void* _freeList = nullptr; // 还回来过程中链接的自由链表的头指针 }; struct TreeNode { int _val; TreeNode* _left; TreeNode* _right; TreeNode() :_val(0) , _left(nullptr) , _right(nullptr) {} }; void TestObjectPool() { // 申请释放的轮次 const size_t Rounds = 5; // 每轮申请释放多少次 const size_t N = 100000; std::vector v1; v1.reserve(N); size_t begin1 = clock(); for (size_t j = 0; j < Rounds; ++j) { for (int i = 0; i < N; ++i) { v1.push_back(new TreeNode); } for (int i = 0; i < N; ++i) { delete v1[i]; } v1.clear(); } size_t end1 = clock(); std::vector v2; v2.reserve(N); ObjectPool TNPool; size_t begin2 = clock(); for (size_t j = 0; j < Rounds; ++j) { for (int i = 0; i < N; ++i) { v2.push_back(TNPool.New()); } for (int i = 0; i < N; ++i) { TNPool.Delete(v2[i]); } v2.clear(); } size_t end2 = clock(); cout << "new cost time:" << end1 - begin1 << endl; cout << "object pool cost time:" << end2 - begin2 << endl; }int main() { TestObjectPool(); return 0; }

效果演示 高并发|项目(高并发内存池)
文章图片

可以看出,使用定长内存池,率率比使用malloc申请空间要高的多。
高并发内存池整体框架设计 现代很多的开发环境都是多核多线程,在申请内存的场景下,必然存在激烈的锁竞争问题。malloc本身其实已经很优秀,那么我们项目的原型tcmalloc就是在多线程高并发的场景下更胜一筹,所以这次我们实现的内存池需要考虑以下几方面的问题。
1. 性能问题。
2. 多线程环境下,锁竞争问题。
3. 内存碎片问题。
thread cache:线程缓存是每个线程独有的,用于小于256KB的内存的分配,线程从这里申请内存不需要加锁,每个线程独享一个cache,这也就是这个并发线程池高效的地方。
central cache:中心缓存是所有线程所共享,thread cache是按需从central cache中获取的对象。central cache合适的时机回收thread cache中的对象,避免一个线程占用了太多的内存,而其他线程的内存吃紧,达到内存分配在多个线程中更均衡的按需调度的目的。central cache是存在竞争的,所以从这里取内存对象是需要加锁,首先这里用的是桶锁,其次只有thread cache的没有内存对象时才会找central cache,所以这里竞争不会很激烈。
page cache:页缓存是在central cache缓存上面的一层缓存,存储的内存是以页为单位存储及分配的,central cache没有内存对象时,从page cache分配出一定数量的page,并切割成定长大小的小块内存,分配给central cache。当一个span的几个跨度页的对象都回收以后,page cache会回收central cache满足条件的span对象,并且合并相邻的页,组成更大的页,缓解内存碎片的问题。
高并发|项目(高并发内存池)
文章图片

高并发内存池–thread cache thread cache是哈希桶结构,每个桶是一个按桶位置映射大小的内存块对象的自由链表。每个线程都会有一个thread cache对象,这样每个线程在这里获取对象和释放对象时是无锁的
高并发|项目(高并发内存池)
文章图片

申请内存:
  1. 当内存申请size<=256KB,先获取到线程本地存储的thread cache对象,计算size映射的哈希桶自由链表下标i。
  2. 如果自由链表_freeLists[i]中有对象,则直接Pop一个内存对象返回。
  3. 如果_freeLists[i]中没有对象时,则批量从central cache中获取一定数量的对象,插入到自由链表并返回一个对象。
释放内存:
4. 当释放内存小于256k时将内存释放回thread cache,计算size映射自由链表桶位置i,将对象Push到_freeLists[i]。
5. 当链表的长度过长,则回收一部分内存对象到central cache。
如何保证线程可以创建属于自己的thread cache?
线程局部存储(TLS),是一种变量的存储方法,这个变量在它所在的线程内是全局可访问的,但是不能被其他线程访问到,这样就保持了数据的线程独立性。而熟知的全局变量,是所有线程都可以访问的,这样就不可避免需要锁来控制,增加了控制成本和代码复杂度。
thread cache代码框架:
#pragma once#include "Common.h"class ThreadCache { public: // 申请和释放内存对象 void* Allocate(size_t size); void Deallocate(void* ptr, size_t size); // 从中心缓存获取对象 void* FetchFromCentralCache(size_t index, size_t size); // 释放对象时,链表过长时,回收内存回到中心缓存 void ListTooLong(FreeList& list, size_t size); private: FreeList _freeLists[NFREELIST]; }; // TLS thread local storage(线程本地存储,每个线程都有自己的线程本地存储) //有了TLS,线程来访问就不需要加锁了,被static修饰,只在当前文件可见 static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr; // 管理切分好的小对象的自由链表 class FreeList { public: void Push(void* obj) { assert(obj); // 头插 //*(void**)obj = _freeList; //*(void**)obj取obj头上4个或8个字节指向_freeList NextObj(obj) = _freeList; _freeList = obj; ++_size; } void PushRange(void* start, void* end, size_t n) { NextObj(end) = _freeList; _freeList = start; // 测试验证+条件断点 /*int i = 0; void* cur = start; while (cur) { cur = NextObj(cur); ++i; }if (n != i) { int x = 0; }*/_size += n; } void PopRange(void*& start, void*& end, size_t n) { assert(n >= _size); start = _freeList; end = start; for (size_t i = 0; i < n - 1; ++i) { end = NextObj(end); }_freeList = NextObj(end); NextObj(end) = nullptr; _size -= n; } void* Pop() { assert(_freeList); // 头删 void* obj = _freeList; _freeList = NextObj(obj); --_size; return obj; } bool Empty() { return _freeList == nullptr; } size_t& MaxSize() { return _maxSize; } size_t Size() { return _size; }private: void* _freeList = nullptr; size_t _maxSize = 1; size_t _size = 0; };

自由链表的哈希桶跟对象大小的映射关系
1、内存对其
在此项目中,申请的内存小于256KB的,都会走三层缓存。那是不是我们要建立256KB个哈希桶呢?1-256KB都有对应的自由链表呢?
显然不是,那样的话太消耗资源。
本文采用内存对齐的方法,来建立哈希桶。
  • 申请字节数在[1,128]Byte,采用8字节对其。就是你申请1到8字节空间,就给你8字节的内存。申请9到16字节的空间,就给你16字节的内存。
  • 申请字节数在[1,1024]Byte,采用16字节对齐规则。
  • 申请字节数在[1K,8K]Byte,采用128字节对齐规则。
  • 申请字节数在[8K,64K]Byte,采用1K字节对齐规则。
  • 申请字节数在[64K,256K]Byte,采用8K字节对齐规则。
将内存碎片控制在11%左右。
例:
假设需要129字节内存,对其之后就是144字节。系统会分配144字节空间。那么就浪费了15个字节的空间。15/144=10.4%
2、怎么实现内存对其呢?
高并发|项目(高并发内存池)
文章图片

3、怎么实现对其齐后的数据向自由链表申请内存呢?
知道了对齐后的数据,怎么找到对应的自由链表呢
高并发|项目(高并发内存池)
文章图片

// 计算对象大小的对齐映射规则 class SizeClass { public: // 整体控制在最多10%左右的内碎片浪费 // [1,128]8byte对齐freelist[0,16) //假设需要129字节,会分配给你144字节给你,就有15字节的浪费 15/144=0.104 // [128+1,1024]16byte对齐freelist[16,72) //假设需要1025个字节,会分配给你1152字节给你,就有127字节的浪费 127/1152=0.11 // [1024+1,8*1024]128byte对齐freelist[72,128) // [8*1024+1,64*1024]1024byte对齐freelist[128,184) // [64*1024+1,256*1024]8*1024byte对齐freelist[184,208) /*size_t _RoundUp(size_t size, size_t alignNum) { size_t alignSize; if (size % alignNum != 0) { alignSize = (size / alignNum + 1)*alignNum; } else { alignSize = size; }return alignSize; }*/ // 1-8 static inline size_t _RoundUp(size_t bytes, size_t alignNum) { return ((bytes + alignNum - 1) & ~(alignNum - 1)); } static inline size_t RoundUp(size_t size) { if (size <= 128) { return _RoundUp(size, 8); } else if (size <= 1024) { return _RoundUp(size, 16); } else if (size <= 8*1024) { return _RoundUp(size, 128); } else if (size <= 64*1024) { return _RoundUp(size, 1024); } else if (size <= 256 * 1024) { return _RoundUp(size, 8*1024); } else//>256KB { return _RoundUp(size, 1<> align_shift) - 1; } // 计算映射的哪一个自由链表桶 static inline size_t Index(size_t bytes) { assert(bytes <= MAX_BYTES); // 每个区间有多少个链 static int group_array[4] = { 16, 56, 56, 56 }; if (bytes <= 128){ return _Index(bytes, 3); } else if (bytes <= 1024){ return _Index(bytes - 128, 4) + group_array[0]; } else if (bytes <= 8 * 1024){ return _Index(bytes - 1024, 7) + group_array[1] + group_array[0]; } else if (bytes <= 64 * 1024){ return _Index(bytes - 8 * 1024, 10) + group_array[2] + group_array[1] + group_array[0]; } else if (bytes <= 256 * 1024){ return _Index(bytes - 64 * 1024, 13) + group_array[3] + group_array[2] + group_array[1] + group_array[0]; } else{ assert(false); }return -1; }

高并发内存池–central cache central cache也是一个哈希桶结构,他的哈希桶的映射关系跟thread cache是一样的。不同的是他的每个哈希桶位置挂是SpanList链表结构,不过每个映射桶下面的span中的大内存块被按映射关系切成了一个个小内存块对象挂在span的自由链表中。
高并发|项目(高并发内存池)
文章图片

申请内存:
  1. 当thread cache中没有内存时,就会批量向central cache申请一些内存对象,这里的批量获取对象的数量使用了类似网络tcp协议拥塞控制的慢开始算法;central cache也有一个哈希映射的spanlist,spanlist中挂着span,从span中取出对象给thread cache,这个过程是需要加锁的,不过这里使用的是一个桶锁,尽可能提高效率。
  2. central cache映射的spanlist中所有span的都没有内存以后,则需要向page cache申请一个新的span对象,拿到span以后将span管理的内存按大小切好作为自由链表链接到一起。然后从span中取对象给thread cache。
  3. central cache的中挂的span中use_count记录分配了多少个对象出去,分配一个对象给threadcache,就++use_count
释放内存:
当thread_cache过长或者线程销毁,则会将内存释放回central cache中的,释放回来时–use_count。当use_count减到0时则表示所有对象都回到了span,则将span释放回page cache,page cache中会对前后相邻的空闲页进行合并。
高并发|项目(高并发内存池)
文章图片

CentralCache 代码框架:
#pragma once#include "Common.h"// 单例模式-饿汉(相当于一个全局静态变量) class CentralCache { public: static CentralCache* GetInstance()//获取一个实例对象,全局只有一个 { return &_sInst; } // 获取一个非空的span Span* GetOneSpan(SpanList& list, size_t byte_size); // 从中心缓存获取一定数量的对象给thread cache size_t FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size); // 将一定数量的对象释放到span跨度 void ReleaseListToSpans(void* start, size_t byte_size); private: SpanList _spanLists[NFREELIST]; private: CentralCache()//将构造函数私有,就不能随便创建对象 {} CentralCache(const CentralCache&) = delete; //拷贝构造也封死 static CentralCache _sInst; };

Span结构和Span的自由链表
// 管理多个连续页大块内存跨度结构 struct Span { PAGE_ID _pageId = 0; // 大块内存起始页的页号 size_t_n = 0; // 页的数量 Span* _next = nullptr; // 双向链表的结构 Span* _prev = nullptr; size_t _objSize = 0; // 切好的小对象的大小 size_t _useCount = 0; // 切好小块内存,被分配给thread cache的计数 void* _freeList = nullptr; // 切好的小块内存的自由链表 bool _isUse = false; // 是否在被使用 }; // 带头双向循环链表 class SpanList { public: SpanList() { _head = new Span; _head->_next = _head; _head->_prev = _head; } Span* Begin() { return _head->_next; } Span* End() { return _head; } bool Empty() { return _head->_next == _head; } void PushFront(Span* span) { Insert(Begin(), span); } Span* PopFront() { Span* front = _head->_next; Erase(front); return front; } void Insert(Span* pos, Span* newSpan) { assert(pos); assert(newSpan); Span* prev = pos->_prev; // prev newspan pos prev->_next = newSpan; newSpan->_prev = prev; newSpan->_next = pos; pos->_prev = newSpan; } void Erase(Span* pos) { assert(pos); assert(pos != _head); //不能把头删了// 1、条件断点 // 2、查看栈帧 /*if (pos == _head) { int x = 0; }*/Span* prev = pos->_prev; Span* next = pos->_next; prev->_next = next; next->_prev = prev; }private: Span* _head; public: std::mutex _mtx; // 桶锁,不同的线程访问同一个桶才会有竞争 };

central cache的工作过程
1、映射到对应的桶中,去查看对应的Spanlist上的span是否有空间。
2、去遍历Spanlist上不为空的span,如果span中的内存不够一个批量的,那么有多少小块内存,就给多少,如果够一个批量的内存,就切除一个批量的空间给thread cache
3、如果Spanlist上的所有span都没有空间,则central cache需要向下一层pagecache 申请空间。申请到之后,对申请的空间进行切分,分成和桶对应大小的字节,组成一个span,挂在对应Spanlist上。建议切分的时候,采用尾插的方式,这样组成的span,内存地址连续,分配出去之后,可以提高CPU缓存利用率。
高并发|项目(高并发内存池)
文章图片

central cache释放内存给thread cache,以及从thread cache回收内存框架
#include "CentralCache.h" #include "PageCache.h"CentralCache CentralCache::_sInst; // 获取一个非空的span Span* CentralCache::GetOneSpan(SpanList& list, size_t size) { // 查看当前的spanlist中是否有还有未分配对象的span Span* it = list.Begin(); while (it != list.End()) { if (it->_freeList != nullptr)//span下面有对象 { return it; } else//找下一个span { it = it->_next; } } // 先把central cache的桶锁解掉,这样如果其他线程释放内存对象回来,不会阻塞 list._mtx.unlock(); // 走到这里说没有空闲span了,只能找page cache要 PageCache::GetInstance()->_pageMtx.lock(); Span* span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(size)); span->_isUse = true; span->_objSize = size; PageCache::GetInstance()->_pageMtx.unlock(); // 对获取span进行切分,不需要加锁,因为这会其他线程访问不到这个span // 计算span的大块内存的起始地址和大块内存的大小(字节数) char* start = (char*)(span->_pageId << PAGE_SHIFT); size_t bytes = span->_n << PAGE_SHIFT; char* end = start + bytes; // 把大块内存切成自由链表链接起来 // 1、先切一块下来去做头,方便尾插 span->_freeList = start; start += size; void* tail = span->_freeList; int i = 1; while (start < end) { ++i; NextObj(tail) = start; tail = NextObj(tail); // tail = start; start += size; } NextObj(tail) = nullptr; // 1、条件断点 // 2、疑似死循环,可以中断程序,程序会在正在运行的地方停下来 //int j = 0; //void* cur = span->_freeList; //while (cur) //{ // cur = NextObj(cur); // ++j; //} //if (j != (bytes / size)) //{ // int x = 0; //} // 切好span以后,需要把span挂到桶里面去的时候,再加锁 list._mtx.lock(); list.PushFront(span); return span; }// 从中心缓存获取一定数量的对象给thread cache size_t CentralCache::FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size) { //算出是哪个桶的,size是单个对象大小 size_t index = SizeClass::Index(size); _spanLists[index]._mtx.lock(); //加锁 // 获取一个非空的span Span* span = GetOneSpan(_spanLists[index], size); assert(span); assert(span->_freeList); // 从span中获取batchNum个对象 // 如果不够batchNum个,有多少拿多少 start = span->_freeList; end = start; size_t i = 0; size_t actualNum = 1; //actualNum实际获取的对象 //往后走batchNum - 1步 while ( i < batchNum - 1 && NextObj(end) != nullptr) { end = NextObj(end); ++i; ++actualNum; } span->_freeList = NextObj(end); NextObj(end) = nullptr; span->_useCount += actualNum; 条件断点 int j = 0; void* cur = start; while (cur) { cur = NextObj(cur); ++j; } if (j != actualNum) { int x = 0; } _spanLists[index]._mtx.unlock(); return actualNum; }void CentralCache::ReleaseListToSpans(void* start, size_t size) { //先找到属于哪个桶 size_t index = SizeClass::Index(size); _spanLists[index]._mtx.lock(); //遍历list while (start) { void* next = NextObj(start); Span* span = PageCache::GetInstance()->MapObjectToSpan(start); NextObj(start) = span->_freeList; span->_freeList = start; span->_useCount--; // 说明span的切分出去的所有小块内存都回来了 // 这个span就可以再回收给page cache,pagecache可以再尝试去做前后页的合并 if (span->_useCount == 0) { _spanLists[index].Erase(span); //拿出span,此时span里的小块内存都是乱序的 span->_freeList = nullptr; //将span里的小内存地址都置空 span->_next = nullptr; span->_prev = nullptr; // 释放span给page cache时,使用page cache的锁就可以了 // 这时把桶锁解掉 _spanLists[index]._mtx.unlock(); PageCache::GetInstance()->_pageMtx.lock(); PageCache::GetInstance()->ReleaseSpanToPageCache(span); PageCache::GetInstance()->_pageMtx.unlock(); _spanLists[index]._mtx.lock(); }start = next; } _spanLists[index]._mtx.unlock(); }

高并发内存池–page cache 申请内存:
  1. 当central cache向page cache申请内存时,page cache先检查对应位置有没有span,如果没有则向更大页寻找一个span,如果找到则分裂成两个。比如:申请的是4页page,4页page后面没有挂span,则向后面寻找更大的span,假设在10页page位置找到一个span,则将10页pagespan分裂为一个4页page span和一个6页page span。
  2. 如果找到_spanList[128]都没有合适的span,则向系统使用mmap、brk或者是VirtualAlloc等方式申请128页page span挂在自由链表中,再重复步骤1中的过程。
  3. 需要注意的是central cache和page cache 的核心结构都是spanlist的哈希桶,但是他们是有本质区别的,central cache中哈希桶,是按跟thread cache一样的大小对齐关系映射的,他的spanlist中挂的span中的内存都被按映射关系切好链接成小块内存的自由链表。而page cache 中的spanlist则是按下标桶号映射的,也就是说第i号桶中挂的span都是i页内存。
释放内存:
如果central cache释放回一个span,则依次寻找span的前后page id的没有在使用的空闲span,看是否可以合并,如果合并继续向前寻找。这样就可以将切小的内存合并收缩成大的span,减少内存碎片。
高并发|项目(高并发内存池)
文章图片

page cache代码框架
#pragma once#include "Common.h" #include "ObjectPool.h" #include "PageMap.h"class PageCache { public: static PageCache* GetInstance() { return &_sInst; } // 获取从对象到span的映射 Span* MapObjectToSpan(void* obj); // 释放空闲span回到Pagecache,并合并相邻的span void ReleaseSpanToPageCache(Span* span); // 获取一个K页的span Span* NewSpan(size_t k); std::mutex _pageMtx; private: SpanList _spanLists[NPAGES]; ObjectPool> _spanPool; //std::unordered_map _idSpanMap; //std::map _idSpanMap; TCMalloc_PageMap1<32 - PAGE_SHIFT> _idSpanMap; PageCache() {} PageCache(const PageCache&) = delete; static PageCache _sInst; };

    推荐阅读