diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..afcaefc --- /dev/null +++ b/Makefile @@ -0,0 +1,9 @@ +default: + gcc -g -Wall -o cds_demo memory_pool.c queue.c handler_common.c handler_receive.c handler_resend_epoll.c handler_process.c main.c -lpthread + #gcc -g -Wall -o cds_demo memory_pool.c queue.c handler_common.c handler_receive.c handler_resend.c handler_process.c main.c -lpthread + +memtest: + gcc -o memtest memtest.c -lpthread + +memory_pool: memory_pool.c queue.c + gcc -o memory_pool memory_pool.c queue.c -lpthread -DMEMORY_POOL_TEST diff --git a/handler_common.c b/handler_common.c new file mode 100644 index 0000000..e8b2e03 --- /dev/null +++ b/handler_common.c @@ -0,0 +1,155 @@ +/** + * @brief + * @author Ye Shengnan + * @date 2015-08-17 created + */ + +#include +#include +#include +#include +#include +#include "handler_common.h" + + +void* queue_get_block(queue_t *queue, int wait) +{ + uint8_t *queue_buf; + int queue_buf_size; + void *buf; + for (; ;) + { + if (queue_get_readbuf(queue, &queue_buf, &queue_buf_size) != 0) + { + if (wait == 0) + return NULL; + //printf("read block to from FAILED!\n"); + usleep(20000); + continue; + } + + memcpy(&buf, queue_buf, sizeof(void *)); + queue_read_complete(queue); + break; + } + return buf; +} + + +int queue_put_block(queue_t *queue, void *block, int wait) +{ + uint8_t *queue_buf; + int queue_buf_size = sizeof(void *); + for (; ;) + { + if (queue_get_writebuf(queue, &queue_buf, &queue_buf_size) != 0) + { + if (wait == 0) + return -1; + //printf("write block to queue FAILED!\n"); + usleep(20000); + continue; + } + + memcpy(queue_buf, &block, sizeof(void *)); + queue_write_complete(queue, queue_buf, sizeof(void *), 0); + return 0; + } +} + + +void* memory_pool_get_block(memory_pool_t *memory_pool, int wait) +{ + void *block; + for (; ;) + { + block = memory_pool_alloc(memory_pool); + if (block == NULL) + { + if (wait == 0) + return NULL; + usleep(20000); + continue; + } + return block; + } +} + + +int memory_pool_put_block(memory_pool_t *memory_pool, void *block, int wait) +{ + memory_pool_free(memory_pool, block); + return 0; +} + + + +struct _fd_map +{ + int max_fd; + void **fds; +}; + + +fd_map_t* fd_map_create(int max_fd) +{ + fd_map_t *h = (fd_map_t *)malloc(sizeof(fd_map_t)); + if (h == NULL) + { + return NULL; + } + memset(h, 0, sizeof(*h)); + + h->max_fd = max_fd; + h->fds = malloc(sizeof(void *) * (max_fd + 1)); + if (h->fds == NULL) + { + free(h); + return NULL; + } + memset(h->fds, 0, sizeof(void *) * (max_fd + 1)); + return h; +} + + +void fd_map_destroy(fd_map_t *h) +{ + free(h->fds); + free(h); +} + + +int fd_map_add(fd_map_t *h, int fd, void *info) +{ + if (fd < 0 || fd > h->max_fd) + { + printf("[add] fd [%d] exceed max_fd [%d]\n", fd, h->max_fd); + return -1; + } + h->fds[fd] = info; + return 0; +} + + +int fd_map_remove(fd_map_t *h, int fd) +{ + if (fd < 0 || fd > h->max_fd) + { + printf("[remove] fd [%d] exceed max_fd [%d]\n", fd, h->max_fd); + return -1; + } + h->fds[fd] = NULL; + return 0; +} + + +void* fd_map_get(fd_map_t *h, int fd) +{ + if (fd < 0 || fd > h->max_fd) + { + printf("[get] fd [%d] exceed max_fd [%d]\n", fd, h->max_fd); + return NULL; + } + return h->fds[fd]; +} + diff --git a/handler_common.h b/handler_common.h new file mode 100644 index 0000000..aaa92a5 --- /dev/null +++ b/handler_common.h @@ -0,0 +1,63 @@ +/** + * @brief + * @author Ye Shengnan + * @date 2015-08-17 created + */ + +#ifndef __HANDLER_COMMON_H__ +#define __HANDLER_COMMON_H__ + +#ifdef __cplusplus +extern "C" { +#endif + + +#include "queue.h" +#include "memory_pool.h" + +typedef void* (*callback_in_t)(void *param, int wait); +typedef int (*callback_out_t)(void *param, void *block, int wait); + +typedef struct _handler handler_t; +struct _handler +{ + int (*start)(handler_t *h); + void (*destroy)(handler_t *h); + + void (*set_callback_in)(handler_t *h, callback_in_t callback, void *param); + void (*set_callback_out)(handler_t *h, callback_out_t callback, void *param); +}; + +void* queue_get_block(queue_t *queue, int wait); +int queue_put_block(queue_t *queue, void *block, int wait); + +void* memory_pool_get_block(memory_pool_t *memory_pool, int wait); +int memory_pool_put_block(memory_pool_t *memory_pool, void *block, int wait); + + +#include +#include + +typedef struct _source_info +{ + int fd; + void *info_resend; + //TODO: add other infos +} source_info_t; + +typedef struct _fd_map fd_map_t; + +fd_map_t* fd_map_create(int max_fd); +void fd_map_destroy(fd_map_t *h); + +int fd_map_add(fd_map_t *h, int fd, void *info); +int fd_map_remove(fd_map_t *h, int fd); +void* fd_map_get(fd_map_t *h, int fd); + + +#ifdef __cplusplus +} +#endif + +#endif // __HANDLER_COMMON_H__ + diff --git a/handler_process.c b/handler_process.c new file mode 100644 index 0000000..3d62f46 --- /dev/null +++ b/handler_process.c @@ -0,0 +1,115 @@ +/** + * @brief + * @author Ye Shengnan + * @date 2015-08-17 created + */ +#include +#include +#include +#include +#include +#include +#include "handler_process.h" + +typedef struct _handler_process +{ + handler_t handler; + int block_size; + + callback_in_t cb_in; + callback_out_t cb_out; + void *param_in; + void *param_out; + + pthread_t thrd; + int exit_flag; +} handler_process_t; + + +static void* thread_proc(void *arg) +{ + handler_process_t *h = (handler_process_t *)arg; + void *block; + int32_t socket_fd; + int32_t data_size; + void *p_data; + for (; ;) + { + block = h->cb_in(h->param_in, 1); + socket_fd = *(int32_t *)block; + data_size = *(int32_t *)(block + 4); + //memcpy(&socket_fd, block, 4); + //memcpy(&data_size, block + 4, 4); + p_data = block + 8; + + //TODO: do some process + + h->cb_out(h->param_out, block, 1); + } + return NULL; +} + + +static int process_start(handler_t *h) +{ + handler_process_t *h_process = (handler_process_t *)h; + if (h_process->thrd != 0) + { + printf("[%d] already started\n", __LINE__); + return -1; + } + if (pthread_create(&h_process->thrd, NULL, thread_proc, h) != 0) + { + printf("[%d] pthread_create FAILED!\n", __LINE__); + return -1; + } + return 0; +} + + +static void process_destroy(handler_t *h) +{ + handler_process_t *h_process = (handler_process_t *)h; + if (h_process->thrd != 0) + { + h_process->exit_flag = 1; + while (h_process->exit_flag == 1) + usleep(10000); + } + free(h); +} + + +static void process_set_callback_in(handler_t *h, callback_in_t callback, void *param) +{ + handler_process_t *h_process = (handler_process_t *)h; + h_process->cb_in = callback; + h_process->param_in = param; +} + + +static void process_set_callback_out(handler_t *h, callback_out_t callback, void *param) +{ + handler_process_t *h_process = (handler_process_t *)h; + h_process->cb_out = callback; + h_process->param_out = param; +} + + +handler_t* handler_process_create(int block_size) +{ + handler_process_t *h_process = (handler_process_t *)malloc(sizeof(handler_process_t)); + if (h_process == NULL) + { + printf("malloc FAILED!\n"); + return NULL; + } + memset(h_process, 0, sizeof(*h_process)); + h_process->block_size = block_size; + h_process->handler.start = process_start; + h_process->handler.destroy = process_destroy; + h_process->handler.set_callback_in = process_set_callback_in; + h_process->handler.set_callback_out = process_set_callback_out; + return &h_process->handler; +} + diff --git a/handler_process.h b/handler_process.h new file mode 100644 index 0000000..09dcc5a --- /dev/null +++ b/handler_process.h @@ -0,0 +1,25 @@ +/** + * @brief + * @author Ye Shengnan + * @date 2015-08-17 created + */ + +#ifndef __HANDLER_PROCESS_H__ +#define __HANDLER_PROCESS_H__ + +#ifdef __cplusplus +extern "C" { +#endif + + +#include "handler_common.h" + +handler_t* handler_process_create(int block_size); + + +#ifdef __cplusplus +} +#endif + +#endif // __HANDLER_PROCESS_H__ + diff --git a/handler_receive.c b/handler_receive.c new file mode 100644 index 0000000..068e089 --- /dev/null +++ b/handler_receive.c @@ -0,0 +1,421 @@ +/** + * @brief + * @author Ye Shengnan + * @date 2015-08-17 created + */ +#include +#include +#include +#include +#include +#include +#include "handler_receive.h" + +#include +#include +#include +#include +#include +#include + +#define MAXEVENTS 64 + +typedef struct _handler_receive +{ + handler_t handler; + int block_size; + fd_map_t *fd_map_source; + handler_receive_event_t on_event; + + callback_in_t cb_in; + callback_out_t cb_out; + void *param_in; + void *param_out; + + pthread_t thrd; + int exit_flag; + + struct epoll_event *events; + int sfd; + int efd; +} handler_receive_t; + + +static int create_and_bind(const char *port) +{ + struct addrinfo hints; + struct addrinfo *result, *rp; + int s, sfd; + + memset(&hints, 0, sizeof(struct addrinfo)); + hints.ai_family = AF_UNSPEC; /* Return IPv4 and IPv6 choices */ + hints.ai_socktype = SOCK_STREAM; /* We want a TCP socket */ + hints.ai_flags = AI_PASSIVE; /* All interfaces */ + + s = getaddrinfo(NULL, port, &hints, &result); + if (s != 0) + { + printf("getaddrinfo: %s\n", gai_strerror(s)); + return -1; + } + + for (rp = result; rp != NULL; rp = rp->ai_next) + { + sfd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol); + if (sfd == -1) + continue; + + s = bind(sfd, rp->ai_addr, rp->ai_addrlen); + if (s == 0) + { + //We managed to bind successfully! + break; + } + + close(sfd); + } + + if (rp == NULL) + { + printf("Could not bind\n"); + return -1; + } + + freeaddrinfo(result); + return sfd; +} + + +static int make_socket_non_blocking (int sfd) +{ + int flags, s; + + //得到文件状态标志 + flags = fcntl(sfd, F_GETFL, 0); + if (flags == -1) + { + printf("fnctl F_GETFL got error: %s", strerror(errno)); + return -1; + } + + //设置文件状态标志 + flags |= O_NONBLOCK; + s = fcntl(sfd, F_SETFL, flags); + if (s == -1) + { + printf("fnctl F_SETFL got error: %s", strerror(errno)); + return -1; + } + + return 0; +} + + +static void* thread_proc(void *arg) +{ + handler_receive_t *h_receive = (handler_receive_t *)arg; + void *block = NULL; + int32_t read_size; + void *p_data; + struct epoll_event event; + int s; + int n, i; + + for (; ;) + { + n = epoll_wait(h_receive->efd, h_receive->events, MAXEVENTS, -1); + for (i = 0; i < n; i++) + { + if ((h_receive->events[i].events & EPOLLERR) || (h_receive->events[i].events & EPOLLHUP) || + (!(h_receive->events[i].events & EPOLLIN))) + { + //An error has occured on this fd, or the socket is not ready for reading (why were we notified then?) + printf("receive epoll error\n"); + close (h_receive->events[i].data.fd); + continue; + } + else if (h_receive->sfd == h_receive->events[i].data.fd) + { + //We have a notification on the listening socket, which means one or more incoming connections. + for (; ;) + { + struct sockaddr in_addr; + socklen_t in_len; + int infd; + char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV]; + + in_len = sizeof(in_addr); + infd = accept(h_receive->sfd, &in_addr, &in_len); + if (infd == -1) + { + if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) + { + //We have processed all incoming connections. + break; + } + else + { + printf("accept error:%s\n", strerror(errno)); + break; + } + } + + //将地址转化为主机名或者服务名, flag参数:以数字名返回主机地址和服务地址 + s = getnameinfo(&in_addr, in_len, hbuf, sizeof(hbuf), sbuf, sizeof(sbuf), + NI_NUMERICHOST | NI_NUMERICSERV); + if (s == 0) + { + //printf("Accepted connection on descriptor %d " "(host=%s, port=%s)\n", infd, hbuf, sbuf); + } + + //Make the incoming socket non-blocking and add it to the list of fds to monitor. + s = make_socket_non_blocking(infd); + if (s == -1) + { + //abort (); + close(infd); + continue; + } + + event.data.fd = infd; + event.events = EPOLLIN | EPOLLET; + s = epoll_ctl(h_receive->efd, EPOLL_CTL_ADD, infd, &event); + if (s == -1) + { + printf("epoll_ctl error:%s\n", strerror(errno)); + //abort (); + close(infd); + continue; + } + + source_info_t *info = (source_info_t *)malloc(sizeof(source_info_t)); + if (info == NULL) + { + close(infd); + continue; + } + memset(info, 0, sizeof(*info)); + info->fd = infd; + if (fd_map_add(h_receive->fd_map_source, infd, info) != 0) + { + free(info); + close(infd); + continue; + } + + h_receive->on_event(0, infd); + } + } + else + { + //We have data on the fd waiting to be read. Read and display it. + //We must read whatever data is available completely, + //as we are running in edge-triggered mode and won't get a notification again for the same data. + int done = 0; + int size; + block = NULL; + + for (; ;) + { + if (block == NULL) + { + block = h_receive->cb_in(h_receive->param_in, 1); + p_data = block + 8; + read_size = 0; + } + + size = read(h_receive->events[i].data.fd, p_data + read_size, h_receive->block_size - read_size - 8); + if (size == -1) + { + //If errno == EAGAIN, that means we have read all data. So go back to the main loop. + if (errno != EAGAIN) + { + perror ("read"); + done = 1; + } + break; + } + else if (size == 0) + { + //End of file. The remote has closed the connection. + done = 1; + break; + } + + read_size += size; + if (read_size >= h_receive->block_size - 8) + { + *(int32_t *)block = h_receive->events[i].data.fd; + *(int32_t *)(block + 4) = read_size; + //memcpy(block, &h_receive->events[i].data.fd, 4); + //memcpy(block + 4, &read_size, 4); + //printf("aaa %d\n", read_size); + h_receive->cb_out(h_receive->param_out, block, 1); + block = NULL; + } + } + + if (block != NULL && read_size > 0) + { + *(int32_t *)block = h_receive->events[i].data.fd; + *(int32_t *)(block + 4) = read_size; + //memcpy(block, &h_receive->events[i].data.fd, 4); + //memcpy(block + 4, &read_size, 4); + //printf("bbb %d\n", read_size); + h_receive->cb_out(h_receive->param_out, block, 1); + } + + if (done) + { + //TODO: 只close就可以吗?????? + + //printf("Closed connection on descriptor %d\n", h_receive->events[i].data.fd); + + //Closing the descriptor will make epoll remove it from the set of descriptors which are monitored. + int fd = h_receive->events[i].data.fd; + + //TODO !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! +#if 0 + source_info_t *info = fd_map_get(h_receive->fd_map_source, fd); + if (fd_map_remove(h_receive->fd_map_source, fd) != 0) + { + } + h_receive->on_event(1, fd); + free(info); +#endif + close(fd); + } + } + } + } + + return NULL; +} + + +static int receive_start(handler_t *h) +{ + handler_receive_t *h_receive = (handler_receive_t *)h; + if (h_receive->thrd != 0) + { + printf("[%d] already started\n", __LINE__); + return -1; + } + if (pthread_create(&h_receive->thrd, NULL, thread_proc, h) != 0) + { + printf("[%d] pthread_create FAILED!\n", __LINE__); + return -1; + } + return 0; +} + + +static void receive_destroy(handler_t *h) +{ + handler_receive_t *h_receive = (handler_receive_t *)h; + if (h_receive->thrd != 0) + { + h_receive->exit_flag = 1; + while (h_receive->exit_flag == 1) + usleep(10000); + } + if (h_receive->sfd != 0) + close(h_receive->sfd); + if (h_receive->efd != 0) + close(h_receive->efd); + if (h_receive->events != NULL) + free(h_receive->events); + free(h); +} + + +static void receive_set_callback_in(handler_t *h, callback_in_t callback, void *param) +{ + handler_receive_t *h_receive = (handler_receive_t *)h; + h_receive->cb_in = callback; + h_receive->param_in = param; +} + + +static void receive_set_callback_out(handler_t *h, callback_out_t callback, void *param) +{ + handler_receive_t *h_receive = (handler_receive_t *)h; + h_receive->cb_out = callback; + h_receive->param_out = param; +} + + +handler_t* handler_receive_create(int block_size, int port, fd_map_t *fd_map_source, handler_receive_event_t on_event) +{ + handler_receive_t *h_receive; + char port_str[16]; + struct epoll_event event; + int s; + + h_receive = (handler_receive_t *)malloc(sizeof(handler_receive_t)); + if (h_receive == NULL) + { + printf("malloc FAILED!\n"); + return NULL; + } + memset(h_receive, 0, sizeof(*h_receive)); + h_receive->block_size = block_size; + h_receive->handler.start = receive_start; + h_receive->handler.destroy = receive_destroy; + h_receive->handler.set_callback_in = receive_set_callback_in; + h_receive->handler.set_callback_out = receive_set_callback_out; + h_receive->fd_map_source = fd_map_source; + h_receive->on_event = on_event; + + snprintf(port_str, 16, "%d", port); + h_receive->sfd = create_and_bind(port_str); + if (h_receive->sfd == -1) + { + goto FAIL; + } + + s = make_socket_non_blocking(h_receive->sfd); + if (s == -1) + { + goto FAIL; + } + + s = listen(h_receive->sfd, SOMAXCONN); + if (s == -1) + { + printf("listen error:%s\n", strerror(errno)); + goto FAIL; + } + + //除了参数size被忽略外,此函数和epoll_create完全相同 + h_receive->efd = epoll_create1(0); + if (h_receive->efd == -1) + { + printf("epoll_create1 error:%s\n", strerror(errno)); + goto FAIL; + } + + event.data.fd = h_receive->sfd; + event.events = EPOLLIN | EPOLLET;//读入,边缘触发方式 + s = epoll_ctl(h_receive->efd, EPOLL_CTL_ADD, h_receive->sfd, &event); + if (s == -1) + { + printf("epoll_ctl error:%s\n", strerror(errno)); + goto FAIL; + } + + //Buffer where events are returned + h_receive->events = (struct epoll_event *)malloc(MAXEVENTS * sizeof(event)); + if (h_receive->events == NULL) + { + goto FAIL; + } + //is this needed? memset(h_receive->events, 0, MAXEVENTS * sizeof(event)); + + return &h_receive->handler; + +FAIL: + receive_destroy(&h_receive->handler); + return NULL; +} + diff --git a/handler_receive.h b/handler_receive.h new file mode 100644 index 0000000..5d94feb --- /dev/null +++ b/handler_receive.h @@ -0,0 +1,28 @@ +/** + * @brief + * @author Ye Shengnan + * @date 2015-08-17 created + */ + +#ifndef __HANDLER_RECEIVE_H__ +#define __HANDLER_RECEIVE_H__ + +#ifdef __cplusplus +extern "C" { +#endif + + +#include "handler_common.h" + +//type: 0->connected, 1->disconnected +typedef void (*handler_receive_event_t)(int type, int fd); + +handler_t* handler_receive_create(int block_size, int port, fd_map_t *fd_map_source, handler_receive_event_t on_event); + + +#ifdef __cplusplus +} +#endif + +#endif // __HANDLER_RECEIVE_H__ + diff --git a/handler_resend.c b/handler_resend.c new file mode 100644 index 0000000..c8cc002 --- /dev/null +++ b/handler_resend.c @@ -0,0 +1,239 @@ +/** + * @brief + * @author Ye Shengnan + * @date 2015-08-17 created + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include "handler_resend.h" + +#include +#include +#include +#include +#include +#include +#include + +#define MAXEVENTS 64 +#define MAXCLIENTS 32 + +typedef struct _handler_resend +{ + handler_t handler; + int block_size; + fd_map_t *fd_map_source; + + callback_in_t cb_in; + callback_out_t cb_out; + void *param_in; + void *param_out; + + pthread_t thrd; + int exit_flag; + + int out_count; +} handler_resend_t; + +typedef struct _client_t +{ + struct sockaddr_in send2; + int fdsend2; + void *block_addr; + int block_pos; +} client_t; + +typedef struct _info_resend +{ + //这里简单起见,使用数组,后续应改成链表 + client_t clients[MAXCLIENTS]; + int client_count; +} info_resend_t; + + +static void* thread_proc(void *arg) +{ + handler_resend_t *h_resend = (handler_resend_t *)arg; + void *block = NULL; + int write_size; + source_info_t *info; + void *p_data; + int size; + int32_t socket_fd; + int32_t data_size; + int i; + + for (; ;) + { + block = h_resend->cb_in(h_resend->param_in, 1); + socket_fd = *(int32_t *)block; + data_size = *(int32_t *)(block + 4); + //memcpy(&socket_fd, block, 4); + //memcpy(&data_size, block + 4, 4); + p_data = block + 8; + info = fd_map_get(h_resend->fd_map_source, socket_fd); + + for (i = 0; i < ((info_resend_t *)info->info_resend)->client_count; i++) + { + client_t *client = &((info_resend_t *)info->info_resend)->clients[i]; + write_size = 0; + for (; ;) + { + size = sendto(client->fdsend2, p_data + write_size, data_size, 0, + (struct sockaddr *)&client->send2, sizeof(struct sockaddr_in)); + if (size < 0) + { + break; + } + + write_size += size; + if (write_size >= data_size) + { + break; + } + } + } + + h_resend->cb_out(h_resend->param_out, block, 1); + } + + return NULL; +} + + +static int resend_start(handler_t *h) +{ + handler_resend_t *h_resend = (handler_resend_t *)h; + if (h_resend->thrd != 0) + { + printf("[%d] already started\n", __LINE__); + return -1; + } + if (pthread_create(&h_resend->thrd, NULL, thread_proc, h) != 0) + { + printf("[%d] pthread_create FAILED!\n", __LINE__); + return -1; + } + return 0; +} + + +static void resend_destroy(handler_t *h) +{ + handler_resend_t *h_resend = (handler_resend_t *)h; + if (h_resend->thrd != 0) + { + h_resend->exit_flag = 1; + while (h_resend->exit_flag == 1) + usleep(10000); + } + free(h); +} + + +static void resend_set_callback_in(handler_t *h, callback_in_t callback, void *param) +{ + handler_resend_t *h_resend = (handler_resend_t *)h; + h_resend->cb_in = callback; + h_resend->param_in = param; +} + + +static void resend_set_callback_out(handler_t *h, callback_out_t callback, void *param) +{ + handler_resend_t *h_resend = (handler_resend_t *)h; + h_resend->cb_out = callback; + h_resend->param_out = param; +} + + +handler_t* handler_resend_create(int block_size, fd_map_t *fd_map_source) +{ + handler_resend_t *h_resend; + + h_resend = (handler_resend_t *)malloc(sizeof(handler_resend_t)); + if (h_resend == NULL) + { + printf("malloc FAILED!\n"); + return NULL; + } + memset(h_resend, 0, sizeof(*h_resend)); + + h_resend->block_size = block_size; + h_resend->fd_map_source = fd_map_source; + h_resend->handler.start = resend_start; + h_resend->handler.destroy = resend_destroy; + h_resend->handler.set_callback_in = resend_set_callback_in; + h_resend->handler.set_callback_out = resend_set_callback_out; + + return &h_resend->handler; + +//FAIL: +// resend_destroy(&h_resend->handler); +// return NULL; +} + + +void handler_resend_on_event(handler_t *h, int type, int fd) +{ + handler_resend_t *h_resend = (handler_resend_t *)h; + source_info_t *info; + + info = fd_map_get(h_resend->fd_map_source, fd); + + if (type == 0) + { + info->info_resend = (info_resend_t *)malloc(sizeof(info_resend_t)); + if (info->info_resend == NULL) + { + } + memset(info->info_resend, 0, sizeof(sizeof(info_resend_t))); + + if (((info_resend_t *)info->info_resend)->client_count > MAXCLIENTS) + return; + + client_t *client; + int i; + for (i = 0; i < 2; i++) //添加两个用做测试 + { + uint32_t dest_ip; + uint16_t dest_port; + + //测试方便起见,转发到固定端口. + dest_port = htons(10000 + i * 10000 + fd); + + //这里将少数流转发到外部机器,用来检验正确性,其他发到本机 + if (h_resend->out_count < 2) + { + dest_ip = inet_addr("172.16.15.89"); + printf("=================================> stream send to 172.16.15.89 port:%d\n", 10000 + i * 10000 + fd); + h_resend->out_count += 1; + } + else + dest_ip = inet_addr("127.0.0.1"); + + client = &((info_resend_t *)info->info_resend)->clients[i]; + client->send2.sin_family = AF_INET; + client->send2.sin_addr.s_addr = dest_ip; + client->send2.sin_port = dest_port; + + client->fdsend2 = socket(AF_INET, SOCK_DGRAM, 0); + if (client->fdsend2 < 0) + { + } + ((info_resend_t *)info->info_resend)->client_count += 1; + } + } + else if (type == 1) + { + //TODO !!!!!! + } +} + diff --git a/handler_resend.h b/handler_resend.h new file mode 100644 index 0000000..6622977 --- /dev/null +++ b/handler_resend.h @@ -0,0 +1,27 @@ +/** + * @brief + * @author Ye Shengnan + * @date 2015-08-17 created + */ + +#ifndef __HANDLER_RESEND_H__ +#define __HANDLER_RESEND_H__ + +#ifdef __cplusplus +extern "C" { +#endif + + +#include "handler_common.h" + +handler_t* handler_resend_create(int block_size, fd_map_t *fd_map_source); + +void handler_resend_on_event(handler_t *h, int type, int fd); + + +#ifdef __cplusplus +} +#endif + +#endif // __HANDLER_RESEND_H__ + diff --git a/handler_resend_epoll.c b/handler_resend_epoll.c new file mode 100644 index 0000000..fd887a2 --- /dev/null +++ b/handler_resend_epoll.c @@ -0,0 +1,611 @@ +/** + * @brief + * @author Ye Shengnan + * @date 2015-08-17 created + */ +#include +#include +#include +#include +#include +#include +#include +#include +#include "handler_resend.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +#define MAXEVENTS 64 +#define MAX_FD 65536 +#define MAXCLIENTS 4096 +#define PKG_MAX 8192 + +//#define UDP + +typedef struct _handler_resend +{ + handler_t handler; + int block_size; + fd_map_t *fd_map_in; + fd_map_t *fd_map_out; + + callback_in_t cb_in; + callback_out_t cb_out; + void *param_in; + void *param_out; + + pthread_t thrd; + int exit_flag; + + int out_count; + + struct epoll_event *events; + int sfd; + int efd; + + int fdin_1; + int fdin_2; +} handler_resend_t; + +typedef struct _client_t +{ +#ifdef UDP + struct sockaddr_in send2; + int fdsend2; +#endif + void *block_addr; + int block_index; + int block_size; + int block_pos; + + source_info_t *p_srcinfo; +} client_t; + +typedef struct _info_resend +{ + //这里简单起见,使用数组,后续应改成链表 + client_t clients[MAXCLIENTS]; + int client_count; + + //简单起见使用数组,应改成链表,否则设置小了可能导致所有client被阻塞 + void *pkgs[PKG_MAX]; + int pkg_head; + int pkg_tail; +} info_resend_t; + + +static int create_and_bind(const char *port) +{ + struct addrinfo hints; + struct addrinfo *result, *rp; + int s, sfd; + + memset(&hints, 0, sizeof(struct addrinfo)); + hints.ai_family = AF_UNSPEC; /* Return IPv4 and IPv6 choices */ + hints.ai_socktype = SOCK_STREAM; /* We want a TCP socket */ + hints.ai_flags = AI_PASSIVE; /* All interfaces */ + + s = getaddrinfo(NULL, port, &hints, &result); + if (s != 0) + { + printf("getaddrinfo: %s\n", gai_strerror(s)); + return -1; + } + + for (rp = result; rp != NULL; rp = rp->ai_next) + { + sfd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol); + if (sfd == -1) + continue; + + s = bind(sfd, rp->ai_addr, rp->ai_addrlen); + if (s == 0) + { + //We managed to bind successfully! + break; + } + + close(sfd); + } + + if (rp == NULL) + { + printf("Could not bind\n"); + return -1; + } + + freeaddrinfo(result); + return sfd; +} + + +static int make_socket_non_blocking (int sfd) +{ + int flags, s; + + //得到文件状态标志 + flags = fcntl(sfd, F_GETFL, 0); + if (flags == -1) + { + printf("fnctl F_GETFL got error: %s", strerror(errno)); + return -1; + } + + //设置文件状态标志 + flags |= O_NONBLOCK; + s = fcntl(sfd, F_SETFL, flags); + if (s == -1) + { + printf("fnctl F_SETFL got error: %s", strerror(errno)); + return -1; + } + + return 0; +} + + +static void* thread_proc(void *arg) +{ + handler_resend_t *h_resend = (handler_resend_t *)arg; + void *block = NULL; + int write_size; + source_info_t *info; + info_resend_t *info_resend; + int n, i, k; + struct epoll_event event; + int s; + int32_t socket_fd; + int32_t data_size; + + for (; ;) + { + n = epoll_wait(h_resend->efd, h_resend->events, MAXEVENTS, -1); + for (i = 0; i < n; i++) + { + if ((h_resend->events[i].events & EPOLLERR) || (h_resend->events[i].events & EPOLLHUP)) + { + //An error has occured on this fd, or the socket is not ready for reading (why were we notified then?) + printf("resend epoll error\n"); + close (h_resend->events[i].data.fd); + fd_map_remove(h_resend->fd_map_out, h_resend->events[i].data.fd); + continue; + } + else if (h_resend->sfd == h_resend->events[i].data.fd) + { + //We have a notification on the listening socket, which means one or more incoming connections. + for (; ;) + { + struct sockaddr in_addr; + socklen_t in_len; + int infd; + char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV]; + + in_len = sizeof(in_addr); + infd = accept(h_resend->sfd, &in_addr, &in_len); + if (infd == -1) + { + if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) + { + //We have processed all incoming connections. + break; + } + else + { + printf("accept error:%s\n", strerror(errno)); + break; + } + } + + //将地址转化为主机名或者服务名, flag参数:以数字名返回主机地址和服务地址 + s = getnameinfo(&in_addr, in_len, hbuf, sizeof(hbuf), sbuf, sizeof(sbuf), + NI_NUMERICHOST | NI_NUMERICSERV); + if (s == 0) + { + printf("AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA Accepted connection on descriptor %d " "(host=%s, port=%s)\n", infd, hbuf, sbuf); + } + + //Make the incoming socket non-blocking and add it to the list of fds to monitor. + s = make_socket_non_blocking(infd); + if (s == -1) + { + //abort (); + close(infd); + continue; + } + + event.data.fd = infd; + //event.events = EPOLLIN | EPOLLET; + event.events = EPOLLOUT; + s = epoll_ctl(h_resend->efd, EPOLL_CTL_ADD, infd, &event); + if (s == -1) + { + printf("epoll_ctl error:%s\n", strerror(errno)); + //abort (); + close(infd); + continue; + } + + //TODO: 根据请求的信息,将这个client分配到某个source. + //这里测试简单起见, 记录下第一、第二个source, 随机分配到这两个 + source_info_t *info; + if (atoi(sbuf) % 2 == 0) + info = fd_map_get(h_resend->fd_map_in, h_resend->fdin_1); + else + info = fd_map_get(h_resend->fd_map_in, h_resend->fdin_2); + + + if (((info_resend_t *)info->info_resend)->client_count > MAXCLIENTS) + { + close(infd); + continue; + } + client_t *client; + client = &((info_resend_t *)info->info_resend)->clients[((info_resend_t *)info->info_resend)->client_count]; + ((info_resend_t *)info->info_resend)->client_count += 1; + client->block_addr = NULL; + client->block_pos = 0; + client->block_size = 0; + client->block_index = -1; + client->p_srcinfo = info; + + if (fd_map_add(h_resend->fd_map_out, infd, client) != 0) + { + free(info); + close(infd); + continue; + } + } + } + else + { + for (; ;) + { + block = h_resend->cb_in(h_resend->param_in, 0); + if (block == NULL) + break; + socket_fd = *(int32_t *)block; + data_size = *(int32_t *)(block + 4); + info = fd_map_get(h_resend->fd_map_in, socket_fd); + info_resend = (info_resend_t *)info->info_resend; + + if (info_resend->client_count <= 0) + { + h_resend->cb_out(h_resend->param_out, block, 1); + continue; + } + + if ((info_resend->pkg_tail + 1) % PKG_MAX == info_resend->pkg_head) + { + //printf("aaaaaaaaaaaaaaaaaaaaaaa %d, %d\n", info_resend->pkg_head, info_resend->pkg_tail); + break; + } + info_resend->pkgs[info_resend->pkg_tail] = block; + info_resend->pkg_tail = (info_resend->pkg_tail + 1) % PKG_MAX; + //printf("bbbbbb %d, %d, %d\n", info_resend->pkg_head, info_resend->pkg_tail, data_size); + } + + client_t *client = fd_map_get(h_resend->fd_map_out, h_resend->events[i].data.fd); + info = client->p_srcinfo; + info_resend = (info_resend_t *)info->info_resend; + + if (client->block_index < 0) + { + if (info_resend->pkg_head != info_resend->pkg_tail) + { + client->block_index = info_resend->pkg_head; + client->block_addr = info_resend->pkgs[info_resend->pkg_head] + 8; + client->block_size = *(int32_t *)(info_resend->pkgs[info_resend->pkg_head] + 4); + client->block_pos = 0; + //printf("UUUUUUUUUUUUUUUUUUUU %d\n", client->block_size); + } + else + { + //printf("AAAAAAAAAAAAAAAAAAAA\n"); + usleep(100000); + //没有数据可以发送 + continue; + } + } + else if (client->block_pos >= client->block_size) + { + if ((client->block_index + 1) % PKG_MAX != info_resend->pkg_tail) + { + //检查此block是否是head,并且没有其他client在使用,是的话向下移动 + if (client->block_index == info_resend->pkg_head) + { + int used = 0; + for (k = 0; k < info_resend->client_count; k++) + { + client_t *c = &info_resend->clients[k]; + if (c == client) + continue; + if (c->block_index == info_resend->pkg_head) + { + used = 1; + break; + } + } + if (used == 0) + { + info_resend->pkg_head = (info_resend->pkg_head + 1) % PKG_MAX; + //printf("cccccc %d, %d\n", info_resend->pkg_head, info_resend->pkg_tail); + } + } + + client->block_index = (client->block_index + 1) % PKG_MAX; + //printf("UUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUU %d, %d, %d\n", + // client->block_index, info_resend->pkg_head, info_resend->pkg_tail); + client->block_addr = info_resend->pkgs[client->block_index] + 8; + client->block_size = *(int32_t *)(info_resend->pkgs[client->block_index] + 4); + client->block_pos = 0; + } + else + { + //printf("BBBBBBBBBBBBBBBBBBBB\n"); + usleep(100000); + //没有数据可以发送 + continue; + } + } + +#ifdef UDP + write_size = sendto(client->fdsend2, client->block_addr + client->block_pos, + client->block_size - client->block_pos, 0, + (struct sockaddr *)&client->send2, sizeof(struct sockaddr_in)); + //printf("CCCCCCCCCCCCCCCCCCCC %p, %d, %d, %d\n", client, client->block_size, client->block_pos, write_size); +#else + write_size = send(h_resend->events[i].data.fd, client->block_addr + client->block_pos, + client->block_size - client->block_pos, 0); + //printf("DDDDDDDDDDDDDDDDDDDD %p, %d, %d, %d\n", client, client->block_size, client->block_pos, write_size); +#endif + + if (write_size <= 0) + { + continue; + } + + client->block_pos += write_size; + } + } + } + + return NULL; +} + + +static int resend_start(handler_t *h) +{ + handler_resend_t *h_resend = (handler_resend_t *)h; + if (h_resend->thrd != 0) + { + printf("[%d] already started\n", __LINE__); + return -1; + } + if (pthread_create(&h_resend->thrd, NULL, thread_proc, h) != 0) + { + printf("[%d] pthread_create FAILED!\n", __LINE__); + return -1; + } + return 0; +} + + +static void resend_destroy(handler_t *h) +{ + handler_resend_t *h_resend = (handler_resend_t *)h; + if (h_resend->thrd != 0) + { + h_resend->exit_flag = 1; + while (h_resend->exit_flag == 1) + usleep(10000); + } + free(h_resend->events); + close(h_resend->efd); + fd_map_destroy(h_resend->fd_map_out); + free(h); +} + + +static void resend_set_callback_in(handler_t *h, callback_in_t callback, void *param) +{ + handler_resend_t *h_resend = (handler_resend_t *)h; + h_resend->cb_in = callback; + h_resend->param_in = param; +} + + +static void resend_set_callback_out(handler_t *h, callback_out_t callback, void *param) +{ + handler_resend_t *h_resend = (handler_resend_t *)h; + h_resend->cb_out = callback; + h_resend->param_out = param; +} + + +handler_t* handler_resend_create(int block_size, fd_map_t *fd_map_in) +{ + handler_resend_t *h_resend; + char port_str[16]; + struct epoll_event event; + int s; + + h_resend = (handler_resend_t *)malloc(sizeof(handler_resend_t)); + if (h_resend == NULL) + { + printf("malloc FAILED!\n"); + return NULL; + } + memset(h_resend, 0, sizeof(*h_resend)); + + h_resend->block_size = block_size; + h_resend->fd_map_in = fd_map_in; + h_resend->handler.start = resend_start; + h_resend->handler.destroy = resend_destroy; + h_resend->handler.set_callback_in = resend_set_callback_in; + h_resend->handler.set_callback_out = resend_set_callback_out; + + //除了参数size被忽略外,此函数和epoll_create完全相同 + h_resend->efd = epoll_create1(0); + if (h_resend->efd == -1) + { + printf("epoll_create1 error:%s\n", strerror(errno)); + goto FAIL; + } + + + snprintf(port_str, 16, "%d", 7000); //TODO !!!!!! + h_resend->sfd = create_and_bind(port_str); + if (h_resend->sfd == -1) + { + goto FAIL; + } + + s = make_socket_non_blocking(h_resend->sfd); + if (s == -1) + { + goto FAIL; + } + + s = listen(h_resend->sfd, SOMAXCONN); + if (s == -1) + { + printf("listen error:%s\n", strerror(errno)); + goto FAIL; + } + + //除了参数size被忽略外,此函数和epoll_create完全相同 + h_resend->efd = epoll_create1(0); + if (h_resend->efd == -1) + { + printf("epoll_create1 error:%s\n", strerror(errno)); + goto FAIL; + } + + event.data.fd = h_resend->sfd; + event.events = EPOLLIN | EPOLLET;//读入,边缘触发方式 + s = epoll_ctl(h_resend->efd, EPOLL_CTL_ADD, h_resend->sfd, &event); + if (s == -1) + { + printf("epoll_ctl error:%s\n", strerror(errno)); + goto FAIL; + } + + + //Buffer where events are returned + h_resend->events = (struct epoll_event *)malloc(MAXEVENTS * sizeof(struct epoll_event)); + if (h_resend->events == NULL) + { + goto FAIL; + } + + h_resend->fd_map_out = fd_map_create(MAX_FD); + if (h_resend->fd_map_out == NULL) + { + goto FAIL; + } + + return &h_resend->handler; + +FAIL: + resend_destroy(&h_resend->handler); + return NULL; +} + + +void handler_resend_on_event(handler_t *h, int type, int fd) +{ + handler_resend_t *h_resend = (handler_resend_t *)h; + source_info_t *info; + + info = fd_map_get(h_resend->fd_map_in, fd); + + if (type == 0) + { + info->info_resend = (info_resend_t *)malloc(sizeof(info_resend_t)); + if (info->info_resend == NULL) + { + } + memset(info->info_resend, 0, sizeof(sizeof(info_resend_t))); + +#ifdef UDP + if (((info_resend_t *)info->info_resend)->client_count > MAXCLIENTS) + return; + + client_t *client; + int i; + for (i = 0; i < 2; i++) //添加两个用做测试 + { + uint32_t dest_ip; + uint16_t dest_port; + + //测试方便起见,转发到固定端口. + dest_port = htons(10000 + i * 10000 + fd); + + //这里将少数流转发到外部机器,用来检验正确性,其他发到本机 + if (h_resend->out_count < 2) + { + dest_ip = inet_addr("172.16.15.89"); + printf("=================================> stream send to 172.16.15.89 port:%d\n", 10000 + i * 10000 + fd); + h_resend->out_count += 1; + } + else + dest_ip = inet_addr("127.0.0.1"); + + client = &((info_resend_t *)info->info_resend)->clients[i]; + client->send2.sin_family = AF_INET; + client->send2.sin_addr.s_addr = dest_ip; + client->send2.sin_port = dest_port; + client->p_srcinfo = info; + + client->block_addr = NULL; + client->block_pos = 0; + client->block_size = 0; + client->block_index = -1; + + client->fdsend2 = socket(AF_INET, SOCK_DGRAM, 0); + if (client->fdsend2 < 0) + { + } + ((info_resend_t *)info->info_resend)->client_count += 1; + + + struct epoll_event event; + event.data.fd = client->fdsend2; + //event.events = EPOLLOUT | EPOLLET; + event.events = EPOLLOUT; + int s = epoll_ctl(h_resend->efd, EPOLL_CTL_ADD, client->fdsend2, &event); + if (s == -1) + { + //printf("epoll_ctl error:%s\n", strerror(errno)); + ////abort (); + //close(client->fdsend2); + //continue; + } + + if (fd_map_add(h_resend->fd_map_out, client->fdsend2, client) != 0) + { + //free(info); + //close(infd); + //continue; + } + } +#else + if (h_resend->fdin_1 == 0) + h_resend->fdin_1 = fd; + else if (h_resend->fdin_2 == 0) + h_resend->fdin_2 = fd; +#endif + } + else if (type == 1) + { + //TODO !!!!!! + } +} + diff --git a/handler_resend_epoll.h b/handler_resend_epoll.h new file mode 100644 index 0000000..42526ca --- /dev/null +++ b/handler_resend_epoll.h @@ -0,0 +1,27 @@ +/** + * @brief + * @author Ye Shengnan + * @date 2015-08-17 created + */ + +#ifndef __HANDLER_RESEND_H__ +#define __HANDLER_RESEND_H__ + +#ifdef __cplusplus +extern "C" { +#endif + + +#include "handler_common.h" + +handler_t* handler_resend_create(int block_size, fd_map_t *fd_map); + +void handler_resend_on_event(handler_t *h, int type, int fd); + + +#ifdef __cplusplus +} +#endif + +#endif // __HANDLER_RESEND_H__ + diff --git a/main.c b/main.c new file mode 100644 index 0000000..b8af7e5 --- /dev/null +++ b/main.c @@ -0,0 +1,131 @@ +/** + * @brief + * @author Ye Shengnan + * @date 2015-08-17 created + */ + +#include +#include +#include +#include +#include +#include "handler_common.h" +#include "handler_receive.h" +#include "handler_process.h" +#include "handler_resend.h" +#include "memory_pool.h" +#include "queue.h" + +#define MAX_FD 65536 +//#define BLOCK_SIZE 1536 +#define BLOCK_SIZE 12288 +#define MAX_BLOCKS (1024 * 128) + +static fd_map_t *m_fd_map_source = NULL; +handler_t *handler_receive; +handler_t *handler_process; +handler_t *handler_resend; + + +static void on_receive_event(int type, int fd) +{ + //source_info_t *info; + + //type: 0->connected, 1->disconnected + if (type == 0) + { + //info = (source_info_t *)malloc(sizeof(source_info_t)); + //if (info == NULL) + //{ + // return; + //} + //memset(info, 0, sizeof(*info)); + //info->fd = fd; + + //if (fd_map_add(m_fd_map_source, fd, info) != 0) + //{ + //} + + handler_resend_on_event(handler_resend, type, fd); + } + else if (type == 1) + { + handler_resend_on_event(handler_resend, type, fd); + + //info = fd_map_get(m_fd_map_source, fd); + + //if (fd_map_remove(m_fd_map_source, fd) != 0) + //{ + //} + + //free(info); + } +} + + +int main() +{ + memory_pool_t *memory_pool = NULL; + queue_t *queue_received = NULL; + uint8_t *queue_received_buf = NULL; + queue_t *queue_sended = NULL; + uint8_t *queue_sended_buf = NULL; + int queue_buf_size = MAX_BLOCKS * 24 * 2 + 512; + + m_fd_map_source = fd_map_create(MAX_FD); + if (m_fd_map_source == NULL) + { + } + + queue_received_buf = (uint8_t *)malloc(queue_buf_size); + if (queue_received_buf == NULL) + { + } + queue_received = queue_create(queue_received_buf, queue_buf_size, 0); + if (queue_received == NULL) + { + } + + queue_sended_buf = (uint8_t *)malloc(queue_buf_size); + if (queue_sended_buf == NULL) + { + } + queue_sended = queue_create(queue_sended_buf, queue_buf_size, 0); + if (queue_sended == NULL) + { + } + + memory_pool = memory_pool_create(BLOCK_SIZE, MAX_BLOCKS); + if (memory_pool == NULL) + { + } + + handler_receive = handler_receive_create(BLOCK_SIZE, 6000, m_fd_map_source, on_receive_event); + if (handler_receive == NULL) + { + } + handler_receive->set_callback_in(handler_receive, (callback_in_t)memory_pool_get_block, memory_pool); + handler_receive->set_callback_out(handler_receive, (callback_out_t)queue_put_block, queue_received); + + handler_resend = handler_resend_create(BLOCK_SIZE, m_fd_map_source); + if (handler_resend == NULL) + { + } + handler_resend->set_callback_in(handler_resend, (callback_in_t)queue_get_block, queue_received); + handler_resend->set_callback_out(handler_resend, (callback_out_t)queue_put_block, queue_sended); + + handler_process = handler_process_create(BLOCK_SIZE); + if (handler_process == NULL) + { + } + handler_process->set_callback_in(handler_process, (callback_in_t)queue_get_block, queue_sended); + handler_process->set_callback_out(handler_process, (callback_out_t)memory_pool_put_block, memory_pool); + + handler_receive->start(handler_receive); + handler_resend->start(handler_resend); + handler_process->start(handler_process); + + getchar(); + return 0; +} + diff --git a/memory_pool.c b/memory_pool.c new file mode 100644 index 0000000..d5c720c --- /dev/null +++ b/memory_pool.c @@ -0,0 +1,273 @@ +/** + * @brief + * @author Ye Shengnan + * @date 2015-08-17 created + */ +#include +#include +#include +#include +#include +#include +#include "queue.h" +#include "memory_pool.h" + +#define ARRAY_BLOCKS 512 + +struct _memory_pool +{ + int block_size; + int max_blocks; + + int arrays_count; + int arrays_allocated; + void **p_arrays; + + void* unused_blocks[ARRAY_BLOCKS]; + int unused_blocks_count; + + queue_t *queue; + uint8_t *queue_buf; +}; + + +memory_pool_t* memory_pool_create(int block_size, int max_blocks) +{ + memory_pool_t *h = (memory_pool_t *)malloc(sizeof(memory_pool_t)); + if (h == NULL) + { + printf("malloc FAILED! %d\n", (int)sizeof(memory_pool_t)); + return NULL; + } + memset(h, 0, sizeof(*h)); + + h->block_size = block_size; + h->arrays_count = ((max_blocks + ARRAY_BLOCKS - 1) / ARRAY_BLOCKS); + h->max_blocks = h->arrays_count * ARRAY_BLOCKS; + + h->p_arrays = malloc(h->arrays_count * sizeof(void *)); + if (h->p_arrays == NULL) + { + goto FAIL; + } + memset(h->p_arrays, 0, h->arrays_count * sizeof(void *)); + + int queue_buf_size = h->max_blocks * (16 + 8) + 256; + h->queue_buf = (uint8_t *)malloc(queue_buf_size); + if (h->queue_buf == NULL) + { + printf("malloc FAILED! %d\n", queue_buf_size); + goto FAIL; + } + + h->queue = queue_create(h->queue_buf, queue_buf_size, 0); + if (h->queue == NULL) + goto FAIL; + + return h; + +FAIL: + memory_pool_destroy(h); + return NULL; +} + + +void memory_pool_destroy(memory_pool_t *h) +{ + int i; + for (i = 0; i < h->arrays_count; i++) + free(h->p_arrays[i]); + free(h->p_arrays); + + free(h->queue_buf); + free(h); +} + + +void* memory_pool_alloc(memory_pool_t *h) +{ + void *block; + uint8_t *queue_buf; + int queue_buf_size; + + if (queue_get_readbuf(h->queue, &queue_buf, &queue_buf_size) == 0) + { + block = *(void **)queue_buf; + //memcpy(&block, queue_buf, sizeof(void *)); + queue_read_complete(h->queue); + return block; + } + + if (h->unused_blocks_count > 0) + { + h->unused_blocks_count -= 1; + return h->unused_blocks[h->unused_blocks_count]; + } + + if (h->arrays_allocated < h->arrays_count) + { + void *array = malloc(h->block_size * ARRAY_BLOCKS); + int i; + if (array == NULL) + { + printf("malloc FAILED! %d\n", h->block_size * ARRAY_BLOCKS); + return NULL; + } + for (i = 1; i < ARRAY_BLOCKS; i++) + h->unused_blocks[i] = array + i * h->block_size; + h->unused_blocks_count = ARRAY_BLOCKS - 1; + + h->p_arrays[h->arrays_allocated] = array; + h->arrays_allocated += 1; + return array; + } + + //printf("max blocks [%d] reached\n", h->max_blocks); + return NULL; +} + + +int memory_pool_free(memory_pool_t *h, void *block) +{ + uint8_t *queue_buf; + int queue_buf_size = sizeof(void *); + + if (queue_get_writebuf(h->queue, &queue_buf, &queue_buf_size) != 0) + { + printf("write block to queue FAILED!\n"); + return -1; + } + + //memcpy(queue_buf, &block, sizeof(void *)); + *(void **)queue_buf = block; + queue_write_complete(h->queue, queue_buf, sizeof(void *), 0); + return 0; +} + + +#ifdef MEMORY_POOL_TEST +static queue_t *m_queue; +static memory_pool_t *m_pool; +static int64_t m_write_count = 0; +static int64_t m_check_count = 0; + +static void* thread_write(void *arg) +{ + uint8_t *buf; + int rnd; + int i; + + uint8_t *queue_buf; + int queue_buf_size = sizeof(void *); + + for (; ;) + { + buf = memory_pool_alloc(m_pool); + if (buf == NULL) + { + usleep(10); + continue; + } + + rnd = random() % 256; + for (i = 0; i < 1024; i++) + buf[i] = (rnd + i) % 256; + + + for (; ;) + { + if (queue_get_writebuf(m_queue, &queue_buf, &queue_buf_size) != 0) + { + //printf("write block to queue FAILED!\n"); + usleep(10); + continue; + } + + memcpy(queue_buf, &buf, sizeof(void *)); + queue_write_complete(m_queue, queue_buf, sizeof(void *), 0); + break; + } + + m_write_count += 1; + if (m_write_count % 10000 == 0) + printf("write %"PRId64" times\n", m_write_count); + } + + return NULL; +} + +static void* thread_check(void *arg) +{ + uint8_t *buf; + int rnd; + int i; + + uint8_t *queue_buf; + int queue_buf_size; + + for (; ;) + { + for (; ;) + { + if (queue_get_readbuf(m_queue, &queue_buf, &queue_buf_size) != 0) + { + //printf("read block to from FAILED!\n"); + usleep(10); + continue; + } + + memcpy(&buf, queue_buf, sizeof(void *)); + queue_read_complete(m_queue); + break; + } + + rnd = buf[0]; + for (i = 1; i < 1024; i++) + { + if (buf[i] != (rnd + i) % 256) + { + printf("check FAILED!\n"); + printf("write count:%"PRId64", check count:%"PRId64"\n", m_write_count, m_check_count); + exit(-1); + } + + } + + memory_pool_free(m_pool, buf); + + m_check_count += 1; + if (m_check_count % 10000 == 0) + printf("check %"PRId64" times\n", m_check_count); + } + + return NULL; +} + +int main() +{ + pthread_t thrd_write; + pthread_t thrd_check; + uint8_t *queue_buf; + + queue_buf = (uint8_t *)malloc(1024 * 5120); + m_queue = queue_create(queue_buf, 1024 * 5120, 0); + + m_pool = memory_pool_create(1024, 50000); + + if (pthread_create(&thrd_write, NULL, thread_write, 0) != 0) + { + printf("[%d] pthread_create FAILED!\n", __LINE__); + return -1; + } + + if (pthread_create(&thrd_check, NULL, thread_check, 0) != 0) + { + printf("[%d] pthread_create FAILED!\n", __LINE__); + return -1; + } + + getchar(); + printf("write count:%"PRId64", check count:%"PRId64"\n", m_write_count, m_check_count); + return 0; +} +#endif diff --git a/memory_pool.h b/memory_pool.h new file mode 100644 index 0000000..5af876e --- /dev/null +++ b/memory_pool.h @@ -0,0 +1,14 @@ +/** + * @brief + * @author Ye Shengnan + * @date 2015-08-17 created + */ + +typedef struct _memory_pool memory_pool_t; + +memory_pool_t* memory_pool_create(int block_size, int max_blocks); +void memory_pool_destroy(memory_pool_t *h); + +void* memory_pool_alloc(memory_pool_t *h); +int memory_pool_free(memory_pool_t *h, void *block); + diff --git a/memtest.c b/memtest.c new file mode 100644 index 0000000..d060a34 --- /dev/null +++ b/memtest.c @@ -0,0 +1,161 @@ +/** + * @brief memory test program for sunniwell streamqueue + * @author Ye Shengnan + * @date 2012-06-01 created + * + * Sunniwell media framework use a none block queue for media stream buffering. + * Generally, the feeding thread write a stream block and set the marker flag. + * When the reading thread found there is a stream block by checking the marker flag, + * it read the stream block content. The marker flag is set to volatile, but the whole + * stream buffer not. On some platform, the reading thread may read garbage data + * immediately after the feeding thread write. This program is designed to check this case. + */ + +#include +#include +#include +#include +#include + +#define TOTAL_SIZE (1024 * 1024 * 20) + +static pthread_t m_thrd_feed; +static pthread_t m_thrd_read; + +static char *m_mem; +static volatile uint32_t m_curr_num = 0; +static volatile uint32_t m_curr_pos = 0; +static volatile int m_feed_size = 0; +static volatile int m_feed_flag; +static volatile int m_read_flag; +static int64_t m_read_count = 0; +static int64_t m_failed_count = 0; + + +static void* thread_feed(void *arg) +{ + int i; + volatile uint32_t *p; + for (; ;) + { + if (m_read_flag != 1) + { + usleep(1); + continue; + } + m_read_flag = 0; + + m_feed_size = 1024 + random() % 16000; + if (m_curr_pos + m_feed_size * 4 > TOTAL_SIZE) + m_curr_pos = 0; + if (m_curr_num >= 0xFFFFFFFF - TOTAL_SIZE) + m_curr_num = 0; + p = (uint32_t *)(m_mem + m_curr_pos); + for (i = 0; i < m_feed_size; i++) + { + *p = m_curr_num + i * m_feed_size; + p++; + } + + m_feed_flag = 1; + } + + return NULL; +} + + +static void* thread_read(void *arg) +{ + int i; + volatile uint32_t *p; + uint32_t readed_data_1, readed_data_2; + for (; ;) + { + if (m_feed_flag != 1) + { + usleep(1); + continue; + } + m_feed_flag = 0; + +#if 0 + p = (uint32_t *)(m_mem + m_curr_pos); + for (i = 0; i < m_feed_size; i++) + { + readed_data_1 = *p; + if (readed_data_1 != m_curr_num + i * m_feed_size) + { + printf("[%d] AAAAAAAAAAAAAAAAAAAAAAAAA\n", __LINE__); + printf("[%d] read data invalid, read_count:%lld, failed_count:%lld\n", __LINE__, m_read_count, m_failed_count); + printf("[%d] curr_pos:%u, curr_num:0x%x, feed_size:%d, i:%d\n", __LINE__, m_curr_pos, m_curr_num, m_feed_size, i); + usleep(100000); + readed_data_2 = *p; + printf("[%d] readed_data_1:0x%x, readed_data_2:0x%x\n", __LINE__, readed_data_1, readed_data_2); + printf("[%d] BBBBBBBBBBBBBBBBBBBBBBBBB\n", __LINE__); + m_failed_count++; + } + p++; + } +#else + //check most recently writed first + p = (uint32_t *)(m_mem + m_curr_pos + (m_feed_size - 1) * 4); + for (i = m_feed_size - 1; i >= 0; i--) + { + readed_data_1 = *p; + if (readed_data_1 != m_curr_num + i * m_feed_size) + { + printf("[%d] AAAAAAAAAAAAAAAAAAAAAAAAA\n", __LINE__); + printf("[%d] read data invalid, read_count:%lld, failed_count:%lld\n", __LINE__, m_read_count, m_failed_count); + printf("[%d] curr_pos:%u, curr_num:0x%x, feed_size:%d, i:%d\n", __LINE__, m_curr_pos, m_curr_num, m_feed_size, i); + usleep(100000); + readed_data_2 = *p; + printf("[%d] readed_data_1:0x%x, readed_data_2:0x%x\n", __LINE__, readed_data_1, readed_data_2); + printf("[%d] BBBBBBBBBBBBBBBBBBBBBBBBB\n", __LINE__); + m_failed_count++; + } + p--; + } +#endif + + m_curr_pos += (m_feed_size * 4); + m_curr_num += m_feed_size; + + m_read_count++; + if (m_read_count % 5000 == 0) + printf("read count:%lld\n", m_read_count); + + m_read_flag = 1; + } + + return NULL; +} + + +int main() +{ + m_mem = (char *)malloc(TOTAL_SIZE); + if (m_mem == NULL) + { + printf("[%d] malloc FAILED!\n", __LINE__); + return -1; + } + + if (pthread_create(&m_thrd_feed, NULL, thread_feed, 0) != 0) + { + printf("[%d] pthread_create FAILED!\n", __LINE__); + return -1; + } + + if (pthread_create(&m_thrd_read, NULL, thread_read, 0) != 0) + { + printf("[%d] pthread_create FAILED!\n", __LINE__); + return -1; + } + + m_read_flag = 1; + m_feed_flag = 0; + + getchar(); + printf("[%d] read_count:%lld, failed_count:%lld\n", __LINE__, m_read_count, m_failed_count); + return 0; +} diff --git a/queue.c b/queue.c new file mode 100644 index 0000000..41b6dde --- /dev/null +++ b/queue.c @@ -0,0 +1,262 @@ +/** + * @brief + * @author Ye Shengnan + * @date 2015-08-17 created + +|-------- 4 bytes blocksize ---------------|-------------- 4 bytes offset ------------------| +|-------- 4 bytes playloadsize ------------|-------------- 4 bytes revered -----------------| +|-------------------------------------- playload -------------------------------------------| + */ + +#include +#include +#include +#include +#include +#include +#include "queue.h" + +struct _queue +{ + int size; + uint8_t *real_buf; + uint8_t *buf; + uint8_t *buf_end; + + uint8_t *head; + uint8_t *tail; + uint8_t *new_begin; + + int block; //空间不足时是否等待,默认为等待 + sem_t sem; +}; + +#define ALIGN 8 //必须为2^n +#define MEMBLOCK_HEADER_LEN 16 //内存块头信息长度 + +#define BLOCK_SIZE(buf) (*(int32_t *) buf) +#define PAYLOAD_OFFSET(buf) (*(uint32_t*)(buf + 4)) +#define PAYLOAD_SIZE(buf) (*(uint32_t*)(buf + 8)) + + +queue_t* queue_create(uint8_t *buf, int size, int block) +{ + queue_t *qobj; + printf("BEGIN..., size:%dk\n", size / 1024); + + qobj = (queue_t *)malloc(sizeof(queue_t)); + if (qobj == NULL) + goto FAIL; + memset(qobj, 0, sizeof(queue_t)); + + qobj->size = size; + qobj->real_buf = buf; + qobj->buf = (uint8_t *)((uint64_t)(buf + ALIGN - 1) & ~(uint64_t)(ALIGN - 1)); + qobj->buf_end = buf + size; + + qobj->head = qobj->buf; + qobj->tail = qobj->buf; + + qobj->block = block; + + if (sem_init(&qobj->sem, 0, 0) == -1) + { + printf("sem_init FAILED! %s", strerror(errno)); + goto FAIL; + } + + printf("OK!\n"); + return qobj; + +FAIL: + printf("FAILED!\n"); + if (qobj != NULL) + free(qobj); + return NULL; +} + + +void queue_destroy(queue_t *qobj) +{ + printf("qobj:0x%p\n", qobj); + if (qobj != NULL) + { + int val; + sem_getvalue(&qobj->sem, &val); + if (val < 1) + sem_post(&qobj->sem); + sem_destroy(&qobj->sem); + free(qobj); + } +} + + +int queue_record_newbegin(queue_t *qobj) +{ + printf("qobj:0x%p\n", qobj); + qobj->new_begin = qobj->tail; + + //ATTENTION + int val; + sem_getvalue(&qobj->sem, &val); + if (val < 1) + sem_post(&qobj->sem); + return 0; +} + + +int queue_reset(queue_t *qobj) +{ + printf("qobj:0x%p\n", qobj); + + if (qobj->new_begin == NULL) + { + qobj->head = qobj->tail; + } + else + { + qobj->head = qobj->new_begin; + qobj->new_begin = NULL; + } + + //ATTENTION + int val; + sem_getvalue(&qobj->sem, &val); + if (val < 1) + sem_post(&qobj->sem); + return 0; +} + + +int queue_get_writebuf(queue_t *qobj, uint8_t **buf, int *size) +{ + int need_size; + uint8_t *head; + + //ATTENTION + if (*size <= 0 || qobj == NULL) + return -1; + + need_size = MEMBLOCK_HEADER_LEN + *size + ALIGN - 1 + MEMBLOCK_HEADER_LEN/*为尾部预留一个头的位置*/; + + if (need_size > qobj->buf_end - qobj->buf) + { + int newsize; + need_size = qobj->buf_end - qobj->buf; + newsize = need_size - MEMBLOCK_HEADER_LEN - ALIGN + 1 - MEMBLOCK_HEADER_LEN; + printf("require size [%d] too big, truncated to %d!\n", *size, newsize); + *size = newsize; + } + +AGAIN: + head = qobj->head; + //从头部重新开始 + if (qobj->tail >= head && qobj->tail + need_size >= qobj->buf_end) + { + if (head == qobj->buf) + goto WOF; + + BLOCK_SIZE(qobj->tail) = -1; //用-1标记为需跳转至头 + qobj->tail = qobj->buf; + goto AGAIN; + } + + //==时不写入保证满时head != tail + if (qobj->tail < head && (qobj->tail + need_size >= head)) + goto WOF; + + if (qobj->new_begin != NULL) + { + if (qobj->tail < qobj->new_begin && (qobj->tail + need_size >= qobj->new_begin)) + { + printf("EEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEE\n"); + printf("cycle to new_begin, wait for strmqueue_reset...\n"); + goto WOF; + } + } + + *buf = qobj->tail + MEMBLOCK_HEADER_LEN; + return 0; + +WOF: //wait or fail + if (qobj->block && qobj->new_begin == NULL) + { + sem_wait(&qobj->sem); + goto AGAIN; + } + else + { + *buf = 0; + *size = 0; + return 0; + } +} + + +int queue_write_complete(queue_t *qobj, uint8_t *buf, int size, int skip) +{ + int block_size; + + if ( qobj == NULL ) + { + printf("stream queue is NULL\n"); + return -1; + } + if (buf != qobj->tail + MEMBLOCK_HEADER_LEN) + { + printf("mismatched GET_BUF and PUT_BUF\n"); + return -1; + } + + block_size = (MEMBLOCK_HEADER_LEN + size + skip + ALIGN - 1) & ~(ALIGN - 1); + BLOCK_SIZE(qobj->tail) = block_size; + PAYLOAD_OFFSET(qobj->tail) = MEMBLOCK_HEADER_LEN + skip; + PAYLOAD_SIZE(qobj->tail) = size; + qobj->tail += block_size; + return 0; +} + + +int queue_get_readbuf(queue_t *qobj, uint8_t **buf, int *size) +{ + if (qobj == NULL || qobj->head == qobj->tail) + goto NODATA; + + if (BLOCK_SIZE(qobj->head) == -1) //跳到开头 + { + qobj->head = qobj->buf; + if (qobj->head == qobj->tail) + goto NODATA; + } + + *buf = qobj->head + PAYLOAD_OFFSET(qobj->head); + *size = PAYLOAD_SIZE(qobj->head); + return 0; + +NODATA: + *size = 0; + return -1; +} + + +int queue_read_complete(queue_t *qobj) +{ + if (qobj == NULL) + { + printf("stream queue is null\n"); + return -1; + } + int block_size = BLOCK_SIZE(qobj->head); + qobj->head += block_size; + //TODO 被reset打断导致对tail错误修改的情况? + + if (qobj->block) + { + int val; + sem_getvalue(&qobj->sem, &val); + if (val < 1) + sem_post(&qobj->sem); + } + return 0; +} + diff --git a/queue.h b/queue.h new file mode 100644 index 0000000..8eea083 --- /dev/null +++ b/queue.h @@ -0,0 +1,35 @@ +/** + * @brief + * @author Ye Shengnan + * @date 2015-08-17 created + */ + +#ifndef __QUEUE_H__ +#define __QUEUE_H__ + +#ifdef __cplusplus +extern "C" { +#endif + + +typedef struct _queue queue_t; + +queue_t* queue_create(uint8_t *buf, int size, int block); +void queue_destroy(queue_t *qobj); + +int queue_record_newbegin(queue_t *qobj); +int queue_reset(queue_t *qobj); + +int queue_get_writebuf(queue_t *qobj, uint8_t **buf, int *size); +int queue_write_complete(queue_t *qobj, uint8_t *buf, int size, int skip); + +int queue_get_readbuf(queue_t *qobj, uint8_t **buf, int *size); +int queue_read_complete(queue_t *qobj); + + +#ifdef __cplusplus +} +#endif + +#endif // __QUEUE_H__ +