reactor.c 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441
  1. #include <stdio.h>
  2. #include <stdlib.h>
  3. #include <sys/socket.h>
  4. #include <sys/epoll.h>
  5. #include <netinet/in.h>
  6. #include <unistd.h>
  7. #include <memory.h>
  8. #include <errno.h>
  9. #include <string.h>
  10. #include <arpa/inet.h>
  11. #define BUFFER_SIZE 1024
  12. #define EVENT_LENGTH 512
  13. typedef int (*IOCALLBACK)(int fd, int event, void* arg);
  14. enum {
  15. DEV_TYPE_MASTER = 0,
  16. DEV_TYPE_SLAVE,
  17. DEV_TYPE_APP
  18. };
  19. struct io_conn_s {
  20. int fd;
  21. int dev_type;
  22. struct sockaddr_in sock_addr;
  23. char rbuf[BUFFER_SIZE];
  24. int rc;
  25. char wbuf[BUFFER_SIZE];
  26. int wc;
  27. int new;
  28. IOCALLBACK cb;
  29. };
  30. typedef struct io_conn_s io_conn_t;
  31. struct io_block_s {
  32. io_conn_t* conns;
  33. struct io_block_s* next;
  34. };
  35. typedef struct io_block_s io_block_t;
  36. struct io_reactor_s {
  37. int epfd;
  38. int blkcnt;
  39. int conn_cnt;
  40. io_block_t* block_head;
  41. struct sockaddr_in master_addr;
  42. struct sockaddr_in app_addr;
  43. int master_connected;
  44. int app_connected;
  45. };
  46. typedef struct io_reactor_s io_reactor_t;
  47. int io_reactor_init(io_reactor_t* reactor) {
  48. if (!reactor) {
  49. return -1;
  50. }
  51. reactor->block_head = malloc(sizeof(io_block_t) + EVENT_LENGTH * sizeof(io_conn_t));
  52. if (!reactor->block_head) {
  53. return -1;
  54. }
  55. reactor->block_head->conns = (io_conn_t*)(reactor->block_head + 1);
  56. reactor->blkcnt = 1;
  57. reactor->conn_cnt = 0;
  58. reactor->epfd = epoll_create(1);
  59. return 0;
  60. }
  61. void io_reactor_destroy(io_reactor_t* reactor) {
  62. if (!reactor) {
  63. return;
  64. }
  65. if (reactor->block_head) {
  66. free(reactor->block_head);
  67. }
  68. close(reactor->epfd);
  69. }
  70. int io_reactor_add_block(io_reactor_t* reactor) {
  71. if (!reactor) {
  72. return -1;
  73. }
  74. io_block_t* blk = reactor->block_head;
  75. while(blk->next) {
  76. blk = blk->next;
  77. }
  78. io_block_t* nblk = malloc(sizeof(io_block_t) + EVENT_LENGTH * sizeof(io_conn_t));
  79. if (!nblk) {
  80. return -1;
  81. }
  82. nblk->conns = (io_conn_t*)(nblk + 1);
  83. nblk->next = NULL;
  84. blk->next = nblk;
  85. reactor->blkcnt++;
  86. return 0;
  87. }
  88. io_conn_t* io_reactor_connidx(io_reactor_t* reactor, int fd) {
  89. if (!reactor) {
  90. return NULL;
  91. }
  92. int blk_idx = fd / EVENT_LENGTH;
  93. while (reactor->blkcnt <= blk_idx) {
  94. io_reactor_add_block(reactor);
  95. }
  96. int i = 0;
  97. io_block_t* blk = reactor->block_head;
  98. for (size_t i = 0; i < blk_idx; i++) {
  99. blk = blk->next;
  100. }
  101. return &blk->conns[fd % EVENT_LENGTH];
  102. }
  103. io_conn_t* io_reactor_connaddr(io_reactor_t* reactor, struct sockaddr_in addr) {
  104. if (!reactor) {
  105. return NULL;
  106. }
  107. size_t i = 0, j = 0;
  108. io_block_t* blk = reactor->block_head;
  109. for (i = 0; i < reactor->blkcnt; i++) {
  110. int cur_conn_nb = reactor->conn_cnt - i * EVENT_LENGTH;
  111. cur_conn_nb = (cur_conn_nb > EVENT_LENGTH) ? EVENT_LENGTH : cur_conn_nb;
  112. if (i == 0) {
  113. for (j = 3; j < cur_conn_nb + 3; j++) {
  114. if (!memcmp(&blk->conns[j].sock_addr, &addr, sizeof(struct sockaddr_in))) {
  115. return &blk->conns[j];
  116. }
  117. }
  118. }
  119. else {
  120. for (j = 0; j < cur_conn_nb; j++) {
  121. if (!memcmp(&blk->conns[j].sock_addr, &addr, sizeof(struct sockaddr_in))) {
  122. return &blk->conns[j];
  123. }
  124. }
  125. }
  126. blk = blk->next;
  127. }
  128. return &blk->conns[j];
  129. }
  130. int io_init_server(short port) {
  131. int sock = socket(AF_INET, SOCK_DGRAM, 0);
  132. struct sockaddr_in sockaddr;
  133. memset(&sockaddr, 0, sizeof(sockaddr));
  134. sockaddr.sin_port = htons(port);
  135. sockaddr.sin_family = AF_INET;
  136. sockaddr.sin_addr.s_addr = INADDR_ANY;
  137. if (bind(sock, (struct sockaddr*)&sockaddr, sizeof(sockaddr)) < 0) {
  138. printf("Bind error, port: %d\n", port);
  139. return -1;
  140. }
  141. // if (listen(sock, 10) < 0) {
  142. // printf("Listen error, port: %d\n", port);
  143. // return -1;
  144. // }
  145. return sock;
  146. }
  147. int io_recv_cb(int fd, int event, void* arg);
  148. int io_send_cb(int fd, int event, void* arg) {
  149. io_reactor_t* reactor = (io_reactor_t*)arg;
  150. io_conn_t* conn = io_reactor_connidx(reactor, fd);
  151. if (conn->wc > 0) {
  152. int ret = sendto(fd, conn->wbuf, conn->wc, 0,
  153. (struct sockaddr*)&conn->sock_addr, sizeof(struct sockaddr_in));
  154. if (ret > 0) {
  155. conn->wc = 0;
  156. memset(conn->wbuf, 0, BUFFER_SIZE);
  157. }
  158. }
  159. conn->cb = io_recv_cb;
  160. struct epoll_event ev;
  161. ev.data.fd = fd;
  162. ev.events = EPOLLIN;
  163. epoll_ctl(reactor->epfd, EPOLL_CTL_MOD, fd, &ev);
  164. return 0;
  165. }
  166. int io_recv_cb(int fd, int event, void* arg) {
  167. io_reactor_t* reactor = (io_reactor_t*)arg;
  168. io_conn_t* conn = io_reactor_connidx(reactor, fd);
  169. struct sockaddr_in clientaddr;
  170. socklen_t addrlen = sizeof(clientaddr);
  171. memset(&clientaddr, 0, sizeof(clientaddr));
  172. int ret = recvfrom(fd, conn->rbuf, BUFFER_SIZE, 0,
  173. (struct sockaddr*)&clientaddr, &addrlen);
  174. if (ret > 0) {
  175. conn->rc = ret;
  176. conn->rbuf[ret] = '\0';
  177. printf("Received from %s:%d: %s\n",
  178. inet_ntoa(clientaddr.sin_addr),
  179. ntohs(clientaddr.sin_port),
  180. conn->rbuf);
  181. // Check message type and handle accordingly
  182. if (strncmp(conn->rbuf, "1382,1217,2382", 14) == 0) {
  183. // Master registration
  184. reactor->master_addr = clientaddr;
  185. reactor->master_connected = 1;
  186. printf("Master registered at %s:%d\n",
  187. inet_ntoa(clientaddr.sin_addr),
  188. ntohs(clientaddr.sin_port));
  189. // If app is connected, forward master's message to app
  190. if (reactor->app_connected) {
  191. memcpy(conn->wbuf, conn->rbuf, ret);
  192. conn->wc = ret;
  193. conn->sock_addr = reactor->app_addr;
  194. conn->cb = io_send_cb;
  195. struct epoll_event ev;
  196. ev.data.fd = fd;
  197. ev.events = EPOLLOUT;
  198. epoll_ctl(reactor->epfd, EPOLL_CTL_MOD, fd, &ev);
  199. }
  200. }
  201. else if (strncmp(conn->rbuf, "1001,125,1382", 13) == 0) {
  202. // Slave message
  203. if (reactor->master_connected) {
  204. memcpy(conn->wbuf, conn->rbuf, ret);
  205. conn->wc = ret;
  206. conn->sock_addr = reactor->master_addr;
  207. conn->cb = io_send_cb;
  208. struct epoll_event ev;
  209. ev.data.fd = fd;
  210. ev.events = EPOLLOUT;
  211. epoll_ctl(reactor->epfd, EPOLL_CTL_MOD, fd, &ev);
  212. }
  213. else {
  214. printf("Warning: Master not connected, message not forwarded\n");
  215. }
  216. }
  217. else if (strncmp(conn->rbuf, "2189,1223,1189", 14) == 0) {
  218. // App registration
  219. reactor->app_addr = clientaddr;
  220. reactor->app_connected = 1;
  221. printf("App registered at %s:%d\n",
  222. inet_ntoa(clientaddr.sin_addr),
  223. ntohs(clientaddr.sin_port));
  224. // Forward to master if connected
  225. if (reactor->master_connected) {
  226. memcpy(conn->wbuf, conn->rbuf, ret);
  227. conn->wc = ret;
  228. conn->sock_addr = reactor->master_addr;
  229. conn->cb = io_send_cb;
  230. struct epoll_event ev;
  231. ev.data.fd = fd;
  232. ev.events = EPOLLOUT;
  233. epoll_ctl(reactor->epfd, EPOLL_CTL_MOD, fd, &ev);
  234. }
  235. else {
  236. printf("Warning: Master not connected, message not forwarded\n");
  237. }
  238. }
  239. else {
  240. printf("Warning: Unknown message format received\n");
  241. }
  242. }
  243. else if (ret == 0) {
  244. // Connection closed
  245. if (clientaddr.sin_addr.s_addr == reactor->master_addr.sin_addr.s_addr &&
  246. clientaddr.sin_port == reactor->master_addr.sin_port) {
  247. reactor->master_connected = 0;
  248. printf("Master disconnected\n");
  249. }
  250. else if (clientaddr.sin_addr.s_addr == reactor->app_addr.sin_addr.s_addr &&
  251. clientaddr.sin_port == reactor->app_addr.sin_port) {
  252. reactor->app_connected = 0;
  253. printf("App disconnected\n");
  254. }
  255. }
  256. return 0;
  257. }
  258. int io_set_listen(io_reactor_t* reactor, int fd, IOCALLBACK* cb) {
  259. if (!reactor) {
  260. return -1;
  261. }
  262. reactor->block_head->conns[fd].fd = fd;
  263. reactor->block_head->conns[fd].cb = cb;
  264. struct epoll_event ev;
  265. ev.data.fd = fd;
  266. ev.events = EPOLLIN;
  267. epoll_ctl(reactor->epfd, EPOLL_CTL_ADD, fd, &ev);
  268. }
  269. #if 1
  270. uint32_t _pow(uint8_t m,uint8_t n)
  271. {
  272. uint32_t result=1;
  273. while(n--)result*=m;
  274. return result;
  275. }
  276. uint16_t get_Char_Position(char *istr, char ichar, uint16_t iposi)
  277. {
  278. uint16_t posit=0;
  279. uint16_t i;
  280. if(iposi == 0) return 0;
  281. for(i=0;*(istr+i) != '\0';i++)
  282. {
  283. if(*(istr+i) == ichar) posit++;
  284. if(posit == iposi) return (i);
  285. }
  286. return i;
  287. }
  288. float str_2_float(char *Instr ,char x1 ,uint16_t x1p ,char x2 ,uint16_t x2p)
  289. {
  290. uint16_t pi=0,pj=0;
  291. uint16_t i , j=0;
  292. float OutNum=0;
  293. pi = get_Char_Position(Instr, x1, x1p)+1;
  294. pj = get_Char_Position(Instr, x2, x2p)+1;
  295. for(i=pi;i<(pj-1);i++)
  296. {
  297. if( *(Instr+i) == '.' )
  298. {
  299. if(j == 0)
  300. {
  301. j = 1;
  302. i++;
  303. }
  304. else
  305. return OutNum;
  306. }
  307. if(*(Instr+i) < '0' || *(Instr+i) > '9') return OutNum;
  308. if(j == 0) OutNum = OutNum*10.0 + *(Instr+i) - 48;
  309. else
  310. {
  311. OutNum += (*(Instr+i) -48)/(_pow(10,j)*1.0);
  312. j++;
  313. }
  314. }
  315. return OutNum;
  316. }
  317. #endif
  318. int main(int argc, char* argv[]) {
  319. if (argc < 2) {
  320. printf("Usage: %s <port>\n", argv[0]);
  321. return -1;
  322. }
  323. short port = atoi(argv[1]);
  324. io_reactor_t reactor;
  325. memset(&reactor, 0, sizeof(reactor));
  326. reactor.master_connected = 0;
  327. reactor.app_connected = 0;
  328. if (io_reactor_init(&reactor) < 0) {
  329. printf("Failed to initialize reactor\n");
  330. return -1;
  331. }
  332. int sockfd = io_init_server(port);
  333. if (sockfd < 0) {
  334. printf("Failed to initialize server\n");
  335. return -1;
  336. }
  337. io_conn_t* conn = io_reactor_connidx(&reactor, sockfd);
  338. conn->fd = sockfd;
  339. conn->cb = io_recv_cb;
  340. struct epoll_event ev;
  341. ev.data.fd = sockfd;
  342. ev.events = EPOLLIN;
  343. epoll_ctl(reactor.epfd, EPOLL_CTL_ADD, sockfd, &ev);
  344. printf("UDP server started on port %d\n", port);
  345. struct epoll_event evs[1024];
  346. while (1) {
  347. int nready = epoll_wait(reactor.epfd, evs, 1024, -1);
  348. if (nready < 0) {
  349. perror("epoll_wait");
  350. continue;
  351. }
  352. for (int i = 0; i < nready; i++) {
  353. int fd = evs[i].data.fd;
  354. io_conn_t* conn = io_reactor_connidx(&reactor, fd);
  355. if (evs[i].events & EPOLLIN) {
  356. conn->cb(fd, evs[i].events, &reactor);
  357. }
  358. else if (evs[i].events & EPOLLOUT) {
  359. conn->cb(fd, evs[i].events, &reactor);
  360. }
  361. }
  362. }
  363. io_reactor_destroy(&reactor);
  364. return 0;
  365. }