Log In
New Account
  
 
Home My Page Project Tree Code Snippets Project Openings NUTSS
 
 
Summary Tracker Lists CVS Files
 

CVS | Administration

Diff for /libnutss/examples/slinks_variable_load.c between versions 1.26 and 1.27

version 1.26, 2007/07/20 17:00:22 version 1.27, 2007/07/28 00:41:57
Line 1 Line 1
 #include <stdio.h>  #include <stdio.h>
 #include <stdlib.h>  #include <stdlib.h>
 #include <string.h>  #include <string.h>
#include <math.h>#include <time.h>
 #ifndef WIN32  #ifndef WIN32
 #include <poll.h>  #include <poll.h>
 #include <sys/socket.h>  #include <sys/socket.h>
Line 13 Line 13
 #define sleep(sec) Sleep((sec)*1000)  #define sleep(sec) Sleep((sec)*1000)
 #define snprintf   _snprintf  #define snprintf   _snprintf
 #endif  #endif
   
 #define clear(a) a[0]=0  #define clear(a) a[0]=0
 #define streq(a,b) !strcmp((a),(b))  #define streq(a,b) !strcmp((a),(b))
 #define strlenn(s) (s) ? strlen(s) : 0  #define strlenn(s) (s) ? strlen(s) : 0
 #define randint(N) ((int)(rand() / (((double)RAND_MAX + 1) / (N))))  #define randint(N) ((int)(rand() / (((double)RAND_MAX + 1) / (N))))
 #define assert(p) if(!(p)) { printf("Assertion '%s' FAILED!\n", #p); exit(1); }  #define assert(p) if(!(p)) { printf("Assertion '%s' FAILED!\n", #p); exit(1); }
 #define REDUNDANCY 5  #define REDUNDANCY 5
   #define PROPAGATION 300
   #define GRACE 3
   #define ALLOTMENT 400
   #define WALK_LENGTH 10
   #define MAX_FLOWS 256
   #define BUFFER_SIZE 128
   #define URI_LENGTH 256
   
 typedef struct flow {  typedef struct flow {
   int duration;    int duration, weight, exts;
   int weight;    char source[URI_LENGTH];
   char source[256]; 
     struct flow *next, *last;      struct flow *next, *last;
 } flow_t;  } flow_t;
   
 typedef struct int_pair_t { int i; int o; } int_pair;  typedef struct int_pair_t { int i; int o; } int_pair;
   typedef struct flow_spec_t { int weight; int expiration; } flow_spec;
   
 int_pair process_neighbor_list(char*);  int_pair process_neighbor_list(char*);
   
int num, nodes, peers,limit, maxload;// Gobal information, used by multiple threads
 int num, nodes= 0, peers,capacity;
 swaplinks_p swp;  swaplinks_p swp;
FILE *f = NULL, *s = NULL, *g = NULL;FILE *f = NULL, *s = NULL;
   
