Log In
New Account
  
 
Home My Page Project Tree Code Snippets Project Openings NUTSS
 
 
Summary Tracker Lists CVS Files
 

CVS | Administration

Diff for /libnutss/examples/slinks_variable_load.c between versions 1.28 and 1.29

version 1.28, 2007/08/23 22:13:44 version 1.29, 2007/09/04 17:24:45
Line 26 Line 26
 #define REDUNDANCY    5       // how many message repetitions are necessary to reach every node  #define REDUNDANCY    5       // how many message repetitions are necessary to reach every node
 #define PROPAGATION   600     // how many seconds the coordinator should wait for every node to recieve KILL signal  #define PROPAGATION   600     // how many seconds the coordinator should wait for every node to recieve KILL signal
 #define EXPIRATION    600     // how many seconds a node should keep a peering report fron another node  #define EXPIRATION    600     // how many seconds a node should keep a peering report fron another node
#defineGRACE7       //how many times a request can be passed on by a full-capacity node#defineMIN_GRACE1       //minimum number if times a request can be passed on by a full-capacity node
 #define MAX_GRACE     20      // maximum
 #define ALLOTMENT     400     // how much load a node will accomadate for each of his neighbors (obsolete w/ auto peering)  #define ALLOTMENT     400     // how much load a node will accomadate for each of his neighbors (obsolete w/ auto peering)
 #define QUORUM        12      // how many reports a node needs before adjusting his peering level  #define QUORUM        12      // how many reports a node needs before adjusting his peering level
 #define THRESHOLD     0.15    // how much different a newly calculated degree must be from the old one to switch to it  #define THRESHOLD     0.15    // how much different a newly calculated degree must be from the old one to switch to it
   #define ASSURANCE     0.8     // how sure we want to be that we will find the available capacity out there (affects walk length)
 #define MAX_REPORTS   30  #define MAX_REPORTS   30
 #define WALK_LENGTH   10  #define WALK_LENGTH   10
#define MIN_PEERS3#define MIN_PEERS4
 #define MAX_PEERS     20
 #define DEFAULT_PEERS 8       // obsolete w/ auto peering  #define DEFAULT_PEERS 8       // obsolete w/ auto peering
 #define MAX_PEERS     40  
 #define MAX_FLOWS     256  #define MAX_FLOWS     256
 #define BUFFER_SIZE   128  #define BUFFER_SIZE   128
 #define URI_LENGTH    256  #define URI_LENGTH    256
   
 typedef struct flow {  typedef struct flow {
   int id, duration, weight, exts, capacity;    int id, duration, weight, exts, capacity, available, degree;
     char source[URI_LENGTH];      char source[URI_LENGTH];
     struct flow *next, *last;      struct flow *next, *last;
 } flow_t;  } flow_t;
   
typedef struct int_pair_t { int i, o; }int_pair;typedef struct int_pair_t { int i, o; }int_pair;
typedef struct flw_spec_t { int weight, expiration; }flw_spec;typedef struct flw_spec_t { int weight, expiration; }flw_spec;
typedef struct nbr_rprt_t { int expiration, capacity; }  nbr_rprt;typedef struct nbr_rprt_t { int expiration, capacity, available, degree; }  nbr_rprt;
   
 int_pair process_neighbor_list(char*);  int_pair process_neighbor_list(char*);
   
// Gobal information, used bymultiple threads// Gobal information, used byboth threads
int num, nodes = 0, peers, capacity, alive = 1;int num, weight, exts, nodes = 0, peers= 0, capacity= 0, load = 0, alive = 1;
 swaplinks_p swp;  swaplinks_p swp;
FILE *f = NULL, *s = NULL, *q = NULL;FILE *f = NULL, *s = NULL, *q = NULL, *r = NULL;
   
 // Node's receiving thread that decides whether to accept an incoming  // Node's receiving thread that decides whether to accept an incoming
 // request and records all the necessary information; also carries  // request and records all the necessary information; also carries
