Log In
New Account
  
 
Home My Page Project Tree Code Snippets Project Openings NUTSS
 
 
Summary Tracker Lists CVS Files
 

CVS | Administration

Annotation of libnutss/examples/slinks_variable_load.c, revision 1.26

1.24      ths         1: #include <stdio.h>
                      2: #include <stdlib.h>
                      3: #include <string.h>
1.25      ths         4: #include <math.h>
1.24      ths         5: #ifndef WIN32
                      6: #include <poll.h>
                      7: #include <sys/socket.h>
                      8: #endif
                      9: #include <pthread.h>
                     10: #include "../include/swaplinks.h"
                     11:
                     12: #ifdef WIN32
                     13: #define sleep(sec) Sleep((sec)*1000)
                     14: #define snprintf   _snprintf
                     15: #endif
                     16:
                     17: #define clear(a) a[0]=0
                     18: #define streq(a,b) !strcmp((a),(b))
                     19: #define strlenn(s) (s) ? strlen(s) : 0
                     20: #define randint(N) ((int)(rand() / (((double)RAND_MAX + 1) / (N))))
                     21: #define assert(p) if(!(p)) { printf("Assertion '%s' FAILED!\n", #p); exit(1); }
                     22: #define REDUNDANCY 5
                     23:
                     24: typedef struct flow {
                     25:     int duration;
                     26:     int weight;
                     27:     char source[256];
                     28:     struct flow *next, *last;
                     29: } flow_t;
                     30:
                     31: typedef struct int_pair_t { int i; int o; } int_pair;
                     32:
                     33: int_pair process_neighbor_list(char*);
                     34:
1.26    ! ths        35: int num, nodes, peers, limit, maxload;
1.24      ths        36: swaplinks_p swp;
                     37: FILE *f = NULL, *s = NULL, *g = NULL;
                     38:
1.26    ! ths        39: void *coord_recvthread(void *arg) {
        !            40:     int i;
        !            41:     char tmp[32];
        !            42:     struct sockaddr_ns peer;
        !            43:     socklen_t len = sizeof(peer);
        !            44:     swaplinks_p swp = (swaplinks_p)arg;
        !            45:
        !            46:     sleep(20);
        !            47:     for(i=0;;i++) {
        !            48:         if(i % nodes == 0 && (i/nodes+1) < nodes/4) swaplinks_update_walk_length(swp,peers*(i/nodes + 1));
        !            49:         printf("Coordinator sending greeting %d\n",i+1);
        !            50:         swaplinks_sendtoany(swp,"GREETING",9,0);
        !            51:         sleep(2);
        !            52:     }
        !            53:     pthread_exit(NULL);
        !            54: }
        !            55:
