-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathshmque.cpp
1 lines (1 loc) · 22.4 KB
/
shmque.cpp
1
#include <sys/types.h>#include <sys/ipc.h>#include <sys/sem.h>#include <sys/stat.h>#include <sys/mman.h>#include <unistd.h>#include <fcntl.h>#include <errno.h>#include <stdio.h>#include <stdlib.h>#include <string.h>#include <assert.h>#include "shmque.h"#define DHDR_LEN sizeof(short) /* 数据头长度 */#define SHMQ_MAGIC 0x543210 /* 队列魔数 */#define SVSEM_MODE 0666 /* SV信号灯默认权限 */// 信号灯联合结构union semun{ int val; /* used for SETVAL only */ struct semid_ds *buf; /* used for IPC_SET and IPC_STAT */ unsigned short *array; /* used for GETALL and SETALL */};// 共享内存头结构typedef struct _CQHdr{ int data_num; /* 数据个数 */ int data_size; /* 数据区域大小 */ int phead; /* 队列头 */ int ptail; /* 队列尾 */ volatile int pread; /* 读偏移量 *///modify by sunwake volatile int pwrite; /* 写偏移量 *///modify by sunwake int semid; /* SV信号灯 */}CQHdr;//table_head//current_seq// 共享内存循环队列struct _ShmQueue{ long sq_magic; /* 队列魔数 */ int sq_flags; /* 初始化标志 */ CQHdr *sq_hdr; /* 循环队列头 */ char *sq_data; /* 循环队列体 */};// 创建共享内存static char *map_init(const char *file_name,const int file_size);// 销毁共享内存static int map_exit(char *shm_addr,int shm_size);// 初始化信号灯static int sem_init(const key_t semkey,const int nsems);// 加锁static int lock(const int semid,const int sem_num);// 解锁static int unlock(const int semid,const int sem_num);/************************************************* * Function : shmq_create() * Parameter : * in : file_name,file_size; * out : none * Return : * failed : none * success : none * Description : 初始化循环队列**************************************************/ShmQueue *shmq_create(const char *file_name,const int file_size){ int shm_size = 0; char *shm_addr = NULL; ShmQueue *shmque = NULL; // 共享内存长度//should be modified shm_size = sizeof(CQHdr) + file_size; // 创建共享内存//should be modified ? shm_addr = map_init(file_name,shm_size); if(shm_addr == NULL) { printf("map_file(E): file_name = %s; file_size = %d;\n",file_name,file_size); return NULL; } // 初始化循环队列信息 shmque = (ShmQueue *)malloc(sizeof(ShmQueue)); if(shmque != NULL) { shmque->sq_flags = 0; shmque->sq_magic = SHMQ_MAGIC; shmque->sq_hdr = (CQHdr *)shm_addr; shmque->sq_data = shm_addr + sizeof(CQHdr); return shmque; } // 销毁共享内存 if(map_exit(shm_addr,shm_size) != 0) { printf("map_exit(E): shm_addr = 0x%x; shm_size = %d;\n",shm_addr,shm_size); } return NULL;}/************************************************* * Function : shmq_destroy() * Parameter : * in : none * out : none * Return : * failed : none * success : none * Description : 销毁循环队列**************************************************/int shmq_destroy(ShmQueue *shmque){ int shm_size; CQHdr *phdr = NULL; if(shmque->sq_magic != SHMQ_MAGIC) { printf("Error: EBDF\n"); return -1; } // 队列头 phdr = shmque->sq_hdr; // 共享内存长度 shm_size = sizeof(CQHdr) + phdr->data_size; // 销毁共享内存 if(map_exit((char *)phdr,shm_size) != 0) { printf("map_exit(E): shm_addr = 0x%x; shm_size = %d;\n",shmque->sq_hdr,shm_size); } if(shmque != NULL) { free(shmque); /* 释放内存 */ } return 0;}/************************************************* * Function : shmq_size() * Parameter : * in : none * out : none * Return : * failed : none * success : none * Description : 循环队列数据个数**************************************************/int shmq_size(ShmQueue *shmque){ int size; CQHdr *phdr = NULL; if(shmque->sq_magic != SHMQ_MAGIC) { printf("Error: EBDF\n"); return -1; } phdr = shmque->sq_hdr; /* 队列头 */ if(lock(phdr->semid,0) < 0) { printf("lock: semid = %d\n",phdr->semid); return -1; } size = phdr->data_num; /* 队列当前数据个数 */ if(unlock(phdr->semid,0) < 0) { printf("unlock: semid = %d\n",phdr->semid); return -1; } return size;}/************************************************* * Function : shmq_read() * Parameter : * in : none * out : none * Return : * failed : none * success : none * Description : 循环队列读数据**************************************************/int shmq_read(ShmQueue *shmque,char **ptr,short *len){ int read_tail = 0; short data_len = 0; char *pdata = NULL; CQHdr *phdr = NULL; // modify by sunwake int pos_read =0; int pos_write =0; if(shmque->sq_magic != SHMQ_MAGIC) { printf("Error: EBDF\n"); return -1; } phdr = shmque->sq_hdr; /* 队列头 */ pdata = shmque->sq_data; /* 队列体 */ if(lock(phdr->semid,0) < 0) { printf("lock: semid = %d\n",phdr->semid); return -1; } //modify by suwake pos_read = phdr->pread; pos_write = phdr->pwrite; unlock(phdr->semid,0); printf("Pread: %d\tPwrite: %d\n",pos_read, pos_write); // 直接拷贝数据 if(pos_read < pos_write) { // 复制数据长度 memcpy(&data_len,pdata + pos_read,DHDR_LEN); *ptr = (char *)malloc(data_len); if(*ptr == NULL) { perror("malloc error"); goto Err; } // 复制数据内容 memcpy(*ptr,pdata + pos_read + DHDR_LEN,data_len); // need lock if(lock(phdr->semid,0) < 0) { printf("lock: semid = %d\n",phdr->semid); return -1; } phdr->pread = (pos_read + DHDR_LEN + data_len) % phdr->data_size; /* 队列读指针偏移 */ phdr->data_num -= 1; /* 队列数据个数减1 */ unlock(phdr->semid,0); } else if(pos_read > pos_write) { read_tail = phdr->ptail - pos_read; printf("剩余%d个空间\n", read_tail); // 数据长度分布在循环队列两端 if(read_tail < DHDR_LEN) { // 复制数据长度 char* temp_data_len = NULL; temp_data_len = (char*) malloc(DHDR_LEN); if (temp_data_len == NULL) { printf("The malloc is error.\n"); goto Err; } memset(temp_data_len, 0 , DHDR_LEN); memcpy(temp_data_len,pdata + pos_read,read_tail); memcpy(temp_data_len + read_tail,pdata + phdr->phead,DHDR_LEN - read_tail); //memcpy(&data_len,pdata + pos_read,read_tail); //memcpy(&data_len + read_tail,pdata + phdr->phead,DHDR_LEN - read_tail); memcpy(&data_len,temp_data_len,DHDR_LEN); free(temp_data_len); temp_data_len = NULL; *ptr = (char *)malloc(data_len); if(*ptr == NULL) { perror("malloc error"); goto Err; } // 复制数据内容 memcpy(*ptr,pdata + phdr->phead + DHDR_LEN - read_tail,data_len); } else // 数据长度在循环队列尾端 { // 复制数据长度 memcpy(&data_len,pdata + pos_read,DHDR_LEN); *ptr = (char *)malloc(data_len); if(*ptr == NULL) { perror("malloc error"); goto Err; } // 数据分布在循环队列两端 if((read_tail - DHDR_LEN) < data_len) { // 复制数据 memcpy(*ptr,pdata + pos_read + DHDR_LEN,read_tail - DHDR_LEN); memcpy(*ptr + read_tail - DHDR_LEN,pdata + phdr->phead,data_len - read_tail + DHDR_LEN); } else // 数据在循环队列尾端 { // 复制数据 memcpy(*ptr,pdata + pos_read + DHDR_LEN,data_len); } } //need lock if(lock(phdr->semid,0) < 0) { printf("lock: semid = %d\n",phdr->semid); return -1; } phdr->pread = (pos_read + DHDR_LEN + data_len) % phdr->data_size; /* 队列读指针偏移 */ printf("Pread = %d\n",phdr->pread); phdr->data_num -= 1; /* 队列数据个数减1 */ unlock(phdr->semid,0); }else{ printf("Pread == Pwrite:\t%d\n",pos_read); } //unlock(phdr->semid,0); *len = data_len; return data_len;Err: //unlock(phdr->semid,0); return -1;}/************************************************* * Function : shmq_read_all() * Parameter : * in : none * out : none * Return : * failed : none * success : none * Description : 循环队列读所有数据**************************************************/int shmq_read_all(ShmQueue *shmque,char **ptr,int *len){ int read_tail = 0; int data_len = 0; char *pdata = NULL; CQHdr *phdr = NULL; // modify by sunwake int pos_read =0; int pos_write =0; int read_data_num =0; *ptr = NULL; if(shmque->sq_magic != SHMQ_MAGIC) { printf("Error: EBDF\n"); return -1; } phdr = shmque->sq_hdr; /* 队列头 */ pdata = shmque->sq_data; /* 队列体 */ if(lock(phdr->semid,0) < 0) { printf("lock: semid = %d\n",phdr->semid); return -1; } //modify by sunwake pos_read = phdr->pread; pos_write = phdr->pwrite; read_data_num = phdr->data_num; unlock(phdr->semid,0); ////printf("Pread: %d\tPwrite: %d\n",pos_read, pos_write); // 直接拷贝数据 if(pos_read < pos_write) { data_len = pos_write - pos_read;//糙数据,每条记录前包含长度 //debug //printf("pos_read(%d)===(%d/%d)===>>>pos_write(%d);\n",pos_read,\ data_len,read_data_num,pos_write); *ptr = (char *)malloc(data_len); if(*ptr == NULL) { perror("malloc error"); goto Err; } // 复制数据内容 memcpy(*ptr,pdata + pos_read,data_len); // need lock if(lock(phdr->semid,0) < 0) { printf("lock: semid = %d\n",phdr->semid); return -1; } phdr->pread = pos_write; /* 队列读指针偏移 */ phdr->data_num -= read_data_num; /* 队列数据个数减去已读个数 */ unlock(phdr->semid,0); } else if(pos_read > pos_write) { read_tail = phdr->ptail - pos_read; ////printf("剩余%d个空间\n", read_tail); data_len = phdr->data_size + pos_write - pos_read;//糙数据,每条记录前包含长度 //debug //printf("pos_read(%d)<<<===(%d/%d)===pos_write(%d);\n",pos_read,\ data_len,read_data_num,pos_write); *ptr = (char *)malloc(data_len); if(*ptr == NULL) { perror("malloc error"); goto Err; } // 复制数据内容 memcpy(*ptr,pdata + pos_read,read_tail); memcpy(*ptr+read_tail,pdata + phdr->phead,data_len - read_tail); //need lock if(lock(phdr->semid,0) < 0) { printf("lock: semid = %d\n",phdr->semid); return -1; } phdr->pread = pos_write; /* 队列读指针偏移 */ ////printf("Pread = %d\n",read_data_num); phdr->data_num -= read_data_num; /* 队列数据个数减read_data_num */ unlock(phdr->semid,0); }else{ ////printf("Pread == Pwrite:\t%d\n",pos_read); //debug //printf("pos_read(%d)===(%d/%d)===pos_write(%d);\n",pos_read,\ data_len,read_data_num,pos_write); } if(*ptr){//整合糙数据 size_t elem_len =0; char *pdata_len = *ptr; //char *pdata_end = pdata_end; char *pdata_end = pdata_len; assert( DHDR_LEN <= sizeof(size_t)); while(pdata_len < (*ptr + data_len) ){ memcpy(&elem_len,pdata_len,DHDR_LEN);// must be small endian //memcpy(pdata_end,pdata_end+DHDR_LEN,elem_len);//it doesn`t matter that the dest and the src is overlap memcpy(pdata_end,pdata_len+DHDR_LEN,elem_len);//it doesn`t matter that the dest and the src is overlap pdata_end += elem_len; pdata_len += (elem_len+DHDR_LEN); } data_len = pdata_end - *ptr; } //unlock(phdr->semid,0); *len = data_len; return data_len;Err: //unlock(phdr->semid,0); return -1;}/************************************************* * Function : shmq_write() * Parameter : * in : none * out : none * Return : * failed : none * success : none * Description : 循环队列写数据**************************************************/int shmq_write(ShmQueue *shmque,const char *ptr,short len){ int space_len = 0; int write_tail = 0; char *pdata = NULL; CQHdr *phdr = NULL; if(shmque->sq_magic != SHMQ_MAGIC) { printf("Error: EBDF\n"); return -1; } if(len < 0) { printf("Error: len = %d\n",len); return -1; } phdr = shmque->sq_hdr; /* 队列头 */ pdata = shmque->sq_data; /* 队列体 */ if(lock(phdr->semid,0) < 0) { printf("lock: semid = %d\n",phdr->semid); return -1; } // 计算循环队列剩余长度 if(phdr->pread > phdr->pwrite) { space_len = phdr->pread - phdr->pwrite; } else { space_len = phdr->data_size - (phdr->pwrite - phdr->pread); } // 空1个字节作为循环队列满的标志 if(space_len < (DHDR_LEN + len + 1)) { printf("The queue is full.\n"); goto Err; } // 直接拷贝数据 if((DHDR_LEN + len) <= (phdr->ptail - phdr->pwrite)) { // 复制数据长度 memcpy(pdata + phdr->pwrite,&len,DHDR_LEN); // 复制数据 memcpy(pdata + phdr->pwrite + DHDR_LEN,ptr,len); } else { write_tail = phdr->ptail - phdr->pwrite; // 数据长度拷贝到循环队列两端 if(write_tail < DHDR_LEN) { char* temp_data_len = NULL; temp_data_len = (char*) malloc(DHDR_LEN); if (temp_data_len == NULL) { printf("The malloc is error.\n"); goto Err; } memset(temp_data_len, 0 , DHDR_LEN); memcpy(temp_data_len, &len, DHDR_LEN); memcpy(pdata + phdr->pwrite,temp_data_len,write_tail); memcpy(pdata + phdr->phead,temp_data_len + write_tail,DHDR_LEN - write_tail); free(temp_data_len); temp_data_len = NULL; // 复制数据长度 // memcpy(pdata + phdr->pwrite,&len,write_tail); // memcpy(pdata + phdr->phead,&len + write_tail,DHDR_LEN - write_tail); // 复制数据 memcpy(pdata + phdr->phead + DHDR_LEN - write_tail,ptr,len); } else // 数据内容拷贝到循环队列两端 { // 复制数据长度 memcpy(pdata + phdr->pwrite,&len,DHDR_LEN); // 复制数据 memcpy(pdata + phdr->pwrite + DHDR_LEN,ptr,write_tail - DHDR_LEN); // memcpy(pdata + phdr->phead,ptr + len + DHDR_LEN - write_tail,len + DHDR_LEN - write_tail); memcpy(pdata + phdr->phead,ptr - DHDR_LEN + write_tail,len + DHDR_LEN - write_tail); } } phdr->pwrite = (phdr->pwrite + DHDR_LEN + len) % phdr->data_size; /* 偏移循环队列写指针 */ phdr->data_num += 1; /* 循环队列数据个数增1 */ unlock(phdr->semid,0); return 0;Err: unlock(phdr->semid,0); return -1;}/************************************************* * Function : map_init() * Parameter : * in : file_name,file_size; * out : none * Return : * failed : none * success : none * Description : 初始化共享内存**************************************************/char *map_init(const char *file_name,const int file_size){ int fd; key_t semkey; char *shm_addr = NULL; bool init_flag = true; fd = open(file_name,O_RDWR); if(fd < 0) { printf("打开文件失败,创建文件\n"); if(errno != ENOENT) { perror("open error"); return NULL; } // 文件不存在则创建 fd = open(file_name,O_RDWR | O_CREAT,0777); if(fd < 0) { perror("open error"); return NULL; } // 截断文件长度 if(ftruncate(fd,file_size) < 0) { perror("ftruncate error"); return NULL; } init_flag = false; /* 共享内存未初始化 */ } // 映射共享内存 shm_addr = (char *)mmap(NULL,file_size,PROT_READ | PROT_WRITE,MAP_SHARED,fd,0); if(shm_addr == MAP_FAILED) { close(fd); perror("mmap error"); return NULL; } close(fd); // 初始化共享内存头结构 if(!init_flag) { CQHdr *cqhdr = (CQHdr *)shm_addr; cqhdr->data_num = 0; cqhdr->data_size = file_size - sizeof(CQHdr); cqhdr->pread = 0; cqhdr->pwrite = 0; cqhdr->phead = 0; cqhdr->ptail = cqhdr->data_size; // 初始化键值 if((semkey = ftok(file_name,0)) == -1) { perror("ftok error"); return NULL; } // 初始化信号灯 if((cqhdr->semid = sem_init(semkey,1)) < 0) { printf("sem_init: semkey = 0x%x\n",semkey); return NULL; } } else { printf("没有初始化共享内存头结构,没有初始化信号灯\n"); } return shm_addr; }/************************************************* * Function : map_exit() * Parameter : * in : none * out : none * Return : * failed : none * success : none * Description : 销毁共享内存**************************************************/int map_exit(char *shm_addr,int shm_size){ if(munmap(shm_addr,shm_size) != 0) { return -1; } return 0;}/************************************************* * Function : lock() * Parameter : * in : none * out : none * Return : * failed : none * success : none * Description : 加锁**************************************************/int sem_init(const key_t semkey,const int nsems){ int i,semid; // 打开已有信号灯 semid = semget(semkey,0,0); if(semid < 0) { // 创建新信号灯 semid = semget(semkey,nsems,IPC_CREAT | SVSEM_MODE); if(semid < 0) { perror("semget error"); return -1; } union semun init_value; init_value.val = 1; // 初始化信号灯值 for(i = 0; i < nsems; i++) { if(semctl(semid,i,SETVAL,init_value) < 0) { perror("semctl error"); return -1; } } } return semid;}/************************************************* * Function : lock() * Parameter : * in : none * out : none * Return : * failed : none * success : none * Description : 加锁**************************************************/int lock(const int semid,const int sem_num){ struct sembuf sbuf; if(semid < 0 || sem_num < 0) { printf("Error: semid = %d; sem_num = %d;\n",semid,sem_num); return -1; } sbuf.sem_num = sem_num; sbuf.sem_op = -1; sbuf.sem_flg = SEM_UNDO; if(semop(semid,&sbuf,1) == -1) { perror("semop error"); return -1; } return 0;}/************************************************* * Function : unlock() * Parameter : * in : none * out : none * Return : * failed : none * success : none * Description : 解锁**************************************************/int unlock(const int semid,const int sem_num){ struct sembuf sbuf; if(semid < 0 || sem_num < 0) { printf("Error: semid = %d; sem_num = %d;\n",semid,sem_num); return -1; } sbuf.sem_num = sem_num; sbuf.sem_op = 1; sbuf.sem_flg = SEM_UNDO; if(semop(semid,&sbuf,1) == -1) { perror("semop error"); return -1; } return 0;}