Log In
New Account
  
 
Home My Page Project Tree Code Snippets Project Openings NUTSS
 
 
Summary Tracker Lists CVS Files
 

CVS | Administration

Annotation of libnutss/examples/slinks_variable_load.c, revision 1.13

1.13    ! ths         1: #include <stdio.h>
        !             2: #include <stdlib.h>
        !             3: #include <string.h>
        !             4: #ifndef WIN32
        !             5: #include <poll.h>
        !             6: #include <sys/socket.h>
        !             7: #endif
        !             8: #include <pthread.h>
        !             9: #include "../include/swaplinks.h"
        !            10: 
        !            11: #ifdef WIN32
        !            12: #define sleep(sec) Sleep((sec)*1000)
        !            13: #define snprintf   _snprintf
        !            14: #endif
        !            15: 
        !            16: #define assert(p) if(!(p)) { printf("Assertion FAILED!\n"); exit(1); }
        !            17: 
        !            18: typedef struct flow {
        !            19:     int duration;
        !            20:     int weight;
        !            21:     char source[256];
        !            22:     struct flow *next, *last;
        !            23: } flow_t;
        !            24: 
        !            25: int num, peers, limit, maxload;
        !            26: swaplinks_p swp;
        !            27: FILE *f = NULL, *s = NULL;
        !            28: 
        !            29: void *recvthread(void *arg) {
        !            30:     swaplinks_p swp = (swaplinks_p)arg;
        !            31:     struct sockaddr_ns peer;
        !            32:     socklen_t len = sizeof(peer);
        !            33:     flow_t *flows = NULL, *newflow = NULL, *ptr = NULL, *axe = NULL;
        !            34:     char msg[64], tmp[128], buf[2048];
        !            35:     int time = 0, load = 0, busy = 0, exts, delta, expiration;
        !            36: 
        !            37:     buf[0] = 0;
        !            38: 
        !            39:     while(time < limit) {
        !            40:      sleep(1);
        !            41:      if(swaplinks_recvfrom(swp, tmp, sizeof(tmp), 0, &peer, &len) > 0) {
        !            42:     time++; delta = 0;
        !            43:     if(s) fprintf(s,"      %d recvd request %d\n",num,time);
        !            44:     newflow = (flow_t*)calloc(1,sizeof(flow_t));
        !            45:     newflow->duration = atoi(strtok(tmp,";"));
        !            46:     newflow->weight = atoi(strtok(NULL,";"));
        !            47:     exts = atoi(strtok(NULL,";"));
        !            48:     if(load < maxload) {
        !            49:    strncpy(newflow->source,peer.user+4,sizeof(newflow->source));
        !            50:    snprintf(buf,2048,"->%d:%d<->",newflow->duration,newflow->weight);
        !            51:    for(ptr = flows; ptr;) {
        !            52:   sprintf(buf,"%s%d:%d<->",buf,ptr->duration,ptr->weight);
        !            53:   expiration = 0; axe = NULL;
        !            54:   if(--ptr->duration == 0) {
        !            55:  axe = ptr;
        !            56:  load -= ptr->weight;
        !            57:  delta -= ptr->weight;
        !            58: 
        !            59:  if(ptr->last && ptr->next) {
        !            60: assert(flows != ptr);
        !            61: ptr->last->next = ptr->next;
        !            62: ptr->next->last = ptr->last;
        !            63:  }
        !            64:  else if(ptr->next) {
        !            65: assert(flows == ptr);
        !            66: flows = ptr->next;
        !            67:  }
        !            68: 
        !            69:  if(flows) flows->last = NULL;
        !            70:  else if(ptr->last) {
        !            71: assert(flows != ptr);
        !            72: ptr->last->next = NULL;
        !            73:  }
        !            74:  else {
        !            75: assert(flows == ptr);
        !            76: flows = NULL;
        !            77:  }
        !            78: 
        !            79:      if(s) fprintf(s,"%d EXPIRED FLOW from %s:d%d/w%d\n",num,ptr->source,ptr->duration,ptr->weight);
        !            80:  expiration = 1;
        !            81:   }
        !            82:   ptr = ptr->next;
        !            83:   if(expiration) free(axe);
        !            84:    }
        !            85:    sprintf(buf,"%s||\n",buf);
        !            86: 
        !            87:      //          if(s) fprintf(s,"%d ACCEPTED FLOW from %s:d%d/w%d\n",num,newflow->source,newflow->duration,newflow->weight);
        !            88:       if(s) fprintf(s,"%s",buf);
        !            89: 
        !            90:    load += newflow->weight;
        !            91:    delta += newflow->weight;
        !            92:    newflow->next = flows;
        !            93:    if(newflow->next) newflow->next->last = newflow;
        !            94:    flows = newflow;
        !            95:    if(f) fprintf(f,"%d\n",load);
        !            96:     }
        !            97:     // ADMISSION CONTROL
        !            98:     // if we're overloaded, drop it if its been pushed too far,
        !            99:     // or pass on to a neighbor and mark it as such if not
        !           100:     else if(exts < 5) {
        !           101:    swaplinks_update_walk_length(swp,1);
        !           102:    snprintf(msg,64,"%d;%d;%d;",newflow->duration,newflow->weight,exts+1);
        !           103:    swaplinks_sendtoany(swp,msg,strlen(msg)+1,0);
        !           104:    swaplinks_update_walk_length(swp,peers);
        !           105:     }
        !           106:      }
        !           107:     }
        !           108:     if(s) fprintf(s,"%d DONE RECEIVING\n",num);
        !           109: 
        !           110:     if(f) { fclose(f); f = NULL; }
        !           111:     if(s) { fclose(s); s = NULL; }
        !           112: 
        !           113:     pthread_exit(NULL);
        !           114: }
        !           115: 
        !           116: int main(int argc, char **argv) {
        !           117:     struct sockaddr_ns reg;
        !           118:     char tmp[256];
        !           119:     int time = 0, duration, weight, sendcount = 0;
        !           120:     pthread_t tid;
        !           121: 
        !           122:     if (argc > 6) {
        !           123:         num = atoi(argv[1]);
        !           124:         peers = atoi(argv[2]);
        !           125:         limit = atoi(argv[3]);
        !           126:         duration = atoi(argv[4]);
        !           127:         weight = atoi(argv[5]);
        !           128:      maxload = atoi(argv[6]);
        !           129:     }
        !           130:     else {
        !           131:         printf("Usage:\n  slinks_variable_load node_number peers_number time_limit flow_duration flow_weight max_load\n");
        !           132:         exit(1);
        !           133:     }
        !           134: 
        !           135:     snprintf(tmp, sizeof(tmp), "test%d", num);
        !           136:     nutss_config_set(NUTSS_CONFIG_USERNAME, tmp, strlen(tmp));
        !           137:     nutss_config_set(NUTSS_CONFIG_PROXYUSERNAME, tmp, strlen(tmp));
        !           138:     nutss_config_set(NUTSS_CONFIG_PROXYPASSWORD, tmp, strlen(tmp));
        !           139:     strncpy(tmp, "nutss.net", sizeof(tmp));
        !           140:     nutss_config_set(NUTSS_CONFIG_DOMAINNAME, tmp, strlen(tmp));
        !           141:     strncpy(tmp, "sip.nutss.net:5060", sizeof(tmp));
        !           142:     nutss_config_set(NUTSS_CONFIG_SIGPROXY, tmp, strlen(tmp));
        !           143: 
        !           144:     sprintf(tmp, "output/data_node%d.txt", num); f = fopen(tmp,"w");
        !           145: //    sprintf(tmp, "output/output_node%d.txt", num); s = fopen(tmp,"w");
        !           146:     if (f == NULL) f = stderr;
        !           147:     if (s == NULL) s = stdout;
        !           148: 
        !           149:     memset(&reg, 0, sizeof(reg));
        !           150:     reg.family = AF_NUTSS;
        !           151:     strncpy(reg.user, "ths1", sizeof(reg.user));
        !           152:     strncpy(reg.domain, "nutss.net", sizeof(reg.domain));
        !           153:     strncpy(reg.service, "swaplinksd", sizeof(reg.service));
        !           154: 
        !           155:     swaplinks_init();
        !           156:     swp = swaplinks_new("cloud9", &reg, peers, peers);
        !           157: 
        !           158:     // let swaplinks get going.
        !           159:     sleep(10);
        !           160: 
        !           161:     pthread_create(&tid, NULL, recvthread, swp);
        !           162: 
        !           163:     sleep(1);
        !           164: 
        !           165:     snprintf(tmp, sizeof(tmp), "%d;%d;0;", duration, weight);
        !           166: 
        !           167:     while (time++ < 3*limit) { // (i-- != 0) {
        !           168:         swaplinks_sendtoany(swp, tmp, strlen(tmp)+1, 0);
        !           169:         if(s) fprintf(s,"   %d sent request %d\n",num,time);
        !           170:         else printf("   %d sent request %d\n",num,time);
        !           171:         sleep(10);
        !           172:     }
        !           173: 
        !           174:       pthread_join(tid, NULL);
        !           175: 
        !           176:     if(s) fprintf(s,"%d DONE SENDING\n",num);
        !           177: 
        !           178: //    if(num == 1) system("echo \" \" | mail -s\"node one finished sending!\" ths22@cs.cornell.edu");
        !           179: 
        !           180:     return 0;
        !           181: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>
 

GForge.cis.cornell.edu is brought to you by

Cornell Computing and Information Science


Powered By GForge Collaborative Development Environment