189 8069 5689

多线程编程—线程池的实现

多线程编程—线程池的实现

在集安等地区,都构建了全面的区域性战略布局,加强发展的系统性、市场前瞻性、产品创新能力,以专注、极致的服务理念,为客户提供做网站、成都做网站 网站设计制作按需定制,公司网站建设,企业网站建设,成都品牌网站建设,全网营销推广,成都外贸网站制作,集安网站建设费用合理。

 

执行与任务分离的组件— 线程池

https://github.com/wangbojing/threadpool

 

多线程技术主要解决了处理器单元内多个线程执行的问题,它可以显著的减少处理器单元的闲置时间,增加处理器单元的吞吐能力。线程池是多线程编程的一个必要组件,并且对于很多编程人员都是透明的,更是神秘的。有幸能为大家解析其中缘由,尚有不妥之处,欢迎大家抛砖。

 

线程池的概念,是一个用来管理一组执行任务线程的工具。既然是管理工具,那么该工具管理是用来管理任务与执行的。如图一线程池组件拓扑图,执行队列(Workers),任务队列(Jobs)和池管理(Pool Manager)三部分组成。

执行队列(Workers)是用来存放运行线程的队列。

任务队列(Jobs)是用来存放需要被执行的任务队列。

池管理(Pool Manager)主要是管理执行队列的执行顺序,执行任务的时间长短,对长时间没有使用的执行单元进行释放,执行单元满负荷运行的时及时添加执行单元;记录未执行的任务数量,对新任务入队,即将执行的任务出队等等。

多线程编程—线程池的实现

图一 线程池组件拓扑图


执行队列(Workers)中的每一个执行单元(Worker)由哪些元素组成?线程ID,退出标志。

 

任务队列(Jobs)中的每一个任务(Jobs)的组成元素?执行每一个任务的具体执行函数,每一个任务的执行参数。

 

池管理(Pool Manager)由哪些元素组成?每一个新任务添加与执行时的移除用的互斥锁,每一个线程挂起的时所等待的条件变量。

 

根据分析如图二线程池的类图。

多线程编程—线程池的实现

图二线程池的类图


到这里一个简单的线程池就已经可以呼之欲出了。以下为实现代码


/*
 * Author: WangBoJing
 * email: 1989wangbojing@gmail.com 
 * github: https://github.com/wangbojing
 */
#include 
#include 
#include 
#include 

#define LL_ADD(item, list) do { \
 item->prev = NULL;   \
 item->next = list;   \
 list = item;    \
} while(0)

#define LL_REMOVE(item, list) do {        \
 if (item->prev != NULL) item->prev->next = item->next; \
 if (item->next != NULL) item->next->prev = item->prev; \
 if (list == item) list = item->next;     \
 item->prev = item->next = NULL;       \
} while(0)


typedef void (*JOB_CALLBACK)(void *);
struct NTHREADPOOL;

typedef struct NWORKER {
 pthread_t thread;
 int terminate;
 struct NTHREADPOOL *pool;
 struct NWORKER *next;
 struct NWORKER *prev;
} nWorker;

typedef struct NJOB {
 JOB_CALLBACK job_func;
 void *arg;
 struct NJOB *next;
 struct NJOB *prev;
} nJob;

typedef struct NTHREADPOOL {
 struct NWORKER *workers;
 struct NJOB *jobs;
 pthread_mutex_t jobs_mtx;
 pthread_cond_t jobs_cond;
} nThreadPool;

void *ntyWorkerThread(void *arg) {

 nWorker *worker = (nWorker*)arg;
 
 while (1) {
  pthread_mutex_lock(&worker->pool->jobs_mtx);
  while (worker->pool->jobs == NULL) {
   if (worker->terminate) break;
   pthread_cond_wait(&worker->pool->jobs_cond, &worker->pool->jobs_mtx);
  }
  
  if (worker->terminate) {
   pthread_mutex_unlock(&worker->pool->jobs_mtx);
   break;
  }
 
  nJob *job = worker->pool->jobs;
  if (job != NULL) {
   LL_REMOVE(job, worker->pool->jobs);
  }
  
  pthread_mutex_unlock(&worker->pool->jobs_mtx);
  if (job == NULL) continue;
  job->job_func(job);
  
  usleep(1);
 }
 
 free(worker);
 pthread_exit(NULL);
 
}

