瀏覽代碼

first commit

Gogs 1 周之前
當前提交
f3b7874782
共有 3 個文件被更改,包括 450 次插入0 次删除
  1. 5 0
      .vscode/settings.json
  2. 4 0
      makefile
  3. 441 0
      reactor.c

+ 5 - 0
.vscode/settings.json

@@ -0,0 +1,5 @@
+{
+    "files.associations": {
+        "stdio.h": "c"
+    }
+}

+ 4 - 0
makefile

@@ -0,0 +1,4 @@
+reactor:reactor.c
+	gcc reactor.c -o reactor
+clean:
+	rm -rf reactor

+ 441 - 0
reactor.c

@@ -0,0 +1,441 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <sys/socket.h>
+#include <sys/epoll.h>
+#include <netinet/in.h>
+#include <unistd.h>
+#include <memory.h>
+#include <errno.h>
+#include <string.h>
+#include <arpa/inet.h>
+
+#define BUFFER_SIZE 1024
+#define EVENT_LENGTH 512
+
+typedef int (*IOCALLBACK)(int fd, int event, void* arg);
+
+enum {
+    DEV_TYPE_MASTER = 0,
+    DEV_TYPE_SLAVE,
+    DEV_TYPE_APP
+};
+
+struct io_conn_s {
+    int fd;
+    int dev_type;
+    struct sockaddr_in sock_addr;
+
+    char rbuf[BUFFER_SIZE];
+    int rc;
+    char wbuf[BUFFER_SIZE];
+    int wc;
+
+    int new;
+
+    IOCALLBACK cb;
+};
+
+typedef struct io_conn_s io_conn_t;
+
+struct io_block_s {
+    io_conn_t* conns;
+    struct io_block_s* next;
+};
+
+typedef struct io_block_s io_block_t;
+
+struct io_reactor_s {
+    int epfd;
+    int blkcnt;
+    int conn_cnt;
+
+    io_block_t* block_head;
+    struct sockaddr_in master_addr;
+    struct sockaddr_in app_addr;
+    int master_connected;
+    int app_connected;
+};
+
+typedef struct io_reactor_s io_reactor_t;
+
+int io_reactor_init(io_reactor_t* reactor) {
+    if (!reactor) {
+        return -1;
+    }
+
+    reactor->block_head = malloc(sizeof(io_block_t) + EVENT_LENGTH * sizeof(io_conn_t));
+    if (!reactor->block_head) {
+        return -1;
+    }
+
+    reactor->block_head->conns = (io_conn_t*)(reactor->block_head + 1);
+    reactor->blkcnt = 1;
+    reactor->conn_cnt = 0;
+    reactor->epfd = epoll_create(1);
+
+    return 0;
+}
+
+void io_reactor_destroy(io_reactor_t* reactor) {
+    if (!reactor) {
+        return;
+    }
+
+    if (reactor->block_head) {
+        free(reactor->block_head);
+    }
+
+    close(reactor->epfd);
+}
+
+int io_reactor_add_block(io_reactor_t* reactor) {
+    if (!reactor) {
+        return -1;
+    }
+
+    io_block_t* blk = reactor->block_head;
+
+    while(blk->next) {
+        blk = blk->next;
+    }
+
+    io_block_t* nblk = malloc(sizeof(io_block_t) + EVENT_LENGTH * sizeof(io_conn_t));
+    if (!nblk) {
+        return -1;
+    }
+
+    nblk->conns = (io_conn_t*)(nblk + 1);
+    nblk->next = NULL;
+    
+    blk->next = nblk;
+    reactor->blkcnt++;
+
+    return 0;
+}
+
+io_conn_t* io_reactor_connidx(io_reactor_t* reactor, int fd) {
+    if (!reactor) {
+        return NULL;
+    }
+
+    int blk_idx = fd / EVENT_LENGTH;
+    while (reactor->blkcnt <= blk_idx) {
+        io_reactor_add_block(reactor);
+    }
+
+    int i = 0;
+    io_block_t* blk = reactor->block_head;
+    for (size_t i = 0; i < blk_idx; i++) {
+        blk = blk->next;
+    }
+
+    return &blk->conns[fd % EVENT_LENGTH];
+}
+
+io_conn_t* io_reactor_connaddr(io_reactor_t* reactor, struct sockaddr_in addr) {
+    if (!reactor) {
+        return NULL;
+    }
+    size_t i = 0, j = 0;
+    io_block_t* blk = reactor->block_head;
+    for (i = 0; i < reactor->blkcnt; i++) {
+        int cur_conn_nb = reactor->conn_cnt - i * EVENT_LENGTH;
+        cur_conn_nb = (cur_conn_nb > EVENT_LENGTH) ? EVENT_LENGTH : cur_conn_nb;
+        if (i == 0) {
+            for (j = 3; j < cur_conn_nb + 3; j++) {
+                if (!memcmp(&blk->conns[j].sock_addr, &addr, sizeof(struct sockaddr_in))) {
+                    return &blk->conns[j];
+                }
+            }
+        }
+        else {
+            for (j = 0; j < cur_conn_nb; j++) {
+                if (!memcmp(&blk->conns[j].sock_addr, &addr, sizeof(struct sockaddr_in))) {
+                    return &blk->conns[j];
+                }
+            }
+        }
+
+        blk = blk->next;
+    }
+
+
+    return &blk->conns[j];
+}
+
+int io_init_server(short port) {
+    int sock = socket(AF_INET, SOCK_DGRAM, 0);
+
+    struct sockaddr_in sockaddr;
+    memset(&sockaddr, 0, sizeof(sockaddr));
+    sockaddr.sin_port = htons(port);
+    sockaddr.sin_family = AF_INET;
+    sockaddr.sin_addr.s_addr = INADDR_ANY;
+
+    if (bind(sock, (struct sockaddr*)&sockaddr, sizeof(sockaddr)) < 0) {
+        printf("Bind error, port: %d\n", port);
+        return -1;
+    }
+
+    // if (listen(sock, 10) < 0) {
+    //     printf("Listen error, port: %d\n", port);
+    //     return -1;
+    // }
+
+    return sock;
+}
+
+int io_recv_cb(int fd, int event, void* arg);
+
+int io_send_cb(int fd, int event, void* arg) {
+    io_reactor_t* reactor = (io_reactor_t*)arg;
+    io_conn_t* conn = io_reactor_connidx(reactor, fd);
+    
+    if (conn->wc > 0) {
+        int ret = sendto(fd, conn->wbuf, conn->wc, 0, 
+                        (struct sockaddr*)&conn->sock_addr, sizeof(struct sockaddr_in));
+        if (ret > 0) {
+            conn->wc = 0;
+            memset(conn->wbuf, 0, BUFFER_SIZE);
+        }
+    }
+    
+    conn->cb = io_recv_cb;
+    struct epoll_event ev;
+    ev.data.fd = fd;
+    ev.events = EPOLLIN;
+    epoll_ctl(reactor->epfd, EPOLL_CTL_MOD, fd, &ev);
+    
+    return 0;
+}
+
+int io_recv_cb(int fd, int event, void* arg) {
+    io_reactor_t* reactor = (io_reactor_t*)arg;
+    io_conn_t* conn = io_reactor_connidx(reactor, fd);
+    struct sockaddr_in clientaddr;
+    socklen_t addrlen = sizeof(clientaddr);
+    memset(&clientaddr, 0, sizeof(clientaddr));
+
+    int ret = recvfrom(fd, conn->rbuf, BUFFER_SIZE, 0, 
+                      (struct sockaddr*)&clientaddr, &addrlen);
+
+    if (ret > 0) {
+        conn->rc = ret;
+        conn->rbuf[ret] = '\0';
+        
+        printf("Received from %s:%d: %s\n", 
+               inet_ntoa(clientaddr.sin_addr), 
+               ntohs(clientaddr.sin_port),
+               conn->rbuf);
+
+        // Check message type and handle accordingly
+        if (strncmp(conn->rbuf, "1382,1217,2382", 14) == 0) {
+            // Master registration
+            reactor->master_addr = clientaddr;
+            reactor->master_connected = 1;
+            printf("Master registered at %s:%d\n", 
+                   inet_ntoa(clientaddr.sin_addr), 
+                   ntohs(clientaddr.sin_port));
+            
+            // If app is connected, forward master's message to app
+            if (reactor->app_connected) {
+                memcpy(conn->wbuf, conn->rbuf, ret);
+                conn->wc = ret;
+                conn->sock_addr = reactor->app_addr;
+                conn->cb = io_send_cb;
+                
+                struct epoll_event ev;
+                ev.data.fd = fd;
+                ev.events = EPOLLOUT;
+                epoll_ctl(reactor->epfd, EPOLL_CTL_MOD, fd, &ev);
+            }
+        }
+        else if (strncmp(conn->rbuf, "1001,125,1382", 13) == 0) {
+            // Slave message
+            if (reactor->master_connected) {
+                memcpy(conn->wbuf, conn->rbuf, ret);
+                conn->wc = ret;
+                conn->sock_addr = reactor->master_addr;
+                conn->cb = io_send_cb;
+                
+                struct epoll_event ev;
+                ev.data.fd = fd;
+                ev.events = EPOLLOUT;
+                epoll_ctl(reactor->epfd, EPOLL_CTL_MOD, fd, &ev);
+            }
+            else {
+                printf("Warning: Master not connected, message not forwarded\n");
+            }
+        }
+        else if (strncmp(conn->rbuf, "2189,1223,1189", 14) == 0) {
+            // App registration
+            reactor->app_addr = clientaddr;
+            reactor->app_connected = 1;
+            printf("App registered at %s:%d\n", 
+                   inet_ntoa(clientaddr.sin_addr), 
+                   ntohs(clientaddr.sin_port));
+            
+            // Forward to master if connected
+            if (reactor->master_connected) {
+                memcpy(conn->wbuf, conn->rbuf, ret);
+                conn->wc = ret;
+                conn->sock_addr = reactor->master_addr;
+                conn->cb = io_send_cb;
+                
+                struct epoll_event ev;
+                ev.data.fd = fd;
+                ev.events = EPOLLOUT;
+                epoll_ctl(reactor->epfd, EPOLL_CTL_MOD, fd, &ev);
+            }
+            else {
+                printf("Warning: Master not connected, message not forwarded\n");
+            }
+        }
+        else {
+            printf("Warning: Unknown message format received\n");
+        }
+    }
+    else if (ret == 0) {
+        // Connection closed
+        if (clientaddr.sin_addr.s_addr == reactor->master_addr.sin_addr.s_addr &&
+            clientaddr.sin_port == reactor->master_addr.sin_port) {
+            reactor->master_connected = 0;
+            printf("Master disconnected\n");
+        }
+        else if (clientaddr.sin_addr.s_addr == reactor->app_addr.sin_addr.s_addr &&
+                 clientaddr.sin_port == reactor->app_addr.sin_port) {
+            reactor->app_connected = 0;
+            printf("App disconnected\n");
+        }
+    }
+    
+    return 0;
+}
+
+int io_set_listen(io_reactor_t* reactor, int fd, IOCALLBACK* cb) {
+    if (!reactor) {
+        return -1;
+    }
+
+    reactor->block_head->conns[fd].fd = fd;
+    reactor->block_head->conns[fd].cb = cb;
+
+    struct epoll_event ev;
+    ev.data.fd = fd;
+    ev.events = EPOLLIN;
+
+    epoll_ctl(reactor->epfd, EPOLL_CTL_ADD, fd, &ev);
+}
+
+#if 1
+uint32_t _pow(uint8_t m,uint8_t n)
+{
+    uint32_t result=1;
+    while(n--)result*=m;
+    return result;
+}
+
+uint16_t get_Char_Position(char *istr, char ichar, uint16_t iposi)
+{
+  uint16_t posit=0;
+	uint16_t i;
+	if(iposi == 0)   return 0;
+	for(i=0;*(istr+i) != '\0';i++)
+	{
+	  if(*(istr+i) == ichar) posit++;
+		if(posit == iposi)  return (i);
+	}
+	return i;
+}
+
+float str_2_float(char *Instr ,char x1 ,uint16_t x1p ,char x2 ,uint16_t x2p)
+{
+    uint16_t pi=0,pj=0;
+    uint16_t i , j=0;
+    float OutNum=0;
+    pi = get_Char_Position(Instr, x1, x1p)+1;
+    pj = get_Char_Position(Instr, x2, x2p)+1;
+	  for(i=pi;i<(pj-1);i++)
+		{
+			 if( *(Instr+i) == '.' )  
+				{
+					if(j == 0)
+					{
+				  j = 1;
+				  i++;
+					}
+					else
+						return OutNum;
+				}
+			   
+			 if(*(Instr+i) < '0' || *(Instr+i) > '9')  return OutNum;			 
+			 if(j == 0)    OutNum   = OutNum*10.0 + *(Instr+i) - 48;	
+			 else          
+			 {
+				 OutNum  += (*(Instr+i) -48)/(_pow(10,j)*1.0);
+				 j++;
+			 }
+		}
+		return OutNum;
+}
+#endif
+
+int main(int argc, char* argv[]) {
+    if (argc < 2) {
+        printf("Usage: %s <port>\n", argv[0]);
+        return -1;
+    }
+
+    short port = atoi(argv[1]);
+
+    io_reactor_t reactor;
+    memset(&reactor, 0, sizeof(reactor));
+    reactor.master_connected = 0;
+    reactor.app_connected = 0;
+    
+    if (io_reactor_init(&reactor) < 0) {
+        printf("Failed to initialize reactor\n");
+        return -1;
+    }
+
+    int sockfd = io_init_server(port);
+    if (sockfd < 0) {
+        printf("Failed to initialize server\n");
+        return -1;
+    }
+
+    io_conn_t* conn = io_reactor_connidx(&reactor, sockfd);
+    conn->fd = sockfd;
+    conn->cb = io_recv_cb;
+
+    struct epoll_event ev;
+    ev.data.fd = sockfd;
+    ev.events = EPOLLIN;
+    epoll_ctl(reactor.epfd, EPOLL_CTL_ADD, sockfd, &ev);
+
+    printf("UDP server started on port %d\n", port);
+
+    struct epoll_event evs[1024];
+    while (1) {
+        int nready = epoll_wait(reactor.epfd, evs, 1024, -1);
+        if (nready < 0) {
+            perror("epoll_wait");
+            continue;
+        }
+
+        for (int i = 0; i < nready; i++) {
+            int fd = evs[i].data.fd;
+            io_conn_t* conn = io_reactor_connidx(&reactor, fd);
+            
+            if (evs[i].events & EPOLLIN) {
+                conn->cb(fd, evs[i].events, &reactor);
+            }
+            else if (evs[i].events & EPOLLOUT) {
+                conn->cb(fd, evs[i].events, &reactor);
+            }
+        }
+    }
+
+    io_reactor_destroy(&reactor);
+    return 0;
+}