Log In
New Account
  
 
Home My Page Project Tree Code Snippets Project Openings NUTSS
 
 
Summary Tracker Lists CVS Files
 

CVS | Administration

Annotation of libnutss/examples/slinks_variable_load.c, revision 1.25

1.24      ths         1: #include <stdio.h>
                      2: #include <stdlib.h>
                      3: #include <string.h>
1.25    ! ths         4: #include <math.h>
1.24      ths         5: #ifndef WIN32
                      6: #include <poll.h>
                      7: #include <sys/socket.h>
                      8: #endif
                      9: #include <pthread.h>
                     10: #include "../include/swaplinks.h"
                     11:
                     12: #ifdef WIN32
                     13: #define sleep(sec) Sleep((sec)*1000)
                     14: #define snprintf   _snprintf
                     15: #endif
                     16:
                     17: #define clear(a) a[0]=0
                     18: #define streq(a,b) !strcmp((a),(b))
                     19: #define strlenn(s) (s) ? strlen(s) : 0
                     20: #define randint(N) ((int)(rand() / (((double)RAND_MAX + 1) / (N))))
                     21: #define assert(p) if(!(p)) { printf("Assertion '%s' FAILED!\n", #p); exit(1); }
                     22: #define REDUNDANCY 5
                     23:
                     24: typedef struct flow {
                     25:     int duration;
                     26:     int weight;
                     27:     char source[256];
                     28:     struct flow *next, *last;
                     29: } flow_t;
                     30:
                     31: typedef struct int_pair_t { int i; int o; } int_pair;
                     32:
                     33: int_pair process_neighbor_list(char*);
                     34:
                     35: int num, peers, limit, maxload;
                     36: swaplinks_p swp;
                     37: FILE *f = NULL, *s = NULL, *g = NULL;
                     38:
                     39: void *recvthread(void *arg) {
                     40:     swaplinks_p swp = (swaplinks_p)arg;
                     41:     struct sockaddr_ns peer, coord;
                     42:     socklen_t len = sizeof(peer);
                     43:     flow_t *flows = NULL, *newflow = NULL, *ptr = NULL, *axe = NULL;
                     44:     char msg[64], tmp[128], buf[2048], greeting[1024];
1.25    ! ths        45:     int time = 0, load = 0, exts, delta, expiration, greeted = 0, killed = 0, i;
1.24      ths        46:
                     47:     clear(buf);
                     48:     while(time < limit) {
                     49:         memset(tmp,0,128);
                     50:         memset(&peer,0,sizeof(peer));
                     51:         if(swaplinks_recvfrom(swp, tmp, sizeof(tmp), 0, &peer, &len) > 0) {
                     52:             if(streq(tmp,"GREETING")) {
                     53:                 if(greeted) continue;
                     54:                 printf("%d Received greeting from coordinator\n",num);
                     55:                 memset(&coord, 0, sizeof(coord));
                     56:                 memcpy(&coord,&peer,sizeof(peer));
                     57:                 greeted = 1;
                     58:                 continue;
                     59:             }
1.25    ! ths        60:             if(streq(tmp,"SHUTDOWN")) {
        !            61:                 swaplinks_update_walk_length(swp,1);
        !            62:                 for(i = 0; i < peers*2; i++)
        !            63:                     swaplinks_sendtoany(swp,"SHUTDOWN",9,0);
        !            64:                 printf("%d Killed prematurely...\n");
        !            65:                 killed = 1;
        !            66:                 break;
        !            67:             }
1.24      ths        68:             time++; delta = 0;
                     69:             // parse new flow
                     70:             newflow = (flow_t*)calloc(1,sizeof(flow_t));
                     71:             newflow->duration = atoi(strtok(tmp,";"));
                     72:             newflow->weight = atoi(strtok(NULL,";"));
                     73:             exts = atoi(strtok(NULL,";"));
                     74:             strncpy(newflow->source,peer.user+4,sizeof(newflow->source));
                     75:             if(s) fprintf(s,"      %d recvd request #%d from %s - w:%d d:%d\n",
                     76:                           num,time,newflow->source,newflow->weight,newflow->duration);
                     77:             snprintf(buf,2048,"->%d:%d:%s<->",newflow->duration,newflow->weight,newflow->source);
                     78:             // run through flows, decrementing duration, cleaning any expired flows
                     79:             for(ptr = flows; ptr;) {
                     80:                 sprintf(buf,"%s%d:%d<->",buf,ptr->duration,ptr->weight);
                     81:                 expiration = 0; axe = NULL;
                     82:                 if(--ptr->duration == 0) {
                     83:                     axe = ptr;
                     84:                     load -= ptr->weight;
                     85:                     delta -= ptr->weight;
                     86:                     if(ptr->last && ptr->next) {
                     87:                         assert(flows != ptr);
                     88:                         ptr->last->next = ptr->next;
                     89:                         ptr->next->last = ptr->last;
                     90:                     }
                     91:                     else if(ptr->next) {
                     92:                         assert(flows == ptr);
                     93:                         ptr->next->last = NULL;
                     94:                         flows = ptr->next;
                     95:                     }
                     96:                     else if(ptr->last) {
                     97:                         assert(flows != ptr);
                     98:                         ptr->last->next = NULL;
                     99:                     }
                    100:                     else {
                    101:                         assert(flows == ptr);
                    102:                         flows = NULL;
                    103:                     }
                    104:                     expiration = 1;
                    105:                 }
                    106:                 ptr = ptr->next;
                    107:                 if(expiration) { free(axe); axe = NULL; }
                    108:             }
                    109:             sprintf(buf,"%s||\n",buf);
                    110:
                    111:             // if this flow will not put us overbudget, accept
                    112:             if(load+newflow->weight <= maxload) {
                    113:                 load += newflow->weight;
                    114:                 delta += newflow->weight;
                    115:                 newflow->next = flows;
                    116:                 if(flows) flows->last = newflow;
                    117:                 flows = newflow;
                    118:                 if(s) fprintf(s,"        %d accepted flow: %d\n",num,load);
                    119:             }
                    120:             // ADMISSION CONTROL
                    121:             // if we're overloaded, drop it if its been pushed too far,
                    122:             // otherwise pass on to a neighbor and mark it
                    123:             else if(exts < 5) {
                    124:                 fprintf(s,"        %d rejected flow, load too high: %d <= %d\n",num,maxload,load+newflow->weight);
                    125:                 swaplinks_update_walk_length(swp,1);
                    126:                 snprintf(msg,64,"%d;%d;%d;",newflow->duration,newflow->weight,exts+1);
                    127:                 swaplinks_sendtoany(swp,msg,strlenn(msg)+1,0);
                    128:                 swaplinks_update_walk_length(swp,peers);
                    129:             }
                    130:             else
                    131:                 fprintf(s,"        %d retired unserviceable request\n",num);
                    132:
                    133:             if(f) fprintf(f,"%d\n",load);
                    134: //          if(s) fprintf(s,"%s",buf);
                    135:         }
                    136:     }
1.25    ! ths       137:     if(s && !killed) fprintf(s,"%d DONE RECEIVING\n",num);
1.24      ths       138:
                    139:     // clean up flows list
                    140:     for(ptr = flows;ptr;) {axe = ptr; ptr = ptr->next; free(axe);}
                    141:
1.25    ! ths       142:     if(killed) {
        !           143:         if(f && f != stderr) { fclose(f); f = NULL; }
        !           144:         if(s && s != stdout) { fclose(s); s = NULL; }
        !           145:         pthread_exit(NULL);
        !           146:     }
        !           147:
1.24      ths       148:     while(!greeted) {
                    149:         memset(tmp,0,128);
                    150:         memset(&peer,0,sizeof(peer));
                    151:         if(swaplinks_recvfrom(swp, tmp, sizeof(tmp), 0, &peer, &len) > 0) {
                    152:             if(streq(tmp,"GREETING")) {
                    153:                 printf("%d Received late greeting from coordinator\n",num);
                    154:                 memset(&coord,0,sizeof(coord));
                    155:                 memcpy(&coord,&peer,sizeof(peer));
                    156:                 greeted = 1;
                    157:             }
                    158:         }
                    159:     }
                    160:
                    161:     // tell coordinator that were all done here
                    162:     if(swaplinks_sendto(swp, "DONE", 5, 0, &coord, sizeof(coord)) != 5)
                    163:         printf("%d cannot contact coordinator\n", num);
                    164:     else
                    165:         printf("%d sent alert to coordinator at %s. entering holding pattern...\n",num,coord.user+4);
                    166:
                    167:     // go into holding pattern to let other nodes catch up
                    168:     for(;;) {
                    169:         memset(tmp,0,128);
                    170:         if(swaplinks_recvfrom(swp, tmp, sizeof(tmp), 0, &peer, &len) > 0) {
                    171:             if(streq(tmp,"SHUTDOWN")) { printf("%d Received KILL signal from coordinator. Shutting down...\n",num); break; }
                    172:             else if(streq(tmp,"GREETING")) continue;
                    173:             newflow = (flow_t*)calloc(1,sizeof(flow_t));
                    174:             newflow->duration = atoi(strtok(tmp,";"));
                    175:             newflow->weight = atoi(strtok(NULL,";"));
                    176:             exts = atoi(strtok(NULL,";"));
                    177:             strncpy(newflow->source,peer.user+4,sizeof(newflow->source));
                    178:             if(exts < 5) {
                    179:                 swaplinks_update_walk_length(swp,1);
1.25    ! ths       180:                 // dont increase expiration here since it hit a dead node
        !           181:                 snprintf(msg,64,"%d;%d;%d;",newflow->duration,newflow->weight,exts);
1.24      ths       182:                 swaplinks_sendtoany(swp,msg,strlenn(msg)+1,0);
                    183:                 swaplinks_update_walk_length(swp,peers);
                    184:             }
                    185:         }
                    186:     }
                    187:
1.25    ! ths       188:     swaplinks_update_walk_length(swp,1);
        !           189:     for(i = 0; i < peers*2; i++)
        !           190:         swaplinks_sendtoany(swp,"SHUTDOWN",9,0);
        !           191:
1.24      ths       192:     if(f && f != stderr) { fclose(f); f = NULL; }
                    193:     if(s && s != stdout) { fclose(s); s = NULL; }
                    194:
                    195:     pthread_exit(NULL);
                    196: }
                    197:
                    198: int main(int argc, char **argv) {
                    199:     struct sockaddr_ns reg;
                    200:     char tmp[256], nbrs[4096];
                    201:     int time = 0, duration, weight, percent, sendcount = 0;
                    202:     int_pair p;
                    203:     pthread_t tid;
                    204:
                    205:     if(argc > 1) num = atoi(argv[1]);
1.25    ! ths       206:     if(num != 99 && argc > 7) {
1.24      ths       207:         peers = atoi(argv[2]);
                    208:         limit = atoi(argv[3]);
                    209:         percent = atoi(argv[4]);
                    210:         duration = atoi(argv[5]);
                    211:         weight = atoi(argv[6]);
                    212:         maxload = atoi(argv[7]);
                    213:     }
1.25    ! ths       214:     else if(num == 99 && argc > 4) {
        !           215:         peers = atoi(argv[3]);
        !           216:         percent = atoi(argv[4]);
        !           217:     }
1.24      ths       218:     else if(num != 99) {
1.25    ! ths       219:         printf("Usage:\n  slinksvariableload node_number peers_number time_limit percent_to_send flow_duration flow_weight max_load\n");
1.24      ths       220:         exit(1);
                    221:     }
1.25    ! ths       222:     else {
        !           223:        printf("Coordinator Usage:\n  slinksvariableload 99 num_nodes num_peers percent_to_finish\n");
        !           224:        exit(1);
        !           225:     }
1.24      ths       226:
                    227:     snprintf(tmp, sizeof(tmp), "test%d", num);
                    228:     nutss_config_set(NUTSS_CONFIG_USERNAME, tmp, strlenn(tmp));
                    229:     nutss_config_set(NUTSS_CONFIG_PROXYUSERNAME, tmp, strlenn(tmp));
                    230:     nutss_config_set(NUTSS_CONFIG_PROXYPASSWORD, tmp, strlenn(tmp));
                    231:     strncpy(tmp, "nutss.net", sizeof(tmp));
                    232:     nutss_config_set(NUTSS_CONFIG_DOMAINNAME, tmp, strlenn(tmp));
                    233:     strncpy(tmp, "sip.nutss.net:5060", sizeof(tmp));
                    234:     nutss_config_set(NUTSS_CONFIG_SIGPROXY, tmp, strlenn(tmp));
                    235:
                    236:     if(num != 99) {
                    237:         sprintf(tmp, "output/data_node%d.txt", num); f = fopen(tmp,"w");
                    238: //        sprintf(tmp, "output/output_node%d.txt", num); s = fopen(tmp,"w");
                    239:         sprintf(tmp, "output/nbrs_node%d.txt", num); g = fopen(tmp,"w");
                    240:         if (f == NULL) f = stderr;
                    241:         if (s == NULL) s = stdout;
                    242:     }
                    243:
                    244:     memset(tmp,0,256);
                    245:     memset(&reg, 0, sizeof(reg));
                    246:     reg.family = AF_NUTSS;
                    247:     strncpy(reg.user, "ths1", sizeof(reg.user));
                    248:     strncpy(reg.domain, "nutss.net", sizeof(reg.domain));
                    249:     strncpy(reg.service, "swaplinksd", sizeof(reg.service));
                    250:
                    251:     swaplinks_init();
                    252:     swp = swaplinks_new("cloud9", &reg, peers, peers+5);
                    253:     // let swaplinks get going.
                    254:     sleep(5);
                    255:
                    256:     // coordinator thread
                    257:     if(num == 99) {
1.25    ! ths       258:         int done = 0, nodes = atoi(argv[2]), limit = ceil((float)nodes*((float)percent/100)), i;
        !           259:         struct sockaddr_ns* peer_list = (struct sockaddr_ns*)calloc(limit,sizeof(struct sockaddr_ns));
1.24      ths       260:         socklen_t len = sizeof(struct sockaddr_ns);
                    261:
1.25    ! ths       262:         // let nodes wake up to recieve the greetings
        !           263:         sleep(15);
        !           264:
        !           265:         swaplinks_update_walk_length(swp,peers/2);
        !           266:       for(i = 0; i < nodes*2; i++) {
        !           267:             printf("Coordinator sending greeting %d...\n",i);
1.24      ths       268:             swaplinks_sendtoany(swp,"GREETING",9,0);
1.25    ! ths       269:             sleep(2);
1.24      ths       270:         }
                    271:
1.25    ! ths       272:         swaplinks_update_walk_length(swp,nodes/2);
        !           273:         while(done < limit) {
1.24      ths       274:             printf("Coordinator waiting for DONE signal...\n");
                    275:             if(swaplinks_recvfrom(swp, tmp, sizeof(tmp), 0, peer_list+done, &len) > 0 && streq(tmp,"DONE")) {
                    276:                 printf("Coordinator received DONE signal from %s. Total: %d finished nodes.\n",peer_list[done].user+4,done+1);
                    277:                 swaplinks_sendtoany(swp,"GREETING",9,0);
                    278:                 done++;
                    279:             }
                    280:         }
1.25    ! ths       281:         printf("%d nodes reported, now sending KILL signals\n",limit);
        !           282:         for(i = 0; i < limit; i++) {
1.24      ths       283:             printf("Sending kill to %s\n",peer_list[i].user+4);
                    284:             swaplinks_sendto(swp, "SHUTDOWN", 9, 0, peer_list+i, len);
                    285:         }
                    286:
1.25    ! ths       287:         // give them time to scramble
        !           288:         sleep(60);
        !           289:
        !           290:         // kill stragglers more violently
        !           291:         system("killall lt-slinksvariableload");
        !           292:         // clean up their mess
        !           293:         system("find output -size 0c | xargs rm");
        !           294:
1.24      ths       295:         return 0;
                    296:     }
                    297:
                    298:     pthread_create(&tid, NULL, recvthread, swp);
                    299:
1.25    ! ths       300:     sleep(3);
        !           301:
1.24      ths       302:     while (time++ < REDUNDANCY*limit) {
                    303:         if(randint(100) < percent) snprintf(tmp, sizeof(tmp), "%d;%d;0;", duration, 0);
                    304:         else snprintf(tmp, sizeof(tmp), "%d;%d;0;", duration, weight);
                    305:         swaplinks_sendtoany(swp, tmp, strlenn(tmp)+1, 0);
                    306:         swaplinks_get_neighbors(swp, nbrs, 4096);
                    307:         p = process_neighbor_list(nbrs);
                    308:         if(g) fprintf(g,"%d %d\n",p.i,p.o);
                    309:         clear(nbrs);
1.25    ! ths       310:         sleep(2);
1.24      ths       311:     }
                    312:
                    313:     if(g) fclose(g);
                    314:     if(s) fprintf(s,"%d DONE SENDING\n",num);
                    315:     pthread_join(tid, NULL);
                    316:     if(s) fprintf(s,"%d SHUTTING DOWN\n",num);
                    317:
                    318: //    if(num == 1) system("echo \" \" | mail -s\"node one finished sending!\" ths22@cs.cornell.edu");
                    319:
                    320:     return 0;
                    321: }
                    322:
                    323: int_pair process_neighbor_list(char *s) {
                    324:     int_pair p = { 0, 0 };
                    325:     char *t,*v;
                    326:
                    327:     if(!s) return p;
                    328:     while(*s) {
                    329:         if(*s++ == '_') {
                    330:             switch(*s++) {
                    331:                 case 'i': p.i++; break;
                    332:                 case 'o': p.o++; break;
                    333:                 default: exit(s[-1]);
                    334:             }
                    335:         }
                    336:     }
                    337:     return p;
                    338: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>
 

GForge.cis.cornell.edu is brought to you by

Cornell Computing and Information Science


Powered By GForge Collaborative Development Environment