Log In
New Account
  
 
Home My Page Project Tree Code Snippets Project Openings NUTSS
 
 
Summary Tracker Lists CVS Files
 

CVS | Administration

Annotation of libnutss/examples/slinks_variable_load.c, revision 1.17

1.15      ths         1: #include <stdio.h>
                      2: #include <stdlib.h>
                      3: #include <string.h>
                      4: #ifndef WIN32
                      5: #include <poll.h>
                      6: #include <sys/socket.h>
                      7: #endif
                      8: #include <pthread.h>
                      9: #include "../include/swaplinks.h"
                     10: 
                     11: #ifdef WIN32
                     12: #define sleep(sec) Sleep((sec)*1000)
                     13: #define snprintf   _snprintf
                     14: #endif
                     15: 
                     16: #define assert(p) if(!(p)) { printf("Assertion '%s' FAILED!\n", #p); exit(1); }
                     17: 
                     18: typedef struct flow {
                     19:     int duration;
                     20:     int weight;
                     21:     char source[256];
                     22:     struct flow *next, *last;
                     23: } flow_t;
                     24: 
                     25: int num, peers, limit, maxload;
                     26: swaplinks_p swp;
                     27: FILE *f = NULL, *s = NULL;
                     28: 
                     29: void *recvthread(void *arg) {
                     30:     swaplinks_p swp = (swaplinks_p)arg;
                     31:     struct sockaddr_ns peer;
                     32:     socklen_t len = sizeof(peer);
                     33:     flow_t *flows = NULL, *newflow = NULL, *ptr = NULL, *axe = NULL;
                     34:     char msg[64], tmp[128], buf[2048];
                     35:     int time = 0, load = 0, busy = 0, exts, delta, expiration;
                     36: 
                     37:     buf[0] = 0;
                     38: 
                     39:     while(time < limit) {
                     40:      sleep(1);
                     41:      if(swaplinks_recvfrom(swp, tmp, sizeof(tmp), 0, &peer, &len) > 0) {
                     42:     time++; delta = 0;
1.17    ! ths        43:     if(s) fprintf(s,"      %d recvd request %d\n",num,time);
1.15      ths        44:     newflow = (flow_t*)calloc(1,sizeof(flow_t));
                     45:     newflow->duration = atoi(strtok(tmp,";"));
                     46:     newflow->weight = atoi(strtok(NULL,";"));
                     47:     exts = atoi(strtok(NULL,";"));
                     48:     if(load+newflow->weight <= maxload) {
                     49:    strncpy(newflow->source,peer.user+4,sizeof(newflow->source));
                     50:    snprintf(buf,2048,"->%d:%d<->",newflow->duration,newflow->weight);
                     51:    for(ptr = flows; ptr;) {
                     52:   sprintf(buf,"%s%d:%d<->",buf,ptr->duration,ptr->weight);
                     53:   expiration = 0; axe = NULL;
                     54:   if(--ptr->duration == 0) {
                     55:  axe = ptr;
                     56:  load -= ptr->weight;
                     57:  delta -= ptr->weight;
                     58: 
                     59:  if(ptr->last && ptr->next) {
                     60: assert(flows != ptr);
                     61: ptr->last->next = ptr->next;
                     62: ptr->next->last = ptr->last;
                     63:  }
                     64:  else if(ptr->next) {
                     65: assert(flows == ptr);
1.16      ths        66: ptr->next->last = NULL;
1.15      ths        67: flows = ptr->next;
                     68:  }
                     69:  else if(ptr->last) {
                     70: assert(flows != ptr);
                     71: ptr->last->next = NULL;
                     72:  }
                     73:  else {
                     74: assert(flows == ptr);
                     75: flows = NULL;
                     76:  }
                     77: 
                     78:      if(s) fprintf(s,"%d EXPIRED FLOW from %s:d%d/w%d\n",num,ptr->source,ptr->duration,ptr->weight);
                     79:  expiration = 1;
                     80:   }
                     81:   ptr = ptr->next;
1.16      ths        82:   if(expiration) { free(axe); axe = NULL; }
1.15      ths        83:    }
                     84:    sprintf(buf,"%s||\n",buf);
                     85: 
                     86:      //          if(s) fprintf(s,"%d ACCEPTED FLOW from %s:d%d/w%d\n",num,newflow->source,newflow->duration,newflow->weight);
                     87:       if(s) fprintf(s,"%s",buf);
                     88: 
                     89:    load += newflow->weight;
                     90:    delta += newflow->weight;
                     91:    newflow->next = flows;
                     92:    if(newflow->next) newflow->next->last = newflow;
                     93:    flows = newflow;
1.17    ! ths        94:    if(f) {
        !            95:   fprintf(f,"%d\n",load);
        !            96:   fprintf(s,"        %d accepted flow and recorded load: %d\n",num,load); }
        !            97:    else  fprintf(s,"ERROR RECORDING LOAD!\n");
1.15      ths        98:     }
                     99:     // ADMISSION CONTROL
                    100:     // if we're overloaded, drop it if its been pushed too far,
                    101:     // or pass on to a neighbor and mark it as such if not
                    102:     else if(exts < 5) {
1.17    ! ths       103:    fprintf(s,"        %d rejected flow, load too high: %d >= %d\n",num,load,maxload);
1.15      ths       104:    swaplinks_update_walk_length(swp,1);
                    105:    snprintf(msg,64,"%d;%d;%d;",newflow->duration,newflow->weight,exts+1);
                    106:    swaplinks_sendtoany(swp,msg,strlen(msg)+1,0);
                    107:    swaplinks_update_walk_length(swp,peers);
                    108:     }
                    109:      }
                    110:     }
                    111:     if(s) fprintf(s,"%d DONE RECEIVING\n",num);
                    112: 
                    113:     if(f && f != stderr) { fclose(f); f = NULL; }
                    114:     if(s && s != stdout) { fclose(s); s = NULL; }
                    115: 
1.16      ths       116:     for(ptr = flows;ptr;) {axe = ptr; ptr = ptr->next; free(axe);}
                    117: 
1.15      ths       118:     pthread_exit(NULL);
                    119: }
                    120: 
                    121: int main(int argc, char **argv) {
                    122:     struct sockaddr_ns reg;
                    123:     char tmp[256];
                    124:     int time = 0, duration, weight, sendcount = 0;
                    125:     pthread_t tid;
                    126: 
                    127:     if (argc > 6) {
                    128:         num = atoi(argv[1]);
                    129:         peers = atoi(argv[2]);
                    130:         limit = atoi(argv[3]);
                    131:         duration = atoi(argv[4]);
                    132:         weight = atoi(argv[5]);
1.16      ths       133:         maxload = atoi(argv[6]);
1.15      ths       134:     }
                    135:     else {
                    136:         printf("Usage:\n  slinks_variable_load node_number peers_number time_limit flow_duration flow_weight max_load\n");
                    137:         exit(1);
                    138:     }
                    139: 
                    140:     snprintf(tmp, sizeof(tmp), "test%d", num);
                    141:     nutss_config_set(NUTSS_CONFIG_USERNAME, tmp, strlen(tmp));
                    142:     nutss_config_set(NUTSS_CONFIG_PROXYUSERNAME, tmp, strlen(tmp));
                    143:     nutss_config_set(NUTSS_CONFIG_PROXYPASSWORD, tmp, strlen(tmp));
                    144:     strncpy(tmp, "nutss.net", sizeof(tmp));
                    145:     nutss_config_set(NUTSS_CONFIG_DOMAINNAME, tmp, strlen(tmp));
                    146:     strncpy(tmp, "sip.nutss.net:5060", sizeof(tmp));
                    147:     nutss_config_set(NUTSS_CONFIG_SIGPROXY, tmp, strlen(tmp));
                    148: 
                    149:     sprintf(tmp, "output/data_node%d.txt", num); f = fopen(tmp,"w");
                    150: //    sprintf(tmp, "output/output_node%d.txt", num); s = fopen(tmp,"w");
                    151:     if (f == NULL) f = stderr;
                    152:     if (s == NULL) s = stdout;
                    153: 
                    154:     memset(&reg, 0, sizeof(reg));
                    155:     reg.family = AF_NUTSS;
                    156:     strncpy(reg.user, "ths1", sizeof(reg.user));
                    157:     strncpy(reg.domain, "nutss.net", sizeof(reg.domain));
                    158:     strncpy(reg.service, "swaplinksd", sizeof(reg.service));
                    159: 
                    160:     swaplinks_init();
                    161:     swp = swaplinks_new("cloud9", &reg, peers, peers);
                    162: 
                    163:     // let swaplinks get going.
                    164:     sleep(5);
                    165: 
                    166:     pthread_create(&tid, NULL, recvthread, swp);
                    167: 
                    168:     sleep(1);
                    169: 
                    170:     snprintf(tmp, sizeof(tmp), "%d;%d;0;", duration, weight);
                    171: 
1.16      ths       172:     while (time++ < 3*limit) { // (i-- != 0) {
1.15      ths       173:         swaplinks_sendtoany(swp, tmp, strlen(tmp)+1, 0);
                    174:         if(s) fprintf(s,"   %d sent request %d\n",num,time);
                    175:         sleep(2);
                    176:     }
                    177: 
                    178:       if(s) fprintf(s,"%d DONE SENDING\n",num);
                    179: 
                    180:       pthread_join(tid, NULL);
                    181: 
                    182:     if(s) fprintf(s,"%d SHUTTING DOWN\n",num);
                    183: 
                    184:     if(num == 1) system("echo \" \" | mail -s\"node one finished sending!\" ths22@cs.cornell.edu");
                    185: 
                    186:     return 0;
                    187: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>
 

GForge.cis.cornell.edu is brought to you by

Cornell Computing and Information Science


Powered By GForge Collaborative Development Environment