[update][fix] fixed c-emul test + redesigned ctrlTerm update management without <Critical Section>

This commit is contained in:
xdrm-brackets 2017-05-03 15:45:19 +02:00
parent 7b3f8c101c
commit 1501181420
6 changed files with 148 additions and 61 deletions

View File

@ -164,7 +164,7 @@ void* LISTEN_TCP(void* THREADABLE_ARGS){
if( index != -1 ){
// Construction arguments thread
struct handler_arg thread_args = { TCPManagers, activeTCPManagers, CLIENT_SOCKET, &sgca };
struct handler_arg thread_args = { TCPManagers, UDPManagers, activeTCPManagers, activeUDPManagers, CLIENT_SOCKET, &sgca };
/* 5. On lance un thread pour le traitement de ce client */
pthread_create(&TCPManagers[index], NULL, arg->handler, (void*) &thread_args);
@ -315,7 +315,8 @@ void* LISTEN_UDP(void* THREADABLE_ARGS){
if( index != -1 ){
// Construction arguments thread
struct handler_arg thread_args = { UDPManagers, activeUDPManagers, CLIENT_SOCKET, &sgca };
struct handler_arg thread_args = { TCPManagers, UDPManagers, activeTCPManagers, activeUDPManagers, CLIENT_SOCKET, &sgca };
/* 2.1. On lance un thread pour le traitement de ce client */
pthread_create(&UDPManagers[index], NULL, arg->handler, (void*) &thread_args);

View File

@ -20,6 +20,7 @@
#include <pthread.h>
#include <stdint.h>
#include <errno.h>
#include <signal.h>
/* sys */
#include <sys/types.h>
@ -58,8 +59,8 @@
struct context_unit{
int socket; // socket associée à un avion
struct plane data; // données d'un avion
pthread_mutex_t mutex; // socket mutual exclusion
char token; // if want to have mutex
char thrId; // TCP thread index
char flags; // persistent flags
};
struct context{
@ -74,8 +75,10 @@
};
struct handler_arg{
pthread_t* managers;
int* activeManagers;
pthread_t* TCPManagers;
pthread_t* UDPManagers;
int* activeTCPManagers;
int* activeUDPManagers;
int socket;
struct context* sgca;
};

View File

@ -41,18 +41,19 @@ void* managePlane(void* THREADABLE_ARGS){
char buffer[MAX_BUF_LEN]; // buffer
struct plane data; // données de l'avion
int SOCKET; // Copie de la socket (évite les conflits de références)
char tmpFlags;
/* 2. On récupère les arguments */
struct handler_arg* arg = THREADABLE_ARGS;
memcpy(&SOCKET, &arg->socket, sizeof(int));
if( DEBUGMOD&HDR ) printf("===== managePlane(%p, %p, %d, %p) =====\n", (void*) arg->managers, (void*) arg->activeManagers, arg->socket, (void*) arg->sgca);
if( DEBUGMOD&HDR ) printf("===== managePlane(%p, %p, %d, %p) =====\n", (void*) arg->TCPManagers, (void*) arg->activeTCPManagers, arg->socket, (void*) arg->sgca);
/* 3. On récupère le rang du thread parmi les "managers" */
index = -1;
for( i = 0 ; i < MAX_TCP_THR ; i++ )
if( arg->managers[i] == pthread_self() ){ index = i; break; }
if( arg->TCPManagers[i] == pthread_self() ){ index = i; break; }
// Erreur de thread
if( index == -1 ){
@ -69,13 +70,8 @@ void* managePlane(void* THREADABLE_ARGS){
/* (2) Récupération de la requête
---------------------------------------------------------*/
/* 1. On lit sur la socket */
if( pindex > -1 && arg->sgca->unit[pindex].token != 0 )
sleep(1);
if( pindex > -1 ) pthread_mutex_lock(&arg->sgca->unit[pindex].mutex);
read = recv(SOCKET, buffer, MAX_BUF_LEN, 0);
if( pindex > -1 ) pthread_mutex_unlock(&arg->sgca->unit[pindex].mutex);
/* 2.1. Si erreur reception (-1:erreur, 0:fermeture client propre) */
if( read <= 0 ){
@ -84,19 +80,20 @@ void* managePlane(void* THREADABLE_ARGS){
}
/* 2.2. Si message trop court */
if( read != PLANE_LEN ){
if( DEBUGMOD&BUF ) printf("{tcp_com}(%d) read: %d (expected: %d)\n", index, read, (int) PLANE_LEN);
if( read != PLANE_LEN+1 ){
if( DEBUGMOD&BUF ) printf("{tcp_com}(%d) read: %d (expected: %d)\n", index, read, (int) (PLANE_LEN+1) );
continue;
}
/* 3. On parse la requête (indianness: network order) */
memcpy(&data.code, buffer+sizeof(char)*0+sizeof(int)*0, sizeof(char)*6);
memcpy(&data.x, buffer+sizeof(char)*6+sizeof(int)*0, sizeof(int));
memcpy(&data.y, buffer+sizeof(char)*6+sizeof(int)*1, sizeof(int));
memcpy(&data.z, buffer+sizeof(char)*6+sizeof(int)*2, sizeof(int));
memcpy(&data.spd, buffer+sizeof(char)*6+sizeof(int)*4, sizeof(int));
memcpy(&data.cap, buffer+sizeof(char)*6+sizeof(int)*3, sizeof(int));
memcpy(&tmpFlags, buffer+sizeof(char)*0+sizeof(int)*0, sizeof(char) );
memcpy(&data.code, buffer+sizeof(char)*1+sizeof(int)*0, sizeof(char)*6 );
memcpy(&data.x, buffer+sizeof(char)*7+sizeof(int)*0, sizeof(int) );
memcpy(&data.y, buffer+sizeof(char)*7+sizeof(int)*1, sizeof(int) );
memcpy(&data.z, buffer+sizeof(char)*7+sizeof(int)*2, sizeof(int) );
memcpy(&data.spd, buffer+sizeof(char)*7+sizeof(int)*4, sizeof(int) );
memcpy(&data.cap, buffer+sizeof(char)*7+sizeof(int)*3, sizeof(int) );
/* 4. Gestion de l'indianness */
data.x = ntohl(data.x);
@ -134,13 +131,18 @@ void* managePlane(void* THREADABLE_ARGS){
arg->sgca->n++;
arg->sgca->unit = (struct context_unit*) realloc(arg->sgca->unit, sizeof(struct context_unit)*arg->sgca->n + 1);
arg->sgca->unit[pindex].socket = SOCKET;
arg->sgca->unit[pindex].token = 0;
pthread_mutex_init(&arg->sgca->unit[pindex].mutex, NULL);
arg->sgca->unit[pindex].thrId = 0xf0;
printf("{tcp_com}(%d) plane '%s' (#%d) created\n", index, data.code, pindex);
}
/* 4. On copie/met à jour les valeurs */
if( tmpFlags != 0x10 ){ // gestion réponse ctrlTerm
arg->sgca->unit[pindex].flags = tmpFlags; // on met à jour les flags
while( arg->sgca->unit[pindex].thrId == 0xf0 ); // on attend le numéro de thread
pthread_kill(arg->UDPManagers[arg->sgca->unit[pindex].thrId], SIGCONT); // on réveille le thread
arg->sgca->unit[pindex].thrId = 0xf0; // on laisse la place pour qqn d'autre
}
memcpy(&arg->sgca->unit[pindex].data, &data, sizeof(struct plane));
if( DEBUGMOD&COM ) printf("{tcp_com}(%d) stored (%d)'%s': {x = %d; y = %d; z = %d; cap = %d; spd = %d}\n", index, pindex, arg->sgca->unit[pindex].data.code, arg->sgca->unit[pindex].data.x, arg->sgca->unit[pindex].data.y, arg->sgca->unit[pindex].data.z, arg->sgca->unit[pindex].data.cap, arg->sgca->unit[pindex].data.spd);
@ -174,7 +176,7 @@ void* managePlane(void* THREADABLE_ARGS){
============================================================================*/
/* 1. On met à jour "activeManagers" */
if( index != -1 )
arg->activeManagers[index] = 0;
arg->activeTCPManagers[index] = 0;
/* 2. On arrête le THREAD */
if( DEBUGMOD&THR ) printf("{tcp_com}(%d) freed\n", index);
@ -223,7 +225,7 @@ void* manageViewTerm(void* THREADABLE_ARGS){
/* 3. On récupère le rang dans les "managers" */
for( i = 0 ; i < MAX_UDP_THR ; i++ )
if( arg->managers[i] == pthread_self() ){ index = i; break; }
if( arg->UDPManagers[i] == pthread_self() ){ index = i; break; }
printf("{udp_vterm}{com}(%d) starting terminal routine (rate: %d sec)\n", index, PUBL_TIMEOUT);
@ -270,7 +272,7 @@ void* manageViewTerm(void* THREADABLE_ARGS){
============================================================================*/
/* 1. On met à jour "activeManagers" */
if( index != -1 )
arg->activeManagers[index] = 0;
arg->activeUDPManagers[index] = 0;
/* 2. On ferme la socket + libère la mémoire */
close(arg->socket);
@ -319,6 +321,7 @@ void* manageCtrlTerm(void* THREADABLE_ARGS){
char buffer[MAX_BUF_LEN]; // Buffer d'envoi
char* dataBuffer = malloc(1);
struct term_req request; // Requête
char flags;
/* 2. On récupère les arguments */
@ -326,7 +329,7 @@ void* manageCtrlTerm(void* THREADABLE_ARGS){
/* 3. On récupère le rang dans les "managers" */
for( i = 0 ; i < MAX_UDP_THR ; i++ )
if( arg->managers[i] == pthread_self() ){ index = i; break; }
if( arg->UDPManagers[i] == pthread_self() ){ index = i; break; }
printf("{udp_cterm}{com}(%d) starting terminal routine (rate: %d sec)\n", index, PUBL_TIMEOUT);
@ -422,10 +425,6 @@ void* manageCtrlTerm(void* THREADABLE_ARGS){
count += last; last = sizeof(int); memcpy(buffer+count, &request.update.spd, last );
count += last;
// utilisation socket
arg->sgca->unit[pindex].token = 1;
pthread_mutex_lock(&arg->sgca->unit[pindex].mutex);
/* 3. On envoie la requête à l'avion */
if( send(arg->sgca->unit[pindex].socket, buffer, count/sizeof(char), 0) <= 0 ){
printf("{udp_cterm}{com}(%d) Cannot send request to plane\n", index);
@ -435,29 +434,30 @@ void* manageCtrlTerm(void* THREADABLE_ARGS){
/* (3) Gestion de la réponse de l'avion
---------------------------------------------------------*/
/* 1. Réception de la réponse */
count = recv(arg->sgca->unit[pindex].socket, buffer, sizeof(char), 0);
// Attente du flag de réponse
arg->sgca->unit[pindex].thrId = index;
sleep(2);
flags = arg->sgca->unit[pindex].flags; // on récupère les flags
arg->sgca->unit[pindex].flags = 0x10; // on reset les flags
// libération socket
pthread_mutex_unlock(&arg->sgca->unit[pindex].mutex);
if( count != sizeof(char) ){
if( flags == 0x10 ){
printf("{udp_cterm}{com}(%d) Cannot get response from plane %d (%d)\n", index, count, (int) PLANE_LEN);
break;
}
printf("{udp_cterm}{com}(%d) Plane res { flags: %d }\n", index, buffer[0]);
printf("{udp_cterm}{com}(%d) Plane res { flags: %d }\n", index, flags);
/* 2. Gestion de la validation de l'update */
if( !(buffer[0]&TERMREQ_ALT) && request.flags&TERMREQ_ALT )
if( !(flags&TERMREQ_ALT) && request.flags&TERMREQ_ALT )
request.flags -= TERMREQ_ALT;
if( !(buffer[0]&TERMREQ_CAP) && request.flags&TERMREQ_CAP )
if( !(flags&TERMREQ_CAP) && request.flags&TERMREQ_CAP )
request.flags -= TERMREQ_CAP;
if( !(buffer[0]&TERMREQ_SPD) && request.flags&TERMREQ_SPD )
if( !(flags&TERMREQ_SPD) && request.flags&TERMREQ_SPD )
request.flags -= TERMREQ_SPD;
/* 3. Si pas de demande des données -> on*/
/* 3. Si pas de demande des données -> on pré-remplie la requête */
if( !fbk ){
dataBuffer = realloc(dataBuffer, sizeof(char)*2 );
dataBuffer[0] = request.flags;
@ -497,7 +497,7 @@ void* manageCtrlTerm(void* THREADABLE_ARGS){
============================================================================*/
/* 1. On met à jour "activeManagers" */
if( index != -1 )
arg->activeManagers[index] = 0;
arg->activeUDPManagers[index] = 0;
/* 2. On ferme la socket */
close(arg->socket);

View File

@ -167,6 +167,7 @@ int main(int argc, char* argv[]){
}
sleep(1);
@ -256,6 +257,7 @@ int main(int argc, char* argv[]){
}
sleep(1);
/************************************************
**** DEMANDE MISE A JOUR SIMPLE ****
@ -344,6 +346,7 @@ int main(int argc, char* argv[]){
}
sleep(1);
/************************************************
**** DEMANDE MISE A JOUR SIMPLE + DATA ****
@ -427,9 +430,91 @@ int main(int argc, char* argv[]){
}
sleep(1);
/************************************************
**** DEMANDE MISE A JOUR COMBINEE ****
************************************************/
if( 1 ){
printf("\n\nDEMANDE D'UPDATE MULTIPLE + DATA\n================================\n");
/* [1] Preparing request
=========================================================*/
tr.flags = TERMREQ_FBK|TERMREQ_ALT|TERMREQ_CAP|TERMREQ_SPD;
memcpy(&tr.update, &tmp, sizeof(struct plane));
tr.update.z = 2001;
tr.update.cap = 21;
tr.update.spd = 201;
printf("Update req { flags: %d; plane: #%s { z=%d; cap=%d; speed=%d} }\n", tr.flags, tr.update.code, tr.update.z, tr.update.cap, tr.update.spd);
tr.update.x = htonl(tr.update.x);
tr.update.y = htonl(tr.update.y);
tr.update.z = htonl(tr.update.z);
tr.update.cap = htonl(tr.update.cap);
tr.update.spd = htonl(tr.update.spd);
/* [2] Filling buffer
=========================================================*/
bzero(buffer, 512);
memcpy(buffer+sizeof(char)*0+sizeof(int)*0, &tr.flags, sizeof(char));
memcpy(buffer+sizeof(char)*1+sizeof(int)*0, &tr.update.code, sizeof(char)*6);
memcpy(buffer+sizeof(char)*7+sizeof(int)*0, &tr.update.x, sizeof(int));
memcpy(buffer+sizeof(char)*7+sizeof(int)*1, &tr.update.y, sizeof(int));
memcpy(buffer+sizeof(char)*7+sizeof(int)*2, &tr.update.z, sizeof(int));
memcpy(buffer+sizeof(char)*7+sizeof(int)*3, &tr.update.cap, sizeof(int));
memcpy(buffer+sizeof(char)*7+sizeof(int)*4, &tr.update.spd, sizeof(int));
/* [3] Sending request
=========================================================*/
if( sendto(csocket, buffer, 7+sizeof(int)*5+1, 0, (struct sockaddr*) &server, len) < 0 ){
printf("Cannot ask for simple update.\n");
return EXIT_FAILURE;
}
/* [4] Fetch & print data
=========================================================*/
/* 1. On récupère la réponse */
len = sizeof(struct sockaddr_in);
nb = recvfrom(csocket, buffer, 512, 0, (struct sockaddr*) &server, &len);
printf("%d bytes received\n", nb);
if( nb < 1 ){
printf("Error receiving simple update.\n");
return EXIT_FAILURE;
}
/* 2. On parse la réponse */
count = 0; last = sizeof(char); memcpy(&trs.flags, buffer+count, last);
count += last; last = sizeof(char); memcpy(&trs.n, buffer+count, last);
printf("Flag received : %d\n", trs.flags);
trs.data = malloc( sizeof(struct plane) * trs.n );
for( i = 0 ; i < trs.n ; i++ ){
count += last; last = sizeof(char)*6; memcpy(&trs.data[i].code, buffer+count, last);
count += last; last = sizeof(int); memcpy(&trs.data[i].x, buffer+count, last);
count += last; last = sizeof(int); memcpy(&trs.data[i].y, buffer+count, last);
count += last; last = sizeof(int); memcpy(&trs.data[i].z, buffer+count, last);
count += last; last = sizeof(int); memcpy(&trs.data[i].cap, buffer+count, last);
count += last; last = sizeof(int); memcpy(&trs.data[i].spd, buffer+count, last);
trs.data[i].x = ntohl(trs.data[i].x);
trs.data[i].y = ntohl(trs.data[i].y);
trs.data[i].z = ntohl(trs.data[i].z);
trs.data[i].cap = ntohl(trs.data[i].cap);
trs.data[i].spd = ntohl(trs.data[i].spd);
printf("Plane[%s@%d] { (%d,%d,%d), cap: %d, spd: %d}\n", trs.data[i].code, i, trs.data[i].x, trs.data[i].y, trs.data[i].z, trs.data[i].cap, trs.data[i].spd);
}
}

View File

@ -118,7 +118,7 @@ void close_communication(){
void send_data(){
void send_data(char flags){
/* [0] Initialisation des variables
=========================================================*/
@ -126,7 +126,6 @@ void send_data(){
struct plane_data request;
int read;
/* [1] Envoi des caractéristiques
=========================================================*/
/* 1. Vérification socket */
@ -142,14 +141,15 @@ void send_data(){
request.spd = htonl( ctrl.speed );
/* 3. Copie buffer */
memcpy(buffer+sizeof(char)*0+sizeof(int)*0, &request.code, sizeof(char)*6 );
memcpy(buffer+sizeof(char)*6+sizeof(int)*0, &request.x, sizeof(int) );
memcpy(buffer+sizeof(char)*6+sizeof(int)*1, &request.y, sizeof(int) );
memcpy(buffer+sizeof(char)*6+sizeof(int)*2, &request.z, sizeof(int) );
memcpy(buffer+sizeof(char)*6+sizeof(int)*3, &request.cap, sizeof(int) );
memcpy(buffer+sizeof(char)*6+sizeof(int)*4, &request.spd, sizeof(int) );
memcpy(buffer+sizeof(char)*0+sizeof(int)*0, &flags, sizeof(char) );
memcpy(buffer+sizeof(char)*1+sizeof(int)*0, &request.code, sizeof(char)*6 );
memcpy(buffer+sizeof(char)*7+sizeof(int)*0, &request.x, sizeof(int) );
memcpy(buffer+sizeof(char)*7+sizeof(int)*1, &request.y, sizeof(int) );
memcpy(buffer+sizeof(char)*7+sizeof(int)*2, &request.z, sizeof(int) );
memcpy(buffer+sizeof(char)*7+sizeof(int)*3, &request.cap, sizeof(int) );
memcpy(buffer+sizeof(char)*7+sizeof(int)*4, &request.spd, sizeof(int) );
read = send(commu_socket, buffer, PLANE_DATA_LEN/sizeof(char), 0);
read = send(commu_socket, buffer, (1+PLANE_DATA_LEN)/sizeof(char), 0);
if( read <= 0 ){
printf("Cannot send\n");
@ -288,11 +288,13 @@ void update(){
int received;
struct sgca_req request;
char buffer[MAX_BUF_LEN] = {0};
char persistentFlags = 0x10;
while( 1 ){
calc_ctrl();
send_data();
send_data(persistentFlags);
persistentFlags = 0x10;
/* [1] Gestion réception avec timeout
=========================================================*/
@ -334,17 +336,12 @@ void update(){
request.flags -= REQ_CAP;
/* [3] Gestion de la réponse
/* [3] Gestion des flags persistents
=========================================================*/
/* 1. On prépare la réponse */
buffer[0] = request.flags;
/* 1. On remplit le champ */
persistentFlags = request.flags;
printf("Response flags : %d\n", request.flags);
/* 2. Envoi de la réponse */
if( send(commu_socket, buffer, sizeof(char), 0) <= 0 ){
printf("Cannot send response\n");
}
}
}

View File

@ -49,6 +49,7 @@
int spd;
};
#define REQ_FBK 0x01
#define REQ_CAP 0x02
#define REQ_SPD 0x04
#define REQ_ALT 0x08