int ntyThreadPoolCreate(nThreadPool *pool, int numWorkers) {

 if (pool == NULL) return 1;
 if (numWorkers < 1) numWorkers = 1;
 
 memset(pool, 0, sizeof(nThreadPool));
 
 pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER;
 memcpy(&pool->jobs_cond, &blank_cond, sizeof(pool->jobs_cond));
 
 pthread_mutex_t blank_mutex = PTHREAD_MUTEX_INITIALIZER;
 memcpy(&pool->jobs_mtx, &blank_mutex, sizeof(pool->jobs_mtx));
 
 int i = 0;
 for (i = 0;i < numWorkers;i ++) {
 
  nWorker *worker = (nWorker*)malloc(sizeof(nWorker));
  if (worker == NULL) {
   perror("malloc");
   return 1;
  }
  
  memset(worker, 0, sizeof(nWorker));
  worker->pool = pool;
  
  int ret = pthread_create(&worker->thread, NULL, ntyWorkerThread, (void*)worker);
  if (ret) {
   perror("pthread_create");
   free(worker);
   return 1;
  }
  
  LL_ADD(worker, worker->pool->workers);
 }
}


void ntyThreadPoolShutdown(nThreadPool *pool) {

 nWorker *worker = NULL;
 for (worker = pool->workers;worker != NULL;worker = worker->next) {
  worker->terminate = 1;
 }
 
 pthread_mutex_lock(&pool->jobs_mtx);
 
 pool->workers = NULL;
 pool->jobs = NULL;
 pthread_cond_broadcast(&pool->jobs_cond);
 
 pthread_mutex_unlock(&pool->jobs_mtx);
 
}

void ntyThreadPoolPush(nThreadPool *pool, nJob *job) {

 pthread_mutex_lock(&pool->jobs_mtx);
 LL_ADD(job, pool->jobs);
 
 pthread_cond_signal(&pool->jobs_cond);
 pthread_mutex_unlock(&pool->jobs_mtx);
 
}

/********************************* debug thread pool *********************************/

#define KING_MAX_THREADS  80
#define KING_COUNTER_SIZE 1000

void king_counter(void *arg) {
 nJob *job = (nJob*)arg;
 
 int index = *(int *)job->arg;
 printf("index: %d, selfid:%lu\n", index, pthread_self());
 free(job->arg);
 free(job);
}

int main(int argc, char *argv[]) {

 nThreadPool pool;
 ntyThreadPoolCreate(&pool, KING_MAX_THREADS);
 
 int i = 0;
 for (i = 0;i < KING_COUNTER_SIZE;i ++) {
 
  nJob *job = (nJob*)malloc(sizeof(nJob));
  if (job == NULL) {
   perror("malloc");
   exit(1);
  }
  
  job->job_func = king_counter;
  job->arg = malloc(sizeof(int));
  
  *(int*)job->arg = i;
  ntyThreadPoolPush(&pool, job);
  
 }
 getchar();
 printf("You are very good !!!!\n");
 
}



这样的线程池还是只是一个Demo,原因有如下几点需要我们值得改进的。

  1. 线程池的线程数量是确定的,不能随着系统任务请求数量放缩线程池的大小。

  2. 任务数量的统计,并没有对任务队列进行统计

  3. 执行任务中的线程数量,等待执行的任务数量进行统计

  4. 每一个执行任务的时间没有做限制,

  5. IO密集型与计算密集型区分,线程池非常常用,但是根据不同的业务场景需要设置不同配置

  6. 在用户任务执行函数里,用户主动的调用了pthread_exit退出线程的保护机制

针对于以上几点问题,改进了一版线程池

/*
 * Author: WangBoJing
 * email: 1989wangbojing@gmail.com 
 * github: https://github.com/wangbojing
 */
 
 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 

typedef void (*JOB_CALLBACK)(void *);

typedef struct NJOB {
 struct NJOB *next;
 JOB_CALLBACK func;
 void *arg;
} nJob;

typedef struct NWORKER {
 struct NWORKER *active_next;
 pthread_t active_tid;
} nWorker;

typedef struct NTHREADPOOL {
 struct NTHREADPOOL *forw;
 struct NTHREADPOOL *back;
 pthread_mutex_t mtx;
 
 pthread_cond_t busycv;
 pthread_cond_t workcv;
 pthread_cond_t waitcv;
 
 nWorker *active;
 nJob *head;
 nJob *tail;
 pthread_attr_t attr;
 
 int flags;
 unsigned int linger;
 int minimum;
 int maximum;
 int nthreads;
 int idle;
 
} nThreadPool;

static void* ntyWorkerThread(void *arg);
#define NTY_POOL_WAIT   0x01
#define NTY_POOL_DESTROY  0x02

static pthread_mutex_t nty_pool_lock = PTHREAD_MUTEX_INITIALIZER;
static sigset_t fillset;
nThreadPool *thread_pool = NULL;

static int ntyWorkerCreate(nThreadPool *pool) {
 sigset_t oset;
 pthread_t thread_id;
 pthread_sigmask(SIG_SETMASK, &fillset, &oset);
 int error = pthread_create(&thread_id, &pool->attr, ntyWorkerThread, pool);
 pthread_sigmask(SIG_SETMASK, &oset, NULL);
 return error;
}

