Log In
New Account
  
 
Home My Page Project Tree Code Snippets Project Openings NUTSS
 
 
Summary Tracker Lists CVS Files
 

CVS | Administration

Annotation of libnutss/examples/slinks_variable_load.c, revision 1.24

1.24    ! ths         1: #include <stdio.h>
        !             2: #include <stdlib.h>
        !             3: #include <string.h>
        !             4: #ifndef WIN32
        !             5: #include <poll.h>
        !             6: #include <sys/socket.h>
        !             7: #endif
        !             8: #include <pthread.h>
        !             9: #include "../include/swaplinks.h"
        !            10:
        !            11: #ifdef WIN32
        !            12: #define sleep(sec) Sleep((sec)*1000)
        !            13: #define snprintf   _snprintf
        !            14: #endif
        !            15:
        !            16: #define clear(a) a[0]=0
        !            17: #define streq(a,b) !strcmp((a),(b))
        !            18: #define strlenn(s) (s) ? strlen(s) : 0
        !            19: #define randint(N) ((int)(rand() / (((double)RAND_MAX + 1) / (N))))
        !            20: #define assert(p) if(!(p)) { printf("Assertion '%s' FAILED!\n", #p); exit(1); }
        !            21: #define REDUNDANCY 5
        !            22:
        !            23: typedef struct flow {
        !            24:     int duration;
        !            25:     int weight;
        !            26:     char source[256];
        !            27:     struct flow *next, *last;
        !            28: } flow_t;
        !            29:
        !            30: typedef struct int_pair_t { int i; int o; } int_pair;
        !            31:
        !            32: int_pair process_neighbor_list(char*);
        !            33:
        !            34: int num, peers, limit, maxload;
        !            35: swaplinks_p swp;
        !            36: FILE *f = NULL, *s = NULL, *g = NULL;
        !            37:
        !            38: void *recvthread(void *arg) {
        !            39:     swaplinks_p swp = (swaplinks_p)arg;
        !            40:     struct sockaddr_ns peer, coord;
        !            41:     socklen_t len = sizeof(peer);
        !            42:     flow_t *flows = NULL, *newflow = NULL, *ptr = NULL, *axe = NULL;
        !            43:     char msg[64], tmp[128], buf[2048], greeting[1024];
        !            44:     int time = 0, load = 0, exts, delta, expiration, greeted = 0;
        !            45:
        !            46:     clear(buf);
        !            47:     while(time < limit) {
        !            48:         memset(tmp,0,128);
        !            49:         memset(&peer,0,sizeof(peer));
        !            50:         if(swaplinks_recvfrom(swp, tmp, sizeof(tmp), 0, &peer, &len) > 0) {
        !            51:             if(streq(tmp,"GREETING")) {
        !            52:                 if(greeted) continue;
        !            53:                 printf("%d Received greeting from coordinator\n",num);
        !            54:                 memset(&coord, 0, sizeof(coord));
        !            55:                 memcpy(&coord,&peer,sizeof(peer));
        !            56:                 greeted = 1;
        !            57:                 continue;
        !            58:             }
        !            59:             time++; delta = 0;
        !            60:             // parse new flow
        !            61:             newflow = (flow_t*)calloc(1,sizeof(flow_t));
        !            62:             newflow->duration = atoi(strtok(tmp,";"));
        !            63:             newflow->weight = atoi(strtok(NULL,";"));
        !            64:             exts = atoi(strtok(NULL,";"));
        !            65:             strncpy(newflow->source,peer.user+4,sizeof(newflow->source));
        !            66:             if(s) fprintf(s,"      %d recvd request #%d from %s - w:%d d:%d\n",
        !            67:                           num,time,newflow->source,newflow->weight,newflow->duration);
        !            68:             snprintf(buf,2048,"->%d:%d:%s<->",newflow->duration,newflow->weight,newflow->source);
        !            69:             // run through flows, decrementing duration, cleaning any expired flows
        !            70:             for(ptr = flows; ptr;) {
        !            71:                 sprintf(buf,"%s%d:%d<->",buf,ptr->duration,ptr->weight);
        !            72:                 expiration = 0; axe = NULL;
        !            73:                 if(--ptr->duration == 0) {
        !            74:                     axe = ptr;
        !            75:                     load -= ptr->weight;
        !            76:                     delta -= ptr->weight;
        !            77:                     if(ptr->last && ptr->next) {
        !            78:                         assert(flows != ptr);
        !            79:                         ptr->last->next = ptr->next;
        !            80:                         ptr->next->last = ptr->last;
        !            81:                     }
        !            82:                     else if(ptr->next) {
        !            83:                         assert(flows == ptr);
        !            84:                         ptr->next->last = NULL;
        !            85:                         flows = ptr->next;
        !            86:                     }
        !            87:                     else if(ptr->last) {
        !            88:                         assert(flows != ptr);
        !            89:                         ptr->last->next = NULL;
        !            90:                     }
        !            91:                     else {
        !            92:                         assert(flows == ptr);
        !            93:                         flows = NULL;
        !            94:                     }
        !            95:                     expiration = 1;
        !            96:                 }
        !            97:                 ptr = ptr->next;
        !            98:                 if(expiration) { free(axe); axe = NULL; }
        !            99:             }
        !           100:             sprintf(buf,"%s||\n",buf);
        !           101:
        !           102:             // if this flow will not put us overbudget, accept
        !           103:             if(load+newflow->weight <= maxload) {
        !           104:                 load += newflow->weight;
        !           105:                 delta += newflow->weight;
        !           106:                 newflow->next = flows;
        !           107:                 if(flows) flows->last = newflow;
        !           108:                 flows = newflow;
        !           109:                 if(s) fprintf(s,"        %d accepted flow: %d\n",num,load);
        !           110:             }
        !           111:             // ADMISSION CONTROL
        !           112:             // if we're overloaded, drop it if its been pushed too far,
        !           113:             // otherwise pass on to a neighbor and mark it
        !           114:             else if(exts < 5) {
        !           115:                 fprintf(s,"        %d rejected flow, load too high: %d <= %d\n",num,maxload,load+newflow->weight);
        !           116:                 swaplinks_update_walk_length(swp,1);
        !           117:                 snprintf(msg,64,"%d;%d;%d;",newflow->duration,newflow->weight,exts+1);
        !           118:                 swaplinks_sendtoany(swp,msg,strlenn(msg)+1,0);
        !           119:                 swaplinks_update_walk_length(swp,peers);
        !           120:             }
        !           121:             else
        !           122:                 fprintf(s,"        %d retired unserviceable request\n",num);
        !           123:
        !           124:             if(f) fprintf(f,"%d\n",load);
        !           125: //          if(s) fprintf(s,"%s",buf);
        !           126:         }
        !           127:     }
        !           128:     if(s) fprintf(s,"%d DONE RECEIVING\n",num);
        !           129:
        !           130:     // clean up flows list
        !           131:     for(ptr = flows;ptr;) {axe = ptr; ptr = ptr->next; free(axe);}
        !           132:
        !           133:     while(!greeted) {
        !           134:         memset(tmp,0,128);
        !           135:         memset(&peer,0,sizeof(peer));
        !           136:         if(swaplinks_recvfrom(swp, tmp, sizeof(tmp), 0, &peer, &len) > 0) {
        !           137:             if(streq(tmp,"GREETING")) {
        !           138:                 printf("%d Received late greeting from coordinator\n",num);
        !           139:                 memset(&coord,0,sizeof(coord));
        !           140:                 memcpy(&coord,&peer,sizeof(peer));
        !           141:                 greeted = 1;
        !           142:             }
        !           143:         }
        !           144:     }
        !           145:
        !           146:     // tell coordinator that were all done here
        !           147:     if(swaplinks_sendto(swp, "DONE", 5, 0, &coord, sizeof(coord)) != 5)
        !           148:         printf("%d cannot contact coordinator\n", num);
        !           149:     else
        !           150:         printf("%d sent alert to coordinator at %s. entering holding pattern...\n",num,coord.user+4);
        !           151:
        !           152:     // go into holding pattern to let other nodes catch up
        !           153:     for(;;) {
        !           154:         memset(tmp,0,128);
        !           155:         if(swaplinks_recvfrom(swp, tmp, sizeof(tmp), 0, &peer, &len) > 0) {
        !           156:             printf("%d is holding pattern: %s\n",num,tmp);
        !           157:             if(streq(tmp,"SHUTDOWN")) { printf("%d Received KILL signal from coordinator. Shutting down...\n",num); break; }
        !           158:             else if(streq(tmp,"GREETING")) continue;
        !           159:             newflow = (flow_t*)calloc(1,sizeof(flow_t));
        !           160:             newflow->duration = atoi(strtok(tmp,";"));
        !           161:             newflow->weight = atoi(strtok(NULL,";"));
        !           162:             exts = atoi(strtok(NULL,";"));
        !           163:             strncpy(newflow->source,peer.user+4,sizeof(newflow->source));
        !           164:             if(exts < 5) {
        !           165:                 swaplinks_update_walk_length(swp,1);
        !           166:                 snprintf(msg,64,"%d;%d;%d;",newflow->duration,newflow->weight,exts+1);
        !           167:                 swaplinks_sendtoany(swp,msg,strlenn(msg)+1,0);
        !           168:                 swaplinks_update_walk_length(swp,peers);
        !           169:             }
        !           170:         }
        !           171:     }
        !           172:
        !           173:     if(f && f != stderr) { fclose(f); f = NULL; }
        !           174:     if(s && s != stdout) { fclose(s); s = NULL; }
        !           175:
        !           176:     pthread_exit(NULL);
        !           177: }
        !           178:
        !           179: int main(int argc, char **argv) {
        !           180:     struct sockaddr_ns reg;
        !           181:     char tmp[256], nbrs[4096];
        !           182:     int time = 0, duration, weight, percent, sendcount = 0;
        !           183:     int_pair p;
        !           184:     pthread_t tid;
        !           185:
        !           186:     if(argc > 1) num = atoi(argv[1]);
        !           187:     if(argc > 7) {
        !           188:         peers = atoi(argv[2]);
        !           189:         limit = atoi(argv[3]);
        !           190:         percent = atoi(argv[4]);
        !           191:         duration = atoi(argv[5]);
        !           192:         weight = atoi(argv[6]);
        !           193:         maxload = atoi(argv[7]);
        !           194:     }
        !           195:     else if(num != 99) {
        !           196:         printf("Usage:\n  slinks_variable_load node_number peers_number time_limit percent_to_send flow_duration flow_weight max_load\n");
        !           197:         exit(1);
        !           198:     }
        !           199:
        !           200:     snprintf(tmp, sizeof(tmp), "test%d", num);
        !           201:     nutss_config_set(NUTSS_CONFIG_USERNAME, tmp, strlenn(tmp));
        !           202:     nutss_config_set(NUTSS_CONFIG_PROXYUSERNAME, tmp, strlenn(tmp));
        !           203:     nutss_config_set(NUTSS_CONFIG_PROXYPASSWORD, tmp, strlenn(tmp));
        !           204:     strncpy(tmp, "nutss.net", sizeof(tmp));
        !           205:     nutss_config_set(NUTSS_CONFIG_DOMAINNAME, tmp, strlenn(tmp));
        !           206:     strncpy(tmp, "sip.nutss.net:5060", sizeof(tmp));
        !           207:     nutss_config_set(NUTSS_CONFIG_SIGPROXY, tmp, strlenn(tmp));
        !           208:
        !           209:     if(num != 99) {
        !           210:         sprintf(tmp, "output/data_node%d.txt", num); f = fopen(tmp,"w");
        !           211: //        sprintf(tmp, "output/output_node%d.txt", num); s = fopen(tmp,"w");
        !           212:         sprintf(tmp, "output/nbrs_node%d.txt", num); g = fopen(tmp,"w");
        !           213:         if (f == NULL) f = stderr;
        !           214:         if (s == NULL) s = stdout;
        !           215:     }
        !           216:
        !           217:     memset(tmp,0,256);
        !           218:     memset(&reg, 0, sizeof(reg));
        !           219:     reg.family = AF_NUTSS;
        !           220:     strncpy(reg.user, "ths1", sizeof(reg.user));
        !           221:     strncpy(reg.domain, "nutss.net", sizeof(reg.domain));
        !           222:     strncpy(reg.service, "swaplinksd", sizeof(reg.service));
        !           223:
        !           224:     swaplinks_init();
        !           225:     swp = swaplinks_new("cloud9", &reg, peers, peers+5);
        !           226:     // let swaplinks get going.
        !           227:     sleep(5);
        !           228:
        !           229:     // coordinator thread
        !           230:     if(num == 99) {
        !           231:         int done = 0, nodes = atoi(argv[2]), i;
        !           232:         struct sockaddr_ns* peer_list = (struct sockaddr_ns*)calloc(nodes,sizeof(struct sockaddr_ns));
        !           233:         socklen_t len = sizeof(struct sockaddr_ns);
        !           234:
        !           235:         swaplinks_update_walk_length(swp,nodes/2);
        !           236:       for(i = 0; i < nodes*REDUNDANCY; i++) {
        !           237:             printf("Coordinator sending greeting...\n");
        !           238:             swaplinks_sendtoany(swp,"GREETING",9,0);
        !           239:             sleep(1);
        !           240:         }
        !           241:
        !           242:         while(done < nodes) {
        !           243:             printf("Coordinator waiting for DONE signal...\n");
        !           244:             if(swaplinks_recvfrom(swp, tmp, sizeof(tmp), 0, peer_list+done, &len) > 0 && streq(tmp,"DONE")) {
        !           245:                 printf("Coordinator received DONE signal from %s. Total: %d finished nodes.\n",peer_list[done].user+4,done+1);
        !           246:                 swaplinks_sendtoany(swp,"GREETING",9,0);
        !           247:                 done++;
        !           248:             }
        !           249:         }
        !           250:         printf("All nodes reported, now sending KILL signals\n");
        !           251:         for(i = 0; i < nodes; i++) {
        !           252:             printf("Sending kill to %s\n",peer_list[i].user+4);
        !           253:             swaplinks_sendto(swp, "SHUTDOWN", 9, 0, peer_list+i, len);
        !           254:         }
        !           255:
        !           256:         return 0;
        !           257:     }
        !           258:
        !           259:     pthread_create(&tid, NULL, recvthread, swp);
        !           260:
        !           261:     while (time++ < REDUNDANCY*limit) {
        !           262:         if(randint(100) < percent) snprintf(tmp, sizeof(tmp), "%d;%d;0;", duration, 0);
        !           263:         else snprintf(tmp, sizeof(tmp), "%d;%d;0;", duration, weight);
        !           264:         swaplinks_sendtoany(swp, tmp, strlenn(tmp)+1, 0);
        !           265:         swaplinks_get_neighbors(swp, nbrs, 4096);
        !           266:         p = process_neighbor_list(nbrs);
        !           267:         if(g) fprintf(g,"%d %d\n",p.i,p.o);
        !           268:         clear(nbrs);
        !           269:         sleep(1);
        !           270:     }
        !           271:
        !           272:     if(g) fclose(g);
        !           273:     if(s) fprintf(s,"%d DONE SENDING\n",num);
        !           274:     pthread_join(tid, NULL);
        !           275:     if(s) fprintf(s,"%d SHUTTING DOWN\n",num);
        !           276:
        !           277: //    if(num == 1) system("echo \" \" | mail -s\"node one finished sending!\" ths22@cs.cornell.edu");
        !           278:
        !           279:     return 0;
        !           280: }
        !           281:
        !           282: int_pair process_neighbor_list(char *s) {
        !           283:     int_pair p = { 0, 0 };
        !           284:     char *t,*v;
        !           285:
        !           286:     if(!s) return p;
        !           287:     while(*s) {
        !           288:         if(*s++ == '_') {
        !           289:             switch(*s++) {
        !           290:                 case 'i': p.i++; break;
        !           291:                 case 'o': p.o++; break;
        !           292:                 default: exit(s[-1]);
        !           293:             }
        !           294:         }
        !           295:     }
        !           296:     return p;
        !           297: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>
 

GForge.cis.cornell.edu is brought to you by

Cornell Computing and Information Science


Powered By GForge Collaborative Development Environment