Log In
New Account
  
 
Home My Page Project Tree Code Snippets Project Openings NUTSS
 
 
Summary Tracker Lists CVS Files
 

CVS | Administration

Annotation of libnutss/examples/slinks_variable_load.c, revision 1.21

1.15      ths         1: #include <stdio.h>
                      2: #include <stdlib.h>
                      3: #include <string.h>
                      4: #ifndef WIN32
                      5: #include <poll.h>
                      6: #include <sys/socket.h>
                      7: #endif
                      8: #include <pthread.h>
                      9: #include "../include/swaplinks.h"
                     10: 
                     11: #ifdef WIN32
                     12: #define sleep(sec) Sleep((sec)*1000)
                     13: #define snprintf   _snprintf
                     14: #endif
                     15: 
1.21    ! ths        16: #define clear(a) a[0]=0
        !            17: #define strlenn(s) (s) ? strlen(s) : 0
1.19      ths        18: #define randint(N) ((int)(rand() / (((double)RAND_MAX + 1) / (N))))
1.15      ths        19: #define assert(p) if(!(p)) { printf("Assertion '%s' FAILED!\n", #p); exit(1); }
1.20      ths        20: #define REDUNDANCY (4*(100/percent))
                     21: 
1.15      ths        22: typedef struct flow {
                     23:     int duration;
                     24:     int weight;
                     25:     char source[256];
                     26:     struct flow *next, *last;
                     27: } flow_t;
                     28: 
1.21    ! ths        29: typedef struct int_pair_t { int i; int o; } int_pair;
        !            30: 
        !            31: int_pair process_neighbor_list(char*);
        !            32: 
1.15      ths        33: int num, peers, limit, maxload;
                     34: swaplinks_p swp;
