Log In
New Account
  
 
Home My Page Project Tree Code Snippets Project Openings NUTSS
 
 
Summary Tracker Lists CVS Files
 

CVS | Administration

Diff for /libnutss/examples/slinks_variable_load.c between versions 1.27 and 1.28

version 1.27, 2007/07/28 00:41:57 version 1.28, 2007/08/23 22:13:44
Line 2 Line 2
 #include <stdlib.h>  #include <stdlib.h>
 #include <string.h>  #include <string.h>
 #include <time.h>  #include <time.h>
   #include <math.h>
   #include <limits.h>
 #ifndef WIN32  #ifndef WIN32
 #include <poll.h>  #include <poll.h>
 #include <sys/socket.h>  #include <sys/socket.h>
Line 17 Line 19
 #define streq(a,b) !strcmp((a),(b))  #define streq(a,b) !strcmp((a),(b))
 #define strlenn(s) (s) ? strlen(s) : 0  #define strlenn(s) (s) ? strlen(s) : 0
 #define randint(N) ((int)(rand() / (((double)RAND_MAX + 1) / (N))))  #define randint(N) ((int)(rand() / (((double)RAND_MAX + 1) / (N))))
   #define round(f) (f)-(int)(f) > 0.5 ? (int)(f)+1 : (int)(f)
   #define min(a,b) ((a)<(b)?(a):(b))
   #define max(a,b) ((a)>(b)?(a):(b))
 #define assert(p) if(!(p)) { printf("Assertion '%s' FAILED!\n", #p); exit(1); }  #define assert(p) if(!(p)) { printf("Assertion '%s' FAILED!\n", #p); exit(1); }
#define REDUNDANCY5#define REDUNDANCY5       // how many message repetitions are necessary to reach every node
#define PROPAGATION300#define PROPAGATION600     // how many seconds the coordinator should wait for every node to recieve KILL signal
#define GRACE3#define EXPIRATION    600     // how many seconds a node should keep a peering report fron another node
#define ALLOTMENT400#define GRACE7       // how many times a request can be passed on by a full-capacity node
#define WALK_LENGTH10#define ALLOTMENT400     // how much load a node will accomadate for each of his neighbors (obsolete w/ auto peering)
#define MAX_FLOWS256#define QUORUM        12      // how many reports a node needs before adjusting his peering level
#define BUFFER_SIZE128#define THRESHOLD     0.15    // how much different a newly calculated degree must be from the old one to switch to it
#define URI_LENGTH256#define MAX_REPORTS   30
 #define WALK_LENGTH10
 #define MIN_PEERS     3
 #define DEFAULT_PEERS 8       // obsolete w/ auto peering
 #define MAX_PEERS     40
 #define MAX_FLOWS256
 #define BUFFER_SIZE128
 #define URI_LENGTH256
   
 typedef struct flow {  typedef struct flow {
   int duration, weight, exts;    int id, duration, weight, exts, capacity;
     char source[URI_LENGTH];      char source[URI_LENGTH];
     struct flow *next, *last;      struct flow *next, *last;
 } flow_t;  } flow_t;
   
typedef struct int_pair_t { int i; int o; }int_pair;typedef struct int_pair_t { int i, o; }int_pair;
typedef struct flow_spec_t { int weight; int expiration; }flow_spec;typedef struct flw_spec_t { int weight, expiration; }flw_spec;
 typedef struct nbr_rprt_t { int expiration, capacity; }  nbr_rprt;
   
 int_pair process_neighbor_list(char*);  int_pair process_neighbor_list(char*);
   
 // Gobal information, used by multiple threads  // Gobal information, used by multiple threads
int num, nodes = 0, peers, capacity;int num, nodes = 0, peers, capacity, alive = 1;
 swaplinks_p swp;  swaplinks_p swp;
FILE *f = NULL, *s = NULL;FILE *f = NULL, *s = NULL, *q = NULL;
   
 // Node's receiving thread that decides whether to accept an incoming  // Node's receiving thread that decides whether to accept an incoming
