Log In
New Account
  
 
Home My Page Project Tree Code Snippets Project Openings NUTSS
 
 
Summary Tracker Lists CVS Files
 

CVS | Administration

Annotation of libnutss/examples/slinks_variable_load.c, revision 1.27

1.24      ths         1: #include <stdio.h>
                      2: #include <stdlib.h>
                      3: #include <string.h>
1.27    ! ths         4: #include <time.h>
1.24      ths         5: #ifndef WIN32
                      6: #include <poll.h>
                      7: #include <sys/socket.h>
                      8: #endif
                      9: #include <pthread.h>
                     10: #include "../include/swaplinks.h"
                     11:
                     12: #ifdef WIN32
                     13: #define sleep(sec) Sleep((sec)*1000)
                     14: #define snprintf   _snprintf
                     15: #endif
                     16: #define clear(a) a[0]=0
                     17: #define streq(a,b) !strcmp((a),(b))
                     18: #define strlenn(s) (s) ? strlen(s) : 0
                     19: #define randint(N) ((int)(rand() / (((double)RAND_MAX + 1) / (N))))
                     20: #define assert(p) if(!(p)) { printf("Assertion '%s' FAILED!\n", #p); exit(1); }
                     21: #define REDUNDANCY 5
1.27    ! ths        22: #define PROPAGATION 300
        !            23: #define GRACE 3
        !            24: #define ALLOTMENT 400
        !            25: #define WALK_LENGTH 10
        !            26: #define MAX_FLOWS 256
        !            27: #define BUFFER_SIZE 128
        !            28: #define URI_LENGTH 256
1.24      ths        29:
                     30: typedef struct flow {
1.27    ! ths        31:     int duration, weight, exts;
        !            32:     char source[URI_LENGTH];
1.24      ths        33:     struct flow *next, *last;
                     34: } flow_t;
                     35:
                     36: typedef struct int_pair_t { int i; int o; } int_pair;
1.27    ! ths        37: typedef struct flow_spec_t { int weight; int expiration; } flow_spec;
1.24      ths        38:
                     39: int_pair process_neighbor_list(char*);
                     40:
1.27    ! ths        41: // Gobal information, used by multiple threads
        !            42: int num, nodes = 0, peers, capacity;