static void ntyWorkerCleanup(nThreadPool * pool) {

 --pool->nthreads;
 if (pool->flags & NTY_POOL_DESTROY) {
  if (pool->nthreads == 0) {
   pthread_cond_broadcast(&pool->busycv);
  }
 } else if (pool->head != NULL && pool->nthreads < pool->maximum && ntyWorkerCreate(pool) == 0) {
  pool->nthreads ++;
 }
 pthread_mutex_unlock(&pool->mtx);
 
}

static void ntyNotifyWaiters(nThreadPool *pool) {
 
 if (pool->head == NULL && pool->active == NULL) {
  pool->flags &= ~NTY_POOL_WAIT;
  pthread_cond_broadcast(&pool->waitcv);
 }
 
}

static void ntyJobCleanup(nThreadPool *pool) {
 
 pthread_t tid = pthread_self();
 nWorker *activep;
 nWorker **activepp;
 
 pthread_mutex_lock(&pool->mtx);
 for (activepp = &pool->active;(activep = *activepp) != NULL;activepp = &activep->active_next) {
  *activepp = activep->active_next;
  break;
 }
 if (pool->flags & NTY_POOL_WAIT) ntyNotifyWaiters(pool);
 
}

static void* ntyWorkerThread(void *arg) {
 nThreadPool *pool = (nThreadPool*)arg;
 nWorker active;
 
 int timeout;
 struct timespec ts;
 JOB_CALLBACK func;
 
 pthread_mutex_lock(&pool->mtx);
 pthread_cleanup_push(ntyWorkerCleanup, pool);
 active.active_tid = pthread_self();
 
 while (1) {
 
  pthread_sigmask(SIG_SETMASK, &fillset, NULL);
  pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
  pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
  
  timeout = 0;
  pool->idle ++;
  
  if (pool->flags & NTY_POOL_WAIT) {
   ntyNotifyWaiters(pool);
  }
  
  while (pool->head == NULL && !(pool->flags & NTY_POOL_DESTROY)) {
   if (pool->nthreads <= pool->minimum) {
    
    pthread_cond_wait(&pool->workcv, &pool->mtx);
    
   } else {
   
    clock_gettime(CLOCK_REALTIME, &ts);
    ts.tv_sec += pool->linger;
    if (pool->linger == 0 || pthread_cond_timedwait(&pool->workcv, &pool->mtx, &ts) == ETIMEDOUT) {
    
     timeout = 1;
     break;
    }
   }
  }
  pool->idle --;
  
  if (pool->flags & NTY_POOL_DESTROY) break;
  
  nJob *job = pool->head;  
  if (job != NULL) {
  
   timeout = 0;
   func = job->func;
   
   void *job_arg = job->arg;
   pool->head = job->next;
   
   if (job == pool->tail) {
    pool->tail == NULL;
   }
   
   active.active_next = pool->active;
   pool->active = &active;
   
   pthread_mutex_unlock(&pool->mtx);
   pthread_cleanup_push(ntyJobCleanup, pool);
   
   free(job);
   func(job_arg);
   
   pthread_cleanup_pop(1);
  }
  
  if (timeout && (pool->nthreads > pool->minimum)) {
   break;
  }
 }
 pthread_cleanup_pop(1);
 
 return NULL;
 
}

static void ntyCloneAttributes(pthread_attr_t *new_attr, pthread_attr_t *old_attr) {

 struct sched_param param;
 void *addr;
 size_t size;
 int value;
 
 pthread_attr_init(new_attr);
 
 if (old_attr != NULL) {
 
  pthread_attr_getstack(old_attr, &addr, &size);
  pthread_attr_setstack(new_attr, NULL, size);
  
  pthread_attr_getscope(old_attr, &value);
  pthread_attr_setscope(new_attr, value);
  
  pthread_attr_getinheritsched(old_attr, &value);
  pthread_attr_setinheritsched(new_attr, value);
  
  pthread_attr_getschedpolicy(old_attr, &value);
  pthread_attr_setschedpolicy(new_attr, value);
  
  pthread_attr_getschedparam(old_attr, ¶m);
  pthread_attr_setschedparam(new_attr, ¶m);
  
  pthread_attr_getguardsize(old_attr, &size);
  pthread_attr_setguardsize(new_attr, size);
  
 }
 pthread_attr_setdetachstate(new_attr, PTHREAD_CREATE_DETACHED);
 
}

