linux 多核CP 线程,LINUX 如何实现多线程进行cp复制

关于这个问题,意义虽然有限因为一般来说在复制文件的时候,实际的瓶颈来自于I/O,不管开启多少个线程实际上速度并不会快多少,但是为了练习多线程编程,

这里给出了一种C++代码实现的方式,代码附在最后。

实际上就是将一个文件分割为多个片段,开启多个线程进行同时复制,如果用户制定的并行大于服务器实际的CPU核数,程序会自动降级并行度为CPU核数,如果文件小于

100M则并行度始终为1。

root@bogon:/home/gaopeng/mmm# ./parcp log.log log10.log 2

set parallel:2

Your cpu core is:4

real parallel:2

Will Create 2 Threads

140677902710528:0:174522367:3:4

140677894317824:174522368:349044736:3:4

Copy Thread:140677902710528 work 25%

Copy Thread:140677894317824 work 25%

Copy Thread:140677902710528 work 50%

Copy Thread:140677902710528 work 75%

Copy Thread:140677902710528 work 100%

Copy Thread:140677902710528 work Ok!!

Copy Thread:140677894317824 work 50%

Copy Thread:140677894317824 work 75%

Copy Thread:140677894317824 work Ok!!

复制完成后进行md5验证

root@bogon:/home/gaopeng/mmm# md5sum log.log

f64acc21f7187a865938b340b3eda198  log.log

root@bogon:/home/gaopeng/mmm# md5sum log10.log

f64acc21f7187a865938b340b3eda198  log10.log

可以看出校验是通过的

代码如下:

点击(此处)折叠或打开

#include

#include

#include

#include

#include

#include

#include

#include

#include

#include

#include

#include

#include

#include

#include

#define MAX_BUFFER 65536

using namespace std;

pthread_mutex_t counter_mutex = PTHREAD_MUTEX_INITIALIZER;

class thread_info

{

private:

uint64_t start_pos;

uint64_t end_pos;

int fdr;

int fdw;

public:

pthread_t t_id;

static int do_id;

public:

thread_info()

{

start_pos = 0;

end_pos = 0;

fdr = 0;

fdw = 0;

t_id = 0;

}

void set_start(uint64_t a)

{

start_pos = a;

}

void set_end(uint64_t a)

{

end_pos = a;

}

void set_fd(int a,int b)

{

fdr = a;

fdw = b;

}

uint64_t get_start(void)

{

return start_pos;

}

uint64_t get_stop(void)

{

return end_pos;

}

int get_fdr(void)

{

return fdr;

}

int get_fdw(void)

{

return fdw;

}

void print(void)

{

cout<

}

};

int thread_info::do_id = 0;

class ABS_dispatch

{

public:

ABS_dispatch()

{

par_thr = 0;

max_cpu = 0;

}

virtual thread_info* call_thread(void) = 0;

virtual void make_init(uint32_t t_n) = 0;

uint32_t getcpu(void)

{

return get_nprocs() ;

}

virtual ~ABS_dispatch(){

}

protected:

uint32_t par_thr;

uint32_t max_cpu;

};

class dispatch:public ABS_dispatch

{

public:

typedef multimap::iterator pair_piece_iterator;

dispatch():ABS_dispatch()

{

file_idr = 0;

file_idw = 0;

file_size = 0;

tread_arr_p = NULL;

}

virtual thread_info* call_thread(void);

virtual void make_init(uint32_t t_n)

{

max_cpu = getcpu(); //

cout<

if(t_n > max_cpu) //parallel

{

cout<

par_thr = max_cpu;

}

else

{

par_thr = t_n;

}

}

void set_init(int file_idr,int file_idw)

{

file_size = lseek(file_idr,0,SEEK_END);

if(file_size<100000000)

{

cout<

par_thr = 1;

}

this–>file_idr = file_idr;

this–>file_idw = file_idw;

}

uint32_t real_par()

{

return par_thr;

}

virtual ~dispatch()

{

pair_piece.clear();

delete [] tread_arr_p;

}

private:

int file_idr;

int file_idw;

multimap pair_piece;

uint64_t file_size;

public:

thread_info* tread_arr_p;

};

static void* do_work(void* argc)