1.18      ths        35: FILE *f = NULL, *s = NULL, *g = NULL;
1.15      ths        36: 
                     37: void *recvthread(void *arg) {
                     38:     swaplinks_p swp = (swaplinks_p)arg;
                     39:     struct sockaddr_ns peer;
                     40:     socklen_t len = sizeof(peer);
                     41:     flow_t *flows = NULL, *newflow = NULL, *ptr = NULL, *axe = NULL;
                     42:     char msg[64], tmp[128], buf[2048];
1.18      ths        43:     int time = 0, load = 0, exts, delta, expiration;
1.15      ths        44: 
                     45:     buf[0] = 0;
                     46: 
                     47:     while(time < limit) {
                     48:      if(swaplinks_recvfrom(swp, tmp, sizeof(tmp), 0, &peer, &len) > 0) {
                     49:     time++; delta = 0;
1.18      ths        50:     // parse new flow
1.15      ths        51:     newflow = (flow_t*)calloc(1,sizeof(flow_t));
                     52:     newflow->duration = atoi(strtok(tmp,";"));
                     53:     newflow->weight = atoi(strtok(NULL,";"));
                     54:     exts = atoi(strtok(NULL,";"));
1.18      ths        55:     strncpy(newflow->source,peer.user+4,sizeof(newflow->source));
1.19      ths        56:     if(s) fprintf(s,"      %d recvd request #%d from %s - w:%d d:%d\n",
                     57:          num,time,newflow->source,newflow->weight,newflow->duration);
1.18      ths        58:     snprintf(buf,2048,"->%d:%d:%s<->",newflow->duration,newflow->weight,newflow->source);
                     59:     // run through flows, decrementing duration, cleaning any expired flows
                     60:     for(ptr = flows; ptr;) {
                     61:    sprintf(buf,"%s%d:%d<->",buf,ptr->duration,ptr->weight);
                     62:    expiration = 0; axe = NULL;
                     63:    if(--ptr->duration == 0) {
                     64:   axe = ptr;
                     65:   load -= ptr->weight;
                     66:   delta -= ptr->weight;
                     67:   if(ptr->last && ptr->next) {
                     68:  assert(flows != ptr);
                     69:  ptr->last->next = ptr->next;
                     70:  ptr->next->last = ptr->last;
                     71:   }
                     72:   else if(ptr->next) {
                     73:  assert(flows == ptr);
                     74:  ptr->next->last = NULL;
                     75:  flows = ptr->next;
                     76:   }
                     77:   else if(ptr->last) {
                     78:  assert(flows != ptr);
                     79:  ptr->last->next = NULL;
                     80:   }
                     81:   else {
                     82:  assert(flows == ptr);
                     83:  flows = NULL;
                     84:   }
1.15      ths        85:      if(s) fprintf(s,"%d EXPIRED FLOW from %s:d%d/w%d\n",num,ptr->source,ptr->duration,ptr->weight);
1.18      ths        86:   expiration = 1;
1.15      ths        87:    }
1.18      ths        88:    ptr = ptr->next;
                     89:    if(expiration) { free(axe); axe = NULL; }
                     90:     }
                     91:     sprintf(buf,"%s||\n",buf);
1.15      ths        92: 
                     93:      //          if(s) fprintf(s,"%d ACCEPTED FLOW from %s:d%d/w%d\n",num,newflow->source,newflow->duration,newflow->weight);
1.18      ths        94:     // if this flow will not put us overbudget, accept
                     95:     if(load+newflow->weight <= maxload) {
1.15      ths        96:    load += newflow->weight;
                     97:    delta += newflow->weight;
                     98:    newflow->next = flows;
1.18      ths        99:    if(flows) flows->last = newflow;
1.15      ths       100:    flows = newflow;
1.18      ths       101:    if(s) fprintf(s,"        %d accepted flow: %d\n",num,load);
1.15      ths       102:     }
                    103:     // ADMISSION CONTROL
                    104:     // if we're overloaded, drop it if its been pushed too far,
1.18      ths       105:     // otherwise pass on to a neighbor and mark it
1.15      ths       106:     else if(exts < 5) {
1.18      ths       107:    fprintf(s,"        %d rejected flow, load too high: %d <= %d\n",num,maxload,load+newflow->weight);
1.15      ths       108:    swaplinks_update_walk_length(swp,1);
                    109:    snprintf(msg,64,"%d;%d;%d;",newflow->duration,newflow->weight,exts+1);
1.21    ! ths       110:    swaplinks_sendtoany(swp,msg,strlenn(msg)+1,0);
1.15      ths       111:    swaplinks_update_walk_length(swp,peers);
                    112:     }
1.19      ths       113:     else
                    114:    fprintf(s,"        %d retired unserviceable request\n",num);
                    115: 
1.18      ths       116:     if(f) fprintf(f,"%d\n",load);
                    117:   if(s) fprintf(s,"%s",buf);
1.15      ths       118:      }
1.18      ths       119:       }
                    120:       if(s) fprintf(s,"%d DONE RECEIVING\n",num);
1.15      ths       121: 
1.18      ths       122:       // clean up
                    123:       if(f && f != stderr) { fclose(f); f = NULL; }
                    124:       if(s && s != stdout) { fclose(s); s = NULL; }
                    125:       for(ptr = flows;ptr;) {axe = ptr; ptr = ptr->next; free(axe);}
1.16      ths       126: 
1.18      ths       127:       pthread_exit(NULL);
1.15      ths       128: }
                    129: 
                    130: int main(int argc, char **argv) {
                    131:     struct sockaddr_ns reg;
1.18      ths       132:     char tmp[256], nbrs[4096];
1.20      ths       133:     int time = 0, duration, weight, percent, sendcount = 0;
1.21    ! ths       134:     int_pair p;
1.15      ths       135:     pthread_t tid;
                    136: 
                    137:     if (argc > 6) {
                    138:         num = atoi(argv[1]);
                    139:         peers = atoi(argv[2]);
                    140:         limit = atoi(argv[3]);
1.20      ths       141:       percent = atoi(argv[4]);
                    142:         duration = atoi(argv[5]);
                    143:         weight = atoi(argv[6]);
                    144:         maxload = atoi(argv[7]);
1.15      ths       145:     }
                    146:     else {
1.20      ths       147:         printf("Usage:\n  slinks_variable_load node_number peers_number time_limit percent_to_send flow_duration flow_weight max_load\n");
1.15      ths       148:         exit(1);
                    149:     }
                    150: 
                    151:     snprintf(tmp, sizeof(tmp), "test%d", num);
1.21    ! ths       152:     nutss_config_set(NUTSS_CONFIG_USERNAME, tmp, strlenn(tmp));
        !           153:     nutss_config_set(NUTSS_CONFIG_PROXYUSERNAME, tmp, strlenn(tmp));
        !           154:     nutss_config_set(NUTSS_CONFIG_PROXYPASSWORD, tmp, strlenn(tmp));
1.15      ths       155:     strncpy(tmp, "nutss.net", sizeof(tmp));
1.21    ! ths       156:     nutss_config_set(NUTSS_CONFIG_DOMAINNAME, tmp, strlenn(tmp));
1.15      ths       157:     strncpy(tmp, "sip.nutss.net:5060", sizeof(tmp));
1.21    ! ths       158:     nutss_config_set(NUTSS_CONFIG_SIGPROXY, tmp, strlenn(tmp));
1.15      ths       159: 
                    160:     sprintf(tmp, "output/data_node%d.txt", num); f = fopen(tmp,"w");
                    161: //    sprintf(tmp, "output/output_node%d.txt", num); s = fopen(tmp,"w");
1.18      ths       162:     sprintf(tmp, "output/nbrs_node%d.txt", num); g = fopen(tmp,"w");
1.15      ths       163:     if (f == NULL) f = stderr;
                    164:     if (s == NULL) s = stdout;
                    165: 
                    166:     memset(&reg, 0, sizeof(reg));
                    167:     reg.family = AF_NUTSS;
                    168:     strncpy(reg.user, "ths1", sizeof(reg.user));
                    169:     strncpy(reg.domain, "nutss.net", sizeof(reg.domain));
                    170:     strncpy(reg.service, "swaplinksd", sizeof(reg.service));
                    171: 
                    172:     swaplinks_init();
1.19      ths       173: 
1.20      ths       174: //    sleep(randint(5));
1.19      ths       175: 
1.15      ths       176:     swp = swaplinks_new("cloud9", &reg, peers, peers);
                    177: 
                    178:     // let swaplinks get going.
                    179:     sleep(5);
                    180: 
                    181:     pthread_create(&tid, NULL, recvthread, swp);
                    182: 
1.20      ths       183: //    sleep(2);
1.15      ths       184: 
                    185:     snprintf(tmp, sizeof(tmp), "%d;%d;0;", duration, weight);
                    186: 
1.21    ! ths       187:     while (time++ < REDUNDANCY*limit) {
        !           188:         if(randint(100) < percent) swaplinks_sendtoany(swp, tmp, strlenn(tmp)+1, 0);
1.20      ths       189: //      if(s) fprintf(s,"   %d sent request %d\n",num,time);
1.18      ths       190:       swaplinks_get_neighbors(swp, nbrs, 4096);
1.21    ! ths       191:       p = process_neighbor_list(nbrs);
        !           192:       if(g) fprintf(g,"%d %d\n",p.i,p.o);
        !           193:       clear(nbrs);
1.20      ths       194:       sleep(1);
1.15      ths       195:     }
                    196: 
1.20      ths       197:     if(g) fclose(g);
                    198:     if(s) fprintf(s,"%d DONE SENDING\n",num);
                    199:     pthread_join(tid, NULL);
1.15      ths       200:     if(s) fprintf(s,"%d SHUTTING DOWN\n",num);
                    201: 
1.20      ths       202: //    if(num == 1) system("echo \" \" | mail -s\"node one finished sending!\" ths22@cs.cornell.edu");
1.15      ths       203: 
                    204:     return 0;
1.20      ths       205: }
                    206: 
1.21    ! ths       207: int_pair process_neighbor_list(char *s) {
        !           208:     int_pair p;
        !           209:     char *t,*v;
        !           210:     t = strtok(s,",");
        !           211:     do {
        !           212:       s += strlenn(t)+1;
        !           213:       v = strtok(t,":");
        !           214:       t += strlenn(v)+1;
        !           215:       if(*t == 'i') p.i++;
        !           216:       else if(t && *t == 'o') p.o++;
        !           217:     } while(t = strtok(s,","));
        !           218:     return p;
1.15      ths       219: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>
 

GForge.cis.cornell.edu is brought to you by

Cornell Computing and Information Science


Powered By GForge Collaborative Development Environment