Log In
New Account
  
 
Home My Page Project Tree Code Snippets Project Openings NUTSS
 
 
Summary Tracker Lists CVS Files
 

CVS | Administration

Diff for /libnutss/examples/slinks_variable_load.c between versions 1.25 and 1.26

version 1.25, 2007/07/18 05:09:39 version 1.26, 2007/07/20 17:00:22
Line 32  typedef struct int_pair_t { int i; int o Line 32  typedef struct int_pair_t { int i; int o
   
 int_pair process_neighbor_list(char*);  int_pair process_neighbor_list(char*);
   
int num, peers, limit, maxload;int num, nodes, peers, limit, maxload;
 swaplinks_p swp;  swaplinks_p swp;
 FILE *f = NULL, *s = NULL, *g = NULL;  FILE *f = NULL, *s = NULL, *g = NULL;
   
   void *coord_recvthread(void *arg) {
       int i;
       char tmp[32];
       struct sockaddr_ns peer;
       socklen_t len = sizeof(peer);
       swaplinks_p swp = (swaplinks_p)arg;
   
       sleep(20);
       for(i=0;;i++) {
           if(i % nodes == 0 && (i/nodes+1) < nodes/4) swaplinks_update_walk_length(swp,peers*(i/nodes + 1));
           printf("Coordinator sending greeting %d\n",i+1);
           swaplinks_sendtoany(swp,"GREETING",9,0);
           sleep(2);
       }
       pthread_exit(NULL);
   }
   
 void *recvthread(void *arg) {  void *recvthread(void *arg) {
     swaplinks_p swp = (swaplinks_p)arg;      swaplinks_p swp = (swaplinks_p)arg;
     struct sockaddr_ns peer, coord;      struct sockaddr_ns peer, coord;
Line 72  void *recvthread(void *arg) { Line 89  void *recvthread(void *arg) {
             newflow->weight = atoi(strtok(NULL,";"));              newflow->weight = atoi(strtok(NULL,";"));
             exts = atoi(strtok(NULL,";"));              exts = atoi(strtok(NULL,";"));
             strncpy(newflow->source,peer.user+4,sizeof(newflow->source));              strncpy(newflow->source,peer.user+4,sizeof(newflow->source));
               if(num == 1) printf("recvd request %d from %s\n",time,newflow->source+4);
             if(s) fprintf(s,"      %d recvd request #%d from %s - w:%d d:%d\n",              if(s) fprintf(s,"      %d recvd request #%d from %s - w:%d d:%d\n",
                           num,time,newflow->source,newflow->weight,newflow->duration);                            num,time,newflow->source,newflow->weight,newflow->duration);
             snprintf(buf,2048,"->%d:%d:%s<->",newflow->duration,newflow->weight,newflow->source);              snprintf(buf,2048,"->%d:%d:%s<->",newflow->duration,newflow->weight,newflow->source);
Line 134  void *recvthread(void *arg) { Line 152  void *recvthread(void *arg) {
 //          if(s) fprintf(s,"%s",buf);  //          if(s) fprintf(s,"%s",buf);
         }          }
     }      }
   if(s && !killed)fprintf(s,"%d DONE RECEIVING\n",num);    if(s && !killed)printf("%d DONE RECEIVING\n",num);
   
     // clean up flows list      // clean up flows list
     for(ptr = flows;ptr;) {axe = ptr; ptr = ptr->next; free(axe);}      for(ptr = flows;ptr;) {axe = ptr; ptr = ptr->next; free(axe);}
   
       if(f && f != stderr) { fclose(f); f = NULL; }
       if(s && s != stdout) { fclose(s); s = NULL; }
     if(killed) {      if(killed) {
       if(f && f != stderr) { fclose(f); f = NULL; }        swaplinks_update_walk_length(swp,peers*2);
       if(s && s != stdout) { fclose(s); s = NULL; }        for(i = 0; i < peers*2; i++)
             swaplinks_sendtoany(swp,"SHUTDOWN",9,0);
         pthread_exit(NULL);          pthread_exit(NULL);
   }    }
   
     while(!greeted) {      while(!greeted) {
         memset(tmp,0,128);          memset(tmp,0,128);
Line 185  void *recvthread(void *arg) { Line 206  void *recvthread(void *arg) {
         }          }
     }      }
   
   swaplinks_update_walk_length(swp,1);    swaplinks_update_walk_length(swp,peers*2);
     for(i = 0; i < peers*2; i++)      for(i = 0; i < peers*2; i++)
         swaplinks_sendtoany(swp,"SHUTDOWN",9,0);          swaplinks_sendtoany(swp,"SHUTDOWN",9,0);
   
     if(f && f != stderr) { fclose(f); f = NULL; }  
     if(s && s != stdout) { fclose(s); s = NULL; }  
   
     pthread_exit(NULL);      pthread_exit(NULL);
 }  }
   
Line 200  int main(int argc, char **argv) { Line 218  int main(int argc, char **argv) {
     char tmp[256], nbrs[4096];      char tmp[256], nbrs[4096];
     int time = 0, duration, weight, percent, sendcount = 0;      int time = 0, duration, weight, percent, sendcount = 0;
     int_pair p;      int_pair p;
   pthread_t tid;    pthread_t tid,cid;
 
     nodes = 0;
   
     if(argc > 1) num = atoi(argv[1]);      if(argc > 1) num = atoi(argv[1]);
     if(num != 99 && argc > 7) {      if(num != 99 && argc > 7) {
Line 212  int main(int argc, char **argv) { Line 232  int main(int argc, char **argv) {
         maxload = atoi(argv[7]);          maxload = atoi(argv[7]);
     }      }
     else if(num == 99 && argc > 4) {      else if(num == 99 && argc > 4) {
       peers = atoi(argv[3]);        peers = atoi(argv[3])*2;
         percent = atoi(argv[4]);          percent = atoi(argv[4]);
     }      }
     else if(num != 99) {      else if(num != 99) {
Line 235  int main(int argc, char **argv) { Line 255  int main(int argc, char **argv) {
   
     if(num != 99) {      if(num != 99) {
         sprintf(tmp, "output/data_node%d.txt", num); f = fopen(tmp,"w");          sprintf(tmp, "output/data_node%d.txt", num); f = fopen(tmp,"w");
//        sprintf(tmp, "output/output_node%d.txt", num); s = fopen(tmp,"w");        sprintf(tmp, "output/output_node%d.txt", num); s = fopen(tmp,"w");
         sprintf(tmp, "output/nbrs_node%d.txt", num); g = fopen(tmp,"w");          sprintf(tmp, "output/nbrs_node%d.txt", num); g = fopen(tmp,"w");
         if (f == NULL) f = stderr;          if (f == NULL) f = stderr;
         if (s == NULL) s = stdout;          if (s == NULL) s = stdout;
Line 255  int main(int argc, char **argv) { Line 275  int main(int argc, char **argv) {
   
     // coordinator thread      // coordinator thread
     if(num == 99) {      if(num == 99) {
       int done = 0,nodes = atoi(argv[2]), limit= ceil((float)nodes*((float)percent/100)), i;        int done = 0,limit, i;
       struct sockaddr_ns* peer_list= (struct sockaddr_ns*)calloc(limit,sizeof(struct sockaddr_ns));        struct sockaddr_ns* peer_list;
         socklen_t len = sizeof(struct sockaddr_ns);          socklen_t len = sizeof(struct sockaddr_ns);
   
           nodes = atoi(argv[2]);
           limit = ceil((float)nodes*((float)percent/100));
           peer_list = (struct sockaddr_ns*)calloc(limit,sizeof(struct sockaddr_ns));
   
         // let nodes wake up to recieve the greetings          // let nodes wake up to recieve the greetings
       sleep(15);        sleep(20);
   
       swaplinks_update_walk_length(swp,peers/2);        pthread_create(&cid,NULL,coord_recvthread,swp);
       for(i = 0; i < nodes*2; i++) { 
           printf("Coordinator sending greeting %d...\n",i); 
           swaplinks_sendtoany(swp,"GREETING",9,0); 
           sleep(2); 
       } 
   
         swaplinks_update_walk_length(swp,nodes/2);          swaplinks_update_walk_length(swp,nodes/2);
         while(done < limit) {          while(done < limit) {
             printf("Coordinator waiting for DONE signal...\n");  
             if(swaplinks_recvfrom(swp, tmp, sizeof(tmp), 0, peer_list+done, &len) > 0 && streq(tmp,"DONE")) {              if(swaplinks_recvfrom(swp, tmp, sizeof(tmp), 0, peer_list+done, &len) > 0 && streq(tmp,"DONE")) {
                 printf("Coordinator received DONE signal from %s. Total: %d finished nodes.\n",peer_list[done].user+4,done+1);                  printf("Coordinator received DONE signal from %s. Total: %d finished nodes.\n",peer_list[done].user+4,done+1);
                 swaplinks_sendtoany(swp,"GREETING",9,0);                  swaplinks_sendtoany(swp,"GREETING",9,0);
Line 285  int main(int argc, char **argv) { Line 303  int main(int argc, char **argv) {
         }          }
   
         // give them time to scramble          // give them time to scramble
       sleep(60);        sleep(180);
   
         // kill stragglers more violently  
         system("killall lt-slinksvariableload");  
         // clean up their mess          // clean up their mess
       system("find output -size 0c | xargs rm");        system("find output -size 0c | xargs rm-rf");
         // then kill stragglers more violently
         system("killall lt-slinksvariableload");
   
         return 0;          return 0;
     }      }
Line 300  int main(int argc, char **argv) { Line 318  int main(int argc, char **argv) {
     sleep(3);      sleep(3);
   
     while (time++ < REDUNDANCY*limit) {      while (time++ < REDUNDANCY*limit) {
       if(randint(100) < percent) snprintf(tmp, sizeof(tmp), "%d;%d;0;", duration,0);        if(randint(100) < percent) snprintf(tmp, sizeof(tmp), "%d;%d;0;", duration,weight);
       else snprintf(tmp, sizeof(tmp), "%d;%d;0;", duration,weight);        else snprintf(tmp, sizeof(tmp), "%d;%d;0;", duration,0);
         swaplinks_sendtoany(swp, tmp, strlenn(tmp)+1, 0);          swaplinks_sendtoany(swp, tmp, strlenn(tmp)+1, 0);
         swaplinks_get_neighbors(swp, nbrs, 4096);          swaplinks_get_neighbors(swp, nbrs, 4096);
         p = process_neighbor_list(nbrs);          p = process_neighbor_list(nbrs);
Line 311  int main(int argc, char **argv) { Line 329  int main(int argc, char **argv) {
     }      }
   
     if(g) fclose(g);      if(g) fclose(g);
   if(s)fprintf(s,"%d DONE SENDING\n",num);    if(s)printf("%d DONE SENDING\n",num);
     pthread_join(tid, NULL);      pthread_join(tid, NULL);
     if(s) fprintf(s,"%d SHUTTING DOWN\n",num);      if(s) fprintf(s,"%d SHUTTING DOWN\n",num);
   
//    if(num == 1) system("echo \" \" | mail -s\"node one finished sending!\" ths22@cs.cornell.edu");    if(num == 1) system("echo \" \" | mail -s\"node one finished sending!\" ths22@cs.cornell.edu");
   
     return 0;      return 0;
 }  }

Removed from v.1.25  
changed lines
  Added in v.1.26


FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>
 

GForge.cis.cornell.edu is brought to you by

Cornell Computing and Information Science


Powered By GForge Collaborative Development Environment