diff --git a/central-manager/central-manager.c b/central-manager/central-manager.c index 28c4e3f..752abf3 100644 --- a/central-manager/central-manager.c +++ b/central-manager/central-manager.c @@ -164,7 +164,7 @@ void* LISTEN_TCP(void* THREADABLE_ARGS){ if( index != -1 ){ // Construction arguments thread - struct handler_arg thread_args = { TCPManagers, activeTCPManagers, CLIENT_SOCKET, &sgca }; + struct handler_arg thread_args = { TCPManagers, UDPManagers, activeTCPManagers, activeUDPManagers, CLIENT_SOCKET, &sgca }; /* 5. On lance un thread pour le traitement de ce client */ pthread_create(&TCPManagers[index], NULL, arg->handler, (void*) &thread_args); @@ -315,7 +315,8 @@ void* LISTEN_UDP(void* THREADABLE_ARGS){ if( index != -1 ){ // Construction arguments thread - struct handler_arg thread_args = { UDPManagers, activeUDPManagers, CLIENT_SOCKET, &sgca }; + + struct handler_arg thread_args = { TCPManagers, UDPManagers, activeTCPManagers, activeUDPManagers, CLIENT_SOCKET, &sgca }; /* 2.1. On lance un thread pour le traitement de ce client */ pthread_create(&UDPManagers[index], NULL, arg->handler, (void*) &thread_args); diff --git a/central-manager/lib/header.h b/central-manager/lib/header.h index 55c16cb..38c36cc 100644 --- a/central-manager/lib/header.h +++ b/central-manager/lib/header.h @@ -20,6 +20,7 @@ #include #include #include + #include /* sys */ #include @@ -58,8 +59,8 @@ struct context_unit{ int socket; // socket associée à un avion struct plane data; // données d'un avion - pthread_mutex_t mutex; // socket mutual exclusion - char token; // if want to have mutex + char thrId; // TCP thread index + char flags; // persistent flags }; struct context{ @@ -74,8 +75,10 @@ }; struct handler_arg{ - pthread_t* managers; - int* activeManagers; + pthread_t* TCPManagers; + pthread_t* UDPManagers; + int* activeTCPManagers; + int* activeUDPManagers; int socket; struct context* sgca; }; diff --git a/central-manager/lib/local/handler.c b/central-manager/lib/local/handler.c index fea6c7e..270ba3e 100644 --- a/central-manager/lib/local/handler.c +++ b/central-manager/lib/local/handler.c @@ -41,18 +41,19 @@ void* managePlane(void* THREADABLE_ARGS){ char buffer[MAX_BUF_LEN]; // buffer struct plane data; // données de l'avion int SOCKET; // Copie de la socket (évite les conflits de références) + char tmpFlags; /* 2. On récupère les arguments */ struct handler_arg* arg = THREADABLE_ARGS; memcpy(&SOCKET, &arg->socket, sizeof(int)); - if( DEBUGMOD&HDR ) printf("===== managePlane(%p, %p, %d, %p) =====\n", (void*) arg->managers, (void*) arg->activeManagers, arg->socket, (void*) arg->sgca); + if( DEBUGMOD&HDR ) printf("===== managePlane(%p, %p, %d, %p) =====\n", (void*) arg->TCPManagers, (void*) arg->activeTCPManagers, arg->socket, (void*) arg->sgca); /* 3. On récupère le rang du thread parmi les "managers" */ index = -1; for( i = 0 ; i < MAX_TCP_THR ; i++ ) - if( arg->managers[i] == pthread_self() ){ index = i; break; } + if( arg->TCPManagers[i] == pthread_self() ){ index = i; break; } // Erreur de thread if( index == -1 ){ @@ -69,13 +70,8 @@ void* managePlane(void* THREADABLE_ARGS){ /* (2) Récupération de la requête ---------------------------------------------------------*/ - /* 1. On lit sur la socket */ - if( pindex > -1 && arg->sgca->unit[pindex].token != 0 ) - sleep(1); - if( pindex > -1 ) pthread_mutex_lock(&arg->sgca->unit[pindex].mutex); read = recv(SOCKET, buffer, MAX_BUF_LEN, 0); - if( pindex > -1 ) pthread_mutex_unlock(&arg->sgca->unit[pindex].mutex); /* 2.1. Si erreur reception (-1:erreur, 0:fermeture client propre) */ if( read <= 0 ){ @@ -84,19 +80,20 @@ void* managePlane(void* THREADABLE_ARGS){ } /* 2.2. Si message trop court */ - if( read != PLANE_LEN ){ - if( DEBUGMOD&BUF ) printf("{tcp_com}(%d) read: %d (expected: %d)\n", index, read, (int) PLANE_LEN); + if( read != PLANE_LEN+1 ){ + if( DEBUGMOD&BUF ) printf("{tcp_com}(%d) read: %d (expected: %d)\n", index, read, (int) (PLANE_LEN+1) ); continue; } /* 3. On parse la requête (indianness: network order) */ - memcpy(&data.code, buffer+sizeof(char)*0+sizeof(int)*0, sizeof(char)*6); - memcpy(&data.x, buffer+sizeof(char)*6+sizeof(int)*0, sizeof(int)); - memcpy(&data.y, buffer+sizeof(char)*6+sizeof(int)*1, sizeof(int)); - memcpy(&data.z, buffer+sizeof(char)*6+sizeof(int)*2, sizeof(int)); - memcpy(&data.spd, buffer+sizeof(char)*6+sizeof(int)*4, sizeof(int)); - memcpy(&data.cap, buffer+sizeof(char)*6+sizeof(int)*3, sizeof(int)); + memcpy(&tmpFlags, buffer+sizeof(char)*0+sizeof(int)*0, sizeof(char) ); + memcpy(&data.code, buffer+sizeof(char)*1+sizeof(int)*0, sizeof(char)*6 ); + memcpy(&data.x, buffer+sizeof(char)*7+sizeof(int)*0, sizeof(int) ); + memcpy(&data.y, buffer+sizeof(char)*7+sizeof(int)*1, sizeof(int) ); + memcpy(&data.z, buffer+sizeof(char)*7+sizeof(int)*2, sizeof(int) ); + memcpy(&data.spd, buffer+sizeof(char)*7+sizeof(int)*4, sizeof(int) ); + memcpy(&data.cap, buffer+sizeof(char)*7+sizeof(int)*3, sizeof(int) ); /* 4. Gestion de l'indianness */ data.x = ntohl(data.x); @@ -134,13 +131,18 @@ void* managePlane(void* THREADABLE_ARGS){ arg->sgca->n++; arg->sgca->unit = (struct context_unit*) realloc(arg->sgca->unit, sizeof(struct context_unit)*arg->sgca->n + 1); arg->sgca->unit[pindex].socket = SOCKET; - arg->sgca->unit[pindex].token = 0; - pthread_mutex_init(&arg->sgca->unit[pindex].mutex, NULL); + arg->sgca->unit[pindex].thrId = 0xf0; printf("{tcp_com}(%d) plane '%s' (#%d) created\n", index, data.code, pindex); } /* 4. On copie/met à jour les valeurs */ + if( tmpFlags != 0x10 ){ // gestion réponse ctrlTerm + arg->sgca->unit[pindex].flags = tmpFlags; // on met à jour les flags + while( arg->sgca->unit[pindex].thrId == 0xf0 ); // on attend le numéro de thread + pthread_kill(arg->UDPManagers[arg->sgca->unit[pindex].thrId], SIGCONT); // on réveille le thread + arg->sgca->unit[pindex].thrId = 0xf0; // on laisse la place pour qqn d'autre + } memcpy(&arg->sgca->unit[pindex].data, &data, sizeof(struct plane)); if( DEBUGMOD&COM ) printf("{tcp_com}(%d) stored (%d)'%s': {x = %d; y = %d; z = %d; cap = %d; spd = %d}\n", index, pindex, arg->sgca->unit[pindex].data.code, arg->sgca->unit[pindex].data.x, arg->sgca->unit[pindex].data.y, arg->sgca->unit[pindex].data.z, arg->sgca->unit[pindex].data.cap, arg->sgca->unit[pindex].data.spd); @@ -174,7 +176,7 @@ void* managePlane(void* THREADABLE_ARGS){ ============================================================================*/ /* 1. On met à jour "activeManagers" */ if( index != -1 ) - arg->activeManagers[index] = 0; + arg->activeTCPManagers[index] = 0; /* 2. On arrête le THREAD */ if( DEBUGMOD&THR ) printf("{tcp_com}(%d) freed\n", index); @@ -223,7 +225,7 @@ void* manageViewTerm(void* THREADABLE_ARGS){ /* 3. On récupère le rang dans les "managers" */ for( i = 0 ; i < MAX_UDP_THR ; i++ ) - if( arg->managers[i] == pthread_self() ){ index = i; break; } + if( arg->UDPManagers[i] == pthread_self() ){ index = i; break; } printf("{udp_vterm}{com}(%d) starting terminal routine (rate: %d sec)\n", index, PUBL_TIMEOUT); @@ -270,7 +272,7 @@ void* manageViewTerm(void* THREADABLE_ARGS){ ============================================================================*/ /* 1. On met à jour "activeManagers" */ if( index != -1 ) - arg->activeManagers[index] = 0; + arg->activeUDPManagers[index] = 0; /* 2. On ferme la socket + libère la mémoire */ close(arg->socket); @@ -319,6 +321,7 @@ void* manageCtrlTerm(void* THREADABLE_ARGS){ char buffer[MAX_BUF_LEN]; // Buffer d'envoi char* dataBuffer = malloc(1); struct term_req request; // Requête + char flags; /* 2. On récupère les arguments */ @@ -326,7 +329,7 @@ void* manageCtrlTerm(void* THREADABLE_ARGS){ /* 3. On récupère le rang dans les "managers" */ for( i = 0 ; i < MAX_UDP_THR ; i++ ) - if( arg->managers[i] == pthread_self() ){ index = i; break; } + if( arg->UDPManagers[i] == pthread_self() ){ index = i; break; } printf("{udp_cterm}{com}(%d) starting terminal routine (rate: %d sec)\n", index, PUBL_TIMEOUT); @@ -422,10 +425,6 @@ void* manageCtrlTerm(void* THREADABLE_ARGS){ count += last; last = sizeof(int); memcpy(buffer+count, &request.update.spd, last ); count += last; - // utilisation socket - arg->sgca->unit[pindex].token = 1; - pthread_mutex_lock(&arg->sgca->unit[pindex].mutex); - /* 3. On envoie la requête à l'avion */ if( send(arg->sgca->unit[pindex].socket, buffer, count/sizeof(char), 0) <= 0 ){ printf("{udp_cterm}{com}(%d) Cannot send request to plane\n", index); @@ -435,29 +434,30 @@ void* manageCtrlTerm(void* THREADABLE_ARGS){ /* (3) Gestion de la réponse de l'avion ---------------------------------------------------------*/ - /* 1. Réception de la réponse */ - count = recv(arg->sgca->unit[pindex].socket, buffer, sizeof(char), 0); + // Attente du flag de réponse + arg->sgca->unit[pindex].thrId = index; + sleep(2); + flags = arg->sgca->unit[pindex].flags; // on récupère les flags + arg->sgca->unit[pindex].flags = 0x10; // on reset les flags - // libération socket - pthread_mutex_unlock(&arg->sgca->unit[pindex].mutex); - - if( count != sizeof(char) ){ + if( flags == 0x10 ){ printf("{udp_cterm}{com}(%d) Cannot get response from plane %d (%d)\n", index, count, (int) PLANE_LEN); break; } - printf("{udp_cterm}{com}(%d) Plane res { flags: %d }\n", index, buffer[0]); + + printf("{udp_cterm}{com}(%d) Plane res { flags: %d }\n", index, flags); /* 2. Gestion de la validation de l'update */ - if( !(buffer[0]&TERMREQ_ALT) && request.flags&TERMREQ_ALT ) + if( !(flags&TERMREQ_ALT) && request.flags&TERMREQ_ALT ) request.flags -= TERMREQ_ALT; - if( !(buffer[0]&TERMREQ_CAP) && request.flags&TERMREQ_CAP ) + if( !(flags&TERMREQ_CAP) && request.flags&TERMREQ_CAP ) request.flags -= TERMREQ_CAP; - if( !(buffer[0]&TERMREQ_SPD) && request.flags&TERMREQ_SPD ) + if( !(flags&TERMREQ_SPD) && request.flags&TERMREQ_SPD ) request.flags -= TERMREQ_SPD; - /* 3. Si pas de demande des données -> on*/ + /* 3. Si pas de demande des données -> on pré-remplie la requête */ if( !fbk ){ dataBuffer = realloc(dataBuffer, sizeof(char)*2 ); dataBuffer[0] = request.flags; @@ -497,7 +497,7 @@ void* manageCtrlTerm(void* THREADABLE_ARGS){ ============================================================================*/ /* 1. On met à jour "activeManagers" */ if( index != -1 ) - arg->activeManagers[index] = 0; + arg->activeUDPManagers[index] = 0; /* 2. On ferme la socket */ close(arg->socket); diff --git a/command-terminal/c-emul/main.c b/command-terminal/c-emul/main.c index 21284dc..bedfb5d 100644 --- a/command-terminal/c-emul/main.c +++ b/command-terminal/c-emul/main.c @@ -167,6 +167,7 @@ int main(int argc, char* argv[]){ } + sleep(1); @@ -256,6 +257,7 @@ int main(int argc, char* argv[]){ } + sleep(1); /************************************************ **** DEMANDE MISE A JOUR SIMPLE **** @@ -344,6 +346,7 @@ int main(int argc, char* argv[]){ } + sleep(1); /************************************************ **** DEMANDE MISE A JOUR SIMPLE + DATA **** @@ -427,9 +430,91 @@ int main(int argc, char* argv[]){ } + sleep(1); + /************************************************ **** DEMANDE MISE A JOUR COMBINEE **** ************************************************/ + if( 1 ){ + printf("\n\nDEMANDE D'UPDATE MULTIPLE + DATA\n================================\n"); + + + + /* [1] Preparing request + =========================================================*/ + tr.flags = TERMREQ_FBK|TERMREQ_ALT|TERMREQ_CAP|TERMREQ_SPD; + memcpy(&tr.update, &tmp, sizeof(struct plane)); + tr.update.z = 2001; + tr.update.cap = 21; + tr.update.spd = 201; + + printf("Update req { flags: %d; plane: #%s { z=%d; cap=%d; speed=%d} }\n", tr.flags, tr.update.code, tr.update.z, tr.update.cap, tr.update.spd); + tr.update.x = htonl(tr.update.x); + tr.update.y = htonl(tr.update.y); + tr.update.z = htonl(tr.update.z); + tr.update.cap = htonl(tr.update.cap); + tr.update.spd = htonl(tr.update.spd); + + /* [2] Filling buffer + =========================================================*/ + bzero(buffer, 512); + memcpy(buffer+sizeof(char)*0+sizeof(int)*0, &tr.flags, sizeof(char)); + memcpy(buffer+sizeof(char)*1+sizeof(int)*0, &tr.update.code, sizeof(char)*6); + memcpy(buffer+sizeof(char)*7+sizeof(int)*0, &tr.update.x, sizeof(int)); + memcpy(buffer+sizeof(char)*7+sizeof(int)*1, &tr.update.y, sizeof(int)); + memcpy(buffer+sizeof(char)*7+sizeof(int)*2, &tr.update.z, sizeof(int)); + memcpy(buffer+sizeof(char)*7+sizeof(int)*3, &tr.update.cap, sizeof(int)); + memcpy(buffer+sizeof(char)*7+sizeof(int)*4, &tr.update.spd, sizeof(int)); + + + /* [3] Sending request + =========================================================*/ + if( sendto(csocket, buffer, 7+sizeof(int)*5+1, 0, (struct sockaddr*) &server, len) < 0 ){ + printf("Cannot ask for simple update.\n"); + return EXIT_FAILURE; + } + + /* [4] Fetch & print data + =========================================================*/ + /* 1. On récupère la réponse */ + len = sizeof(struct sockaddr_in); + nb = recvfrom(csocket, buffer, 512, 0, (struct sockaddr*) &server, &len); + + printf("%d bytes received\n", nb); + + if( nb < 1 ){ + printf("Error receiving simple update.\n"); + return EXIT_FAILURE; + } + + + /* 2. On parse la réponse */ + count = 0; last = sizeof(char); memcpy(&trs.flags, buffer+count, last); + count += last; last = sizeof(char); memcpy(&trs.n, buffer+count, last); + printf("Flag received : %d\n", trs.flags); + + trs.data = malloc( sizeof(struct plane) * trs.n ); + + + for( i = 0 ; i < trs.n ; i++ ){ + count += last; last = sizeof(char)*6; memcpy(&trs.data[i].code, buffer+count, last); + count += last; last = sizeof(int); memcpy(&trs.data[i].x, buffer+count, last); + count += last; last = sizeof(int); memcpy(&trs.data[i].y, buffer+count, last); + count += last; last = sizeof(int); memcpy(&trs.data[i].z, buffer+count, last); + count += last; last = sizeof(int); memcpy(&trs.data[i].cap, buffer+count, last); + count += last; last = sizeof(int); memcpy(&trs.data[i].spd, buffer+count, last); + + trs.data[i].x = ntohl(trs.data[i].x); + trs.data[i].y = ntohl(trs.data[i].y); + trs.data[i].z = ntohl(trs.data[i].z); + trs.data[i].cap = ntohl(trs.data[i].cap); + trs.data[i].spd = ntohl(trs.data[i].spd); + + printf("Plane[%s@%d] { (%d,%d,%d), cap: %d, spd: %d}\n", trs.data[i].code, i, trs.data[i].x, trs.data[i].y, trs.data[i].z, trs.data[i].cap, trs.data[i].spd); + } + + + } diff --git a/plane/plane.c b/plane/plane.c index 2220bc5..be37107 100644 --- a/plane/plane.c +++ b/plane/plane.c @@ -118,7 +118,7 @@ void close_communication(){ -void send_data(){ +void send_data(char flags){ /* [0] Initialisation des variables =========================================================*/ @@ -126,7 +126,6 @@ void send_data(){ struct plane_data request; int read; - /* [1] Envoi des caractéristiques =========================================================*/ /* 1. Vérification socket */ @@ -142,14 +141,15 @@ void send_data(){ request.spd = htonl( ctrl.speed ); /* 3. Copie buffer */ - memcpy(buffer+sizeof(char)*0+sizeof(int)*0, &request.code, sizeof(char)*6 ); - memcpy(buffer+sizeof(char)*6+sizeof(int)*0, &request.x, sizeof(int) ); - memcpy(buffer+sizeof(char)*6+sizeof(int)*1, &request.y, sizeof(int) ); - memcpy(buffer+sizeof(char)*6+sizeof(int)*2, &request.z, sizeof(int) ); - memcpy(buffer+sizeof(char)*6+sizeof(int)*3, &request.cap, sizeof(int) ); - memcpy(buffer+sizeof(char)*6+sizeof(int)*4, &request.spd, sizeof(int) ); + memcpy(buffer+sizeof(char)*0+sizeof(int)*0, &flags, sizeof(char) ); + memcpy(buffer+sizeof(char)*1+sizeof(int)*0, &request.code, sizeof(char)*6 ); + memcpy(buffer+sizeof(char)*7+sizeof(int)*0, &request.x, sizeof(int) ); + memcpy(buffer+sizeof(char)*7+sizeof(int)*1, &request.y, sizeof(int) ); + memcpy(buffer+sizeof(char)*7+sizeof(int)*2, &request.z, sizeof(int) ); + memcpy(buffer+sizeof(char)*7+sizeof(int)*3, &request.cap, sizeof(int) ); + memcpy(buffer+sizeof(char)*7+sizeof(int)*4, &request.spd, sizeof(int) ); - read = send(commu_socket, buffer, PLANE_DATA_LEN/sizeof(char), 0); + read = send(commu_socket, buffer, (1+PLANE_DATA_LEN)/sizeof(char), 0); if( read <= 0 ){ printf("Cannot send\n"); @@ -288,11 +288,13 @@ void update(){ int received; struct sgca_req request; char buffer[MAX_BUF_LEN] = {0}; + char persistentFlags = 0x10; while( 1 ){ calc_ctrl(); - send_data(); + send_data(persistentFlags); + persistentFlags = 0x10; /* [1] Gestion réception avec timeout =========================================================*/ @@ -334,17 +336,12 @@ void update(){ request.flags -= REQ_CAP; - /* [3] Gestion de la réponse + /* [3] Gestion des flags persistents =========================================================*/ - /* 1. On prépare la réponse */ - buffer[0] = request.flags; + /* 1. On remplit le champ */ + persistentFlags = request.flags; printf("Response flags : %d\n", request.flags); - /* 2. Envoi de la réponse */ - if( send(commu_socket, buffer, sizeof(char), 0) <= 0 ){ - printf("Cannot send response\n"); - } - } } diff --git a/plane/plane.h b/plane/plane.h index 7a6228d..dd88ffb 100644 --- a/plane/plane.h +++ b/plane/plane.h @@ -49,6 +49,7 @@ int spd; }; + #define REQ_FBK 0x01 #define REQ_CAP 0x02 #define REQ_SPD 0x04 #define REQ_ALT 0x08