Log In
New Account
  
 
Home My Page Project Tree Code Snippets Project Openings NUTSS
 
 
Summary Tracker Lists CVS Files
 

CVS | Administration

Diff for /libnutss/examples/slinks_variable_load.c between versions 1.17 and 1.18

version 1.17, 2007/07/03 04:24:53 version 1.18, 2007/07/03 16:45:11
Line 24  typedef struct flow {  Line 24  typedef struct flow { 
   
 int num, peers, limit, maxload;  int num, peers, limit, maxload;
 swaplinks_p swp;  swaplinks_p swp;
FILE *f = NULL, *s = NULL; FILE *f = NULL, *s = NULL, *g = NULL;
   
 void *recvthread(void *arg) {  void *recvthread(void *arg) {
     swaplinks_p swp = (swaplinks_p)arg;      swaplinks_p swp = (swaplinks_p)arg;
Line 32  void *recvthread(void *arg) {  Line 32  void *recvthread(void *arg) { 
     socklen_t len = sizeof(peer);      socklen_t len = sizeof(peer);
     flow_t *flows = NULL, *newflow = NULL, *ptr = NULL, *axe = NULL;      flow_t *flows = NULL, *newflow = NULL, *ptr = NULL, *axe = NULL;
     char msg[64], tmp[128], buf[2048];      char msg[64], tmp[128], buf[2048];
   int time = 0, load= 0, busy = 0, exts, delta, expiration;    int time = 0, load= 0, exts, delta, expiration;
   
     buf[0] = 0;      buf[0] = 0;
   
Line 41  void *recvthread(void *arg) {  Line 41  void *recvthread(void *arg) { 
        if(swaplinks_recvfrom(swp, tmp, sizeof(tmp), 0, &peer, &len) > 0) {         if(swaplinks_recvfrom(swp, tmp, sizeof(tmp), 0, &peer, &len) > 0) {
       time++; delta = 0;        time++; delta = 0;
       if(s) fprintf(s,"      %d recvd request %d\n",num,time);        if(s) fprintf(s,"      %d recvd request %d\n",num,time);
         // parse new flow
       newflow = (flow_t*)calloc(1,sizeof(flow_t));        newflow = (flow_t*)calloc(1,sizeof(flow_t));
       newflow->duration = atoi(strtok(tmp,";"));        newflow->duration = atoi(strtok(tmp,";"));
       newflow->weight = atoi(strtok(NULL,";"));        newflow->weight = atoi(strtok(NULL,";"));
       exts = atoi(strtok(NULL,";"));        exts = atoi(strtok(NULL,";"));
     if(load+newflow->weight <= maxload) {      strncpy(newflow->source,peer.user+4,sizeof(newflow->source));
    strncpy(newflow->source,peer.user+4,sizeof(newflow->source));      snprintf(buf,2048,"->%d:%d:%s<->",newflow->duration,newflow->weight,newflow->source);
    snprintf(buf,2048,"->%d:%d<->",newflow->duration,newflow->weight);      // run through flows, decrementing duration, cleaning any expired flows
    for(ptr = flows; ptr;) {      for(ptr = flows; ptr;) {
   sprintf(buf,"%s%d:%d<->",buf,ptr->duration,ptr->weight);     sprintf(buf,"%s%d:%d<->",buf,ptr->duration,ptr->weight);
   expiration = 0; axe = NULL;     expiration = 0; axe = NULL;
   if(--ptr->duration == 0) {     if(--ptr->duration == 0) {
  axe = ptr;    axe = ptr;
  load -= ptr->weight;    load -= ptr->weight;
  delta -= ptr->weight;    delta -= ptr->weight;
   if(ptr->last && ptr->next) {
  if(ptr->last && ptr->next) {   assert(flows != ptr);
 assert(flows != ptr);   ptr->last->next = ptr->next;
 ptr->last->next = ptr->next;   ptr->next->last = ptr->last;
 ptr->next->last = ptr->last;    }
  }    else if(ptr->next) {
  else if(ptr->next) {   assert(flows == ptr);
 assert(flows == ptr);   ptr->next->last = NULL;
 ptr->next->last = NULL;   flows = ptr->next;
 flows = ptr->next;    }
  }    else if(ptr->last) {
  else if(ptr->last) {   assert(flows != ptr);
 assert(flows != ptr);   ptr->last->next = NULL;
 ptr->last->next = NULL;    }
  }    else {
  else {   assert(flows == ptr);
 assert(flows == ptr);   flows = NULL;
 flows = NULL;  
}  
 
      if(s) fprintf(s,"%d EXPIRED FLOW from %s:d%d/w%d\n",num,ptr->source,ptr->duration,ptr->weight);  
  expiration = 1;  
     }      }
   ptr = ptr->next;       if(s) fprintf(s,"%d EXPIRED FLOW from %s:d%d/w%d\n",num,ptr->source,ptr->duration,ptr->weight);
   if(expiration) { free(axe); axe = NULL; }    expiration= 1;
      }       }
    sprintf(buf,"%s||\n",buf);     ptr = ptr->next;
      if(expiration) { free(axe); axe = NULL; }
       }
       sprintf(buf,"%s||\n",buf);
   
        //          if(s) fprintf(s,"%d ACCEPTED FLOW from %s:d%d/w%d\n",num,newflow->source,newflow->duration,newflow->weight);         //          if(s) fprintf(s,"%d ACCEPTED FLOW from %s:d%d/w%d\n",num,newflow->source,newflow->duration,newflow->weight);
       if(s) fprintf(s,"%s",buf);      // if this flow will not put us overbudget, accept
     if(load+newflow->weight <= maxload) {
      load += newflow->weight;       load += newflow->weight;
      delta += newflow->weight;       delta += newflow->weight;
      newflow->next = flows;       newflow->next = flows;
    if(newflow->next) newflow->next->last = newflow;     if(flows) flows->last = newflow;
      flows = newflow;       flows = newflow;
    if(f) {     if(s) fprintf(s,"        %d accepted flow: %d\n",num,load);
   fprintf(f,"%d\n",load);  
   fprintf(s,"        %d accepted flowand recorded load: %d\n",num,load);}  
    else  fprintf(s,"ERROR RECORDING LOAD!\n");  
       }        }
       // ADMISSION CONTROL        // ADMISSION CONTROL
       // if we're overloaded, drop it if its been pushed too far,        // if we're overloaded, drop it if its been pushed too far,
     // or pass on to a neighbor and mark itas such if not      // otherwise pass on to a neighbor and mark it
       else if(exts < 5) {        else if(exts < 5) {
    fprintf(s,"        %d rejected flow, load too high: %d>= %d\n",num,load,maxload);     fprintf(s,"        %d rejected flow, load too high: %d<= %d\n",num,maxload,load+newflow->weight);
      swaplinks_update_walk_length(swp,1);       swaplinks_update_walk_length(swp,1);
      snprintf(msg,64,"%d;%d;%d;",newflow->duration,newflow->weight,exts+1);       snprintf(msg,64,"%d;%d;%d;",newflow->duration,newflow->weight,exts+1);
      swaplinks_sendtoany(swp,msg,strlen(msg)+1,0);       swaplinks_sendtoany(swp,msg,strlen(msg)+1,0);
      swaplinks_update_walk_length(swp,peers);       swaplinks_update_walk_length(swp,peers);
       }        }
         if(f) fprintf(f,"%d\n",load);
       if(s) fprintf(s,"%s",buf);
        }         }
   }        }
   if(s) fprintf(s,"%d DONE RECEIVING\n",num);        if(s) fprintf(s,"%d DONE RECEIVING\n",num);
 
if(f && f != stderr) { fclose(f); f = NULL; }  
   if(s && s != stdout) { fclose(s); s = NULL; }  
   
   for(ptr = flows;ptr;) {axe = ptr; ptr = ptr->next; free(axe);}        // clean up
         if(f && f != stderr) { fclose(f); f = NULL; }
         if(s && s != stdout) { fclose(s); s = NULL; }
         for(ptr = flows;ptr;) {axe = ptr; ptr = ptr->next; free(axe);}
   
   pthread_exit(NULL);        pthread_exit(NULL);
 }  }
   
 int main(int argc, char **argv) {  int main(int argc, char **argv) {
     struct sockaddr_ns reg;      struct sockaddr_ns reg;
   char tmp[256];    char tmp[256], nbrs[4096];
     int time = 0, duration, weight, sendcount = 0;      int time = 0, duration, weight, sendcount = 0;
     pthread_t tid;      pthread_t tid;
   
Line 148  int main(int argc, char **argv) {  Line 146  int main(int argc, char **argv) { 
   
     sprintf(tmp, "output/data_node%d.txt", num); f = fopen(tmp,"w");      sprintf(tmp, "output/data_node%d.txt", num); f = fopen(tmp,"w");
 //    sprintf(tmp, "output/output_node%d.txt", num); s = fopen(tmp,"w");  //    sprintf(tmp, "output/output_node%d.txt", num); s = fopen(tmp,"w");
       sprintf(tmp, "output/nbrs_node%d.txt", num); g = fopen(tmp,"w");
     if (f == NULL) f = stderr;      if (f == NULL) f = stderr;
     if (s == NULL) s = stdout;      if (s == NULL) s = stdout;
   
Line 169  int main(int argc, char **argv) {  Line 168  int main(int argc, char **argv) { 
   
     snprintf(tmp, sizeof(tmp), "%d;%d;0;", duration, weight);      snprintf(tmp, sizeof(tmp), "%d;%d;0;", duration, weight);
   
   while (time++ <3*limit) { // (i-- != 0) {    while (time++ <2.5*limit) { // (i-- != 0) {
         swaplinks_sendtoany(swp, tmp, strlen(tmp)+1, 0);          swaplinks_sendtoany(swp, tmp, strlen(tmp)+1, 0);
       if(s) fprintf(s,"   %d sent request %d\n",num,time); //        if(s) fprintf(s,"   %d sent request %d\n",num,time);
 swaplinks_get_neighbors(swp, nbrs, 4096);
         if(g) fprintf(g,"%s\n --- \n",nbrs);
         sleep(2);          sleep(2);
     }      }
   

Removed from v.1.17  
changed lines
  Added in v.1.18


FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>
 

GForge.cis.cornell.edu is brought to you by

Cornell Computing and Information Science


Powered By GForge Collaborative Development Environment