void *coord_recvthread(void *arg) {// Node's receiving thread that decides whether to accept an incoming
int i;// request and records all the necessary information
   char tmp[32];void *recvthread(void *arg) {
     struct sockaddr_ns peer;      struct sockaddr_ns peer;
     socklen_t len = sizeof(peer);      socklen_t len = sizeof(peer);
   swaplinks_p swp = (swaplinks_p)arg;    char msg[BUFFER_SIZE], tmp[BUFFER_SIZE];
     int load = 0, bytes, msg_count, i;
     time_t start,now;
     flow_t newflow;
     flow_spec flows[MAX_FLOWS];
   
   sleep(20);    start = time(NULL);
   for(i=0;;i++) {for(;;) {
       if(i % nodes == 0 && (i/nodes+1) < nodes/4) swaplinks_update_walk_length(swp,peers*(i/nodes + 1)); 
       printf("Coordinator sending greeting %d\n",i+1); 
       swaplinks_sendtoany(swp,"GREETING",9,0); 
       sleep(2); 
   } 
   pthread_exit(NULL); 
} 
 
void *recvthread(void *arg) { 
   swaplinks_p swp = (swaplinks_p)arg; 
   struct sockaddr_ns peer, coord; 
   socklen_t len = sizeof(peer); 
   flow_t *flows = NULL, *newflow = NULL, *ptr = NULL, *axe = NULL; 
   char msg[64], tmp[128], buf[2048], greeting[1024]; 
   int time = 0, load = 0, exts, delta, expiration, greeted = 0, killed = 0, i; 
 
   clear(buf); 
   while(time < limit) { 
       memset(tmp,0,128); 
         memset(&peer,0,sizeof(peer));          memset(&peer,0,sizeof(peer));
       if(swaplinks_recvfrom(swp,tmp, sizeof(tmp), 0, &peer, &len)> 0) {        sleep(1);
if(streq(tmp,"GREETING")) {        if((bytes=swaplinks_recvfrom(swp,msg, sizeof(msg), 0, &peer, &len)) > 0) {
               if(greeted) continue;msg[bytes] = 0;
               printf("%d Received greeting from coordinator\n",num);            // Flood SHUTDOWN to neighbors, then power down
               memset(&coord, 0, sizeof(coord));            if(streq(msg,"SHUTDOWN")) {
               memcpy(&coord,&peer,sizeof(peer)); 
               greeted = 1; 
               continue; 
           } 
           if(streq(tmp,"SHUTDOWN")) { 
                 swaplinks_update_walk_length(swp,1);                  swaplinks_update_walk_length(swp,1);
               for(i = 0; i < peers*2; i++)                for(i = 0; i < peers*2; i++)swaplinks_sendtoany(swp,"SHUTDOWN",9,0);
swaplinks_sendtoany(swp,"SHUTDOWN",9,0);                printf("%dRecieved KILL signal...\n",num);
               printf("%dKilled prematurely...\n"); 
               killed = 1; 
                 break;                  break;
             }              }
           time++; delta = 0;            // parse the new flow
           // parse new flowmemcpy(tmp,msg,sizeof(tmp));
newflow = (flow_t*)calloc(1,sizeof(flow_t));            newflow.duration = atoi(strtok(tmp,";"));
           newflow->duration = atoi(strtok(tmp,";"));            newflow.weight = atoi(strtok(NULL,";"));
           newflow->weight = atoi(strtok(NULL,";"));newflow.exts = atoi(strtok(NULL,";"));
exts = atoi(strtok(NULL,";"));            strncpy(newflow.source,peer.user+4,sizeof(newflow.source));
           strncpy(newflow->source,peer.user+4,sizeof(newflow->source));            if(s) fprintf(s,"%d recvd request #%d from %s - w:%d d:%d\n",
           if(num == 1) printf("recvd request %d from %s\n",time,newflow->source+4);                          num,++msg_count,newflow.source,newflow.weight,newflow.duration);
           if(s) fprintf(s,"%d recvd request #%d from %s - w:%d d:%d\n",
                         num,time,newflow->source,newflow->weight,newflow->duration);// retire any expired flowsbefore processing
snprintf(buf,2048,"->%d:%d:%s<->",newflow->duration,newflow->weight,newflow->source);            now = difftime(time(NULL),start);
           // run through flows, decrementing duration, cleaning any expired flows            for(i=0;i < MAX_FLOWS;i++) {
for(ptr = flows; ptr;) {                if(0 < flows[i].expiration && flows[i].expiration < now) {
               sprintf(buf,"%s%d:%d<->",buf,ptr->duration,ptr->weight);                    load -= flows[i].weight;
               expiration = 0;axe = NULL;                    flows[i].weight = 0;
               if(--ptr->duration == 0) {                    flows[i].expiration = 0;
                   axe = ptr;if(s) fprintf(s,"        %d finished accomodating flowat %ld: %d\n",num,now,load);
                   load -= ptr->weight; 
                   delta -= ptr->weight; 
                   if(ptr->last && ptr->next) { 
                       assert(flows != ptr); 
                       ptr->last->next = ptr->next; 
                       ptr->next->last = ptr->last; 
                   } 
                   else if(ptr->next) { 
                       assert(flows == ptr); 
                       ptr->next->last = NULL; 
                       flows = ptr->next; 
                   } 
                   else if(ptr->last) { 
                       assert(flows != ptr); 
                       ptr->last->next = NULL; 
                   } 
                   else { 
                       assert(flows == ptr); 
                       flows = NULL; 
                   } 
                   expiration = 1; 
                 }                  }
                 ptr = ptr->next;  
                 if(expiration) { free(axe); axe = NULL; }  
             }              }
           sprintf(buf,"%s||\n",buf);            if(load+newflow.weight <capacity) {
               load += newflow.weight;
           // if this flow will not put us overbudget, acceptfor(i=0;i < MAX_FLOWS;i++) {
           if(load+newflow->weight <= maxload) {                    if(flows[i].expiration == 0) {
               load += newflow->weight;                        flows[i].weight = newflow.weight;
delta += newflow->weight;                        flows[i].expiration = now + newflow.duration;
               newflow->next = flows;                        if(s) fprintf(s,"        %d acceptedrequest at %ld: %d\n",num,now,load);
               if(flows) flows->last = newflow;if(f) fprintf(f,"%ld %s\n",now,msg);
               flows= newflow;                        break;
               if(s) fprintf(s,"        %d acceptedflow: %d\n",num,load);                    }
                 }
                 assert(i != MAX_FLOWS);
             }              }
             // ADMISSION CONTROL              // ADMISSION CONTROL
           // ifwe're overloaded, drop it ifits been pushed too far,            // do not accept the flow ifit will overloadthis node.
// otherwisepasson to a neighborand mark it            // ifthe request been pushed too far,reject it, otherwise
           else if(exts < 5) {// passit on to a neighbor
               fprintf(s,"        %d rejected flow, load too high: %d <= %d\n",num,maxload,load+newflow->weight);else if(newflow.exts < GRACE) {
                 swaplinks_update_walk_length(swp,1);                  swaplinks_update_walk_length(swp,1);
               snprintf(msg,64,"%d;%d;%d;",newflow->duration,newflow->weight,exts+1);                snprintf(tmp,sizeof(tmp),"%d;%d;%d;",newflow.duration,newflow.weight,newflow.exts+1);
               swaplinks_sendtoany(swp,msg,strlenn(msg)+1,0);                swaplinks_sendtoany(swp,tmp,strlenn(tmp)+1,0);
               swaplinks_update_walk_length(swp,peers);                swaplinks_update_walk_length(swp,WALK_LENGTH);
                 if(s) fprintf(s,"        %d rejected flow, load too high: %d <= %d\n",num,capacity,load+newflow.weight);
                 if(f) fprintf(f,"%ld %s\n",now,"reject");
             }
             else {
                 if(s) fprintf(s,"        %d retired unserviceable request\n",num);
                 if(f) fprintf(f,"%ld %s\n",now,"retire");
             }              }
             else  
                 fprintf(s,"        %d retired unserviceable request\n",num);  
   
             if(f) fprintf(f,"%d\n",load);  
 //          if(s) fprintf(s,"%s",buf);  
         }          }
     }      }
   if(s && !killed) printf("%d DONE RECEIVING\n",num);    // close output files to assure data is recorded
 
   // clean up flows list 
   for(ptr = flows;ptr;) {axe = ptr; ptr = ptr->next; free(axe);} 
 
     if(f && f != stderr) { fclose(f); f = NULL; }      if(f && f != stderr) { fclose(f); f = NULL; }
     if(s && s != stdout) { fclose(s); s = NULL; }      if(s && s != stdout) { fclose(s); s = NULL; }
     if(killed) {  
         swaplinks_update_walk_length(swp,peers*2);  
         for(i = 0; i < peers*2; i++)  
             swaplinks_sendtoany(swp,"SHUTDOWN",9,0);  
         pthread_exit(NULL);  
     }  
   
     while(!greeted) {  
         memset(tmp,0,128);  
         memset(&peer,0,sizeof(peer));  
         if(swaplinks_recvfrom(swp, tmp, sizeof(tmp), 0, &peer, &len) > 0) {  
             if(streq(tmp,"GREETING")) {  
                 printf("%d Received late greeting from coordinator\n",num);  
                 memset(&coord,0,sizeof(coord));  
                 memcpy(&coord,&peer,sizeof(peer));  
                 greeted = 1;  
             }  
         }  
     }  
   
     // tell coordinator that were all done here  
     if(swaplinks_sendto(swp, "DONE", 5, 0, &coord, sizeof(coord)) != 5)  
         printf("%d cannot contact coordinator\n", num);  
     else  
         printf("%d sent alert to coordinator at %s. entering holding pattern...\n",num,coord.user+4);  
   
     // go into holding pattern to let other nodes catch up  
     for(;;) {  
         memset(tmp,0,128);  
         if(swaplinks_recvfrom(swp, tmp, sizeof(tmp), 0, &peer, &len) > 0) {  
             if(streq(tmp,"SHUTDOWN")) { printf("%d Received KILL signal from coordinator. Shutting down...\n",num); break; }  
             else if(streq(tmp,"GREETING")) continue;  
             newflow = (flow_t*)calloc(1,sizeof(flow_t));  
             newflow->duration = atoi(strtok(tmp,";"));  
             newflow->weight = atoi(strtok(NULL,";"));  
             exts = atoi(strtok(NULL,";"));  
             strncpy(newflow->source,peer.user+4,sizeof(newflow->source));  
             if(exts < 5) {  
                 swaplinks_update_walk_length(swp,1);  
                 // dont increase expiration here since it hit a dead node  
                 snprintf(msg,64,"%d;%d;%d;",newflow->duration,newflow->weight,exts);  
                 swaplinks_sendtoany(swp,msg,strlenn(msg)+1,0);  
                 swaplinks_update_walk_length(swp,peers);  
             }  
         }  
     }  
   
     swaplinks_update_walk_length(swp,peers*2);  
     for(i = 0; i < peers*2; i++)  
         swaplinks_sendtoany(swp,"SHUTDOWN",9,0);  
   
     pthread_exit(NULL);      pthread_exit(NULL);
 }  }
   
 int main(int argc, char **argv) {  int main(int argc, char **argv) {
     struct sockaddr_ns reg;      struct sockaddr_ns reg;
   char tmp[256], nbrs[4096];    char tmp[BUFFER_SIZE], nbrs[4096];
   int time = 0, duration, weight, percent,sendcount = 0;    int time = 0, duration, weight, percent,limit, i;
     FILE *g;
     int_pair p;      int_pair p;
   pthread_t tid,cid;    pthread_t tid;
 
   nodes = 0; 
   
       // parse args
     if(argc > 1) num = atoi(argv[1]);      if(argc > 1) num = atoi(argv[1]);
   if(num != 99 && argc >7) {    if(num != 99 && argc >6) {
         peers = atoi(argv[2]);          peers = atoi(argv[2]);
         limit = atoi(argv[3]);          limit = atoi(argv[3]);
         percent = atoi(argv[4]);          percent = atoi(argv[4]);
         duration = atoi(argv[5]);          duration = atoi(argv[5]);
         weight = atoi(argv[6]);          weight = atoi(argv[6]);
       maxload = atoi(argv[7]);        capacity = peers*ALLOTMENT;
     }      }
     else if(num == 99 && argc > 4) {      else if(num == 99 && argc > 4) {
       peers = atoi(argv[3])*2;        nodes = atoi(argv[2]);
       percent = atoi(argv[4]);        // peer coordinator with more nodes
         // to facilitate dispersion of kill signals
         peers = atoi(argv[3]);
         time = atoi(argv[4]);
     }      }
     else if(num != 99) {      else if(num != 99) {
       printf("Usage:\n  slinksvariableload node_numberpeers_number time_limit percent_to_send flow_duration flow_weightmax_load\n");        printf("Usage:\n  slinksvariableload node_numbernum_peersnum_messages percent_load flow_duration flow_weightcapacity\n");
         exit(1);          exit(1);
     }      }
     else {      else {
      printf("Coordinator Usage:\n  slinksvariableload 99 num_nodes num_peerspercent_to_finish\n");       printf("Coordinator Usage:\n  slinksvariableload 99 num_nodes num_peersexperiment_duration_minutes\n");
        exit(1);         exit(1);
     }      }
   
       // set all the important nutss fields
     snprintf(tmp, sizeof(tmp), "test%d", num);      snprintf(tmp, sizeof(tmp), "test%d", num);
     nutss_config_set(NUTSS_CONFIG_USERNAME, tmp, strlenn(tmp));      nutss_config_set(NUTSS_CONFIG_USERNAME, tmp, strlenn(tmp));
     nutss_config_set(NUTSS_CONFIG_PROXYUSERNAME, tmp, strlenn(tmp));      nutss_config_set(NUTSS_CONFIG_PROXYUSERNAME, tmp, strlenn(tmp));
Line 253  int main(int argc, char **argv) { Line 168  int main(int argc, char **argv) {
     strncpy(tmp, "sip.nutss.net:5060", sizeof(tmp));      strncpy(tmp, "sip.nutss.net:5060", sizeof(tmp));
     nutss_config_set(NUTSS_CONFIG_SIGPROXY, tmp, strlenn(tmp));      nutss_config_set(NUTSS_CONFIG_SIGPROXY, tmp, strlenn(tmp));
   
       // open output files
     if(num != 99) {      if(num != 99) {
         sprintf(tmp, "output/data_node%d.txt", num); f = fopen(tmp,"w");          sprintf(tmp, "output/data_node%d.txt", num); f = fopen(tmp,"w");
       sprintf(tmp, "output/output_node%d.txt", num); s = fopen(tmp,"w");//        sprintf(tmp, "output/output_node%d.txt", num); s = fopen(tmp,"w");
sprintf(tmp, "output/nbrs_node%d.txt", num); g = fopen(tmp,"w");//        sprintf(tmp, "output/nbrs_node%d.txt", num); g = fopen(tmp,"w");
         if (f == NULL) f = stderr;          if (f == NULL) f = stderr;
         if (s == NULL) s = stdout;          if (s == NULL) s = stdout;
     }      }
       if(f) fprintf(f,"%d\n",peers);
   
   memset(tmp,0,256);    // build registrar uri
     memset(&reg, 0, sizeof(reg));      memset(&reg, 0, sizeof(reg));
     reg.family = AF_NUTSS;      reg.family = AF_NUTSS;
     strncpy(reg.user, "ths1", sizeof(reg.user));      strncpy(reg.user, "ths1", sizeof(reg.user));
     strncpy(reg.domain, "nutss.net", sizeof(reg.domain));      strncpy(reg.domain, "nutss.net", sizeof(reg.domain));
     strncpy(reg.service, "swaplinksd", sizeof(reg.service));      strncpy(reg.service, "swaplinksd", sizeof(reg.service));
   // start up swaplinks and let it get going
   swaplinks_init();    swaplinks_init();swp = swaplinks_new("cloud9", &reg, peers,WALK_LENGTH); sleep(5);
swp = swaplinks_new("cloud9", &reg, peers,peers+5); 
   // let swaplinks get going. 
   sleep(5); 
   
     // coordinator thread      // coordinator thread
     if(num == 99) {      if(num == 99) {
         int done = 0, limit, i;  
         struct sockaddr_ns* peer_list;  
         socklen_t len = sizeof(struct sockaddr_ns);  
   
         nodes = atoi(argv[2]);  
         limit = ceil((float)nodes*((float)percent/100));  
         peer_list = (struct sockaddr_ns*)calloc(limit,sizeof(struct sockaddr_ns));  
   
         // let nodes wake up to recieve the greetings          // let nodes wake up to recieve the greetings
       sleep(20);        for(;time > 0;time--) {
           printf("---> %dMINUTE%s REMAIN%s\n",time,time==1?"":"S",time==1?"S":"");
       pthread_create(&cid,NULL,coord_recvthread,swp);            sleep(60);
 
       swaplinks_update_walk_length(swp,nodes/2); 
       while(done < limit) { 
           if(swaplinks_recvfrom(swp, tmp, sizeof(tmp), 0, peer_list+done, &len) > 0&& streq(tmp,"DONE")) { 
printf("Coordinator received DONE signal from %s. Total: %dfinished nodes.\n",peer_list[done].user+4,done+1); 
               swaplinks_sendtoany(swp,"GREETING",9,0); 
               done++; 
           } 
       } 
       printf("%d nodes reported, now sending KILL signals\n",limit); 
       for(i = 0; i < limit; i++) { 
           printf("Sending kill to %s\n",peer_list[i].user+4); 
           swaplinks_sendto(swp, "SHUTDOWN", 9, 0, peer_list+i, len); 
         }          }
       printf("Now sending KILL signals\n",limit);
         for(i = 0; i < nodes*REDUNDANCY; i++) swaplinks_sendtoany(swp, "SHUTDOWN", 9, 0);
         // give them time to scramble          // give them time to scramble
       sleep(180);        printf("Waiting for KILL signals to propagate\n");
       sleep(PROPAGATION);
         // clean up their mess          // clean up their mess
           printf("Forcibly removing remaining nodes and associated file\n");
         system("find output -size 0c | xargs rm -rf");          system("find output -size 0c | xargs rm -rf");
       // then kill stragglers more violently        // then kill stragglers more violently(including the coordinator and its sending thread as well)
         system("killall lt-slinksvariableload");          system("killall lt-slinksvariableload");
   
         return 0;          return 0;
     }      }
   
     pthread_create(&tid, NULL, recvthread, swp);      pthread_create(&tid, NULL, recvthread, swp);
       // give recieving threads a chance to wake up
       sleep(5);
   
   sleep(3);    while (time++ <limit) {
// send a request for resources with probability percent/100
   while (time++ <REDUNDANCY*limit) {        if(randint(100) < percent){
if(randint(100) < percent)snprintf(tmp,sizeof(tmp),"%d;%d;0;",duration,weight);            snprintf(tmp,sizeof(tmp),"%d;%d;0;",duration,weight);
       else snprintf(tmp, sizeof(tmp), "%d;%d;0;", duration, 0);            swaplinks_sendtoany(swp,tmp,strlenn(tmp)+1, 0);
       swaplinks_sendtoany(swp,tmp,strlenn(tmp)+1, 0);}
swaplinks_get_neighbors(swp,nbrs,4096);        // record number and type of neighbors
p = process_neighbor_list(nbrs);      //  swaplinks_get_neighbors(swp,nbrs,4096);
if(g) fprintf(g,"%d %d\n",p.i,p.o);//  p = process_neighbor_list(nbrs);
clear(nbrs);//  if(g) fprintf(g,"%d %d\n",p.i,p.o);
sleep(2);//  clear(nbrs);
 // it might make more sense to sleep for more time between requests
         sleep(3);
     }      }
   
   if(g) fclose(g);  //  if(g) fclose(g);
     if(s) printf("%d DONE SENDING\n",num);      if(s) printf("%d DONE SENDING\n",num);
     pthread_join(tid, NULL);      pthread_join(tid, NULL);
     if(s) fprintf(s,"%d SHUTTING DOWN\n",num);      if(s) fprintf(s,"%d SHUTTING DOWN\n",num);
   
     if(num == 1) system("echo \" \" | mail -s\"node one finished sending!\" ths22@cs.cornell.edu");  
   
     return 0;      return 0;
 }  }
   

Removed from v.1.26  
changed lines
  Added in v.1.27


FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>
 

GForge.cis.cornell.edu is brought to you by

Cornell Computing and Information Science


Powered By GForge Collaborative Development Environment