- /*
- * 文件开头先啰嗦几句:
- *
- * thread.c文件代表的是线程模块。但是你会看到这个模块里面有很多其它方法,
- 例如关于item的各种操作函数,item_alloc,item_remove,item_link等等。
- 我们有个items模块,这些不都是items模块要做的事情吗?为什么thread模块也有?
- 你仔细看会发现,thread里面的这种函数,例如item_remove,items模块里面
- 都会有一个对应的do_item_remove函数,而thread中的item_remove仅仅是调用
- items模块中的do_item_remove,唯一多出来的就是thread在do_item_remove前后
- 加了加锁和解锁的操作。
- 其实这是很好的一种设计。
- 1)因为像"删除item"这样的一个逻辑都是由某个线程,而且这里是工作线程执行,
- 所以这是一个线程层面的事情。就是说是“某个工作线程去删除item”这样一件事。
- 2)更重要的是原子性及一致性问题,某个item数据,很有可能同时多个线程在修改,
- 那么需要加锁,那么锁最应该加在哪个地方?既然问题是线程引起的,那么负责
- 解决的无疑是线程模块。
- 3)所以这里像这种函数,thread此时相当于是items的外壳,起调控作用,在线程层面
- 开放给外部调用,同时在内部加锁。而items模块里面定义的do_xxx函数都不需要多
- 加考虑,无条件执行对item进行修改,而由外部被调用方来控制。相信很多需要加锁
- 的项目都会面临这样的问题:锁应该加在哪一层?可以参考memcached这样的设计。
- *
- */
- #include"memcached.h"
- #include<assert.h>
- #include<stdio.h>
- #include<errno.h>
- #include<stdlib.h>
- #include<errno.h>
- #include<string.h>
- #include<pthread.h>
- #ifdef__sun
- #include<atomic.h>
- #endif
- #defineITEMS_PER_ALLOC 64
- /**
- 下面这个CQ_ITEM结构体:
- 可以这么理解,主线程accept连接后,把client fd
- 分发到worker线程的同时会顺带一些与此client连接相关的信息,
- 而CQ_ITEM是包装了这些信息的一个对象,有点"参数对象"的概念。
- 记住这货是主线程那边丢过来的。
- CQ_ITEM中的CQ虽然是connection queue的缩写,
- 它与memcached.h中定义的conn结构体是完全不一样的概念,
- 但worker线程会利用这个CQ_ITEM对象去初始化conn对象
- */
- typedefstructconn_queue_item CQ_ITEM;
- structconn_queue_item {
- intsfd;
- enumconn_states init_state;
- intevent_flags;
- intread_buffer_size;
- enumnetwork_transport transport;
- CQ_ITEM *next;
- };
- /*
- 上面的CQ_ITEM的队列对象,每个worker线程对象都保存着这样一个队列,处理
- 主线程那边分发过来的连接请求时用到。
- */
- typedefstructconn_queue CQ;
- structconn_queue {
- CQ_ITEM *head;
- CQ_ITEM *tail;
- pthread_mutex_tlock;
- };
- //下面是各种锁
- /**
- 个人认为这个锁用于锁住全局数量不变的对象,例如slabclass,LRU链表等等
- 区别于item锁,由于item对象是动态增长的,数量非常多,
- item锁是用hash的方式分配一张大大的item锁表来控制锁的粒度
- */
- pthread_mutex_tcache_lock;
- pthread_mutex_tconn_lock =PTHREAD_MUTEX_INITIALIZER;//连接锁
- #if !defined(HAVE_GCC_ATOMICS) && !defined(__sun)
- pthread_mutex_tatomics_mutex =PTHREAD_MUTEX_INITIALIZER;
- #endif
- staticpthread_mutex_tstats_lock;//统计锁
- staticCQ_ITEM *cqi_freelist;
- staticpthread_mutex_tcqi_freelist_lock;
- staticpthread_mutex_t*item_locks;//item锁
- staticuint32_titem_lock_count;//item锁总数
- staticunsignedintitem_lock_hashpower;//item锁的hash表 指数,锁总数为2的item_lock_hashpower个,见下面的hashsize
- #definehashsize(n)((unsignedlongint)1<<(n))
- #definehashmask(n)(hashsize(n)-1)
- staticpthread_mutex_titem_global_lock;
- staticpthread_key_titem_lock_type_key;
- staticLIBEVENT_DISPATCHER_THREAD dispatcher_thread;
- staticLIBEVENT_THREAD *threads;
- staticintinit_count =0;//有多少个worker线程已经被初始化
- staticpthread_mutex_tinit_lock;//初始化锁
- staticpthread_cond_tinit_cond;//初始化条件变量
- staticvoidthread_libevent_process(intfd,shortwhich,void*arg);
- //引用计数加1
- unsignedshortrefcount_incr(unsignedshort*refcount){
- #ifdefHAVE_GCC_ATOMICS
- return__sync_add_and_fetch(refcount,1);
- #elifdefined(__sun)
- returnatomic_inc_ushort_nv(refcount);
- #else
- unsignedshortres;
- mutex_lock(&atomics_mutex);
- (*refcount)++;
- res =*refcount;
- mutex_unlock(&atomics_mutex);
- returnres;
- #endif
- }
- //引用计数减1
- unsignedshortrefcount_decr(unsignedshort*refcount){
- #ifdefHAVE_GCC_ATOMICS
- return__sync_sub_and_fetch(refcount,1);
- #elifdefined(__sun)
- returnatomic_dec_ushort_nv(refcount);
- #else
- unsignedshortres;
- mutex_lock(&atomics_mutex);
- (*refcount)--;
- res =*refcount;
- mutex_unlock(&atomics_mutex);
- returnres;
- #endif
- }
- voiditem_lock_global(void){
- mutex_lock(&item_global_lock);
- }
- voiditem_unlock_global(void){
- mutex_unlock(&item_global_lock);
- }
- voiditem_lock(uint32_thv){
- uint8_t*lock_type =pthread_getspecific(item_lock_type_key);
- if(likely(*lock_type ==ITEM_LOCK_GRANULAR)){
- mutex_lock(&item_locks[hv &hashmask(item_lock_hashpower)]);
- }else{
- mutex_lock(&item_global_lock);
- }
- }
- void*item_trylock(uint32_thv){
- pthread_mutex_t*lock=&item_locks[hv &hashmask(item_lock_hashpower)];
- if(pthread_mutex_trylock(lock)==0){
- returnlock;
- }
- returnNULL;
- }
- voiditem_trylock_unlock(void*lock){
- mutex_unlock((pthread_mutex_t*)lock);
- }
- voiditem_unlock(uint32_thv){
- uint8_t*lock_type =pthread_getspecific(item_lock_type_key);
- if(likely(*lock_type ==ITEM_LOCK_GRANULAR)){
- mutex_unlock(&item_locks[hv &hashmask(item_lock_hashpower)]);
- }else{
- mutex_unlock(&item_global_lock);
- }
- }
- staticvoidwait_for_thread_registration(intnthreads){
- while(init_count <nthreads){
- pthread_cond_wait(&init_cond,&init_lock);//主线程利用条件变量等待所有worker线程启动完毕
- }
- }
- //worker线程注册函数,主要是统计worker线程完成初始化个数。
- staticvoidregister_thread_initialized(void){
- pthread_mutex_lock(&init_lock);
- init_count++;
- pthread_cond_signal(&init_cond);
- pthread_mutex_unlock(&init_lock);
- }
- //item锁的粒度有几种,这里是切换类型
- voidswitch_item_lock_type(enumitem_lock_types type){
- charbuf[1];
- inti;
- switch(type){
- caseITEM_LOCK_GRANULAR:
- buf[0]='l';
- break;
- caseITEM_LOCK_GLOBAL:
- buf[0]='g';
- break;
- default:
- fprintf(stderr,"Unknown lock type: %d\n",type);
- assert(1==0);
- break;
- }
- pthread_mutex_lock(&init_lock);
- init_count =0;
- for(i =0;i <settings.num_threads;i++){
- if(write(threads[i].notify_send_fd,buf,1)!=1){
- perror("Failed writing to notify pipe");
- /* TODO: This is a fatal problem. Can it ever happen temporarily? */
- }
- }
- wait_for_thread_registration(settings.num_threads);
- pthread_mutex_unlock(&init_lock);
- }
- /*
- * Initializes a connection queue.
- 初始化一个CQ对象,CQ结构体和CQ_ITEM结构体的作用见它们定义处。
- */
- staticvoidcq_init(CQ *cq){
- pthread_mutex_init(&cq->lock,NULL);
- cq->head =NULL;
- cq->tail =NULL;
- }
- /**
- 从worker线程的CQ队列里面pop出一个CQ_ITEM对象
- */
- staticCQ_ITEM *cq_pop(CQ *cq){
- CQ_ITEM *item;
- pthread_mutex_lock(&cq->lock);
- item =cq->head;
- if(NULL !=item){
- cq->head =item->next;
- if(NULL ==cq->head)
- cq->tail =NULL;
- }
- pthread_mutex_unlock(&cq->lock);
- returnitem;
- }
- /**
- push一个CQ_ITEM对象到worker线程的CQ队列中
- */
- staticvoidcq_push(CQ *cq,CQ_ITEM *item){
- item->next=NULL;
- pthread_mutex_lock(&cq->lock);
- if(NULL ==cq->tail)
- cq->head =item;
- else
- cq->tail->next=item;
- cq->tail =item;
- pthread_mutex_unlock(&cq->lock);
- }
- /*
- * Returns a fresh connection queue item.
- 分配一个CQ_ITEM对象
- */
- staticCQ_ITEM *cqi_new(void){
- CQ_ITEM *item =NULL;
- pthread_mutex_lock(&cqi_freelist_lock);
- if(cqi_freelist){
- item =cqi_freelist;
- cqi_freelist =item->next;
- }
- pthread_mutex_unlock(&cqi_freelist_lock);
- if(NULL ==item){
- inti;
- /* Allocate a bunch of items at once to reduce fragmentation */
- item =malloc(sizeof(CQ_ITEM)*ITEMS_PER_ALLOC);
- if(NULL ==item){
- STATS_LOCK();
- stats.malloc_fails++;
- STATS_UNLOCK();
- returnNULL;
- }
- for(i =2;i <ITEMS_PER_ALLOC;i++)
- item[i -1].next=&item[i];
- pthread_mutex_lock(&cqi_freelist_lock);
- item[ITEMS_PER_ALLOC -1].next=cqi_freelist;
- cqi_freelist =&item[1];
- pthread_mutex_unlock(&cqi_freelist_lock);
- }
- returnitem;
- }
- /*
- * Frees a connection queue item (adds it to the freelist.)
- */
- staticvoidcqi_free(CQ_ITEM *item){
- pthread_mutex_lock(&cqi_freelist_lock);
- item->next=cqi_freelist;
- cqi_freelist =item;
- pthread_mutex_unlock(&cqi_freelist_lock);
- }
- /*
- 创建并启动worker线程,在thread_init主线程初始化时调用
- */
- staticvoidcreate_worker(void*(*func)(void*),void*arg){
- pthread_tthread;
- pthread_attr_tattr;
- intret;
- pthread_attr_init(&attr);
- if((ret =pthread_create(&thread,&attr,func,arg))!=0){
- fprintf(stderr,"Can't create thread: %s\n",
- strerror(ret));
- exit(1);
- }
- }
- voidaccept_new_conns(constbooldo_accept){
- pthread_mutex_lock(&conn_lock);
- do_accept_new_conns(do_accept);
- pthread_mutex_unlock(&conn_lock);
- }
- /****************************** LIBEVENT THREADS *****************************/
- /*
- * 装备worker线程,worker线程的event_base在此设置
- */
- staticvoidsetup_thread(LIBEVENT_THREAD *me){
- me->base=event_init();//为每个worker线程分配自己的event_base
- if(!me->base){
- fprintf(stderr,"Can't allocate event base\n");
- exit(1);
- }
- /* Listen for notifications from other threads */
- event_set(&me->notify_event,me->notify_receive_fd,
- EV_READ |EV_PERSIST,thread_libevent_process,me);//监听管道接收fd,这里即监听
- //来自主线程的消息,事件处理函数为thread_libevent_process
- event_base_set(me->base,&me->notify_event);
- if(event_add(&me->notify_event,0)==-1){
- fprintf(stderr,"Can't monitor libevent notify pipe\n");
- exit(1);
- }
- me->new_conn_queue =malloc(sizeof(structconn_queue));//CQ_ITEM队列
- if(me->new_conn_queue ==NULL){
- perror("Failed to allocate memory for connection queue");
- exit(EXIT_FAILURE);
- }
- cq_init(me->new_conn_queue);//初始化CQ_ITEM对象队列
- if(pthread_mutex_init(&me->stats.mutex,NULL)!=0){
- perror("Failed to initialize mutex");
- exit(EXIT_FAILURE);
- }
- me->suffix_cache =cache_create("suffix",SUFFIX_SIZE,sizeof(char*),
- NULL,NULL);
- if(me->suffix_cache ==NULL){
- fprintf(stderr,"Failed to create suffix cache\n");
- exit(EXIT_FAILURE);
- }
- }
- /*
- * 这里主要是让worker线程进入event_base_loop
- */
- staticvoid*worker_libevent(void*arg){
- LIBEVENT_THREAD *me =arg;
- /* Any per-thread setup can happen here; thread_init() will block until
- * all threads have finished initializing.
- */
- /* set an indexable thread-specific memory item for the lock type.
- * this could be unnecessary if we pass the conn *c struct through
- * all item_lock calls...
- */
- me->item_lock_type =ITEM_LOCK_GRANULAR;
- pthread_setspecific(item_lock_type_key,&me->item_lock_type);
- //每一个worker线程进入loop,全局init_count++操作,
- //见thread_init函数后面几行代码和wait_for_thread_registration函数,
- //主线程通过init_count来确认所有线程都启动完毕。
- register_thread_initialized();
- event_base_loop(me->base,0);
- returnNULL;
- }
- //主线程分发client fd给worker线程后,同时往管道写入buf,唤醒worker线程调用此函数
- staticvoidthread_libevent_process(intfd,shortwhich,void*arg){
- LIBEVENT_THREAD *me =arg;
- CQ_ITEM *item;
- charbuf[1];
- if(read(fd,buf,1)!=1)
- if(settings.verbose >0)
- fprintf(stderr,"Can't read from libevent pipe\n");
- switch(buf[0]){
- case'c':
- item =cq_pop(me->new_conn_queue);//取出主线程丢过来的CQ_ITEM
- if(NULL !=item){
- /*
- worker线程创建 conn连接对象,注意由主线程丢过来的CQ_ITEM的init_state为conn_new_cmd (TCP情况下)
- */
- conn *c =conn_new(item->sfd,item->init_state,item->event_flags,
- item->read_buffer_size,item->transport,me->base);
- if(c ==NULL){
- if(IS_UDP(item->transport)){
- fprintf(stderr,"Can't listen for events on UDP socket\n");
- exit(1);
- }else{
- if(settings.verbose >0){
- fprintf(stderr,"Can't listen for events on fd %d\n",
- item->sfd);
- }
- close(item->sfd);
- }
- }else{
- c->thread =me;//设置监听连接的线程为当前worker线程
- }
- cqi_free(item);
- }
- break;
- /* we were told to flip the lock type and report in */
- case'l':
- me->item_lock_type =ITEM_LOCK_GRANULAR;
- register_thread_initialized();
- break;
- case'g':
- me->item_lock_type =ITEM_LOCK_GLOBAL;
- register_thread_initialized();
- break;
- }
- }
- voiddispatch_conn_new(intsfd,enumconn_states init_state,intevent_flags,
- intread_buffer_size,enumnetwork_transport transport){
- /**
- 这下面有一个CQ_ITEM结构体,可以这么理解,主线程accept连接后,把client fd
- 分发到worker线程的同时会顺带一些与此client连接相关的信息,例如dispatch_conn_new的形参上面列的,
- 而CQ_ITEM是包装了这些信息的一个对象。
- CQ_ITEM中的CQ是connection queue的缩写,但它与conn结构体是完全不一样的概念,CQ_ITEM仅仅是把client连接相关的信息
- 打包成一个对象而已。
- */
- CQ_ITEM *item =cqi_new();
- charbuf[1];
- if(item ==NULL){
- close(sfd);
- /* given that malloc failed this may also fail, but let's try */
- fprintf(stderr,"Failed to allocate memory for connection object\n");
- return;
- }
- inttid =(last_thread +1)%settings.num_threads;
- LIBEVENT_THREAD *thread =threads +tid;//通过简单的轮叫方式选择处理当前client fd的worker线程
- last_thread =tid;
- //初始化CQ_ITEM对象,即把信息包装
- item->sfd =sfd;
- item->init_state =init_state;
- item->event_flags =event_flags;
- item->read_buffer_size =read_buffer_size;
- item->transport =transport;
- cq_push(thread->new_conn_queue,item);//每个worker线程保存着所有被分发给自己的CQ_ITEM,即new_conn_queue
- MEMCACHED_CONN_DISPATCH(sfd,thread->thread_id);
- /*
- 主线程向处理当前client fd的worker线程管道中简单写进一个'c'字符,
- 由于每个worker线程都监听了管道的receive_fd,于是相应的worker进程收到事件通知,
- 触发注册的handler,即thread_libevent_process
- */
- buf[0]='c';
- if(write(thread->notify_send_fd,buf,1)!=1){
- perror("Writing to thread notify pipe");
- }
- }
- intis_listen_thread(){
- returnpthread_self()==dispatcher_thread.thread_id;
- }
- /********************************* ITEM ACCESS *******************************/
- /**
- 下面是一堆关于item操作的函数,具体逻辑代码都放在items::do_xxx相应的地方
- 就像本文件开头说的,这里主要是加了锁而已
- */
- /*
- * Allocates a new item.
- 分配item空间
- */
- item *item_alloc(char*key,size_tnkey,intflags,rel_time_texptime,intnbytes){
- item *it;
- /* do_item_alloc handles its own locks */
- /**
- 这里比较特殊,与其它item_xxx函数不一样,这里把锁放在do_item_alloc里面做了。
- 个人猜测是因为do_item_alloc这个逻辑实在有点复杂,甚至加解锁有可能在某个if条件下要发
- 生,加解锁和逻辑本身代码耦合,所以外部不好加锁。因此把锁交给do_item_alloc内部进行考虑。
- */
- it =do_item_alloc(key,nkey,flags,exptime,nbytes,0);
- returnit;
- }
- /*
- * Returns an item if it hasn't been marked as expired,
- * lazy-expiring as needed.
- 取得item,上面这里有句英文注释,说返回不超时的item,因为memcached并没有做实时或者定时把
- 超时item清掉的逻辑,而是用了延迟超时。就是当要用这个item的时候,再来针对这个item做超时处理
- */
- item *item_get(constchar*key,constsize_tnkey){
- item *it;
- uint32_thv;
- hv =hash(key,nkey);
- item_lock(hv);
- it =do_item_get(key,nkey,hv);
- item_unlock(hv);
- returnit;
- }
- item *item_touch(constchar*key,size_tnkey,uint32_texptime){
- item *it;
- uint32_thv;
- hv =hash(key,nkey);
- item_lock(hv);
- it =do_item_touch(key,nkey,exptime,hv);
- item_unlock(hv);
- returnit;
- }
- /*
- * Links an item into the LRU and hashtable.
- */
- intitem_link(item *item){
- intret;
- uint32_thv;
- hv =hash(ITEM_key(item),item->nkey);
- item_lock(hv);
- ret =do_item_link(item,hv);
- item_unlock(hv);
- returnret;
- }
- voiditem_remove(item *item){
- uint32_thv;
- hv =hash(ITEM_key(item),item->nkey);
- item_lock(hv);
- do_item_remove(item);
- item_unlock(hv);
- }
- intitem_replace(item *old_it,item *new_it,constuint32_thv){
- returndo_item_replace(old_it,new_it,hv);
- }
- /*
- * Unlinks an item from the LRU and hashtable.
- * 见items::item_unlink
- */
- voiditem_unlink(item *item){
- uint32_thv;
- hv =hash(ITEM_key(item),item->nkey);
- item_lock(hv);
- do_item_unlink(item,hv);
- item_unlock(hv);
- }
- /**
- 主要作用是重置在最近使用链表中的位置,更新最近使用时间,见items::do_item_update
- */
- voiditem_update(item *item){
- uint32_thv;
- hv =hash(ITEM_key(item),item->nkey);
- item_lock(hv);
- do_item_update(item);
- item_unlock(hv);
- }
- enumdelta_result_type add_delta(conn *c,constchar*key,
- constsize_tnkey,intincr,
- constint64_tdelta,char*buf,
- uint64_t*cas){
- enumdelta_result_type ret;
- uint32_thv;
- hv =hash(key,nkey);
- item_lock(hv);
- ret =do_add_delta(c,key,nkey,incr,delta,buf,cas,hv);
- item_unlock(hv);
- returnret;
- }
- /*
- * Stores an item in the cache (high level, obeys set/add/replace semantics)
- * 保存item信息,主要是调用items::do_store_item,但由于是多线程,所以需求加锁
- * store_item是线程上的操作,所以写在thread模块,在此对外开放,而内部加锁。
- * 除了store_item函数,其它关于item的操作均如此。
- */
- enumstore_item_type store_item(item *item,intcomm,conn*c){
- enumstore_item_type ret;
- uint32_thv;
- hv =hash(ITEM_key(item),item->nkey);//锁住item
- item_lock(hv);
- ret =do_store_item(item,comm,c,hv);
- item_unlock(hv);
- returnret;
- }
- voiditem_flush_expired(){
- mutex_lock(&cache_lock);
- do_item_flush_expired();
- mutex_unlock(&cache_lock);
- }
- char*item_cachedump(unsignedintslabs_clsid,unsignedintlimit,unsignedint*bytes){
- char*ret;
- mutex_lock(&cache_lock);
- ret =do_item_cachedump(slabs_clsid,limit,bytes);
- mutex_unlock(&cache_lock);
- returnret;
- }
- voiditem_stats(ADD_STAT add_stats,void*c){
- mutex_lock(&cache_lock);
- do_item_stats(add_stats,c);
- mutex_unlock(&cache_lock);
- }
- voiditem_stats_totals(ADD_STAT add_stats,void*c){
- mutex_lock(&cache_lock);
- do_item_stats_totals(add_stats,c);
- mutex_unlock(&cache_lock);
- }
- voiditem_stats_sizes(ADD_STAT add_stats,void*c){
- mutex_lock(&cache_lock);
- do_item_stats_sizes(add_stats,c);
- mutex_unlock(&cache_lock);
- }
- /******************************* GLOBAL STATS ******************************/
- voidSTATS_LOCK(){
- pthread_mutex_lock(&stats_lock);
- }
- voidSTATS_UNLOCK(){
- pthread_mutex_unlock(&stats_lock);
- }
- voidthreadlocal_stats_reset(void){
- intii,sid;
- for(ii =0;ii <settings.num_threads;++ii){
- pthread_mutex_lock(&threads[ii].stats.mutex);
- threads[ii].stats.get_cmds =0;
- threads[ii].stats.get_misses =0;
- threads[ii].stats.touch_cmds =0;
- threads[ii].stats.touch_misses =0;
- threads[ii].stats.delete_misses =0;
- threads[ii].stats.incr_misses =0;
- threads[ii].stats.decr_misses =0;
- threads[ii].stats.cas_misses =0;
- threads[ii].stats.bytes_read =0;
- threads[ii].stats.bytes_written =0;
- threads[ii].stats.flush_cmds =0;
- threads[ii].stats.conn_yields =0;
- threads[ii].stats.auth_cmds =0;
- threads[ii].stats.auth_errors =0;
- for(sid =0;sid <MAX_NUMBER_OF_SLAB_CLASSES;sid++){
- threads[ii].stats.slab_stats[sid].set_cmds =0;
- threads[ii].stats.slab_stats[sid].get_hits =0;
- threads[ii].stats.slab_stats[sid].touch_hits =0;
- threads[ii].stats.slab_stats[sid].delete_hits =0;
- threads[ii].stats.slab_stats[sid].incr_hits =0;
- threads[ii].stats.slab_stats[sid].decr_hits =0;
- threads[ii].stats.slab_stats[sid].cas_hits =0;
- threads[ii].stats.slab_stats[sid].cas_badval =0;
- }
- pthread_mutex_unlock(&threads[ii].stats.mutex);
- }
- }
- voidthreadlocal_stats_aggregate(structthread_stats *stats){
- intii,sid;
- /* The struct has a mutex, but we can safely set the whole thing
- * to zero since it is unused when aggregating. */
- memset(stats,0,sizeof(*stats));
- for(ii =0;ii <settings.num_threads;++ii){
- pthread_mutex_lock(&threads[ii].stats.mutex);
- stats->get_cmds +=threads[ii].stats.get_cmds;
- stats->get_misses +=threads[ii].stats.get_misses;
- stats->touch_cmds +=threads[ii].stats.touch_cmds;
- stats->touch_misses +=threads[ii].stats.touch_misses;
- stats->delete_misses +=threads[ii].stats.delete_misses;
- stats->decr_misses +=threads[ii].stats.decr_misses;
- stats->incr_misses +=threads[ii].stats.incr_misses;
- stats->cas_misses +=threads[ii].stats.cas_misses;
- stats->bytes_read +=threads[ii].stats.bytes_read;
- stats->bytes_written +=threads[ii].stats.bytes_written;
- stats->flush_cmds +=threads[ii].stats.flush_cmds;
- stats->conn_yields +=threads[ii].stats.conn_yields;
- stats->auth_cmds +=threads[ii].stats.auth_cmds;
- stats->auth_errors +=threads[ii].stats.auth_errors;
- for(sid =0;sid <MAX_NUMBER_OF_SLAB_CLASSES;sid++){
- stats->slab_stats[sid].set_cmds +=
- threads[ii].stats.slab_stats[sid].set_cmds;
- stats->slab_stats[sid].get_hits +=
- threads[ii].stats.slab_stats[sid].get_hits;
- stats->slab_stats[sid].touch_hits +=
- threads[ii].stats.slab_stats[sid].touch_hits;
- stats->slab_stats[sid].delete_hits +=
- threads[ii].stats.slab_stats[sid].delete_hits;
- stats->slab_stats[sid].decr_hits +=
- threads[ii].stats.slab_stats[sid].decr_hits;
- stats->slab_stats[sid].incr_hits +=
- threads[ii].stats.slab_stats[sid].incr_hits;
- stats->slab_stats[sid].cas_hits +=
- threads[ii].stats.slab_stats[sid].cas_hits;
- stats->slab_stats[sid].cas_badval +=
- threads[ii].stats.slab_stats[sid].cas_badval;
- }
- pthread_mutex_unlock(&threads[ii].stats.mutex);
- }
- }
- voidslab_stats_aggregate(structthread_stats *stats,structslab_stats *out){
- intsid;
- out->set_cmds =0;
- out->get_hits =0;
- out->touch_hits =0;
- out->delete_hits =0;
- out->incr_hits =0;
- out->decr_hits =0;
- out->cas_hits =0;
- out->cas_badval =0;
- for(sid =0;sid <MAX_NUMBER_OF_SLAB_CLASSES;sid++){
- out->set_cmds +=stats->slab_stats[sid].set_cmds;
- out->get_hits +=stats->slab_stats[sid].get_hits;
- out->touch_hits +=stats->slab_stats[sid].touch_hits;
- out->delete_hits +=stats->slab_stats[sid].delete_hits;
- out->decr_hits +=stats->slab_stats[sid].decr_hits;
- out->incr_hits +=stats->slab_stats[sid].incr_hits;
- out->cas_hits +=stats->slab_stats[sid].cas_hits;
- out->cas_badval +=stats->slab_stats[sid].cas_badval;
- }
- }
- //初始化主线程
- voidthread_init(intnthreads,structevent_base *main_base){
- inti;
- intpower;
- pthread_mutex_init(&cache_lock,NULL);
- pthread_mutex_init(&stats_lock,NULL);
- pthread_mutex_init(&init_lock,NULL);
- pthread_cond_init(&init_cond,NULL);
- pthread_mutex_init(&cqi_freelist_lock,NULL);
- cqi_freelist =NULL;
- /* Want a wide lock table, but don't waste memory */
- /**
- 初始化item lock
- */
- //调配item锁的数量
- //之所以需要锁是因为线程之间的并发,所以item锁的数量当然是根据线程的个数进行调配了。
- if(nthreads <3){
- power =10;//这个power是指数
- }elseif(nthreads <4){
- power =11;
- }elseif(nthreads <5){
- power =12;
- }else{
- /* 8192 buckets, and central locks don't scale much past 5 threads */
- power =13;
- }
- item_lock_count =hashsize(power);
- item_lock_hashpower =power;
- item_locks =calloc(item_lock_count,sizeof(pthread_mutex_t));
- if(!item_locks){
- perror("Can't allocate item locks");
- exit(1);
- }
- for(i =0;i <item_lock_count;i++){
- pthread_mutex_init(&item_locks[i],NULL);
- }
- pthread_key_create(&item_lock_type_key,NULL);
- pthread_mutex_init(&item_global_lock,NULL);
- //_mark2_1
- threads =calloc(nthreads,sizeof(LIBEVENT_THREAD));//创建worker线程对象
- if(!threads){
- perror("Can't allocate thread descriptors");
- exit(1);
- }
- //_mark2_3
- dispatcher_thread.base=main_base;//设置主线程对象的event_base
- dispatcher_thread.thread_id =pthread_self();//设置主线程对象pid
- //_mark2_5
- for(i =0;i <nthreads;i++){//为每个worker线程创建与主线程通信的管道
- intfds[2];
- if(pipe(fds)){
- perror("Can't create notify pipe");
- exit(1);
- }
- threads[i].notify_receive_fd =fds[0];//worker线程管道接收fd
- threads[i].notify_send_fd =fds[1];//worker线程管道写入fd
- //_mark2_6
- setup_thread(&threads[i]);//装载 worker线程
- /* Reserve three fds for the libevent base, and two for the pipe */
- stats.reserved_fds +=5;
- }
- /* Create threads after we've done all the libevent setup. */
- for(i =0;i <nthreads;i++){
- //_mark2_7
- create_worker(worker_libevent,&threads[i]);//启动worker线程,见worker_libevent
- }
- /* Wait for all the threads to set themselves up before returning. */
- pthread_mutex_lock(&init_lock);
- wait_for_thread_registration(nthreads);//等待所有worker线程启动完毕
- pthread_mutex_unlock(&init_lock);
- }
转载于:https://www.cnblogs.com/guolanzhu/p/5850220.html