Memcached源码分析之thread.c

  1. /*
  2.  * 文件开头先啰嗦几句:
  3.  *
  4.  * thread.c文件代表的是线程模块。但是你会看到这个模块里面有很多其它方法,
  5.     例如关于item的各种操作函数,item_alloc,item_remove,item_link等等。
  6.     我们有个items模块,这些不都是items模块要做的事情吗?为什么thread模块也有?
  7.     你仔细看会发现,thread里面的这种函数,例如item_remove,items模块里面
  8.     都会有一个对应的do_item_remove函数,而thread中的item_remove仅仅是调用
  9.     items模块中的do_item_remove,唯一多出来的就是thread在do_item_remove前后
  10.     加了加锁和解锁的操作。
  11.     其实这是很好的一种设计。
  12.     1)因为像"删除item"这样的一个逻辑都是由某个线程,而且这里是工作线程执行,
  13.         所以这是一个线程层面的事情。就是说是“某个工作线程去删除item”这样一件事。
  14.     2)更重要的是原子性及一致性问题,某个item数据,很有可能同时多个线程在修改,
  15.         那么需要加锁,那么锁最应该加在哪个地方?既然问题是线程引起的,那么负责
  16.         解决的无疑是线程模块。
  17.     3)所以这里像这种函数,thread此时相当于是items的外壳,起调控作用,在线程层面
  18.         开放给外部调用,同时在内部加锁。而items模块里面定义的do_xxx函数都不需要多
  19.         加考虑,无条件执行对item进行修改,而由外部被调用方来控制。相信很多需要加锁
  20.         的项目都会面临这样的问题:锁应该加在哪一层?可以参考memcached这样的设计。
  21.  *
  22.  */
  23. #include"memcached.h"
  24. #include<assert.h>
  25. #include<stdio.h>
  26. #include<errno.h>
  27. #include<stdlib.h>
  28. #include<errno.h>
  29. #include<string.h>
  30. #include<pthread.h>
  31. #ifdef__sun
  32. #include<atomic.h>
  33. #endif
  34. #defineITEMS_PER_ALLOC 64
  35. /**
  36.     下面这个CQ_ITEM结构体:
  37.     可以这么理解,主线程accept连接后,把client fd
  38.     分发到worker线程的同时会顺带一些与此client连接相关的信息,
  39.     而CQ_ITEM是包装了这些信息的一个对象,有点"参数对象"的概念。
  40.     记住这货是主线程那边丢过来的。
  41.     CQ_ITEM中的CQ虽然是connection queue的缩写,
  42.     它与memcached.h中定义的conn结构体是完全不一样的概念,
  43.     但worker线程会利用这个CQ_ITEM对象去初始化conn对象
  44.  */
  45. typedefstructconn_queue_item CQ_ITEM;
  46. structconn_queue_item {
  47.     intsfd;
  48.     enumconn_states init_state;
  49.     intevent_flags;
  50.     intread_buffer_size;
  51.     enumnetwork_transport transport;
  52.     CQ_ITEM *next;
  53. };
  54. /*
  55. 上面的CQ_ITEM的队列对象,每个worker线程对象都保存着这样一个队列,处理
  56. 主线程那边分发过来的连接请求时用到。
  57. */
  58. typedefstructconn_queue CQ;
  59. structconn_queue {
  60.     CQ_ITEM *head;
  61.     CQ_ITEM *tail;
  62.     pthread_mutex_tlock;
  63. };
  64. //下面是各种锁
  65. /**
  66. 个人认为这个锁用于锁住全局数量不变的对象,例如slabclass,LRU链表等等
  67. 区别于item锁,由于item对象是动态增长的,数量非常多,
  68. item锁是用hash的方式分配一张大大的item锁表来控制锁的粒度
  69. */
  70. pthread_mutex_tcache_lock;
  71. pthread_mutex_tconn_lock =PTHREAD_MUTEX_INITIALIZER;//连接锁
  72. #if !defined(HAVE_GCC_ATOMICS) && !defined(__sun)
  73. pthread_mutex_tatomics_mutex =PTHREAD_MUTEX_INITIALIZER;
  74. #endif
  75.  
  76. staticpthread_mutex_tstats_lock;//统计锁
  77.  
  78. staticCQ_ITEM *cqi_freelist;
  79. staticpthread_mutex_tcqi_freelist_lock;
  80. staticpthread_mutex_t*item_locks;//item锁
  81.  
  82. staticuint32_titem_lock_count;//item锁总数
  83. staticunsignedintitem_lock_hashpower;//item锁的hash表 指数,锁总数为2的item_lock_hashpower个,见下面的hashsize
  84. #definehashsize(n)((unsignedlongint)1<<(n))
  85. #definehashmask(n)(hashsize(n)-1)
  86.  
  87. staticpthread_mutex_titem_global_lock;
  88.  
  89. staticpthread_key_titem_lock_type_key;
  90. staticLIBEVENT_DISPATCHER_THREAD dispatcher_thread;
  91. staticLIBEVENT_THREAD *threads;
  92. staticintinit_count =0;//有多少个worker线程已经被初始化
  93. staticpthread_mutex_tinit_lock;//初始化锁
  94. staticpthread_cond_tinit_cond;//初始化条件变量
  95. staticvoidthread_libevent_process(intfd,shortwhich,void*arg);
  96. //引用计数加1
  97. unsignedshortrefcount_incr(unsignedshort*refcount){
  98. #ifdefHAVE_GCC_ATOMICS
  99.     return__sync_add_and_fetch(refcount,1);
  100. #elifdefined(__sun)
  101.     returnatomic_inc_ushort_nv(refcount);
  102. #else
  103.     unsignedshortres;
  104.     mutex_lock(&atomics_mutex);
  105.     (*refcount)++;
  106.     res =*refcount;
  107.     mutex_unlock(&atomics_mutex);
  108.     returnres;
  109. #endif
  110. }
  111. //引用计数减1
  112. unsignedshortrefcount_decr(unsignedshort*refcount){
  113. #ifdefHAVE_GCC_ATOMICS
  114.     return__sync_sub_and_fetch(refcount,1);
  115. #elifdefined(__sun)
  116.     returnatomic_dec_ushort_nv(refcount);
  117. #else
  118.     unsignedshortres;
  119.     mutex_lock(&atomics_mutex);
  120.     (*refcount)--;
  121.     res =*refcount;
  122.     mutex_unlock(&atomics_mutex);
  123.     returnres;
  124. #endif
  125. }
  126.  
  127. voiditem_lock_global(void){
  128.     mutex_lock(&item_global_lock);
  129. }
  130. voiditem_unlock_global(void){
  131.     mutex_unlock(&item_global_lock);
  132. }
  133. voiditem_lock(uint32_thv){
  134.     uint8_t*lock_type =pthread_getspecific(item_lock_type_key);
  135.     if(likely(*lock_type ==ITEM_LOCK_GRANULAR)){
  136.         mutex_lock(&item_locks[hv &hashmask(item_lock_hashpower)]);
  137.     }else{
  138.         mutex_lock(&item_global_lock);
  139.     }
  140. }
  141.  
  142. void*item_trylock(uint32_thv){
  143.     pthread_mutex_t*lock=&item_locks[hv &hashmask(item_lock_hashpower)];
  144.     if(pthread_mutex_trylock(lock)==0){
  145.         returnlock;
  146.     }
  147.     returnNULL;
  148. }
  149. voiditem_trylock_unlock(void*lock){
  150.     mutex_unlock((pthread_mutex_t*)lock);
  151. }
  152. voiditem_unlock(uint32_thv){
  153.     uint8_t*lock_type =pthread_getspecific(item_lock_type_key);
  154.     if(likely(*lock_type ==ITEM_LOCK_GRANULAR)){
  155.         mutex_unlock(&item_locks[hv &hashmask(item_lock_hashpower)]);
  156.     }else{
  157.         mutex_unlock(&item_global_lock);
  158.     }
  159. }
  160. staticvoidwait_for_thread_registration(intnthreads){
  161.     while(init_count <nthreads){
  162.         pthread_cond_wait(&init_cond,&init_lock);//主线程利用条件变量等待所有worker线程启动完毕
  163.     }
  164. }
  165. //worker线程注册函数,主要是统计worker线程完成初始化个数。
  166. staticvoidregister_thread_initialized(void){
  167.     pthread_mutex_lock(&init_lock);
  168.     init_count++;
  169.     pthread_cond_signal(&init_cond);
  170.     pthread_mutex_unlock(&init_lock);
  171. }
  172. //item锁的粒度有几种,这里是切换类型
  173. voidswitch_item_lock_type(enumitem_lock_types type){
  174.     charbuf[1];
  175.     inti;
  176.     switch(type){
  177.         caseITEM_LOCK_GRANULAR:
  178.             buf[0]='l';
  179.             break;
  180.         caseITEM_LOCK_GLOBAL:
  181.             buf[0]='g';
  182.             break;
  183.         default:
  184.             fprintf(stderr,"Unknown lock type: %d\n",type);
  185.             assert(1==0);
  186.             break;
  187.     }
  188.     pthread_mutex_lock(&init_lock);
  189.     init_count =0;
  190.     for(i =0;i <settings.num_threads;i++){
  191.         if(write(threads[i].notify_send_fd,buf,1)!=1){
  192.             perror("Failed writing to notify pipe");
  193.             /* TODO: This is a fatal problem. Can it ever happen temporarily? */
  194.         }
  195.     }
  196.     wait_for_thread_registration(settings.num_threads);
  197.     pthread_mutex_unlock(&init_lock);
  198. }
  199. /*
  200.  * Initializes a connection queue.
  201.     初始化一个CQ对象,CQ结构体和CQ_ITEM结构体的作用见它们定义处。
  202.  */
  203. staticvoidcq_init(CQ *cq){
  204.     pthread_mutex_init(&cq->lock,NULL);
  205.     cq->head =NULL;
  206.     cq->tail =NULL;
  207. }
  208.  /**
  209.  从worker线程的CQ队列里面pop出一个CQ_ITEM对象
  210.  */
  211. staticCQ_ITEM *cq_pop(CQ *cq){
  212.     CQ_ITEM *item;
  213.     pthread_mutex_lock(&cq->lock);
  214.     item =cq->head;
  215.     if(NULL !=item){
  216.         cq->head =item->next;
  217.         if(NULL ==cq->head)
  218.             cq->tail =NULL;
  219.     }
  220.     pthread_mutex_unlock(&cq->lock);
  221.     returnitem;
  222. }
  223.   /**
  224.  push一个CQ_ITEM对象到worker线程的CQ队列中
  225.  */
  226. staticvoidcq_push(CQ *cq,CQ_ITEM *item){
  227.     item->next=NULL;
  228.     pthread_mutex_lock(&cq->lock);
  229.     if(NULL ==cq->tail)
  230.         cq->head =item;
  231.     else
  232.         cq->tail->next=item;
  233.     cq->tail =item;
  234.     pthread_mutex_unlock(&cq->lock);
  235. }
  236. /*
  237.  * Returns a fresh connection queue item.
  238.     分配一个CQ_ITEM对象
  239.  */
  240. staticCQ_ITEM *cqi_new(void){
  241.     CQ_ITEM *item =NULL;
  242.     pthread_mutex_lock(&cqi_freelist_lock);
  243.     if(cqi_freelist){
  244.         item =cqi_freelist;
  245.         cqi_freelist =item->next;
  246.     }
  247.     pthread_mutex_unlock(&cqi_freelist_lock);
  248.     if(NULL ==item){
  249.         inti;
  250.         /* Allocate a bunch of items at once to reduce fragmentation */
  251.         item =malloc(sizeof(CQ_ITEM)*ITEMS_PER_ALLOC);
  252.         if(NULL ==item){
  253.             STATS_LOCK();
  254.             stats.malloc_fails++;
  255.             STATS_UNLOCK();
  256.             returnNULL;
  257.         }
  258.         for(i =2;i <ITEMS_PER_ALLOC;i++)
  259.             item[i -1].next=&item[i];
  260.         pthread_mutex_lock(&cqi_freelist_lock);
  261.         item[ITEMS_PER_ALLOC -1].next=cqi_freelist;
  262.         cqi_freelist =&item[1];
  263.         pthread_mutex_unlock(&cqi_freelist_lock);
  264.     }
  265.     returnitem;
  266. }
  267. /*
  268.  * Frees a connection queue item (adds it to the freelist.)
  269.  */
  270. staticvoidcqi_free(CQ_ITEM *item){
  271.     pthread_mutex_lock(&cqi_freelist_lock);
  272.     item->next=cqi_freelist;
  273.     cqi_freelist =item;
  274.     pthread_mutex_unlock(&cqi_freelist_lock);
  275. }
  276.  
  277. /*
  278.     创建并启动worker线程,在thread_init主线程初始化时调用
  279.  */
  280. staticvoidcreate_worker(void*(*func)(void*),void*arg){
  281.     pthread_tthread;
  282.     pthread_attr_tattr;
  283.     intret;
  284.     pthread_attr_init(&attr);
  285.     if((ret =pthread_create(&thread,&attr,func,arg))!=0){
  286.         fprintf(stderr,"Can't create thread: %s\n",
  287.                 strerror(ret));
  288.         exit(1);
  289.     }
  290. }
  291.  
  292. voidaccept_new_conns(constbooldo_accept){
  293.     pthread_mutex_lock(&conn_lock);
  294.     do_accept_new_conns(do_accept);
  295.     pthread_mutex_unlock(&conn_lock);
  296. }
  297. /****************************** LIBEVENT THREADS *****************************/
  298. /*
  299.  * 装备worker线程,worker线程的event_base在此设置
  300.  */
  301. staticvoidsetup_thread(LIBEVENT_THREAD *me){
  302.     me->base=event_init();//为每个worker线程分配自己的event_base
  303.     if(!me->base){
  304.         fprintf(stderr,"Can't allocate event base\n");
  305.         exit(1);
  306.     }
  307.     /* Listen for notifications from other threads */
  308.     event_set(&me->notify_event,me->notify_receive_fd,
  309.               EV_READ |EV_PERSIST,thread_libevent_process,me);//监听管道接收fd,这里即监听
  310.     //来自主线程的消息,事件处理函数为thread_libevent_process
  311.     event_base_set(me->base,&me->notify_event);
  312.     if(event_add(&me->notify_event,0)==-1){
  313.         fprintf(stderr,"Can't monitor libevent notify pipe\n");
  314.         exit(1);
  315.     }
  316.     me->new_conn_queue =malloc(sizeof(structconn_queue));//CQ_ITEM队列
  317.     if(me->new_conn_queue ==NULL){
  318.         perror("Failed to allocate memory for connection queue");
  319.         exit(EXIT_FAILURE);
  320.     }
  321.     cq_init(me->new_conn_queue);//初始化CQ_ITEM对象队列
  322.     if(pthread_mutex_init(&me->stats.mutex,NULL)!=0){
  323.         perror("Failed to initialize mutex");
  324.         exit(EXIT_FAILURE);
  325.     }
  326.     me->suffix_cache =cache_create("suffix",SUFFIX_SIZE,sizeof(char*),
  327.                                     NULL,NULL);
  328.     if(me->suffix_cache ==NULL){
  329.         fprintf(stderr,"Failed to create suffix cache\n");
  330.         exit(EXIT_FAILURE);
  331.     }
  332. }
  333.  
  334. /*
  335.  * 这里主要是让worker线程进入event_base_loop
  336.  */
  337. staticvoid*worker_libevent(void*arg){
  338.     LIBEVENT_THREAD *me =arg;
  339.     /* Any per-thread setup can happen here; thread_init() will block until
  340.      * all threads have finished initializing.
  341.      */
  342.     /* set an indexable thread-specific memory item for the lock type.
  343.      * this could be unnecessary if we pass the conn *c struct through
  344.      * all item_lock calls...
  345.      */
  346.     me->item_lock_type =ITEM_LOCK_GRANULAR;
  347.     pthread_setspecific(item_lock_type_key,&me->item_lock_type);
  348.     //每一个worker线程进入loop,全局init_count++操作,
  349.     //见thread_init函数后面几行代码和wait_for_thread_registration函数,
  350.     //主线程通过init_count来确认所有线程都启动完毕。
  351.     register_thread_initialized();
  352.     event_base_loop(me->base,0);
  353.     returnNULL;
  354. }
  355.  
  356.  //主线程分发client fd给worker线程后,同时往管道写入buf,唤醒worker线程调用此函数
  357. staticvoidthread_libevent_process(intfd,shortwhich,void*arg){
  358.     LIBEVENT_THREAD *me =arg;
  359.     CQ_ITEM *item;
  360.     charbuf[1];
  361.     if(read(fd,buf,1)!=1)
  362.         if(settings.verbose >0)
  363.             fprintf(stderr,"Can't read from libevent pipe\n");
  364.     switch(buf[0]){
  365.     case'c':
  366.     item =cq_pop(me->new_conn_queue);//取出主线程丢过来的CQ_ITEM
  367.     if(NULL !=item){
  368.         /*
  369.         worker线程创建 conn连接对象,注意由主线程丢过来的CQ_ITEM的init_state为conn_new_cmd (TCP情况下)
  370.         */
  371.         conn *c =conn_new(item->sfd,item->init_state,item->event_flags,
  372.                            item->read_buffer_size,item->transport,me->base);
  373.         if(c ==NULL){
  374.             if(IS_UDP(item->transport)){
  375.                 fprintf(stderr,"Can't listen for events on UDP socket\n");
  376.                 exit(1);
  377.             }else{
  378.                 if(settings.verbose >0){
  379.                     fprintf(stderr,"Can't listen for events on fd %d\n",
  380.                         item->sfd);
  381.                 }
  382.                 close(item->sfd);
  383.             }
  384.         }else{
  385.             c->thread =me;//设置监听连接的线程为当前worker线程
  386.         }
  387.         cqi_free(item);
  388.     }
  389.         break;
  390.     /* we were told to flip the lock type and report in */
  391.     case'l':
  392.     me->item_lock_type =ITEM_LOCK_GRANULAR;
  393.     register_thread_initialized();
  394.         break;
  395.     case'g':
  396.     me->item_lock_type =ITEM_LOCK_GLOBAL;
  397.     register_thread_initialized();
  398.         break;
  399.     }
  400. }
  401. voiddispatch_conn_new(intsfd,enumconn_states init_state,intevent_flags,
  402.                        intread_buffer_size,enumnetwork_transport transport){
  403.     /**
  404.     这下面有一个CQ_ITEM结构体,可以这么理解,主线程accept连接后,把client fd
  405.     分发到worker线程的同时会顺带一些与此client连接相关的信息,例如dispatch_conn_new的形参上面列的,
  406.     而CQ_ITEM是包装了这些信息的一个对象。
  407.     CQ_ITEM中的CQ是connection queue的缩写,但它与conn结构体是完全不一样的概念,CQ_ITEM仅仅是把client连接相关的信息
  408.     打包成一个对象而已。
  409.     */
  410.     CQ_ITEM *item =cqi_new();
  411.     charbuf[1];
  412.     if(item ==NULL){
  413.         close(sfd);
  414.         /* given that malloc failed this may also fail, but let's try */
  415.         fprintf(stderr,"Failed to allocate memory for connection object\n");
  416.         return;
  417.     }
  418.     inttid =(last_thread +1)%settings.num_threads;
  419.     LIBEVENT_THREAD *thread =threads +tid;//通过简单的轮叫方式选择处理当前client fd的worker线程
  420.     last_thread =tid;
  421.     //初始化CQ_ITEM对象,即把信息包装
  422.     item->sfd =sfd;
  423.     item->init_state =init_state;
  424.     item->event_flags =event_flags;
  425.     item->read_buffer_size =read_buffer_size;
  426.     item->transport =transport;
  427.     cq_push(thread->new_conn_queue,item);//每个worker线程保存着所有被分发给自己的CQ_ITEM,即new_conn_queue
  428.     MEMCACHED_CONN_DISPATCH(sfd,thread->thread_id);
  429.     /*
  430.     主线程向处理当前client fd的worker线程管道中简单写进一个'c'字符,
  431.     由于每个worker线程都监听了管道的receive_fd,于是相应的worker进程收到事件通知,
  432.     触发注册的handler,即thread_libevent_process
  433.     */
  434.     buf[0]='c';
  435.     if(write(thread->notify_send_fd,buf,1)!=1){
  436.         perror("Writing to thread notify pipe");
  437.     }
  438. }
  439.  
  440. intis_listen_thread(){
  441.     returnpthread_self()==dispatcher_thread.thread_id;
  442. }
  443.  
  444. /********************************* ITEM ACCESS *******************************/
  445. /**
  446. 下面是一堆关于item操作的函数,具体逻辑代码都放在items::do_xxx相应的地方
  447. 就像本文件开头说的,这里主要是加了锁而已
  448. */
  449. /*
  450.  * Allocates a new item.
  451.     分配item空间
  452.  */
  453. item *item_alloc(char*key,size_tnkey,intflags,rel_time_texptime,intnbytes){
  454.     item *it;
  455.     /* do_item_alloc handles its own locks */
  456.     /**
  457.     这里比较特殊,与其它item_xxx函数不一样,这里把锁放在do_item_alloc里面做了。
  458.     个人猜测是因为do_item_alloc这个逻辑实在有点复杂,甚至加解锁有可能在某个if条件下要发
  459.     生,加解锁和逻辑本身代码耦合,所以外部不好加锁。因此把锁交给do_item_alloc内部进行考虑。
  460.     */
  461.     it =do_item_alloc(key,nkey,flags,exptime,nbytes,0);
  462.     returnit;
  463. }
  464. /*
  465.  * Returns an item if it hasn't been marked as expired,
  466.  * lazy-expiring as needed.
  467.     取得item,上面这里有句英文注释,说返回不超时的item,因为memcached并没有做实时或者定时把
  468.     超时item清掉的逻辑,而是用了延迟超时。就是当要用这个item的时候,再来针对这个item做超时处理
  469.  */
  470. item *item_get(constchar*key,constsize_tnkey){
  471.     item *it;
  472.     uint32_thv;
  473.     hv =hash(key,nkey);
  474.     item_lock(hv);
  475.     it =do_item_get(key,nkey,hv);
  476.     item_unlock(hv);
  477.     returnit;
  478. }
  479. item *item_touch(constchar*key,size_tnkey,uint32_texptime){
  480.     item *it;
  481.     uint32_thv;
  482.     hv =hash(key,nkey);
  483.     item_lock(hv);
  484.     it =do_item_touch(key,nkey,exptime,hv);
  485.     item_unlock(hv);
  486.     returnit;
  487. }
  488. /*
  489.  * Links an item into the LRU and hashtable.
  490.  */
  491. intitem_link(item *item){
  492.     intret;
  493.     uint32_thv;
  494.     hv =hash(ITEM_key(item),item->nkey);
  495.     item_lock(hv);
  496.     ret =do_item_link(item,hv);
  497.     item_unlock(hv);
  498.     returnret;
  499. }
  500.  
  501. voiditem_remove(item *item){
  502.     uint32_thv;
  503.     hv =hash(ITEM_key(item),item->nkey);
  504.     item_lock(hv);
  505.     do_item_remove(item);
  506.     item_unlock(hv);
  507. }
  508. intitem_replace(item *old_it,item *new_it,constuint32_thv){
  509.     returndo_item_replace(old_it,new_it,hv);
  510. }
  511.  
  512. /*
  513.  * Unlinks an item from the LRU and hashtable.
  514.  * 见items::item_unlink
  515.  */
  516. voiditem_unlink(item *item){
  517.     uint32_thv;
  518.     hv =hash(ITEM_key(item),item->nkey);
  519.     item_lock(hv);
  520.     do_item_unlink(item,hv);
  521.     item_unlock(hv);
  522. }
  523.  
  524.  /**
  525. 主要作用是重置在最近使用链表中的位置,更新最近使用时间,见items::do_item_update
  526. */
  527. voiditem_update(item *item){
  528.     uint32_thv;
  529.     hv =hash(ITEM_key(item),item->nkey);
  530.     item_lock(hv);
  531.     do_item_update(item);
  532.     item_unlock(hv);
  533. }
  534. enumdelta_result_type add_delta(conn *c,constchar*key,
  535.                                  constsize_tnkey,intincr,
  536.                                  constint64_tdelta,char*buf,
  537.                                  uint64_t*cas){
  538.     enumdelta_result_type ret;
  539.     uint32_thv;
  540.     hv =hash(key,nkey);
  541.     item_lock(hv);
  542.     ret =do_add_delta(c,key,nkey,incr,delta,buf,cas,hv);
  543.     item_unlock(hv);
  544.     returnret;
  545. }
  546. /*
  547.  * Stores an item in the cache (high level, obeys set/add/replace semantics)
  548.  * 保存item信息,主要是调用items::do_store_item,但由于是多线程,所以需求加锁
  549.  * store_item是线程上的操作,所以写在thread模块,在此对外开放,而内部加锁。
  550.  * 除了store_item函数,其它关于item的操作均如此。
  551.  */
  552. enumstore_item_type store_item(item *item,intcomm,conn*c){
  553.     enumstore_item_type ret;
  554.     uint32_thv;
  555.     hv =hash(ITEM_key(item),item->nkey);//锁住item
  556.     item_lock(hv);
  557.     ret =do_store_item(item,comm,c,hv);
  558.     item_unlock(hv);
  559.     returnret;
  560. }
  561. voiditem_flush_expired(){
  562.     mutex_lock(&cache_lock);
  563.     do_item_flush_expired();
  564.     mutex_unlock(&cache_lock);
  565. }
  566. char*item_cachedump(unsignedintslabs_clsid,unsignedintlimit,unsignedint*bytes){
  567.     char*ret;
  568.     mutex_lock(&cache_lock);
  569.     ret =do_item_cachedump(slabs_clsid,limit,bytes);
  570.     mutex_unlock(&cache_lock);
  571.     returnret;
  572. }
  573. voiditem_stats(ADD_STAT add_stats,void*c){
  574.     mutex_lock(&cache_lock);
  575.     do_item_stats(add_stats,c);
  576.     mutex_unlock(&cache_lock);
  577. }
  578. voiditem_stats_totals(ADD_STAT add_stats,void*c){
  579.     mutex_lock(&cache_lock);
  580.     do_item_stats_totals(add_stats,c);
  581.     mutex_unlock(&cache_lock);
  582. }
  583. voiditem_stats_sizes(ADD_STAT add_stats,void*c){
  584.     mutex_lock(&cache_lock);
  585.     do_item_stats_sizes(add_stats,c);
  586.     mutex_unlock(&cache_lock);
  587. }
  588. /******************************* GLOBAL STATS ******************************/
  589. voidSTATS_LOCK(){
  590.     pthread_mutex_lock(&stats_lock);
  591. }
  592. voidSTATS_UNLOCK(){
  593.     pthread_mutex_unlock(&stats_lock);
  594. }
  595. voidthreadlocal_stats_reset(void){
  596.     intii,sid;
  597.     for(ii =0;ii <settings.num_threads;++ii){
  598.         pthread_mutex_lock(&threads[ii].stats.mutex);
  599.         threads[ii].stats.get_cmds =0;
  600.         threads[ii].stats.get_misses =0;
  601.         threads[ii].stats.touch_cmds =0;
  602.         threads[ii].stats.touch_misses =0;
  603.         threads[ii].stats.delete_misses =0;
  604.         threads[ii].stats.incr_misses =0;
  605.         threads[ii].stats.decr_misses =0;
  606.         threads[ii].stats.cas_misses =0;
  607.         threads[ii].stats.bytes_read =0;
  608.         threads[ii].stats.bytes_written =0;
  609.         threads[ii].stats.flush_cmds =0;
  610.         threads[ii].stats.conn_yields =0;
  611.         threads[ii].stats.auth_cmds =0;
  612.         threads[ii].stats.auth_errors =0;
  613.         for(sid =0;sid <MAX_NUMBER_OF_SLAB_CLASSES;sid++){
  614.             threads[ii].stats.slab_stats[sid].set_cmds =0;
  615.             threads[ii].stats.slab_stats[sid].get_hits =0;
  616.             threads[ii].stats.slab_stats[sid].touch_hits =0;
  617.             threads[ii].stats.slab_stats[sid].delete_hits =0;
  618.             threads[ii].stats.slab_stats[sid].incr_hits =0;
  619.             threads[ii].stats.slab_stats[sid].decr_hits =0;
  620.             threads[ii].stats.slab_stats[sid].cas_hits =0;
  621.             threads[ii].stats.slab_stats[sid].cas_badval =0;
  622.         }
  623.         pthread_mutex_unlock(&threads[ii].stats.mutex);
  624.     }
  625. }
  626. voidthreadlocal_stats_aggregate(structthread_stats *stats){
  627.     intii,sid;
  628.     /* The struct has a mutex, but we can safely set the whole thing
  629.      * to zero since it is unused when aggregating. */
  630.     memset(stats,0,sizeof(*stats));
  631.     for(ii =0;ii <settings.num_threads;++ii){
  632.         pthread_mutex_lock(&threads[ii].stats.mutex);
  633.         stats->get_cmds +=threads[ii].stats.get_cmds;
  634.         stats->get_misses +=threads[ii].stats.get_misses;
  635.         stats->touch_cmds +=threads[ii].stats.touch_cmds;
  636.         stats->touch_misses +=threads[ii].stats.touch_misses;
  637.         stats->delete_misses +=threads[ii].stats.delete_misses;
  638.         stats->decr_misses +=threads[ii].stats.decr_misses;
  639.         stats->incr_misses +=threads[ii].stats.incr_misses;
  640.         stats->cas_misses +=threads[ii].stats.cas_misses;
  641.         stats->bytes_read +=threads[ii].stats.bytes_read;
  642.         stats->bytes_written +=threads[ii].stats.bytes_written;
  643.         stats->flush_cmds +=threads[ii].stats.flush_cmds;
  644.         stats->conn_yields +=threads[ii].stats.conn_yields;
  645.         stats->auth_cmds +=threads[ii].stats.auth_cmds;
  646.         stats->auth_errors +=threads[ii].stats.auth_errors;
  647.         for(sid =0;sid <MAX_NUMBER_OF_SLAB_CLASSES;sid++){
  648.             stats->slab_stats[sid].set_cmds +=
  649.                 threads[ii].stats.slab_stats[sid].set_cmds;
  650.             stats->slab_stats[sid].get_hits +=
  651.                 threads[ii].stats.slab_stats[sid].get_hits;
  652.             stats->slab_stats[sid].touch_hits +=
  653.                 threads[ii].stats.slab_stats[sid].touch_hits;
  654.             stats->slab_stats[sid].delete_hits +=
  655.                 threads[ii].stats.slab_stats[sid].delete_hits;
  656.             stats->slab_stats[sid].decr_hits +=
  657.                 threads[ii].stats.slab_stats[sid].decr_hits;
  658.             stats->slab_stats[sid].incr_hits +=
  659.                 threads[ii].stats.slab_stats[sid].incr_hits;
  660.             stats->slab_stats[sid].cas_hits +=
  661.                 threads[ii].stats.slab_stats[sid].cas_hits;
  662.             stats->slab_stats[sid].cas_badval +=
  663.                 threads[ii].stats.slab_stats[sid].cas_badval;
  664.         }
  665.         pthread_mutex_unlock(&threads[ii].stats.mutex);
  666.     }
  667. }
  668. voidslab_stats_aggregate(structthread_stats *stats,structslab_stats *out){
  669.     intsid;
  670.     out->set_cmds =0;
  671.     out->get_hits =0;
  672.     out->touch_hits =0;
  673.     out->delete_hits =0;
  674.     out->incr_hits =0;
  675.     out->decr_hits =0;
  676.     out->cas_hits =0;
  677.     out->cas_badval =0;
  678.     for(sid =0;sid <MAX_NUMBER_OF_SLAB_CLASSES;sid++){
  679.         out->set_cmds +=stats->slab_stats[sid].set_cmds;
  680.         out->get_hits +=stats->slab_stats[sid].get_hits;
  681.         out->touch_hits +=stats->slab_stats[sid].touch_hits;
  682.         out->delete_hits +=stats->slab_stats[sid].delete_hits;
  683.         out->decr_hits +=stats->slab_stats[sid].decr_hits;
  684.         out->incr_hits +=stats->slab_stats[sid].incr_hits;
  685.         out->cas_hits +=stats->slab_stats[sid].cas_hits;
  686.         out->cas_badval +=stats->slab_stats[sid].cas_badval;
  687.     }
  688. }
  689.  //初始化主线程
  690. voidthread_init(intnthreads,structevent_base *main_base){
  691.     inti;
  692.     intpower;
  693.     pthread_mutex_init(&cache_lock,NULL);
  694.     pthread_mutex_init(&stats_lock,NULL);
  695.     pthread_mutex_init(&init_lock,NULL);
  696.     pthread_cond_init(&init_cond,NULL);
  697.     pthread_mutex_init(&cqi_freelist_lock,NULL);
  698.     cqi_freelist =NULL;
  699.     /* Want a wide lock table, but don't waste memory */
  700.     /**
  701.     初始化item lock
  702.     */
  703.     //调配item锁的数量
  704.     //之所以需要锁是因为线程之间的并发,所以item锁的数量当然是根据线程的个数进行调配了。
  705.     if(nthreads <3){
  706.         power =10;//这个power是指数
  707.     }elseif(nthreads <4){
  708.         power =11;
  709.     }elseif(nthreads <5){
  710.         power =12;
  711.     }else{
  712.         /* 8192 buckets, and central locks don't scale much past 5 threads */
  713.         power =13;
  714.     }
  715.     item_lock_count =hashsize(power);
  716.     item_lock_hashpower =power;
  717.     item_locks =calloc(item_lock_count,sizeof(pthread_mutex_t));
  718.     if(!item_locks){
  719.         perror("Can't allocate item locks");
  720.         exit(1);
  721.     }
  722.     for(i =0;i <item_lock_count;i++){
  723.         pthread_mutex_init(&item_locks[i],NULL);
  724.     }
  725.     pthread_key_create(&item_lock_type_key,NULL);
  726.     pthread_mutex_init(&item_global_lock,NULL);
  727.     //_mark2_1
  728.     threads =calloc(nthreads,sizeof(LIBEVENT_THREAD));//创建worker线程对象
  729.     if(!threads){
  730.         perror("Can't allocate thread descriptors");
  731.         exit(1);
  732.     }
  733.     //_mark2_3
  734.     dispatcher_thread.base=main_base;//设置主线程对象的event_base
  735.     dispatcher_thread.thread_id =pthread_self();//设置主线程对象pid
  736.     //_mark2_5
  737.     for(i =0;i <nthreads;i++){//为每个worker线程创建与主线程通信的管道
  738.         intfds[2];
  739.         if(pipe(fds)){
  740.             perror("Can't create notify pipe");
  741.             exit(1);
  742.         }
  743.         threads[i].notify_receive_fd =fds[0];//worker线程管道接收fd
  744.         threads[i].notify_send_fd =fds[1];//worker线程管道写入fd
  745.         //_mark2_6
  746.         setup_thread(&threads[i]);//装载 worker线程
  747.         /* Reserve three fds for the libevent base, and two for the pipe */
  748.         stats.reserved_fds +=5;
  749.     }
  750.     /* Create threads after we've done all the libevent setup. */
  751.     for(i =0;i <nthreads;i++){
  752.         //_mark2_7
  753.         create_worker(worker_libevent,&threads[i]);//启动worker线程,见worker_libevent
  754.     }
  755.     /* Wait for all the threads to set themselves up before returning. */
  756.     pthread_mutex_lock(&init_lock);
  757.     wait_for_thread_registration(nthreads);//等待所有worker线程启动完毕
  758.     pthread_mutex_unlock(&init_lock);
  759. }

转载于:https://www.cnblogs.com/guolanzhu/p/5850220.html