1.24      ths        43: swaplinks_p swp;
1.27    ! ths        44: FILE *f = NULL, *s = NULL;
1.24      ths        45:
1.27    ! ths        46: // Node's receiving thread that decides whether to accept an incoming
        !            47: // request and records all the necessary information
        !            48: void *recvthread(void *arg) {
1.26      ths        49:     struct sockaddr_ns peer;
                     50:     socklen_t len = sizeof(peer);
1.27    ! ths        51:     char msg[BUFFER_SIZE], tmp[BUFFER_SIZE];
        !            52:     int load = 0, bytes, msg_count, i;
        !            53:     time_t start,now;
        !            54:     flow_t newflow;
        !            55:     flow_spec flows[MAX_FLOWS];
1.26      ths        56:
1.27    ! ths        57:     start = time(NULL);
        !            58:     for(;;) {
1.24      ths        59:         memset(&peer,0,sizeof(peer));
1.27    ! ths        60:       sleep(1);
        !            61:         if((bytes=swaplinks_recvfrom(swp, msg, sizeof(msg), 0, &peer, &len)) > 0) {
        !            62:             msg[bytes] = 0;
        !            63:             // Flood SHUTDOWN to neighbors, then power down
        !            64:             if(streq(msg,"SHUTDOWN")) {
1.25      ths        65:                 swaplinks_update_walk_length(swp,1);
1.27    ! ths        66:                 for(i = 0; i < peers*2; i++) swaplinks_sendtoany(swp,"SHUTDOWN",9,0);
        !            67:                 printf("%d Recieved KILL signal...\n",num);
1.25      ths        68:                 break;
                     69:             }
1.27    ! ths        70:             // parse the new flow
        !            71:             memcpy(tmp,msg,sizeof(tmp));
        !            72:             newflow.duration = atoi(strtok(tmp,";"));
        !            73:             newflow.weight = atoi(strtok(NULL,";"));
        !            74:             newflow.exts = atoi(strtok(NULL,";"));
        !            75:             strncpy(newflow.source,peer.user+4,sizeof(newflow.source));
        !            76:             if(s) fprintf(s,"    %d recvd request #%d from %s - w:%d d:%d\n",
        !            77:                           num,++msg_count,newflow.source,newflow.weight,newflow.duration);
        !            78:
        !            79:             // retire any expired flows before processing
        !            80:             now = difftime(time(NULL),start);
        !            81:             for(i=0;i < MAX_FLOWS;i++) {
        !            82:                 if(0 < flows[i].expiration && flows[i].expiration < now) {
        !            83:                     load -= flows[i].weight;
        !            84:                     flows[i].weight = 0;
        !            85:                     flows[i].expiration = 0;
        !            86:                     if(s) fprintf(s,"        %d finished accomodating flow at %ld: %d\n",num,now,load);
        !            87:                 }
        !            88:             }
        !            89:             if(load+newflow.weight < capacity) {
        !            90:                 load += newflow.weight;
        !            91:                 for(i=0;i < MAX_FLOWS;i++) {
        !            92:                     if(flows[i].expiration == 0) {
        !            93:                         flows[i].weight = newflow.weight;
        !            94:                         flows[i].expiration = now + newflow.duration;
        !            95:                         if(s) fprintf(s,"        %d accepted request at %ld: %d\n",num,now,load);
        !            96:                         if(f) fprintf(f,"%ld %s\n",now,msg);
        !            97:                         break;
1.24      ths        98:                     }
                     99:                 }
1.27    ! ths       100:                 assert(i != MAX_FLOWS);
1.24      ths       101:             }
                    102:             // ADMISSION CONTROL
1.27    ! ths       103:             // do not accept the flow if it will overload this node.
        !           104:             // if the request been pushed too far, reject it, otherwise
        !           105:             // pass it on to a neighbor
        !           106:             else if(newflow.exts < GRACE) {
1.24      ths       107:                 swaplinks_update_walk_length(swp,1);
1.27    ! ths       108:                 snprintf(tmp,sizeof(tmp),"%d;%d;%d;",newflow.duration,newflow.weight,newflow.exts+1);
        !           109:                 swaplinks_sendtoany(swp,tmp,strlenn(tmp)+1,0);
        !           110:                 swaplinks_update_walk_length(swp,WALK_LENGTH);
        !           111:                 if(s) fprintf(s,"        %d rejected flow, load too high: %d <= %d\n",num,capacity,load+newflow.weight);
        !           112:                 if(f) fprintf(f,"%ld %s\n",now,"reject");
        !           113:             }
        !           114:             else {
        !           115:                 if(s) fprintf(s,"        %d retired unserviceable request\n",num);
        !           116:                 if(f) fprintf(f,"%ld %s\n",now,"retire");
1.24      ths       117:             }
                    118:         }
                    119:     }
1.27    ! ths       120:     // close output files to assure data is recorded
1.26      ths       121:     if(f && f != stderr) { fclose(f); f = NULL; }
                    122:     if(s && s != stdout) { fclose(s); s = NULL; }
1.25      ths       123:
1.24      ths       124:     pthread_exit(NULL);
                    125: }
                    126:
                    127: int main(int argc, char **argv) {
                    128:     struct sockaddr_ns reg;
1.27    ! ths       129:     char tmp[BUFFER_SIZE], nbrs[4096];
        !           130:     int time = 0, duration, weight, percent, limit, i;
        !           131:     FILE *g;
1.24      ths       132:     int_pair p;
1.27    ! ths       133:     pthread_t tid;
1.24      ths       134:
1.27    ! ths       135:     // parse args
1.24      ths       136:     if(argc > 1) num = atoi(argv[1]);
1.27    ! ths       137:     if(num != 99 && argc > 6) {
1.24      ths       138:         peers = atoi(argv[2]);
                    139:         limit = atoi(argv[3]);
                    140:         percent = atoi(argv[4]);
                    141:         duration = atoi(argv[5]);
                    142:         weight = atoi(argv[6]);
1.27    ! ths       143:         capacity = peers*ALLOTMENT;
1.24      ths       144:     }
1.25      ths       145:     else if(num == 99 && argc > 4) {
1.27    ! ths       146:         nodes = atoi(argv[2]);
        !           147:         // peer coordinator with more nodes
        !           148:         // to facilitate dispersion of kill signals
        !           149:         peers = atoi(argv[3]);
        !           150:         time = atoi(argv[4]);
1.25      ths       151:     }
1.24      ths       152:     else if(num != 99) {
1.27    ! ths       153:         printf("Usage:\n  slinksvariableload node_number num_peers num_messages percent_load flow_duration flow_weight capacity\n");
1.24      ths       154:         exit(1);
                    155:     }
1.25      ths       156:     else {
1.27    ! ths       157:        printf("Coordinator Usage:\n  slinksvariableload 99 num_nodes num_peers experiment_duration_minutes\n");
1.25      ths       158:        exit(1);
                    159:     }
1.24      ths       160:
1.27    ! ths       161:     // set all the important nutss fields
1.24      ths       162:     snprintf(tmp, sizeof(tmp), "test%d", num);
                    163:     nutss_config_set(NUTSS_CONFIG_USERNAME, tmp, strlenn(tmp));
                    164:     nutss_config_set(NUTSS_CONFIG_PROXYUSERNAME, tmp, strlenn(tmp));
                    165:     nutss_config_set(NUTSS_CONFIG_PROXYPASSWORD, tmp, strlenn(tmp));
                    166:     strncpy(tmp, "nutss.net", sizeof(tmp));
                    167:     nutss_config_set(NUTSS_CONFIG_DOMAINNAME, tmp, strlenn(tmp));
                    168:     strncpy(tmp, "sip.nutss.net:5060", sizeof(tmp));
                    169:     nutss_config_set(NUTSS_CONFIG_SIGPROXY, tmp, strlenn(tmp));
                    170:
1.27    ! ths       171:     // open output files
1.24      ths       172:     if(num != 99) {
                    173:         sprintf(tmp, "output/data_node%d.txt", num); f = fopen(tmp,"w");
1.27    ! ths       174: //        sprintf(tmp, "output/output_node%d.txt", num); s = fopen(tmp,"w");
        !           175: //        sprintf(tmp, "output/nbrs_node%d.txt", num); g = fopen(tmp,"w");
1.24      ths       176:         if (f == NULL) f = stderr;
                    177:         if (s == NULL) s = stdout;
                    178:     }
1.27    ! ths       179:     if(f) fprintf(f,"%d\n",peers);
1.24      ths       180:
1.27    ! ths       181:     // build registrar uri
1.24      ths       182:     memset(&reg, 0, sizeof(reg));
                    183:     reg.family = AF_NUTSS;
                    184:     strncpy(reg.user, "ths1", sizeof(reg.user));
                    185:     strncpy(reg.domain, "nutss.net", sizeof(reg.domain));
                    186:     strncpy(reg.service, "swaplinksd", sizeof(reg.service));
1.27    ! ths       187:     // start up swaplinks and let it get going
        !           188:     swaplinks_init(); swp = swaplinks_new("cloud9", &reg, peers, WALK_LENGTH); sleep(5);
1.24      ths       189:
                    190:     // coordinator thread
                    191:     if(num == 99) {
1.25      ths       192:         // let nodes wake up to recieve the greetings
1.27    ! ths       193:         for(;time > 0;time--) {
        !           194:             printf(" ---> %d MINUTE%s REMAIN%s\n",time,time==1?"":"S",time==1?"S":"");
        !           195:             sleep(60);
1.24      ths       196:         }
1.27    ! ths       197:         printf("Now sending KILL signals\n",limit);
        !           198:         for(i = 0; i < nodes*REDUNDANCY; i++) swaplinks_sendtoany(swp, "SHUTDOWN", 9, 0);
1.25      ths       199:         // give them time to scramble
1.27    ! ths       200:         printf("Waiting for KILL signals to propagate\n");
        !           201:         sleep(PROPAGATION);
1.26      ths       202:         // clean up their mess
1.27    ! ths       203:         printf("Forcibly removing remaining nodes and associated file\n");
1.26      ths       204:         system("find output -size 0c | xargs rm -rf");
1.27    ! ths       205:         // then kill stragglers more violently (including the coordinator and its sending thread as well)
1.25      ths       206:         system("killall lt-slinksvariableload");
1.24      ths       207:         return 0;
                    208:     }
                    209:
                    210:     pthread_create(&tid, NULL, recvthread, swp);
1.27    ! ths       211:     // give recieving threads a chance to wake up
        !           212:     sleep(5);
1.24      ths       213:
1.27    ! ths       214:     while (time++ < limit) {
        !           215:         // send a request for resources with probability percent/100
        !           216:         if(randint(100) < percent) {
        !           217:             snprintf(tmp,sizeof(tmp),"%d;%d;0;",duration,weight);
        !           218:             swaplinks_sendtoany(swp,tmp,strlenn(tmp)+1, 0);
        !           219:         }
        !           220:         // record number and type of neighbors
        !           221:       //  swaplinks_get_neighbors(swp,nbrs,4096);
        !           222:       //  p = process_neighbor_list(nbrs);
        !           223:       //  if(g) fprintf(g,"%d %d\n",p.i,p.o);
        !           224:       //  clear(nbrs);
        !           225:        // it might make more sense to sleep for more time between requests
        !           226:         sleep(3);
1.24      ths       227:     }
                    228:
1.27    ! ths       229:   //  if(g) fclose(g);
1.26      ths       230:     if(s) printf("%d DONE SENDING\n",num);
1.24      ths       231:     pthread_join(tid, NULL);
                    232:     if(s) fprintf(s,"%d SHUTTING DOWN\n",num);
                    233:
                    234:     return 0;
                    235: }
                    236:
                    237: int_pair process_neighbor_list(char *s) {
                    238:     int_pair p = { 0, 0 };
                    239:     char *t,*v;
                    240:
                    241:     if(!s) return p;
                    242:     while(*s) {
                    243:         if(*s++ == '_') {
                    244:             switch(*s++) {
                    245:                 case 'i': p.i++; break;
                    246:                 case 'o': p.o++; break;
                    247:                 default: exit(s[-1]);
                    248:             }
                    249:         }
                    250:     }
                    251:     return p;
                    252: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>
 

GForge.cis.cornell.edu is brought to you by

Cornell Computing and Information Science


Powered By GForge Collaborative Development Environment