nThreadPool *ntyThreadPoolCreate(int min_threads, int max_threads, int linger, pthread_attr_t *attr) {

 sigfillset(&fillset);
 if (min_threads > max_threads || max_threads < 1) {
  errno = EINVAL;
  return NULL;
 }
 
 nThreadPool *pool = (nThreadPool*)malloc(sizeof(nThreadPool));
 if (pool == NULL) {
  errno = ENOMEM;
  return NULL;
 }
 
 pthread_mutex_init(&pool->mtx, NULL);
 pthread_cond_init(&pool->busycv, NULL);
 pthread_cond_init(&pool->workcv, NULL);
 pthread_cond_init(&pool->waitcv, NULL);
 
 pool->active = NULL;
 pool->head = NULL;
 pool->tail = NULL;
 pool->flags = 0;
 pool->linger = linger;
 pool->minimum = min_threads;
 pool->maximum = max_threads;
 pool->nthreads = 0;
 pool->idle = 0;
 
 ntyCloneAttributes(&pool->attr, attr);
 pthread_mutex_lock(&nty_pool_lock);
 
 if (thread_pool == NULL) {
  pool->forw = pool;
  pool->back = pool;
  
  thread_pool = pool;
  
 } else {
 
  thread_pool->back->forw = pool;
  pool->forw = thread_pool;
  pool->back = pool->back;
  thread_pool->back = pool;
  
 }
 
 pthread_mutex_unlock(&nty_pool_lock);
 return pool;
 
}

int ntyThreadPoolQueue(nThreadPool *pool, JOB_CALLBACK func, void *arg) {

 nJob *job = (nJob*)malloc(sizeof(nJob));
 if (job == NULL) {
  errno = ENOMEM;
  return -1;
 }
 job->next = NULL;
 job->func = func;
 job->arg = arg;
 
 pthread_mutex_lock(&pool->mtx);
 if (pool->head == NULL) {
  pool->head = job;
 } else {
  pool->tail->next = job;
 }
 pool->tail = job;
 
 if (pool->idle > 0) {
  pthread_cond_signal(&pool->workcv);
 } else if (pool->nthreads < pool->maximum && ntyWorkerCreate(pool) == 0) {
  pool->nthreads ++;
 }
 
 pthread_mutex_unlock(&pool->mtx);
}

void nThreadPoolWait(nThreadPool *pool) {

 pthread_mutex_lock(&pool->mtx);
 pthread_cleanup_push(pthread_mutex_unlock, &pool->mtx);
 
 while (pool->head != NULL || pool->active != NULL) {
  pool->flags |= NTY_POOL_WAIT;
  pthread_cond_wait(&pool->waitcv, &pool->mtx);
 }
 
 pthread_cleanup_pop(1);
}

void nThreadPoolDestroy(nThreadPool *pool) {

 nWorker *activep;
 nJob *job;
 
 pthread_mutex_lock(&pool->mtx);
 pthread_cleanup_push(pthread_mutex_unlock, &pool->mtx);
 
 pool->flags |= NTY_POOL_DESTROY;
 pthread_cond_broadcast(&pool->workcv);
 
 for (activep = pool->active;activep != NULL;activep = activep->active_next) {
  pthread_cancel(activep->active_tid);
 }
 
 while (pool->nthreads != 0) {
  pthread_cond_wait(&pool->busycv, &pool->mtx);
 }
 
 pthread_cleanup_pop(1);
 pthread_mutex_lock(&nty_pool_lock);
 
 if (thread_pool == pool) {
  thread_pool = pool->forw;
 }
 
 if (thread_pool == pool) {
  thread_pool = NULL;
 } else {
  pool->back->forw = pool->forw;
  pool->forw->back = pool->back;
 }
 
 pthread_mutex_unlock(&nty_pool_lock);
 
 for (job = pool->head;job != NULL;job = pool->head) {
  pool->head = job->next;
  free(job);
 }
 pthread_attr_destroy(&pool->attr);
 free(pool);
 
}

/********************************* debug thread pool *********************************/

void king_counter(void *arg) {
 int index = *(int*)arg;
 printf("index : %d, selfid : %lu\n", index, pthread_self());
 
 free(arg);
 usleep(1);
}


#define KING_COUNTER_SIZE 1000


int main(int argc, char *argv[]) {

 nThreadPool *pool = ntyThreadPoolCreate(10, 20, 15, NULL);
 
 int i = 0;
 for (i = 0;i < KING_COUNTER_SIZE;i ++) {
 
  int *index = (int*)malloc(sizeof(int));
  
  memset(index, 0, sizeof(int));
  memcpy(index, &i, sizeof(int));
  
  ntyThreadPoolQueue(pool, king_counter, index);
  
 }
 
 
 getchar();
 printf("You are very good !!!!\n");
}

多线程编程—线程池的实现


当前名称:多线程编程—线程池的实现
标题链接:http://cdxtjz.cn/article/jopgjs.html

其他资讯