Line 63  void *recvthread(void *arg) { Line 65  void *recvthread(void *arg) {
     struct sockaddr_ns peer;      struct sockaddr_ns peer;
     socklen_t len = sizeof(peer);      socklen_t len = sizeof(peer);
     char msg[BUFFER_SIZE], tmp[BUFFER_SIZE];      char msg[BUFFER_SIZE], tmp[BUFFER_SIZE];
   int load = 0, bytes, msg_count, i;    int bytes, msg_count, i;
     time_t start,now;      time_t start,now;
     flow_t newflow;      flow_t newflow;
     flw_spec flows[MAX_FLOWS];      flw_spec flows[MAX_FLOWS];
Line 78  void *recvthread(void *arg) { Line 80  void *recvthread(void *arg) {
     for(;;) {      for(;;) {
         memset(&peer,0,sizeof(peer));          memset(&peer,0,sizeof(peer));
         sleep(1);          sleep(1);
       // simulate churn by having a small portion of nodes duck out early, and replace them periodically    // simulate churn by having a small portion of nodes duck out early, and replace them periodically
 //        if(randint(1000) < 2) {  //        if(randint(1000) < 2) {
 //            printf("%d Withdrawing from cloud early\n",num);  //            printf("%d Withdrawing from cloud early\n",num);
 //            break;  //            break;
 //        }  //        }
       if((bytes=swaplinks_recvfrom(swp, msg, sizeof(msg), 0, &peer, &len)) > 0) {    if((bytes=swaplinks_recvfrom(swp, msg, sizeof(msg), 0, &peer, &len)) > 0) {
             msg[bytes] = 0;              msg[bytes] = 0;
             memcpy(tmp,msg,bytes);              memcpy(tmp,msg,bytes);
             // Flood SHUTDOWN to neighbors, then power down              // Flood SHUTDOWN to neighbors, then power down
Line 100  void *recvthread(void *arg) { Line 102  void *recvthread(void *arg) {
             newflow.exts = atoi(strtok(NULL,";"));              newflow.exts = atoi(strtok(NULL,";"));
             newflow.id = atoi(strtok(NULL,";"));              newflow.id = atoi(strtok(NULL,";"));
             newflow.capacity = atoi(strtok(NULL,";"));              newflow.capacity = atoi(strtok(NULL,";"));
               newflow.available = atoi(strtok(NULL,";"));
               newflow.degree = atoi(strtok(NULL,";"));
             strncpy(newflow.source,peer.user+4,sizeof(newflow.source));              strncpy(newflow.source,peer.user+4,sizeof(newflow.source));
   
             /*******/              /*******/
             // Adjust peering level based on updated picture of the state of the network              // Adjust peering level based on updated picture of the state of the network
             {              {
               int alive= 0, more = 0, max_capacity = 0, min_capacity = INT_MAX, numReports = 0, src, curmax, k;                int alive= 0, livelinks = 0, totallinks = 0, more = 0, max_capacity = 0, min_capacity = INT_MAX, numReports = 0, src, curmax, k;
               float rank,newDegree, tempfloat;                float rank,live, newDegree, tempfloat;
                 // parse report                  // parse report
                 now = difftime(time(NULL),start);                  now = difftime(time(NULL),start);
                 src = atoi(newflow.source);                  src = atoi(newflow.source);
Line 121  void *recvthread(void *arg) { Line 125  void *recvthread(void *arg) {
                 // dont add this report if we have enough already                  // dont add this report if we have enough already
                 if(numReports < MAX_REPORTS) {                  if(numReports < MAX_REPORTS) {
                     reports[src].capacity = newflow.capacity;                      reports[src].capacity = newflow.capacity;
                       reports[src].available = newflow.available;
                       reports[src].degree = newflow.degree;
                     reports[src].expiration = now+EXPIRATION;                      reports[src].expiration = now+EXPIRATION;
                 }                  }
                 // gather important data based on known info                  // gather important data based on known info
                 for(i=0;i<MAX_FLOWS;i++) {                  for(i=0;i<MAX_FLOWS;i++) {
                   if(0 < reports[i].capacity)alive++;                    if(0 < reports[i].capacity){
                   if(max_capacity < reports[i].capacity) max_capacity = reports[i].capacity;                        alive++;
                   if(min_capacity > reports[i].capacity && 0 < reports[i].capacity) min_capacity = reports[i].capacity;                        totallinks += reports[i].degree;
                   if(capacity < reports[i].capacity) more++;                        if(weight < reports[i].available) livelinks += reports[i].degree;
                         if(max_capacity < reports[i].capacity) max_capacity = reports[i].capacity;
                         if(min_capacity > reports[i].capacity && 0 < reports[i].capacity) min_capacity = reports[i].capacity;
                         if(capacity < reports[i].capacity) more++;
 }
                 }                  }
                 // include this node's capacity in mix                  // include this node's capacity in mix
                 if(max_capacity < capacity) max_capacity = capacity;                  if(max_capacity < capacity) max_capacity = capacity;
                 if(min_capacity > capacity) min_capacity = capacity;                  if(min_capacity > capacity) min_capacity = capacity;
                 assert(alive > 0);                  assert(alive > 0);
               rank =((float)(alive-more))/alive;                rank =(float)(alive-more)/alive;
live = (float)livelinks/totallinks;
                 tempfloat = min(log(1-ASSURANCE)/log(1-live),MAX_GRACE);
                 if(alive > QUORUM && abs(max(tempfloat,MIN_GRACE)-exts) > exts*THRESHOLD)
                     exts = round(max(tempfloat,MIN_GRACE));
                 // sort information for viewing purposes                  // sort information for viewing purposes
     //                memcpy(tempbuf,reports,MAX_FLOWS*sizeof(nbr_rprt));      //                memcpy(tempbuf,reports,MAX_FLOWS*sizeof(nbr_rprt));
     //                for(i=0;i<alive;i++) {      //                for(i=0;i<alive;i++) {
Line 151  void *recvthread(void *arg) { Line 164  void *recvthread(void *arg) {
                 // compute new degree... still need to think about compressing scale if it gets too big                  // compute new degree... still need to think about compressing scale if it gets too big
                 newDegree = rank*min(((float)max_capacity/min_capacity)*MIN_PEERS,MAX_PEERS);                  newDegree = rank*min(((float)max_capacity/min_capacity)*MIN_PEERS,MAX_PEERS);
                 // only alter degree if we have a quorum and the change would be significant                  // only alter degree if we have a quorum and the change would be significant
               if(alive > QUORUM && abs(max(newDegree,MIN_PEERS)-peers) > peers*THRESHOLD)                if(alive > QUORUM && abs(max(newDegree,MIN_PEERS)-peers) > peers*THRESHOLD){
                     peers = round(max(newDegree,MIN_PEERS));                      peers = round(max(newDegree,MIN_PEERS));
                       swaplinks_update_degree(swp,peers);
                   }
                 if(q) fprintf(q,"%ld %d\n",now,peers);                  if(q) fprintf(q,"%ld %d\n",now,peers);
                   if(r) fprintf(r,"%ld %d\n",now,exts);
   
                 //                printf("- *%d* -\n",capacity);                  //                printf("- *%d* -\n",capacity);
                 //                for(i=0;i<alive;i++)                  //                for(i=0;i<alive;i++)
Line 163  void *recvthread(void *arg) { Line 179  void *recvthread(void *arg) {
             }              }
             /********/              /********/
   
           if(s) fprintf(s,"    %d recvd request %d from %s - w:%d d:%d e:%d\n",            //if(s) fprintf(s,"    %d recvd request %d from %s - w:%d d:%d e:%d\n",
num,newflow.id,newflow.source,newflow.weight,newflow.duration,newflow.exts);//              num,newflow.id,newflow.source,newflow.weight,newflow.duration,newflow.exts);
             // retire any expired flows before processing              // retire any expired flows before processing
             now = difftime(time(NULL),start);              now = difftime(time(NULL),start);
             for(i=0;i<MAX_FLOWS;i++) {              for(i=0;i<MAX_FLOWS;i++) {
Line 172  void *recvthread(void *arg) { Line 188  void *recvthread(void *arg) {
                     load -= flows[i].weight;                      load -= flows[i].weight;
                     flows[i].weight = 0;                      flows[i].weight = 0;
                     flows[i].expiration = 0;                      flows[i].expiration = 0;
                   if(s) fprintf(s,"        %d finished accomodating flow at %ld: %d\n",num,now,load);                    //if(s) fprintf(s,"        %d finished accomodating flow at %ld: %d\n",num,now,load);
                 }                  }
             }              }
             if(load+newflow.weight < capacity) {              if(load+newflow.weight < capacity) {
Line 182  void *recvthread(void *arg) { Line 198  void *recvthread(void *arg) {
                     if(flows[i].expiration == 0) {                      if(flows[i].expiration == 0) {
                         flows[i].weight = newflow.weight;                          flows[i].weight = newflow.weight;
                         flows[i].expiration = now + newflow.duration;                          flows[i].expiration = now + newflow.duration;
                       if(s) fprintf(s,"        %d accepted request at %ld: %d\n",num,now,load);                        //if(s) fprintf(s,"        %d accepted request at %ld: %d\n",num,now,load);
                         if(f) fprintf(f,"%ld %s\n",now,msg);                          if(f) fprintf(f,"%ld %s\n",now,msg);
                         break;                          break;
                     }                      }
Line 194  void *recvthread(void *arg) { Line 210  void *recvthread(void *arg) {
             // do not accept the flow if it will overload this node.              // do not accept the flow if it will overload this node.
             // if the request been pushed too far, reject it, otherwise              // if the request been pushed too far, reject it, otherwise
             // pass it on to a neighbor              // pass it on to a neighbor
           else if(newflow.exts< GRACE) {            else if(newflow.exts> 0) {
                 swaplinks_update_walk_length(swp,1);                  swaplinks_update_walk_length(swp,1);
               snprintf(tmp,sizeof(tmp),"%d;%d;%d;%d;%d;",newflow.duration,newflow.weight,newflow.exts+1,newflow.id,newflow.capacity);                snprintf(tmp,sizeof(tmp),"%d;%d;%d;%d;%d;%d;%d;",newflow.duration,
                                                                  newflow.weight,
                                                                  newflow.exts-1,
                                                                  newflow.id,
                                                                  newflow.capacity,
                                                                  newflow.available,
                                                                  newflow.degree);
                 swaplinks_sendtoany(swp,tmp,strlenn(tmp)+1,0);                  swaplinks_sendtoany(swp,tmp,strlenn(tmp)+1,0);
                 swaplinks_update_walk_length(swp,WALK_LENGTH);                  swaplinks_update_walk_length(swp,WALK_LENGTH);
               if(s) fprintf(s,"%d rejected flow, load too high: %d <= %d\n",num,capacity,load+newflow.weight);                if(s) fprintf(s,"%d rejected flow%d:%d, load too high: %d <= %d\n",num,newflow.id,newflow.exts,capacity,load+newflow.weight);
 if(s) fprintf(s,"  %d sends requests with exts: %d\n",num,exts);
                 if(f) fprintf(f,"%ld %s\n",now,"reject");                  if(f) fprintf(f,"%ld %s\n",now,"reject");
             }              }
             else {              else {
Line 220  void *recvthread(void *arg) { Line 243  void *recvthread(void *arg) {
 int main(int argc, char **argv) {  int main(int argc, char **argv) {
     struct sockaddr_ns reg;      struct sockaddr_ns reg;
     char tmp[BUFFER_SIZE], nbrs[4096];      char tmp[BUFFER_SIZE], nbrs[4096];
   int time = 0, duration,weight, percent, limit, i;    int time = 0, duration,percent, limit, i;
     FILE *g;      FILE *g;
     int_pair p;      int_pair p;
     pthread_t tid;      pthread_t tid;
Line 265  int main(int argc, char **argv) { Line 288  int main(int argc, char **argv) {
     if(num != 99) {      if(num != 99) {
         sprintf(tmp, "output/data_node%d.txt", num); f = fopen(tmp,"w");          sprintf(tmp, "output/data_node%d.txt", num); f = fopen(tmp,"w");
         sprintf(tmp, "output/peer_node%d.txt", num); q = fopen(tmp,"w");          sprintf(tmp, "output/peer_node%d.txt", num); q = fopen(tmp,"w");
           sprintf(tmp, "output/walk_node%d.txt", num); r = fopen(tmp,"w");
 //        sprintf(tmp, "output/output_node%d.txt", num); s = fopen(tmp,"w");  //        sprintf(tmp, "output/output_node%d.txt", num); s = fopen(tmp,"w");
 //        sprintf(tmp, "output/nbrs_node%d.txt", num); g = fopen(tmp,"w");  //        sprintf(tmp, "output/nbrs_node%d.txt", num); g = fopen(tmp,"w");
         if (f == NULL) f = stderr;          if (f == NULL) f = stderr;
Line 283  int main(int argc, char **argv) { Line 307  int main(int argc, char **argv) {
   
     // coordinator thread      // coordinator thread
     if(num == 99) {      if(num == 99) {
         // let nodes wake up to recieve the greetings  
         for(;time > 0;time--) {          for(;time > 0;time--) {
             printf(" ---> %d MINUTE%s REMAIN%s\n",time,time==1?"":"S",time==1?"S":"");              printf(" ---> %d MINUTE%s REMAIN%s\n",time,time==1?"":"S",time==1?"S":"");
             clear(nbrs);              clear(nbrs);
             swaplinks_get_neighbors(swp,nbrs,4096);              swaplinks_get_neighbors(swp,nbrs,4096);
               print_neighbor_list(nbrs);
             p = process_neighbor_list(nbrs);              p = process_neighbor_list(nbrs);
           printf("%d %d\n",p.i,p.o);            printf("Coordinator Nbrs: %d %d\n",p.i,p.o);
             sleep(60);              sleep(60);
         }          }
         printf("Now sending KILL signals\n",limit);          printf("Now sending KILL signals\n",limit);
   //
           swaplinks_update_walk_length(swp,1);
           swaplinks_sendtoany(swp, "SHUTDOWN", 9, 0);
           swaplinks_update_walk_length(swp,WALK_LENGTH);
   //
         for(i = 0; i < nodes*REDUNDANCY; i++) swaplinks_sendtoany(swp, "SHUTDOWN", 9, 0);          for(i = 0; i < nodes*REDUNDANCY; i++) swaplinks_sendtoany(swp, "SHUTDOWN", 9, 0);
         // give them time to scramble          // give them time to scramble
         printf("Waiting for KILL signals to propagate\n");          printf("Waiting for KILL signals to propagate\n");
Line 300  int main(int argc, char **argv) { Line 329  int main(int argc, char **argv) {
         // clean up their mess          // clean up their mess
         printf("Forcibly removing remaining nodes and associated file\n");          printf("Forcibly removing remaining nodes and associated file\n");
         system("find output -size 0c | xargs rm -rf");          system("find output -size 0c | xargs rm -rf");
       // then kill stragglers more violently (including the coordinatorand its sending thread as well)        // then kill stragglers more violently (including the coordinator)
         system("killall lt-slinksvariableload");          system("killall lt-slinksvariableload");
         printf("Coordinator shutting down.\n");  
         return 0;          return 0;
     }      }
   
       exts = MIN_GRACE;
     pthread_create(&tid, NULL, recvthread, swp);      pthread_create(&tid, NULL, recvthread, swp);
     // give recieving threads a chance to wake up      // give recieving threads a chance to wake up
     sleep(5);      sleep(5);
   
     while (time++ < limit && alive) {      while (time++ < limit && alive) {
         // send a request for resources with probability percent/100          // send a request for resources with probability percent/100
       if(randint(100) < percent) {        if(randint(100) < percent) {
           snprintf(tmp,sizeof(tmp),"%d;%d;0;%d%d;%d;",duration,weight,num,time,capacity);            snprintf(tmp,sizeof(tmp),"%d;%d;%d;%d%d;%d;%d;%d;",duration,weight,exts,num,time,capacity,capacity-load,peers);
             swaplinks_sendtoany(swp,tmp,strlenn(tmp)+1, 0);              swaplinks_sendtoany(swp,tmp,strlenn(tmp)+1, 0);
         }          }
         // record number and type of neighbors          // record number and type of neighbors
Line 322  int main(int argc, char **argv) { Line 351  int main(int argc, char **argv) {
       //  if(g) fprintf(g,"%d %d\n",p.i,p.o);        //  if(g) fprintf(g,"%d %d\n",p.i,p.o);
       //  clear(nbrs);        //  clear(nbrs);
        // it might make more sense to sleep for more time between requests         // it might make more sense to sleep for more time between requests
       sleep(3);        sleep(12);
     }      }
   
   //  if(g) fclose(g);    //  if(g) fclose(g);
Line 337  int main(int argc, char **argv) { Line 366  int main(int argc, char **argv) {
 // would also like to alter this to give the number of unique nbrs  // would also like to alter this to give the number of unique nbrs
 // instead of the raw number  // instead of the raw number
 int_pair process_neighbor_list(char *s) {  int_pair process_neighbor_list(char *s) {
   int_pair p = {0, 0 };    int_pair p = {0,0};
   char *t,*v;    int inuniq[MAX_PEERS],outuniq[MAX_PEERS],inuniqs=0,outuniqs=0,newpeer,i;
     char temp[5],copy[URI_LENGTH];
     char *t,inseenit=0,outseenit;
   
       memset(inuniq,0,MAX_PEERS*sizeof(int));
       memset(outuniq,0,MAX_PEERS*sizeof(int));
   
       // avoid null strings altogether
     if(!s) return p;      if(!s) return p;
       // move through the string one character at a time
     while(*s) {      while(*s) {
           // if we have some characters left
           if(strlen(s) > 4) {
               // check if the next four chars are "test"
               strncpy(copy,s,URI_LENGTH);
               strncpy(temp,copy,4);
               if(streq(temp,"test")) {
                   // get number of peer and save it
                   newpeer = atoi(strtok(copy+4,"@"));
                   inseenit = outseenit = 0;
                   // check if we've seen this peer before
                   for(i=0;i<inuniqs;i++) {
                       if(inuniq[i] == newpeer) {
                           inseenit = 1;
                           break;
                       }
                   }
                   for(i=0;i<outuniqs;i++) {
                       if(outuniq[i] == newpeer) {
                           outseenit = 1;
                           break;
                       }
                   }
               }
           }
         if(*s++ == '_') {          if(*s++ == '_') {
             switch(*s++) {              switch(*s++) {
               case 'i':p.i++;break;                // only record this nbr if we haven't seen it
               case 'o':p.o++;break;                case 'i':if(!inseenit) { p.i++;inuniq[inuniqs++] = newpeer; } break;
                 case 'o':if(!outseenit) { p.o++;outuniq[outuniqs++] = newpeer; } break;
                 default: exit(s[-1]);                  default: exit(s[-1]);
             }              }
         }          }
     }      }
     return p;      return p;
 }  }
   
   void print_neighbor_list(char *s) {
       int newpeer;
       char copy[URI_LENGTH],temp[5];
       char inp[256], outp[256];
       clear(inp); clear(outp);
       // avoid null strings altogether
       if(!s) return;
       // move through the string one character at a time
       while(*s) {
           // if we have some characters left
           if(strlen(s) > 4) {
               // check if the next four chars are "test"
               strncpy(copy,s,URI_LENGTH);
               strncpy(temp,copy,4);
               if(streq(temp,"test"))
                   newpeer = atoi(strtok(copy+4,"@"));
           }
           if(*s++ == '_') {
               switch(*s++) {
                   // only record this nbr if we haven't seen it
                   case 'i': sprintf(inp,"%s%d ",inp,newpeer); break;
                   case 'o': sprintf(outp,"%s%d ",outp,newpeer); break;
                   default: exit(s[-1]);
               }
           }
       }
       printf("in: %s\nout: %s\n",inp,outp);
   }
   
   

Removed from v.1.28  
changed lines
  Added in v.1.29


FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>
 

GForge.cis.cornell.edu is brought to you by

Cornell Computing and Information Science


Powered By GForge Collaborative Development Environment