#include #include #include #include #include #include #include #include #include #include #define BUFFER_SIZE 1024 #define EVENT_LENGTH 512 typedef int (*IOCALLBACK)(int fd, int event, void* arg); enum { DEV_TYPE_MASTER = 0, DEV_TYPE_SLAVE, DEV_TYPE_APP }; struct io_conn_s { int fd; int dev_type; struct sockaddr_in sock_addr; char rbuf[BUFFER_SIZE]; int rc; char wbuf[BUFFER_SIZE]; int wc; int new; IOCALLBACK cb; }; typedef struct io_conn_s io_conn_t; struct io_block_s { io_conn_t* conns; struct io_block_s* next; }; typedef struct io_block_s io_block_t; struct io_reactor_s { int epfd; int blkcnt; int conn_cnt; io_block_t* block_head; struct sockaddr_in master_addr; struct sockaddr_in app_addr; int master_connected; int app_connected; }; typedef struct io_reactor_s io_reactor_t; int io_reactor_init(io_reactor_t* reactor) { if (!reactor) { return -1; } reactor->block_head = malloc(sizeof(io_block_t) + EVENT_LENGTH * sizeof(io_conn_t)); if (!reactor->block_head) { return -1; } reactor->block_head->conns = (io_conn_t*)(reactor->block_head + 1); reactor->blkcnt = 1; reactor->conn_cnt = 0; reactor->epfd = epoll_create(1); return 0; } void io_reactor_destroy(io_reactor_t* reactor) { if (!reactor) { return; } if (reactor->block_head) { free(reactor->block_head); } close(reactor->epfd); } int io_reactor_add_block(io_reactor_t* reactor) { if (!reactor) { return -1; } io_block_t* blk = reactor->block_head; while(blk->next) { blk = blk->next; } io_block_t* nblk = malloc(sizeof(io_block_t) + EVENT_LENGTH * sizeof(io_conn_t)); if (!nblk) { return -1; } nblk->conns = (io_conn_t*)(nblk + 1); nblk->next = NULL; blk->next = nblk; reactor->blkcnt++; return 0; } io_conn_t* io_reactor_connidx(io_reactor_t* reactor, int fd) { if (!reactor) { return NULL; } int blk_idx = fd / EVENT_LENGTH; while (reactor->blkcnt <= blk_idx) { io_reactor_add_block(reactor); } int i = 0; io_block_t* blk = reactor->block_head; for (size_t i = 0; i < blk_idx; i++) { blk = blk->next; } return &blk->conns[fd % EVENT_LENGTH]; } io_conn_t* io_reactor_connaddr(io_reactor_t* reactor, struct sockaddr_in addr) { if (!reactor) { return NULL; } size_t i = 0, j = 0; io_block_t* blk = reactor->block_head; for (i = 0; i < reactor->blkcnt; i++) { int cur_conn_nb = reactor->conn_cnt - i * EVENT_LENGTH; cur_conn_nb = (cur_conn_nb > EVENT_LENGTH) ? EVENT_LENGTH : cur_conn_nb; if (i == 0) { for (j = 3; j < cur_conn_nb + 3; j++) { if (!memcmp(&blk->conns[j].sock_addr, &addr, sizeof(struct sockaddr_in))) { return &blk->conns[j]; } } } else { for (j = 0; j < cur_conn_nb; j++) { if (!memcmp(&blk->conns[j].sock_addr, &addr, sizeof(struct sockaddr_in))) { return &blk->conns[j]; } } } blk = blk->next; } return &blk->conns[j]; } int io_init_server(short port) { int sock = socket(AF_INET, SOCK_DGRAM, 0); struct sockaddr_in sockaddr; memset(&sockaddr, 0, sizeof(sockaddr)); sockaddr.sin_port = htons(port); sockaddr.sin_family = AF_INET; sockaddr.sin_addr.s_addr = INADDR_ANY; if (bind(sock, (struct sockaddr*)&sockaddr, sizeof(sockaddr)) < 0) { printf("Bind error, port: %d\n", port); return -1; } // if (listen(sock, 10) < 0) { // printf("Listen error, port: %d\n", port); // return -1; // } return sock; } int io_recv_cb(int fd, int event, void* arg); int io_send_cb(int fd, int event, void* arg) { io_reactor_t* reactor = (io_reactor_t*)arg; io_conn_t* conn = io_reactor_connidx(reactor, fd); if (conn->wc > 0) { int ret = sendto(fd, conn->wbuf, conn->wc, 0, (struct sockaddr*)&conn->sock_addr, sizeof(struct sockaddr_in)); if (ret > 0) { conn->wc = 0; memset(conn->wbuf, 0, BUFFER_SIZE); } } conn->cb = io_recv_cb; struct epoll_event ev; ev.data.fd = fd; ev.events = EPOLLIN; epoll_ctl(reactor->epfd, EPOLL_CTL_MOD, fd, &ev); return 0; } int io_recv_cb(int fd, int event, void* arg) { io_reactor_t* reactor = (io_reactor_t*)arg; io_conn_t* conn = io_reactor_connidx(reactor, fd); struct sockaddr_in clientaddr; socklen_t addrlen = sizeof(clientaddr); memset(&clientaddr, 0, sizeof(clientaddr)); int ret = recvfrom(fd, conn->rbuf, BUFFER_SIZE, 0, (struct sockaddr*)&clientaddr, &addrlen); if (ret > 0) { conn->rc = ret; conn->rbuf[ret] = '\0'; printf("Received from %s:%d: %s\n", inet_ntoa(clientaddr.sin_addr), ntohs(clientaddr.sin_port), conn->rbuf); // Check message type and handle accordingly if (strncmp(conn->rbuf, "1382,1217,2382", 14) == 0) { // Master registration reactor->master_addr = clientaddr; reactor->master_connected = 1; printf("Master registered at %s:%d\n", inet_ntoa(clientaddr.sin_addr), ntohs(clientaddr.sin_port)); // If app is connected, forward master's message to app if (reactor->app_connected) { memcpy(conn->wbuf, conn->rbuf, ret); conn->wc = ret; conn->sock_addr = reactor->app_addr; conn->cb = io_send_cb; struct epoll_event ev; ev.data.fd = fd; ev.events = EPOLLOUT; epoll_ctl(reactor->epfd, EPOLL_CTL_MOD, fd, &ev); } } else if (strncmp(conn->rbuf, "1001,125,1382", 13) == 0) { // Slave message if (reactor->master_connected) { memcpy(conn->wbuf, conn->rbuf, ret); conn->wc = ret; conn->sock_addr = reactor->master_addr; conn->cb = io_send_cb; struct epoll_event ev; ev.data.fd = fd; ev.events = EPOLLOUT; epoll_ctl(reactor->epfd, EPOLL_CTL_MOD, fd, &ev); } else { printf("Warning: Master not connected, message not forwarded\n"); } } else if (strncmp(conn->rbuf, "2189,1223,1189", 14) == 0) { // App registration reactor->app_addr = clientaddr; reactor->app_connected = 1; printf("App registered at %s:%d\n", inet_ntoa(clientaddr.sin_addr), ntohs(clientaddr.sin_port)); // Forward to master if connected if (reactor->master_connected) { memcpy(conn->wbuf, conn->rbuf, ret); conn->wc = ret; conn->sock_addr = reactor->master_addr; conn->cb = io_send_cb; struct epoll_event ev; ev.data.fd = fd; ev.events = EPOLLOUT; epoll_ctl(reactor->epfd, EPOLL_CTL_MOD, fd, &ev); } else { printf("Warning: Master not connected, message not forwarded\n"); } } else { printf("Warning: Unknown message format received\n"); } } else if (ret == 0) { // Connection closed if (clientaddr.sin_addr.s_addr == reactor->master_addr.sin_addr.s_addr && clientaddr.sin_port == reactor->master_addr.sin_port) { reactor->master_connected = 0; printf("Master disconnected\n"); } else if (clientaddr.sin_addr.s_addr == reactor->app_addr.sin_addr.s_addr && clientaddr.sin_port == reactor->app_addr.sin_port) { reactor->app_connected = 0; printf("App disconnected\n"); } } return 0; } int io_set_listen(io_reactor_t* reactor, int fd, IOCALLBACK* cb) { if (!reactor) { return -1; } reactor->block_head->conns[fd].fd = fd; reactor->block_head->conns[fd].cb = cb; struct epoll_event ev; ev.data.fd = fd; ev.events = EPOLLIN; epoll_ctl(reactor->epfd, EPOLL_CTL_ADD, fd, &ev); } #if 1 uint32_t _pow(uint8_t m,uint8_t n) { uint32_t result=1; while(n--)result*=m; return result; } uint16_t get_Char_Position(char *istr, char ichar, uint16_t iposi) { uint16_t posit=0; uint16_t i; if(iposi == 0) return 0; for(i=0;*(istr+i) != '\0';i++) { if(*(istr+i) == ichar) posit++; if(posit == iposi) return (i); } return i; } float str_2_float(char *Instr ,char x1 ,uint16_t x1p ,char x2 ,uint16_t x2p) { uint16_t pi=0,pj=0; uint16_t i , j=0; float OutNum=0; pi = get_Char_Position(Instr, x1, x1p)+1; pj = get_Char_Position(Instr, x2, x2p)+1; for(i=pi;i<(pj-1);i++) { if( *(Instr+i) == '.' ) { if(j == 0) { j = 1; i++; } else return OutNum; } if(*(Instr+i) < '0' || *(Instr+i) > '9') return OutNum; if(j == 0) OutNum = OutNum*10.0 + *(Instr+i) - 48; else { OutNum += (*(Instr+i) -48)/(_pow(10,j)*1.0); j++; } } return OutNum; } #endif int main(int argc, char* argv[]) { if (argc < 2) { printf("Usage: %s \n", argv[0]); return -1; } short port = atoi(argv[1]); io_reactor_t reactor; memset(&reactor, 0, sizeof(reactor)); reactor.master_connected = 0; reactor.app_connected = 0; if (io_reactor_init(&reactor) < 0) { printf("Failed to initialize reactor\n"); return -1; } int sockfd = io_init_server(port); if (sockfd < 0) { printf("Failed to initialize server\n"); return -1; } io_conn_t* conn = io_reactor_connidx(&reactor, sockfd); conn->fd = sockfd; conn->cb = io_recv_cb; struct epoll_event ev; ev.data.fd = sockfd; ev.events = EPOLLIN; epoll_ctl(reactor.epfd, EPOLL_CTL_ADD, sockfd, &ev); printf("UDP server started on port %d\n", port); struct epoll_event evs[1024]; while (1) { int nready = epoll_wait(reactor.epfd, evs, 1024, -1); if (nready < 0) { perror("epoll_wait"); continue; } for (int i = 0; i < nready; i++) { int fd = evs[i].data.fd; io_conn_t* conn = io_reactor_connidx(&reactor, fd); if (evs[i].events & EPOLLIN) { conn->cb(fd, evs[i].events, &reactor); } else if (evs[i].events & EPOLLOUT) { conn->cb(fd, evs[i].events, &reactor); } } } io_reactor_destroy(&reactor); return 0; }