Log In
New Account
  
 
Home My Page Project Tree Code Snippets Project Openings NUTSS
 
 
Summary Tracker Lists CVS Files
 

CVS | Administration

Annotation of libnutss/examples/slinks_variable_load.c, revision 1.12

1.1       ths         1: #include <stdio.h>
                      2: #include <stdlib.h>
                      3: #include <string.h>
                      4: #ifndef WIN32
                      5: #include <poll.h>
                      6: #include <sys/socket.h>
                      7: #endif
                      8: #include <pthread.h>
                      9: #include "../include/swaplinks.h"
                     10:
                     11: #ifdef WIN32
                     12: #define sleep(sec) Sleep((sec)*1000)
                     13: #define snprintf   _snprintf
                     14: #endif
                     15:
1.11      ths        16: #define assert(p) if(!(p)) { printf("Assertion FAILED!\n"); exit(1); }
1.10      ths        17:
1.1       ths        18: typedef struct flow {
                     19:     int duration;
                     20:     int weight;
1.4       ths        21:     char source[256];
1.1       ths        22:     struct flow *next, *last;
                     23: } flow_t;
                     24:
1.12    ! ths        25: int num, peers, limit, maxload;
        !            26: swaplinks_p swp;
1.1       ths        27: FILE *f = NULL, *s = NULL;
                     28:
                     29: void *recvthread(void *arg) {
                     30:     swaplinks_p swp = (swaplinks_p)arg;
                     31:     struct sockaddr_ns peer;
                     32:     socklen_t len = sizeof(peer);
1.10      ths        33:     flow_t *flows = NULL, *newflow = NULL, *ptr = NULL, *axe = NULL;
1.12    ! ths        34:     char msg[64], tmp[128], buf[2048];
        !            35:     int time = 0, load = 0, busy = 0, exts, delta, expiration;
1.1       ths        36:
1.7       ths        37:     buf[0] = 0;
                     38:
1.1       ths        39:     while(time < limit) {
1.11      ths        40:       sleep(1);
                     41:       if(busy++ > limit*3) {
1.12    ! ths        42:           if(s) fprintf(s,"%d HALTING RECEPTION. TOO MUCH WAITING.\n",num);
1.11      ths        43:           pthread_exit(NULL);
                     44:       }
1.1       ths        45:         if(swaplinks_recvfrom(swp, tmp, sizeof(tmp), 0, &peer, &len) > 0) {
1.4       ths        46:             time++; delta = 0;
1.12    ! ths        47:           if(s) fprintf(s,"\t\tTIME at node %d: %d\n",num,time);
1.2       ths        48:             newflow = (flow_t*)calloc(1,sizeof(flow_t));
1.1       ths        49:             newflow->duration = atoi(strtok(tmp,";"));
                     50:             newflow->weight = atoi(strtok(NULL,";"));
1.12    ! ths        51:           exts = atoi(strtok(NULL,";"));
        !            52:           if(load < maxload) {
        !            53:          strncpy(newflow->source,peer.user+4,sizeof(newflow->source));
        !            54:                   snprintf(buf,2048,"->%d:%d<->",newflow->duration,newflow->weight);
        !            55:          for(ptr = flows; ptr;) {
        !            56:     sprintf(buf,"%s%d:%d<->",buf,ptr->duration,ptr->weight);
        !            57:     expiration = 0; axe = NULL;
        !            58:                       if(--ptr->duration == 0) {
        !            59:         axe = ptr;
        !            60:                           load -= ptr->weight;
        !            61:                           delta -= ptr->weight;
        !            62:
        !            63:         if(ptr->last && ptr->next) {
        !            64:    assert(flows != ptr);
        !            65:    ptr->last->next = ptr->next;
        !            66:    ptr->next->last = ptr->last;
        !            67:         }
        !            68:         else if(ptr->next) {
        !            69:    assert(flows == ptr);
        !            70:    flows = ptr->next;
        !            71:    if(flows) flows->last = NULL;
        !            72:         }
        !            73:         else if(ptr->last) {
        !            74:    assert(flows != ptr);
        !            75:    ptr->last->next = NULL;
        !            76:         }
        !            77:         else {
        !            78:    assert(flows == ptr);
        !            79:    flows = NULL;
        !            80:         }
        !            81:
        !            82:       if(s) fprintf(s,"%d EXPIRED FLOW from %s:d%d/w%d\n",num,ptr->source,ptr->duration,ptr->weight);
        !            83:         expiration = 1;
        !            84:                       }
        !            85:     ptr = ptr->next;
        !            86:     if(expiration) free(axe);
        !            87:
        !            88:                   }
        !            89:          sprintf(buf,"%s||\n",buf);
        !            90:
        !            91:       //          if(s) fprintf(s,"%d ACCEPTED FLOW from %s:d%d/w%d\n",num,newflow->source,newflow->duration,newflow->weight);
        !            92:        if(s) fprintf(s,"%s",buf);
        !            93:
        !            94:                   load += newflow->weight;
        !            95:          delta += newflow->weight;
        !            96:          newflow->next = flows;
        !            97:          if(newflow->next) newflow->next->last = newflow;
        !            98:          flows = newflow;
        !            99:                   if(f) fprintf(f,"%d\n",load);
        !           100:               }
        !           101:      // ADMISSION CONTROL
        !           102:      // if we're overloaded, drop it or pass this on to a neighbor, mark it as such
        !           103:      else if(exts < 5) {
        !           104:     swaplinks_update_walk_length(swp,1);
        !           105:     snprintf(msg,64,"%d;%d;%d;",newflow->duration,newflow->weight,exts+1);
        !           106:     swaplinks_sendtoany(swp,msg,strlen(msg)+1,0);
        !           107:     swaplinks_update_walk_length(swp,peers);
        !           108:      }
        !           109:       }
1.1       ths       110:     }
1.5       ths       111:     if(s) fprintf(s,"%d DONE RECEIVING\n",num);
1.8       ths       112:
                    113:     pthread_exit(NULL);
1.1       ths       114: }
                    115:
                    116: int main(int argc, char **argv) {
                    117:     struct sockaddr_ns reg;
                    118:     char tmp[256];
1.12    ! ths       119:     int time = 0, duration, weight, sendcount = 0;
1.1       ths       120:     pthread_t tid;
                    121:
1.12    ! ths       122:     if (argc > 6) {
1.1       ths       123:         num = atoi(argv[1]);
                    124:         peers = atoi(argv[2]);
                    125:         limit = atoi(argv[3]);
                    126:         duration = atoi(argv[4]);
                    127:         weight = atoi(argv[5]);
1.12    ! ths       128:       maxload = atoi(argv[6]);
1.1       ths       129:     }
                    130:     else {
1.12    ! ths       131:         printf("Usage:\n  slinks_variable_load node_number peers_number time_limit flow_duration flow_weight max_load\n");
1.1       ths       132:         exit(1);
                    133:     }
                    134:
                    135:     snprintf(tmp, sizeof(tmp), "test%d", num);
                    136:     nutss_config_set(NUTSS_CONFIG_USERNAME, tmp, strlen(tmp));
                    137:     nutss_config_set(NUTSS_CONFIG_PROXYUSERNAME, tmp, strlen(tmp));
                    138:     nutss_config_set(NUTSS_CONFIG_PROXYPASSWORD, tmp, strlen(tmp));
                    139:     strncpy(tmp, "nutss.net", sizeof(tmp));
                    140:     nutss_config_set(NUTSS_CONFIG_DOMAINNAME, tmp, strlen(tmp));
                    141:     strncpy(tmp, "sip.nutss.net:5060", sizeof(tmp));
                    142:     nutss_config_set(NUTSS_CONFIG_SIGPROXY, tmp, strlen(tmp));
                    143:
                    144:     sprintf(tmp, "output/data_node%d.txt", num); f = fopen(tmp,"w");
1.9       ths       145:     sprintf(tmp, "output/output_node%d.txt", num); s = fopen(tmp,"w");
1.1       ths       146:     if (f == NULL) f = stderr;
                    147:     if (s == NULL) s = stdout;
                    148:
                    149:     memset(&reg, 0, sizeof(reg));
                    150:     reg.family = AF_NUTSS;
                    151:     strncpy(reg.user, "ths1", sizeof(reg.user));
                    152:     strncpy(reg.domain, "nutss.net", sizeof(reg.domain));
                    153:     strncpy(reg.service, "swaplinksd", sizeof(reg.service));
                    154:
                    155:     swaplinks_init();
                    156:     swp = swaplinks_new("cloud9", &reg, peers, peers);
                    157:
                    158:     // let swaplinks get going.
                    159:     sleep(10);
                    160:
                    161:     pthread_create(&tid, NULL, recvthread, swp);
                    162:
                    163:     sleep(1);
                    164:
1.12    ! ths       165:     snprintf(tmp, sizeof(tmp), "%d;%d;0;", duration, weight);
1.5       ths       166:
1.9       ths       167:     while (time++ < 2*limit) { // (i-- != 0) {
1.5       ths       168:         swaplinks_sendtoany(swp, tmp, strlen(tmp)+1, 0);
1.9       ths       169:         sleep(5);
1.1       ths       170:     }
1.5       ths       171:
                    172:     if(s) fprintf(s,"%d DONE SENDING\n",num);
1.1       ths       173:
                    174:     pthread_join(tid,NULL);
                    175:
                    176:     if(f) fclose(f);
                    177:     if(s) fclose(s);
1.6       ths       178:
1.9       ths       179:     if(num == 1) system("echo \" \" | mail -s\"experiment done!\" ths22@cs.cornell.edu");
1.6       ths       180:
1.1       ths       181:     return 0;
                    182: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>
 

GForge.cis.cornell.edu is brought to you by

Cornell Computing and Information Science


Powered By GForge Collaborative Development Environment