// request and records all the necessary information// request and records all the necessary information; also carries
 // out the gossiping-equilibrium protocol to assign peering level
 void *recvthread(void *arg) {  void *recvthread(void *arg) {
     struct sockaddr_ns peer;      struct sockaddr_ns peer;
     socklen_t len = sizeof(peer);      socklen_t len = sizeof(peer);
Line 52  void *recvthread(void *arg) { Line 66  void *recvthread(void *arg) {
     int load = 0, bytes, msg_count, i;      int load = 0, bytes, msg_count, i;
     time_t start,now;      time_t start,now;
     flow_t newflow;      flow_t newflow;
   flow_spec flows[MAX_FLOWS];    flw_spec flows[MAX_FLOWS];
 nbr_rprt reports[MAX_FLOWS];
     nbr_rprt tempbuf[MAX_FLOWS];
     nbr_rprt sorted[MAX_FLOWS];
 
     memset(flows,0,sizeof(flw_spec)*MAX_FLOWS);
     memset(reports,0,sizeof(nbr_rprt)*MAX_FLOWS);
   
     start = time(NULL);      start = time(NULL);
     for(;;) {      for(;;) {
         memset(&peer,0,sizeof(peer));          memset(&peer,0,sizeof(peer));
       sleep(1);        sleep(1);
if((bytes=swaplinks_recvfrom(swp, msg, sizeof(msg), 0, &peer, &len)) > 0) {// simulate churn by having a small portion of nodes duck out early, and replace them periodically
 //        if(randint(1000) < 2) {
 //            printf("%d Withdrawing from cloud early\n",num);
 //            break;
 //        }
         if((bytes=swaplinks_recvfrom(swp, msg, sizeof(msg), 0, &peer, &len)) > 0) {
             msg[bytes] = 0;              msg[bytes] = 0;
               memcpy(tmp,msg,bytes);
             // Flood SHUTDOWN to neighbors, then power down              // Flood SHUTDOWN to neighbors, then power down
             if(streq(msg,"SHUTDOWN")) {              if(streq(msg,"SHUTDOWN")) {
                 swaplinks_update_walk_length(swp,1);                  swaplinks_update_walk_length(swp,1);
Line 72  void *recvthread(void *arg) { Line 98  void *recvthread(void *arg) {
             newflow.duration = atoi(strtok(tmp,";"));              newflow.duration = atoi(strtok(tmp,";"));
             newflow.weight = atoi(strtok(NULL,";"));              newflow.weight = atoi(strtok(NULL,";"));
             newflow.exts = atoi(strtok(NULL,";"));              newflow.exts = atoi(strtok(NULL,";"));
               newflow.id = atoi(strtok(NULL,";"));
               newflow.capacity = atoi(strtok(NULL,";"));
             strncpy(newflow.source,peer.user+4,sizeof(newflow.source));              strncpy(newflow.source,peer.user+4,sizeof(newflow.source));
             if(s) fprintf(s,"    %d recvd request #%d from %s - w:%d d:%d\n",  
                           num,++msg_count,newflow.source,newflow.weight,newflow.duration);  
   
               /*******/
               // Adjust peering level based on updated picture of the state of the network
               {
                   int alive = 0, more = 0, max_capacity = 0, min_capacity = INT_MAX, numReports = 0, src, curmax, k;
                   float rank, newDegree, tempfloat;
                   // parse report
                   now = difftime(time(NULL),start);
                   src = atoi(newflow.source);
                   assert(src < MAX_FLOWS);
                   // retire any old data and count current reports
                   for(i=0;i<MAX_FLOWS;i++) {
                       if(0 < reports[i].expiration && reports[i].expiration > now)
                           numReports++;
                       if(0 < reports[i].expiration && reports[i].expiration < now)
                           memset(reports+i,0,sizeof(nbr_rprt));
                   }
                   // dont add this report if we have enough already
                   if(numReports < MAX_REPORTS) {
                       reports[src].capacity = newflow.capacity;
                       reports[src].expiration = now+EXPIRATION;
                   }
                   // gather important data based on known info
                   for(i=0;i<MAX_FLOWS;i++) {
                       if(0 < reports[i].capacity) alive++;
                       if(max_capacity < reports[i].capacity) max_capacity = reports[i].capacity;
                       if(min_capacity > reports[i].capacity && 0 < reports[i].capacity) min_capacity = reports[i].capacity;
                       if(capacity < reports[i].capacity) more++;
                   }
                   // include this node's capacity in mix
                   if(max_capacity < capacity) max_capacity = capacity;
                   if(min_capacity > capacity) min_capacity = capacity;
                   assert(alive > 0);
                   rank = ((float)(alive-more))/alive;
   
                   // sort information for viewing purposes
       //                memcpy(tempbuf,reports,MAX_FLOWS*sizeof(nbr_rprt));
       //                for(i=0;i<alive;i++) {
       //                    curmax = 0;
       //                    for(k=0;k<MAX_FLOWS;k++) {
       //                        if(tempbuf[curmax].capacity < tempbuf[k].capacity)
       //                            curmax = k;
       //                    }
       //                    sorted[i] = tempbuf[curmax];
       //                    tempbuf[curmax].capacity = 0;
       //                }
   
                   // compute new degree... still need to think about compressing scale if it gets too big
                   newDegree = rank*min(((float)max_capacity/min_capacity)*MIN_PEERS,MAX_PEERS);
                   // only alter degree if we have a quorum and the change would be significant
                   if(alive > QUORUM && abs(max(newDegree,MIN_PEERS)-peers) > peers*THRESHOLD)
                       peers = round(max(newDegree,MIN_PEERS));
                   if(q) fprintf(q,"%ld %d\n",now,peers);
   
                   //                printf("- *%d* -\n",capacity);
                   //                for(i=0;i<alive;i++)
                   //                    printf(" %d:%4.2f",sorted[i].capacity,(double)(sorted[i].expiration)-difftime(time(NULL),start));
                   //                printf("\n  %d peering at %d, %4.2f out of %d, adjmax %3.1f\n",num,peers,rank*100.0,alive+1,adjusted_max_peers);
                   //                printf("---------\n");
               }
               /********/
   
               if(s) fprintf(s,"    %d recvd request %d from %s - w:%d d:%d e:%d\n",
                             num,newflow.id,newflow.source,newflow.weight,newflow.duration,newflow.exts);
             // retire any expired flows before processing              // retire any expired flows before processing
             now = difftime(time(NULL),start);              now = difftime(time(NULL),start);
           for(i=0;i< MAX_FLOWS;i++) {            for(i=0;i<MAX_FLOWS;i++) {
                 if(0 < flows[i].expiration && flows[i].expiration < now) {                  if(0 < flows[i].expiration && flows[i].expiration < now) {
                     load -= flows[i].weight;                      load -= flows[i].weight;
                     flows[i].weight = 0;                      flows[i].weight = 0;
Line 88  void *recvthread(void *arg) { Line 177  void *recvthread(void *arg) {
             }              }
             if(load+newflow.weight < capacity) {              if(load+newflow.weight < capacity) {
                 load += newflow.weight;                  load += newflow.weight;
               for(i=0;i< MAX_FLOWS;i++) {                // find an empty spot to record this flow
                 for(i=0;i<MAX_FLOWS;i++) {
                     if(flows[i].expiration == 0) {                      if(flows[i].expiration == 0) {
                         flows[i].weight = newflow.weight;                          flows[i].weight = newflow.weight;
                         flows[i].expiration = now + newflow.duration;                          flows[i].expiration = now + newflow.duration;
Line 97  void *recvthread(void *arg) { Line 187  void *recvthread(void *arg) {
                         break;                          break;
                     }                      }
                 }                  }
                   // if we run out of space, i dont want to add more space, just die.
                 assert(i != MAX_FLOWS);                  assert(i != MAX_FLOWS);
             }              }
             // ADMISSION CONTROL              // ADMISSION CONTROL
Line 105  void *recvthread(void *arg) { Line 196  void *recvthread(void *arg) {
             // pass it on to a neighbor              // pass it on to a neighbor
             else if(newflow.exts < GRACE) {              else if(newflow.exts < GRACE) {
                 swaplinks_update_walk_length(swp,1);                  swaplinks_update_walk_length(swp,1);
               snprintf(tmp,sizeof(tmp),"%d;%d;%d;",newflow.duration,newflow.weight,newflow.exts+1);                snprintf(tmp,sizeof(tmp),"%d;%d;%d;%d;%d;",newflow.duration,newflow.weight,newflow.exts+1,newflow.id,newflow.capacity);
                 swaplinks_sendtoany(swp,tmp,strlenn(tmp)+1,0);                  swaplinks_sendtoany(swp,tmp,strlenn(tmp)+1,0);
                 swaplinks_update_walk_length(swp,WALK_LENGTH);                  swaplinks_update_walk_length(swp,WALK_LENGTH);
                 if(s) fprintf(s,"        %d rejected flow, load too high: %d <= %d\n",num,capacity,load+newflow.weight);                  if(s) fprintf(s,"        %d rejected flow, load too high: %d <= %d\n",num,capacity,load+newflow.weight);
                 if(f) fprintf(f,"%ld %s\n",now,"reject");                  if(f) fprintf(f,"%ld %s\n",now,"reject");
             }              }
             else {              else {
               if(s) fprintf(s,"        %d retired unserviceable request\n",num);                if(s) fprintf(s,"        %d retired unserviceable request%d\n",num,newflow.id);
                 if(f) fprintf(f,"%ld %s\n",now,"retire");                  if(f) fprintf(f,"%ld %s\n",now,"retire");
             }              }
         }          }
Line 121  void *recvthread(void *arg) { Line 212  void *recvthread(void *arg) {
     if(f && f != stderr) { fclose(f); f = NULL; }      if(f && f != stderr) { fclose(f); f = NULL; }
     if(s && s != stdout) { fclose(s); s = NULL; }      if(s && s != stdout) { fclose(s); s = NULL; }
   
       alive = 0;
   
     pthread_exit(NULL);      pthread_exit(NULL);
 }  }
   
Line 135  int main(int argc, char **argv) { Line 228  int main(int argc, char **argv) {
     // parse args      // parse args
     if(argc > 1) num = atoi(argv[1]);      if(argc > 1) num = atoi(argv[1]);
     if(num != 99 && argc > 6) {      if(num != 99 && argc > 6) {
       peers = atoi(argv[2]);        limit = atoi(argv[2]);
       limit = atoi(argv[3]);        percent = atoi(argv[3]);
       percent = atoi(argv[4]);        duration = atoi(argv[4]);
       duration = atoi(argv[5]);        weight = atoi(argv[5]);
       weight = atoi(argv[6]);        capacity =atoi(argv[6]);
       capacity =peers*ALLOTMENT;        peers= DEFAULT_PEERS;
     }      }
     else if(num == 99 && argc > 4) {      else if(num == 99 && argc > 4) {
         nodes = atoi(argv[2]);          nodes = atoi(argv[2]);
Line 150  int main(int argc, char **argv) { Line 243  int main(int argc, char **argv) {
         time = atoi(argv[4]);          time = atoi(argv[4]);
     }      }
     else if(num != 99) {      else if(num != 99) {
       printf("Usage:\n  slinksvariableload node_numbernum_peers num_messages percent_load flow_duration flow_weight capacity\n");        printf("Usage:\n  slinksvariableload node_numbernum_messages percent_load flow_duration flow_weight capacity\n");
         exit(1);          exit(1);
     }      }
     else {      else {
Line 171  int main(int argc, char **argv) { Line 264  int main(int argc, char **argv) {
     // open output files      // open output files
     if(num != 99) {      if(num != 99) {
         sprintf(tmp, "output/data_node%d.txt", num); f = fopen(tmp,"w");          sprintf(tmp, "output/data_node%d.txt", num); f = fopen(tmp,"w");
           sprintf(tmp, "output/peer_node%d.txt", num); q = fopen(tmp,"w");
 //        sprintf(tmp, "output/output_node%d.txt", num); s = fopen(tmp,"w");  //        sprintf(tmp, "output/output_node%d.txt", num); s = fopen(tmp,"w");
 //        sprintf(tmp, "output/nbrs_node%d.txt", num); g = fopen(tmp,"w");  //        sprintf(tmp, "output/nbrs_node%d.txt", num); g = fopen(tmp,"w");
         if (f == NULL) f = stderr;          if (f == NULL) f = stderr;
         if (s == NULL) s = stdout;          if (s == NULL) s = stdout;
     }      }
   if(f) fprintf(f,"%d\n",peers);    if(f) fprintf(f,"%d\n",capacity);
   
     // build registrar uri      // build registrar uri
     memset(&reg, 0, sizeof(reg));      memset(&reg, 0, sizeof(reg));
Line 192  int main(int argc, char **argv) { Line 286  int main(int argc, char **argv) {
         // let nodes wake up to recieve the greetings          // let nodes wake up to recieve the greetings
         for(;time > 0;time--) {          for(;time > 0;time--) {
             printf(" ---> %d MINUTE%s REMAIN%s\n",time,time==1?"":"S",time==1?"S":"");              printf(" ---> %d MINUTE%s REMAIN%s\n",time,time==1?"":"S",time==1?"S":"");
               clear(nbrs);
               swaplinks_get_neighbors(swp,nbrs,4096);
               p = process_neighbor_list(nbrs);
               printf("%d %d\n",p.i,p.o);
             sleep(60);              sleep(60);
         }          }
         printf("Now sending KILL signals\n",limit);          printf("Now sending KILL signals\n",limit);
Line 204  int main(int argc, char **argv) { Line 302  int main(int argc, char **argv) {
         system("find output -size 0c | xargs rm -rf");          system("find output -size 0c | xargs rm -rf");
         // then kill stragglers more violently (including the coordinator and its sending thread as well)          // then kill stragglers more violently (including the coordinator and its sending thread as well)
         system("killall lt-slinksvariableload");          system("killall lt-slinksvariableload");
           printf("Coordinator shutting down.\n");
         return 0;          return 0;
     }      }
   
Line 211  int main(int argc, char **argv) { Line 310  int main(int argc, char **argv) {
     // give recieving threads a chance to wake up      // give recieving threads a chance to wake up
     sleep(5);      sleep(5);
   
   while (time++ < limit) {    while (time++ < limit&& alive) {
         // send a request for resources with probability percent/100          // send a request for resources with probability percent/100
         if(randint(100) < percent) {          if(randint(100) < percent) {
           snprintf(tmp,sizeof(tmp),"%d;%d;0;",duration,weight);            snprintf(tmp,sizeof(tmp),"%d;%d;0;%d%d;%d;",duration,weight,num,time,capacity);
             swaplinks_sendtoany(swp,tmp,strlenn(tmp)+1, 0);              swaplinks_sendtoany(swp,tmp,strlenn(tmp)+1, 0);
         }          }
         // record number and type of neighbors          // record number and type of neighbors
Line 234  int main(int argc, char **argv) { Line 333  int main(int argc, char **argv) {
     return 0;      return 0;
 }  }
   
   // read the neighbor list and extract the number of in and out nbrs
   // would also like to alter this to give the number of unique nbrs
   // instead of the raw number
 int_pair process_neighbor_list(char *s) {  int_pair process_neighbor_list(char *s) {
     int_pair p = { 0, 0 };      int_pair p = { 0, 0 };
     char *t,*v;      char *t,*v;

Removed from v.1.27  
changed lines
  Added in v.1.28


FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>
 

GForge.cis.cornell.edu is brought to you by

Cornell Computing and Information Science


Powered By GForge Collaborative Development Environment