{

uint64_t b;

uint64_t e;

int fdr;

int fdw;

char* buffer[MAX_BUFFER]={0};

thread_info* tread_arr_p;

uint64_t loopc = 0;

uint64_t loopc25 = 0;

uint64_t i = 0;

int m = 1;

pthread_t t_id;

tread_arr_p = static_cast(argc);

//临界区 MUTEX

pthread_mutex_lock(&counter_mutex);

b = (tread_arr_p+ tread_arr_p–>do_id)–>get_start();

e = (tread_arr_p+ tread_arr_p–>do_id)–>get_stop();

fdr = (tread_arr_p+ tread_arr_p–>do_id)–>get_fdr();

fdw = (tread_arr_p+ tread_arr_p–>do_id)–>get_fdw();

t_id = (tread_arr_p+ tread_arr_p–>do_id)–>t_id ;

cout<< t_id <

tread_arr_p–>do_id++;

pthread_mutex_unlock(&counter_mutex);

//临界区

loopc = e/uint64_t(MAX_BUFFER);

loopc25 = loopc/(uint64_t)4;

while(i

{

if(i == loopc25*m )

{

cout<< “Copy Thread:”<

m++;

}

memset(buffer,0,MAX_BUFFER);

pread(fdr,buffer,MAX_BUFFER,uint64_t(i*MAX_BUFFER));

pwrite(fdw,buffer,MAX_BUFFER,uint64_t(i*MAX_BUFFER));

i++;

}

memset(buffer,0,MAX_BUFFER);

pread(fdr,buffer,(e–uint64_t(i*MAX_BUFFER)),uint64_t(i*MAX_BUFFER));

pwrite(fdw,buffer,(e–uint64_t(i*MAX_BUFFER)),uint64_t(i*MAX_BUFFER));

cout<< “Copy Thread:”<

return NULL;

}

thread_info* dispatch::call_thread()

{

int i = 0;

uint64_t temp_size = 0;

temp_size = file_size/par_thr;

tread_arr_p = new thread_info[par_thr];

cout<

//cout<

//cout<

for(i = 0;i

{

pair_piece.insert( pair(temp_size*i,temp_size*(i+1)–1 ));

}

pair_piece.insert( pair(temp_size*i,file_size ));

i = 1;

for(pair_piece_iterator it =pair_piece.begin();it !=pair_piece.end() ;it++)

{

//cout<

//cout<first<

//cout<second<

//cout<

(tread_arr_p+(i–1))–>set_start(it–>first);

(tread_arr_p+(i–1))–>set_end(it–>second);

(tread_arr_p+(i–1))–>set_fd(file_idr,file_idw);

pthread_create(&((tread_arr_p+(i–1))–>t_id),NULL,do_work,static_cast(tread_arr_p));

//(tread_arr_p+(i–1))–>print();

i++;

}

return tread_arr_p;

}

int main(int argc,char** argv)

{

dispatch test;

thread_info* thread_info_p = NULL;

uint32_t real_par;

void *tret;

int fdr = open(argv[1],O_RDONLY|O_NOFOLLOW);

int fdw = open(argv[2],O_RDWR|O_CREAT|O_EXCL,0755);

cout<

if(argc<4)

{

cout<

return –1;

}

if(fdr == –1 || fdw == –1)

{

perror(“open readfile:”);

return –1;

}

if(fdw == –1)

{

perror(“open wirtefile:”);

return –1;

}

if(sscanf(argv[3],“%u”,&real_par) == EOF)

{

perror(“sscanf:”);

return –1;

}

cout<

//cout<

test.make_init(real_par);

test.set_init(fdr,fdw);

real_par = test.real_par();

cout<

thread_info_p = test.call_thread();

for(int i = 0 ;i

{

//cout<t_id<

pthread_join((thread_info_p+i)–>t_id,&tret);

//cout<(tret)<

}

close(fdw);

close(fdr);

}

如果觉得不错 您可以考虑请作者喝杯茶(微信支付):                                作者微信号:

48daf30f2b261ad7ab0837998adc003d.png

来自 “ ITPUB博客 ” ,链接:http://blog.itpub.net/7728585/viewspace-2137204/,如需转载,请注明出处,否则将追究法律责任。