Log In
New Account
  
 
Home My Page Project Tree Code Snippets Project Openings NUTSS
 
 
Summary Tracker Lists CVS Files
 

CVS | Administration

Diff for /libnutss/examples/slinks_variable_load.c between versions 1.24 and 1.25

version 1.24, 2007/07/17 02:17:30 version 1.25, 2007/07/18 05:09:39
Line 1 Line 1
 #include <stdio.h>  #include <stdio.h>
 #include <stdlib.h>  #include <stdlib.h>
 #include <string.h>  #include <string.h>
   #include <math.h>
 #ifndef WIN32  #ifndef WIN32
 #include <poll.h>  #include <poll.h>
 #include <sys/socket.h>  #include <sys/socket.h>
Line 41  void *recvthread(void *arg) { Line 42  void *recvthread(void *arg) {
     socklen_t len = sizeof(peer);      socklen_t len = sizeof(peer);
     flow_t *flows = NULL, *newflow = NULL, *ptr = NULL, *axe = NULL;      flow_t *flows = NULL, *newflow = NULL, *ptr = NULL, *axe = NULL;
     char msg[64], tmp[128], buf[2048], greeting[1024];      char msg[64], tmp[128], buf[2048], greeting[1024];
   int time = 0, load = 0, exts, delta, expiration, greeted = 0;    int time = 0, load = 0, exts, delta, expiration, greeted = 0, killed = 0, i;
   
     clear(buf);      clear(buf);
     while(time < limit) {      while(time < limit) {
Line 56  void *recvthread(void *arg) { Line 57  void *recvthread(void *arg) {
                 greeted = 1;                  greeted = 1;
                 continue;                  continue;
             }              }
               if(streq(tmp,"SHUTDOWN")) {
                   swaplinks_update_walk_length(swp,1);
                   for(i = 0; i < peers*2; i++)
                       swaplinks_sendtoany(swp,"SHUTDOWN",9,0);
                   printf("%d Killed prematurely...\n");
                   killed = 1;
                   break;
               }
             time++; delta = 0;              time++; delta = 0;
             // parse new flow              // parse new flow
             newflow = (flow_t*)calloc(1,sizeof(flow_t));              newflow = (flow_t*)calloc(1,sizeof(flow_t));
Line 125  void *recvthread(void *arg) { Line 134  void *recvthread(void *arg) {
 //          if(s) fprintf(s,"%s",buf);  //          if(s) fprintf(s,"%s",buf);
         }          }
     }      }
   if(s) fprintf(s,"%d DONE RECEIVING\n",num);    if(s&& !killed) fprintf(s,"%d DONE RECEIVING\n",num);
   
     // clean up flows list      // clean up flows list
     for(ptr = flows;ptr;) {axe = ptr; ptr = ptr->next; free(axe);}      for(ptr = flows;ptr;) {axe = ptr; ptr = ptr->next; free(axe);}
   
       if(killed) {
           if(f && f != stderr) { fclose(f); f = NULL; }
           if(s && s != stdout) { fclose(s); s = NULL; }
           pthread_exit(NULL);
       }
   
     while(!greeted) {      while(!greeted) {
         memset(tmp,0,128);          memset(tmp,0,128);
         memset(&peer,0,sizeof(peer));          memset(&peer,0,sizeof(peer));
Line 153  void *recvthread(void *arg) { Line 168  void *recvthread(void *arg) {
     for(;;) {      for(;;) {
         memset(tmp,0,128);          memset(tmp,0,128);
         if(swaplinks_recvfrom(swp, tmp, sizeof(tmp), 0, &peer, &len) > 0) {          if(swaplinks_recvfrom(swp, tmp, sizeof(tmp), 0, &peer, &len) > 0) {
             printf("%d is holding pattern: %s\n",num,tmp);  
             if(streq(tmp,"SHUTDOWN")) { printf("%d Received KILL signal from coordinator. Shutting down...\n",num); break; }              if(streq(tmp,"SHUTDOWN")) { printf("%d Received KILL signal from coordinator. Shutting down...\n",num); break; }
             else if(streq(tmp,"GREETING")) continue;              else if(streq(tmp,"GREETING")) continue;
             newflow = (flow_t*)calloc(1,sizeof(flow_t));              newflow = (flow_t*)calloc(1,sizeof(flow_t));
Line 163  void *recvthread(void *arg) { Line 177  void *recvthread(void *arg) {
             strncpy(newflow->source,peer.user+4,sizeof(newflow->source));              strncpy(newflow->source,peer.user+4,sizeof(newflow->source));
             if(exts < 5) {              if(exts < 5) {
                 swaplinks_update_walk_length(swp,1);                  swaplinks_update_walk_length(swp,1);
               snprintf(msg,64,"%d;%d;%d;",newflow->duration,newflow->weight,exts+1);                // dont increase expiration here since it hit a dead node
                 snprintf(msg,64,"%d;%d;%d;",newflow->duration,newflow->weight,exts);
                 swaplinks_sendtoany(swp,msg,strlenn(msg)+1,0);                  swaplinks_sendtoany(swp,msg,strlenn(msg)+1,0);
                 swaplinks_update_walk_length(swp,peers);                  swaplinks_update_walk_length(swp,peers);
             }              }
         }          }
     }      }
   
       swaplinks_update_walk_length(swp,1);
       for(i = 0; i < peers*2; i++)
           swaplinks_sendtoany(swp,"SHUTDOWN",9,0);
   
     if(f && f != stderr) { fclose(f); f = NULL; }      if(f && f != stderr) { fclose(f); f = NULL; }
     if(s && s != stdout) { fclose(s); s = NULL; }      if(s && s != stdout) { fclose(s); s = NULL; }
   
Line 184  int main(int argc, char **argv) { Line 203  int main(int argc, char **argv) {
     pthread_t tid;      pthread_t tid;
   
     if(argc > 1) num = atoi(argv[1]);      if(argc > 1) num = atoi(argv[1]);
   if(argc > 7) {    if(num != 99 && argc > 7) {
         peers = atoi(argv[2]);          peers = atoi(argv[2]);
         limit = atoi(argv[3]);          limit = atoi(argv[3]);
         percent = atoi(argv[4]);          percent = atoi(argv[4]);
Line 192  int main(int argc, char **argv) { Line 211  int main(int argc, char **argv) {
         weight = atoi(argv[6]);          weight = atoi(argv[6]);
         maxload = atoi(argv[7]);          maxload = atoi(argv[7]);
     }      }
       else if(num == 99 && argc > 4) {
           peers = atoi(argv[3]);
           percent = atoi(argv[4]);
       }
     else if(num != 99) {      else if(num != 99) {
       printf("Usage:\n  slinks_variable_load node_number peers_number time_limit percent_to_send flow_duration flow_weight max_load\n");        printf("Usage:\n  slinksvariableload node_number peers_number time_limit percent_to_send flow_duration flow_weight max_load\n");
         exit(1);          exit(1);
     }      }
       else {
          printf("Coordinator Usage:\n  slinksvariableload 99 num_nodes num_peers percent_to_finish\n");
          exit(1);
       }
   
     snprintf(tmp, sizeof(tmp), "test%d", num);      snprintf(tmp, sizeof(tmp), "test%d", num);
     nutss_config_set(NUTSS_CONFIG_USERNAME, tmp, strlenn(tmp));      nutss_config_set(NUTSS_CONFIG_USERNAME, tmp, strlenn(tmp));
Line 228  int main(int argc, char **argv) { Line 255  int main(int argc, char **argv) {
   
     // coordinator thread      // coordinator thread
     if(num == 99) {      if(num == 99) {
       int done = 0, nodes = atoi(argv[2]), i;        int done = 0, nodes = atoi(argv[2]), limit = ceil((float)nodes*((float)percent/100)), i;
       struct sockaddr_ns* peer_list = (struct sockaddr_ns*)calloc(nodes,sizeof(struct sockaddr_ns));        struct sockaddr_ns* peer_list = (struct sockaddr_ns*)calloc(limit,sizeof(struct sockaddr_ns));
         socklen_t len = sizeof(struct sockaddr_ns);          socklen_t len = sizeof(struct sockaddr_ns);
   
       swaplinks_update_walk_length(swp,nodes/2);        // let nodes wake up to recieve the greetings
       for(i = 0; i < nodes*REDUNDANCY; i++) {        sleep(15);
           printf("Coordinator sending greeting...\n");
         swaplinks_update_walk_length(swp,peers/2);
         for(i = 0; i < nodes*2; i++) {
             printf("Coordinator sending greeting%d...\n",i);
             swaplinks_sendtoany(swp,"GREETING",9,0);              swaplinks_sendtoany(swp,"GREETING",9,0);
           sleep(1);            sleep(2);
         }          }
   
       while(done <nodes) {        swaplinks_update_walk_length(swp,nodes/2);
         while(done <limit) {
             printf("Coordinator waiting for DONE signal...\n");              printf("Coordinator waiting for DONE signal...\n");
             if(swaplinks_recvfrom(swp, tmp, sizeof(tmp), 0, peer_list+done, &len) > 0 && streq(tmp,"DONE")) {              if(swaplinks_recvfrom(swp, tmp, sizeof(tmp), 0, peer_list+done, &len) > 0 && streq(tmp,"DONE")) {
                 printf("Coordinator received DONE signal from %s. Total: %d finished nodes.\n",peer_list[done].user+4,done+1);                  printf("Coordinator received DONE signal from %s. Total: %d finished nodes.\n",peer_list[done].user+4,done+1);
Line 247  int main(int argc, char **argv) { Line 278  int main(int argc, char **argv) {
                 done++;                  done++;
             }              }
         }          }
       printf("All nodes reported, now sending KILL signals\n");        printf("%d nodes reported, now sending KILL signals\n",limit);
       for(i = 0; i <nodes; i++) {        for(i = 0; i <limit; i++) {
             printf("Sending kill to %s\n",peer_list[i].user+4);              printf("Sending kill to %s\n",peer_list[i].user+4);
             swaplinks_sendto(swp, "SHUTDOWN", 9, 0, peer_list+i, len);              swaplinks_sendto(swp, "SHUTDOWN", 9, 0, peer_list+i, len);
         }          }
   
           // give them time to scramble
           sleep(60);
   
           // kill stragglers more violently
           system("killall lt-slinksvariableload");
           // clean up their mess
           system("find output -size 0c | xargs rm");
   
         return 0;          return 0;
     }      }
   
     pthread_create(&tid, NULL, recvthread, swp);      pthread_create(&tid, NULL, recvthread, swp);
   
       sleep(3);
   
     while (time++ < REDUNDANCY*limit) {      while (time++ < REDUNDANCY*limit) {
         if(randint(100) < percent) snprintf(tmp, sizeof(tmp), "%d;%d;0;", duration, 0);          if(randint(100) < percent) snprintf(tmp, sizeof(tmp), "%d;%d;0;", duration, 0);
         else snprintf(tmp, sizeof(tmp), "%d;%d;0;", duration, weight);          else snprintf(tmp, sizeof(tmp), "%d;%d;0;", duration, weight);
Line 266  int main(int argc, char **argv) { Line 307  int main(int argc, char **argv) {
         p = process_neighbor_list(nbrs);          p = process_neighbor_list(nbrs);
         if(g) fprintf(g,"%d %d\n",p.i,p.o);          if(g) fprintf(g,"%d %d\n",p.i,p.o);
         clear(nbrs);          clear(nbrs);
       sleep(1);        sleep(2);
     }      }
   
     if(g) fclose(g);      if(g) fclose(g);

Removed from v.1.24  
changed lines
  Added in v.1.25


FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>
 

GForge.cis.cornell.edu is brought to you by

Cornell Computing and Information Science


Powered By GForge Collaborative Development Environment