1.24      ths        56: void *recvthread(void *arg) {
                     57:     swaplinks_p swp = (swaplinks_p)arg;
                     58:     struct sockaddr_ns peer, coord;
                     59:     socklen_t len = sizeof(peer);
                     60:     flow_t *flows = NULL, *newflow = NULL, *ptr = NULL, *axe = NULL;
                     61:     char msg[64], tmp[128], buf[2048], greeting[1024];
1.25      ths        62:     int time = 0, load = 0, exts, delta, expiration, greeted = 0, killed = 0, i;
1.24      ths        63:
                     64:     clear(buf);
                     65:     while(time < limit) {
                     66:         memset(tmp,0,128);
                     67:         memset(&peer,0,sizeof(peer));
                     68:         if(swaplinks_recvfrom(swp, tmp, sizeof(tmp), 0, &peer, &len) > 0) {
                     69:             if(streq(tmp,"GREETING")) {
                     70:                 if(greeted) continue;
                     71:                 printf("%d Received greeting from coordinator\n",num);
                     72:                 memset(&coord, 0, sizeof(coord));
                     73:                 memcpy(&coord,&peer,sizeof(peer));
                     74:                 greeted = 1;
                     75:                 continue;
                     76:             }
1.25      ths        77:             if(streq(tmp,"SHUTDOWN")) {
                     78:                 swaplinks_update_walk_length(swp,1);
                     79:                 for(i = 0; i < peers*2; i++)
                     80:                     swaplinks_sendtoany(swp,"SHUTDOWN",9,0);
                     81:                 printf("%d Killed prematurely...\n");
                     82:                 killed = 1;
                     83:                 break;
                     84:             }
1.24      ths        85:             time++; delta = 0;
                     86:             // parse new flow
                     87:             newflow = (flow_t*)calloc(1,sizeof(flow_t));
                     88:             newflow->duration = atoi(strtok(tmp,";"));
                     89:             newflow->weight = atoi(strtok(NULL,";"));
                     90:             exts = atoi(strtok(NULL,";"));
                     91:             strncpy(newflow->source,peer.user+4,sizeof(newflow->source));
1.26    ! ths        92:             if(num == 1) printf("recvd request %d from %s\n",time,newflow->source+4);
1.24      ths        93:             if(s) fprintf(s,"      %d recvd request #%d from %s - w:%d d:%d\n",
                     94:                           num,time,newflow->source,newflow->weight,newflow->duration);
                     95:             snprintf(buf,2048,"->%d:%d:%s<->",newflow->duration,newflow->weight,newflow->source);
                     96:             // run through flows, decrementing duration, cleaning any expired flows
                     97:             for(ptr = flows; ptr;) {
                     98:                 sprintf(buf,"%s%d:%d<->",buf,ptr->duration,ptr->weight);
                     99:                 expiration = 0; axe = NULL;
                    100:                 if(--ptr->duration == 0) {
                    101:                     axe = ptr;
                    102:                     load -= ptr->weight;
                    103:                     delta -= ptr->weight;
                    104:                     if(ptr->last && ptr->next) {
                    105:                         assert(flows != ptr);
                    106:                         ptr->last->next = ptr->next;
                    107:                         ptr->next->last = ptr->last;
                    108:                     }
                    109:                     else if(ptr->next) {
                    110:                         assert(flows == ptr);
                    111:                         ptr->next->last = NULL;
                    112:                         flows = ptr->next;
                    113:                     }
                    114:                     else if(ptr->last) {
                    115:                         assert(flows != ptr);
                    116:                         ptr->last->next = NULL;
                    117:                     }
                    118:                     else {
                    119:                         assert(flows == ptr);
                    120:                         flows = NULL;
                    121:                     }
                    122:                     expiration = 1;
                    123:                 }
                    124:                 ptr = ptr->next;
                    125:                 if(expiration) { free(axe); axe = NULL; }
                    126:             }
                    127:             sprintf(buf,"%s||\n",buf);
                    128:
                    129:             // if this flow will not put us overbudget, accept
                    130:             if(load+newflow->weight <= maxload) {
                    131:                 load += newflow->weight;
                    132:                 delta += newflow->weight;
                    133:                 newflow->next = flows;
                    134:                 if(flows) flows->last = newflow;
                    135:                 flows = newflow;
                    136:                 if(s) fprintf(s,"        %d accepted flow: %d\n",num,load);
                    137:             }
                    138:             // ADMISSION CONTROL
                    139:             // if we're overloaded, drop it if its been pushed too far,
                    140:             // otherwise pass on to a neighbor and mark it
                    141:             else if(exts < 5) {
                    142:                 fprintf(s,"        %d rejected flow, load too high: %d <= %d\n",num,maxload,load+newflow->weight);
                    143:                 swaplinks_update_walk_length(swp,1);
                    144:                 snprintf(msg,64,"%d;%d;%d;",newflow->duration,newflow->weight,exts+1);
                    145:                 swaplinks_sendtoany(swp,msg,strlenn(msg)+1,0);
                    146:                 swaplinks_update_walk_length(swp,peers);
                    147:             }
                    148:             else
                    149:                 fprintf(s,"        %d retired unserviceable request\n",num);
                    150:
                    151:             if(f) fprintf(f,"%d\n",load);
                    152: //          if(s) fprintf(s,"%s",buf);
                    153:         }
                    154:     }
1.26    ! ths       155:     if(s && !killed) printf("%d DONE RECEIVING\n",num);
1.24      ths       156:
                    157:     // clean up flows list
                    158:     for(ptr = flows;ptr;) {axe = ptr; ptr = ptr->next; free(axe);}
                    159:
1.26    ! ths       160:     if(f && f != stderr) { fclose(f); f = NULL; }
        !           161:     if(s && s != stdout) { fclose(s); s = NULL; }
1.25      ths       162:     if(killed) {
1.26    ! ths       163:         swaplinks_update_walk_length(swp,peers*2);
        !           164:         for(i = 0; i < peers*2; i++)
        !           165:             swaplinks_sendtoany(swp,"SHUTDOWN",9,0);
1.25      ths       166:         pthread_exit(NULL);
1.26    ! ths       167:     }
1.25      ths       168:
1.24      ths       169:     while(!greeted) {
                    170:         memset(tmp,0,128);
                    171:         memset(&peer,0,sizeof(peer));
                    172:         if(swaplinks_recvfrom(swp, tmp, sizeof(tmp), 0, &peer, &len) > 0) {
                    173:             if(streq(tmp,"GREETING")) {
                    174:                 printf("%d Received late greeting from coordinator\n",num);
                    175:                 memset(&coord,0,sizeof(coord));
                    176:                 memcpy(&coord,&peer,sizeof(peer));
                    177:                 greeted = 1;
                    178:             }
                    179:         }
                    180:     }
                    181:
                    182:     // tell coordinator that were all done here
                    183:     if(swaplinks_sendto(swp, "DONE", 5, 0, &coord, sizeof(coord)) != 5)
                    184:         printf("%d cannot contact coordinator\n", num);
                    185:     else
                    186:         printf("%d sent alert to coordinator at %s. entering holding pattern...\n",num,coord.user+4);
                    187:
                    188:     // go into holding pattern to let other nodes catch up
                    189:     for(;;) {
                    190:         memset(tmp,0,128);
                    191:         if(swaplinks_recvfrom(swp, tmp, sizeof(tmp), 0, &peer, &len) > 0) {
                    192:             if(streq(tmp,"SHUTDOWN")) { printf("%d Received KILL signal from coordinator. Shutting down...\n",num); break; }
                    193:             else if(streq(tmp,"GREETING")) continue;
                    194:             newflow = (flow_t*)calloc(1,sizeof(flow_t));
                    195:             newflow->duration = atoi(strtok(tmp,";"));
                    196:             newflow->weight = atoi(strtok(NULL,";"));
                    197:             exts = atoi(strtok(NULL,";"));
                    198:             strncpy(newflow->source,peer.user+4,sizeof(newflow->source));
                    199:             if(exts < 5) {
                    200:                 swaplinks_update_walk_length(swp,1);
1.25      ths       201:                 // dont increase expiration here since it hit a dead node
                    202:                 snprintf(msg,64,"%d;%d;%d;",newflow->duration,newflow->weight,exts);
1.24      ths       203:                 swaplinks_sendtoany(swp,msg,strlenn(msg)+1,0);
                    204:                 swaplinks_update_walk_length(swp,peers);
                    205:             }
                    206:         }
                    207:     }
                    208:
1.26    ! ths       209:     swaplinks_update_walk_length(swp,peers*2);
1.25      ths       210:     for(i = 0; i < peers*2; i++)
                    211:         swaplinks_sendtoany(swp,"SHUTDOWN",9,0);
                    212:
1.24      ths       213:     pthread_exit(NULL);
                    214: }
                    215:
                    216: int main(int argc, char **argv) {
                    217:     struct sockaddr_ns reg;
                    218:     char tmp[256], nbrs[4096];
                    219:     int time = 0, duration, weight, percent, sendcount = 0;
                    220:     int_pair p;
1.26    ! ths       221:     pthread_t tid,cid;
        !           222:
        !           223:     nodes = 0;
1.24      ths       224:
                    225:     if(argc > 1) num = atoi(argv[1]);
1.25      ths       226:     if(num != 99 && argc > 7) {
1.24      ths       227:         peers = atoi(argv[2]);
                    228:         limit = atoi(argv[3]);
                    229:         percent = atoi(argv[4]);
                    230:         duration = atoi(argv[5]);
                    231:         weight = atoi(argv[6]);
                    232:         maxload = atoi(argv[7]);
                    233:     }
1.25      ths       234:     else if(num == 99 && argc > 4) {
1.26    ! ths       235:         peers = atoi(argv[3])*2;
1.25      ths       236:         percent = atoi(argv[4]);
                    237:     }
1.24      ths       238:     else if(num != 99) {
1.25      ths       239:         printf("Usage:\n  slinksvariableload node_number peers_number time_limit percent_to_send flow_duration flow_weight max_load\n");
1.24      ths       240:         exit(1);
                    241:     }
1.25      ths       242:     else {
                    243:        printf("Coordinator Usage:\n  slinksvariableload 99 num_nodes num_peers percent_to_finish\n");
                    244:        exit(1);
                    245:     }
1.24      ths       246:
                    247:     snprintf(tmp, sizeof(tmp), "test%d", num);
                    248:     nutss_config_set(NUTSS_CONFIG_USERNAME, tmp, strlenn(tmp));
                    249:     nutss_config_set(NUTSS_CONFIG_PROXYUSERNAME, tmp, strlenn(tmp));
                    250:     nutss_config_set(NUTSS_CONFIG_PROXYPASSWORD, tmp, strlenn(tmp));
                    251:     strncpy(tmp, "nutss.net", sizeof(tmp));
                    252:     nutss_config_set(NUTSS_CONFIG_DOMAINNAME, tmp, strlenn(tmp));
                    253:     strncpy(tmp, "sip.nutss.net:5060", sizeof(tmp));
                    254:     nutss_config_set(NUTSS_CONFIG_SIGPROXY, tmp, strlenn(tmp));
                    255:
                    256:     if(num != 99) {
                    257:         sprintf(tmp, "output/data_node%d.txt", num); f = fopen(tmp,"w");
1.26    ! ths       258:         sprintf(tmp, "output/output_node%d.txt", num); s = fopen(tmp,"w");
1.24      ths       259:         sprintf(tmp, "output/nbrs_node%d.txt", num); g = fopen(tmp,"w");
                    260:         if (f == NULL) f = stderr;
                    261:         if (s == NULL) s = stdout;
                    262:     }
                    263:
                    264:     memset(tmp,0,256);
                    265:     memset(&reg, 0, sizeof(reg));
                    266:     reg.family = AF_NUTSS;
                    267:     strncpy(reg.user, "ths1", sizeof(reg.user));
                    268:     strncpy(reg.domain, "nutss.net", sizeof(reg.domain));
                    269:     strncpy(reg.service, "swaplinksd", sizeof(reg.service));
                    270:
                    271:     swaplinks_init();
                    272:     swp = swaplinks_new("cloud9", &reg, peers, peers+5);
                    273:     // let swaplinks get going.
                    274:     sleep(5);
                    275:
                    276:     // coordinator thread
                    277:     if(num == 99) {
1.26    ! ths       278:         int done = 0, limit, i;
        !           279:         struct sockaddr_ns* peer_list;
1.24      ths       280:         socklen_t len = sizeof(struct sockaddr_ns);
                    281:
1.26    ! ths       282:         nodes = atoi(argv[2]);
        !           283:         limit = ceil((float)nodes*((float)percent/100));
        !           284:         peer_list = (struct sockaddr_ns*)calloc(limit,sizeof(struct sockaddr_ns));
        !           285:
1.25      ths       286:         // let nodes wake up to recieve the greetings
1.26    ! ths       287:         sleep(20);
1.25      ths       288:
1.26    ! ths       289:         pthread_create(&cid,NULL,coord_recvthread,swp);
1.24      ths       290:
1.25      ths       291:         swaplinks_update_walk_length(swp,nodes/2);
                    292:         while(done < limit) {
1.24      ths       293:             if(swaplinks_recvfrom(swp, tmp, sizeof(tmp), 0, peer_list+done, &len) > 0 && streq(tmp,"DONE")) {
                    294:                 printf("Coordinator received DONE signal from %s. Total: %d finished nodes.\n",peer_list[done].user+4,done+1);
                    295:                 swaplinks_sendtoany(swp,"GREETING",9,0);
                    296:                 done++;
                    297:             }
                    298:         }
1.25      ths       299:         printf("%d nodes reported, now sending KILL signals\n",limit);
                    300:         for(i = 0; i < limit; i++) {
1.24      ths       301:             printf("Sending kill to %s\n",peer_list[i].user+4);
                    302:             swaplinks_sendto(swp, "SHUTDOWN", 9, 0, peer_list+i, len);
                    303:         }
                    304:
1.25      ths       305:         // give them time to scramble
1.26    ! ths       306:         sleep(180);
1.25      ths       307:
1.26    ! ths       308:         // clean up their mess
        !           309:         system("find output -size 0c | xargs rm -rf");
        !           310:         // then kill stragglers more violently
1.25      ths       311:         system("killall lt-slinksvariableload");
                    312:
1.24      ths       313:         return 0;
                    314:     }
                    315:
                    316:     pthread_create(&tid, NULL, recvthread, swp);
                    317:
1.25      ths       318:     sleep(3);
                    319:
1.24      ths       320:     while (time++ < REDUNDANCY*limit) {
1.26    ! ths       321:         if(randint(100) < percent) snprintf(tmp, sizeof(tmp), "%d;%d;0;", duration, weight);
        !           322:         else snprintf(tmp, sizeof(tmp), "%d;%d;0;", duration, 0);
1.24      ths       323:         swaplinks_sendtoany(swp, tmp, strlenn(tmp)+1, 0);
                    324:         swaplinks_get_neighbors(swp, nbrs, 4096);
                    325:         p = process_neighbor_list(nbrs);
                    326:         if(g) fprintf(g,"%d %d\n",p.i,p.o);
                    327:         clear(nbrs);
1.25      ths       328:         sleep(2);
1.24      ths       329:     }
                    330:
                    331:     if(g) fclose(g);
1.26    ! ths       332:     if(s) printf("%d DONE SENDING\n",num);
1.24      ths       333:     pthread_join(tid, NULL);
                    334:     if(s) fprintf(s,"%d SHUTTING DOWN\n",num);
                    335:
1.26    ! ths       336:     if(num == 1) system("echo \" \" | mail -s\"node one finished sending!\" ths22@cs.cornell.edu");
1.24      ths       337:
                    338:     return 0;
                    339: }
                    340:
                    341: int_pair process_neighbor_list(char *s) {
                    342:     int_pair p = { 0, 0 };
                    343:     char *t,*v;
                    344:
                    345:     if(!s) return p;
                    346:     while(*s) {
                    347:         if(*s++ == '_') {
                    348:             switch(*s++) {
                    349:                 case 'i': p.i++; break;
                    350:                 case 'o': p.o++; break;
                    351:                 default: exit(s[-1]);
                    352:             }
                    353:         }
                    354:     }
                    355:     return p;
                    356: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>
 

GForge.cis.cornell.edu is brought to you by

Cornell Computing and Information Science


Powered By GForge Collaborative Development Environment