Log In
New Account
  
 
Home My Page Project Tree Code Snippets Project Openings NUTSS
 
 
Summary Tracker Lists CVS Files
 

CVS | Administration

Diff for /libnutss/examples/slinks_variable_load.c between versions 1.23 and 1.24

version 1.23, 2007/07/15 14:47:20 version 1.24, 2007/07/17 02:17:30
Line 1 Line 1
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
#ifndef WIN32 #ifndef WIN32
#include <poll.h> #include <poll.h>
#include <sys/socket.h> #include <sys/socket.h>
#endif #endif
#include <pthread.h> #include <pthread.h>
#include "../include/swaplinks.h" #include "../include/swaplinks.h"
#ifdef WIN32 #ifdef WIN32
#define sleep(sec) Sleep((sec)*1000) #define sleep(sec) Sleep((sec)*1000)
#define snprintf   _snprintf #define snprintf   _snprintf
#endif #endif
#define clear(a) a[0]=0 #define clear(a) a[0]=0
#define strlenn(s) (s) ? strlen(s) : 0 #define streq(a,b) !strcmp((a),(b))
#define randint(N) ((int)(rand() / (((double)RAND_MAX + 1) / (N)))) #define strlenn(s) (s) ? strlen(s) : 0
#define assert(p) if(!(p)) { printf("Assertion '%s' FAILED!\n", #p); exit(1); } #define randint(N) ((int)(rand() / (((double)RAND_MAX + 1) / (N))))
#define REDUNDANCY7 #define assert(p) if(!(p)) { printf("Assertion '%s' FAILED!\n", #p); exit(1); }
#define REDUNDANCY5
typedef struct flow {
   int duration; typedef struct flow {
   int weight;    int duration;
   char source[256];    int weight;
   struct flow *next, *last;    char source[256];
} flow_t;    struct flow *next, *last;
} flow_t;
typedef struct int_pair_t { int i; int o; } int_pair;
typedef struct int_pair_t { int i; int o; } int_pair;
int_pair process_neighbor_list(char*);
int_pair process_neighbor_list(char*);
int num, peers, limit, maxload;
swaplinks_p swp; int num, peers, limit, maxload;
FILE *f = NULL, *s = NULL, *g = NULL; swaplinks_p swp;
FILE *f = NULL, *s = NULL, *g = NULL;
void *recvthread(void *arg) {
   swaplinks_p swp = (swaplinks_p)arg; void *recvthread(void *arg) {
   struct sockaddr_ns peer;    swaplinks_p swp = (swaplinks_p)arg;
   socklen_t len = sizeof(peer);    struct sockaddr_ns peer, coord;
   flow_t *flows = NULL, *newflow = NULL, *ptr = NULL, *axe = NULL;    socklen_t len = sizeof(peer);
   char msg[64], tmp[128], buf[2048];    flow_t *flows = NULL, *newflow = NULL, *ptr = NULL, *axe = NULL;
   int time = 0, load = 0, exts, delta, expiration;    char msg[64], tmp[128], buf[2048], greeting[1024];
   int time = 0, load = 0, exts, delta, expiration, greeted = 0;
   clear(buf);
   clear(buf);
   while(time < limit) {    while(time < limit) {
if(swaplinks_recvfrom(swp, tmp, sizeof(tmp), 0, &peer, &len) > 0) { memset(tmp,0,128);
time++; delta = 0;        memset(&peer,0,sizeof(peer));
     // parse new flow        if(swaplinks_recvfrom(swp, tmp, sizeof(tmp), 0, &peer, &len) > 0) {
     newflow = (flow_t*)calloc(1,sizeof(flow_t)); if(streq(tmp,"GREETING")) {
     newflow->duration = atoi(strtok(tmp,";"));                if(greeted) continue;
     newflow->weight = atoi(strtok(NULL,";"));                printf("%d Received greeting from coordinator\n",num);
     exts = atoi(strtok(NULL,";"));                memset(&coord, 0, sizeof(coord));
     strncpy(newflow->source,peer.user+4,sizeof(newflow->source));                memcpy(&coord,&peer,sizeof(peer));
     if(s) fprintf(s,"      %d recvd request #%d from %s - w:%d d:%d\n",                greeted = 1;
          num,time,newflow->source,newflow->weight,newflow->duration);                continue;
     snprintf(buf,2048,"->%d:%d:%s<->",newflow->duration,newflow->weight,newflow->source);            }
     // run through flows, decrementing duration, cleaning any expired flows            time++; delta = 0;
     for(ptr = flows; ptr;) {            // parse new flow
    sprintf(buf,"%s%d:%d<->",buf,ptr->duration,ptr->weight);            newflow = (flow_t*)calloc(1,sizeof(flow_t));
    expiration = 0; axe = NULL;            newflow->duration = atoi(strtok(tmp,";"));
    if(--ptr->duration == 0) {            newflow->weight = atoi(strtok(NULL,";"));
   axe = ptr;            exts = atoi(strtok(NULL,";"));
   load -= ptr->weight;            strncpy(newflow->source,peer.user+4,sizeof(newflow->source));
   delta -= ptr->weight;            if(s) fprintf(s,"      %d recvd request #%d from %s - w:%d d:%d\n",
   if(ptr->last && ptr->next) {                          num,time,newflow->source,newflow->weight,newflow->duration);
  assert(flows != ptr);            snprintf(buf,2048,"->%d:%d:%s<->",newflow->duration,newflow->weight,newflow->source);
  ptr->last->next = ptr->next;            // run through flows, decrementing duration, cleaning any expired flows
  ptr->next->last = ptr->last;            for(ptr = flows; ptr;) {
   }                sprintf(buf,"%s%d:%d<->",buf,ptr->duration,ptr->weight);
   else if(ptr->next) {                expiration = 0; axe = NULL;
  assert(flows == ptr);                if(--ptr->duration == 0) {
  ptr->next->last = NULL;                    axe = ptr;
  flows = ptr->next;                    load -= ptr->weight;
   }                    delta -= ptr->weight;
   else if(ptr->last) {                    if(ptr->last && ptr->next) {
  assert(flows != ptr);                        assert(flows != ptr);
  ptr->last->next = NULL;                        ptr->last->next = ptr->next;
   }                        ptr->next->last = ptr->last;
   else {                    }
  assert(flows == ptr);                    else if(ptr->next) {
  flows = NULL;                        assert(flows == ptr);
}                        ptr->next->last = NULL;
          if(s) fprintf(s,"%d EXPIRED FLOW from %s:d%d/w%d\n",num,ptr->source,ptr->duration,ptr->weight);                        flows = ptr->next;
   expiration = 1;                    }
    }                    else if(ptr->last) {
    ptr = ptr->next;                        assert(flows != ptr);
    if(expiration) { free(axe); axe = NULL; }                        ptr->last->next = NULL;
     }                    }
     sprintf(buf,"%s||\n",buf);                    else {
                       assert(flows == ptr);
            if(s) fprintf(s,"%d ACCEPTED FLOW from %s:d%d/w%d\n",num,newflow->source,newflow->duration,newflow->weight);                        flows = NULL;
     // if this flow will not put us overbudget, accept                    }
     if(load+newflow->weight <= maxload) { expiration = 1;
    load += newflow->weight;                }
    delta += newflow->weight;                ptr = ptr->next;
    newflow->next = flows;                if(expiration) { free(axe); axe = NULL; }
    if(flows) flows->last = newflow;            }
    flows = newflow;            sprintf(buf,"%s||\n",buf);
    if(s) fprintf(s,"        %d accepted flow: %d\n",num,load);
     }            // if this flow will not put us overbudget, accept
     // ADMISSION CONTROL            if(load+newflow->weight <= maxload) {
     // if we're overloaded, drop it if its been pushed too far,                load += newflow->weight;
     // otherwise pass on to a neighbor and mark it                delta += newflow->weight;
     else if(exts < 5) {                newflow->next = flows;
    fprintf(s,"        %d rejected flow, load too high: %d <= %d\n",num,maxload,load+newflow->weight);                if(flows) flows->last = newflow;
    swaplinks_update_walk_length(swp,1);                flows = newflow;
    snprintf(msg,64,"%d;%d;%d;",newflow->duration,newflow->weight,exts+1);                if(s) fprintf(s,"        %d accepted flow: %d\n",num,load);
    swaplinks_sendtoany(swp,msg,strlenn(msg)+1,0);            }
    swaplinks_update_walk_length(swp,peers);            // ADMISSION CONTROL
     }            // if we're overloaded, drop it if its been pushed too far,
     else            // otherwise pass on to a neighbor and mark it
    fprintf(s,"        %d retired unserviceable request\n",num);            else if(exts < 5) {
               fprintf(s,"        %d rejected flow, load too high: %d <= %d\n",num,maxload,load+newflow->weight);
     if(f) fprintf(f,"%d\n",load);                swaplinks_update_walk_length(swp,1);
//if(s) fprintf(s,"%s",buf);                snprintf(msg,64,"%d;%d;%d;",newflow->duration,newflow->weight,exts+1);
      }                swaplinks_sendtoany(swp,msg,strlenn(msg)+1,0);
       }                swaplinks_update_walk_length(swp,peers);
       if(s) fprintf(s,"%d DONE RECEIVING\n",num);            }
           else
       // clean up                fprintf(s,"        %d retired unserviceable request\n",num);
if(f && f != stderr) { fclose(f); f = NULL; }
       if(s && s != stdout) { fclose(s); s = NULL;}            if(f) fprintf(f,"%d\n",load);
       for(ptr = flows;ptr;) {axe = ptr; ptr = ptr->next; free(axe);} //if(s) fprintf(s,"%s",buf);
       }
       pthread_exit(NULL);    }
}    if(s) fprintf(s,"%d DONE RECEIVING\n",num);
int main(int argc, char **argv) {    // clean upflows list
   struct sockaddr_ns reg;    for(ptr = flows;ptr;) {axe = ptr; ptr = ptr->next; free(axe);}
   char tmp[256], nbrs[4096];
   int time = 0, duration, weight, percent, sendcount = 0;    while(!greeted) {
   int_pair p;        memset(tmp,0,128);
   pthread_t tid;        memset(&peer,0,sizeof(peer));
       if(swaplinks_recvfrom(swp, tmp, sizeof(tmp), 0, &peer, &len) > 0) {
   if(argc >7) {            if(streq(tmp,"GREETING")) {
       num = atoi(argv[1]);                printf("%d Received late greeting from coordinator\n",num);
peers = atoi(argv[2]);                memset(&coord,0,sizeof(coord));
       limit = atoi(argv[3]);                memcpy(&coord,&peer,sizeof(peer));
       percent = atoi(argv[4]);                greeted = 1;
       duration = atoi(argv[5]);            }
       weight = atoi(argv[6]);        }
       maxload = atoi(argv[7]);    }
   }
   else{    // tell coordinator that were all done here
       printf("Usage:\n  slinks_variable_load node_number peers_number time_limit percent_to_send flow_duration flow_weight max_load\n");    if(swaplinks_sendto(swp, "DONE", 5, 0, &coord, sizeof(coord)) != 5)
       exit(1);        printf("%d cannot contact coordinator\n", num);
   }    else
       printf("%d sent alert to coordinator at %s. entering holding pattern...\n",num,coord.user+4);
   snprintf(tmp, sizeof(tmp), "test%d", num);
   nutss_config_set(NUTSS_CONFIG_USERNAME, tmp, strlenn(tmp));    // go into holding pattern to let other nodes catch up
   nutss_config_set(NUTSS_CONFIG_PROXYUSERNAME, tmp, strlenn(tmp));    for(;;) {
   nutss_config_set(NUTSS_CONFIG_PROXYPASSWORD, tmp, strlenn(tmp));        memset(tmp,0,128);
   strncpy(tmp, "nutss.net", sizeof(tmp));        if(swaplinks_recvfrom(swp, tmp, sizeof(tmp), 0, &peer, &len) > 0) {
   nutss_config_set(NUTSS_CONFIG_DOMAINNAME, tmp, strlenn(tmp));            printf("%d is holding pattern: %s\n",num,tmp);
   strncpy(tmp, "sip.nutss.net:5060", sizeof(tmp));            if(streq(tmp,"SHUTDOWN")) { printf("%d Received KILL signal from coordinator. Shutting down...\n",num); break; }
   nutss_config_set(NUTSS_CONFIG_SIGPROXY, tmp, strlenn(tmp));            else if(streq(tmp,"GREETING")) continue;
           newflow = (flow_t*)calloc(1,sizeof(flow_t));
sprintf(tmp, "output/data_node%d.txt", num); f = fopen(tmp,"w");            newflow->duration = atoi(strtok(tmp,";"));
//sprintf(tmp, "output/output_node%d.txt", num); s = fopen(tmp,"w");            newflow->weight = atoi(strtok(NULL,";"));
   sprintf(tmp, "output/nbrs_node%d.txt", num); g = fopen(tmp,"w");            exts = atoi(strtok(NULL,";"));
   if (f == NULL) f = stderr;            strncpy(newflow->source,peer.user+4,sizeof(newflow->source));
   if (s == NULL) s = stdout;            if(exts < 5) {
               swaplinks_update_walk_length(swp,1);
   memset(&reg, 0, sizeof(reg));                snprintf(msg,64,"%d;%d;%d;",newflow->duration,newflow->weight,exts+1);
   reg.family = AF_NUTSS;                swaplinks_sendtoany(swp,msg,strlenn(msg)+1,0);
   strncpy(reg.user, "ths1", sizeof(reg.user));                swaplinks_update_walk_length(swp,peers);
   strncpy(reg.domain, "nutss.net", sizeof(reg.domain));            }
   strncpy(reg.service, "swaplinksd", sizeof(reg.service));        }
   }
   swaplinks_init();
   if(f && f != stderr) { fclose(f); f = NULL; }
//    sleep(randint(5));    if(s && s != stdout) { fclose(s); s = NULL;}
   swp = swaplinks_new("cloud9", &reg, peers, peers);    pthread_exit(NULL);
}
   // let swaplinks get going.
   sleep(10); int main(int argc, char **argv) {
   struct sockaddr_ns reg;
   pthread_create(&tid, NULL, recvthread, swp);    char tmp[256], nbrs[4096];
   int time = 0, duration, weight, percent, sendcount = 0;
//    sleep(2);    int_pair p;
   pthread_t tid;
   while (time++ < REDUNDANCY*limit) {
       if(randint(100) < percent) snprintf(tmp, sizeof(tmp), "%d;%d;0;", duration, 0);    if(argc >1) num = atoi(argv[1]);
       else snprintf(tmp, sizeof(tmp), "%d;%d;0;", duration, weight); if(argc > 7) {
       swaplinks_sendtoany(swp, tmp, strlenn(tmp)+1, 0);        peers = atoi(argv[2]);
//      if(s) fprintf(s,"   %d sent request %d\n",num,time);        limit = atoi(argv[3]);
       swaplinks_get_neighbors(swp, nbrs, 4096);        percent = atoi(argv[4]);
       p = process_neighbor_list(nbrs);        duration = atoi(argv[5]);
       if(g) fprintf(g,"%d %d\n",p.i,p.o);        weight = atoi(argv[6]);
       clear(nbrs);        maxload = atoi(argv[7]);
       sleep(1);    }
   }    elseif(num != 99) {
       printf("Usage:\n  slinks_variable_load node_number peers_number time_limit percent_to_send flow_duration flow_weight max_load\n");
   if(g) fclose(g);        exit(1);
   if(s) fprintf(s,"%d DONE SENDING\n",num);    }
   pthread_join(tid, NULL);
   if(s) fprintf(s,"%d SHUTTING DOWN\n",num);    snprintf(tmp, sizeof(tmp), "test%d", num);
   nutss_config_set(NUTSS_CONFIG_USERNAME, tmp, strlenn(tmp));
//    if(num == 1) system("echo \" \" | mail -s\"node one finished sending!\" ths22@cs.cornell.edu");    nutss_config_set(NUTSS_CONFIG_PROXYUSERNAME, tmp, strlenn(tmp));
   nutss_config_set(NUTSS_CONFIG_PROXYPASSWORD, tmp, strlenn(tmp));
   return 0;    strncpy(tmp, "nutss.net", sizeof(tmp));
}    nutss_config_set(NUTSS_CONFIG_DOMAINNAME, tmp, strlenn(tmp));
   strncpy(tmp, "sip.nutss.net:5060", sizeof(tmp));
int_pair process_neighbor_list(char *s) {    nutss_config_set(NUTSS_CONFIG_SIGPROXY, tmp, strlenn(tmp));
   int_pair p = { 0, 0 };
   char *t,*v; if(num != 99) {
       sprintf(tmp, "output/data_node%d.txt", num); f = fopen(tmp,"w");
   if(!s) return p; //sprintf(tmp, "output/output_node%d.txt", num); s = fopen(tmp,"w");
   while(*s) {        sprintf(tmp, "output/nbrs_node%d.txt", num); g = fopen(tmp,"w");
       if(*s++ == '_') {        if (f == NULL) f = stderr;
           switch(*s++) {        if (s == NULL) s = stdout;
               case 'i': p.i++; break;    }
               case 'o': p.o++; break;
               default: exit(s[-1]);    memset(tmp,0,256);
           }    memset(&reg, 0, sizeof(reg));
       }    reg.family = AF_NUTSS;
   }    strncpy(reg.user, "ths1", sizeof(reg.user));
   return p;    strncpy(reg.domain, "nutss.net", sizeof(reg.domain));
/*    strncpy(reg.service, "swaplinksd", sizeof(reg.service));
   if(s) {
       t = strtok(s,",");    swaplinks_init();
       do { swp = swaplinks_new("cloud9", &reg, peers, peers+5);
           s += strlenn(t)+1;    // let swaplinks get going.
           v = strtok(t,":");    sleep(5);
           t += strlenn(v)+1;
           if(t && *t == 'i') p.i++;    // coordinator thread
           else if(t && *t == 'o') p.o++;    if(num == 99) {
       } while(t = strtok(s,","));        int done = 0, nodes = atoi(argv[2]), i;
   }        struct sockaddr_ns* peer_list = (struct sockaddr_ns*)calloc(nodes,sizeof(struct sockaddr_ns));
*/        socklen_t len = sizeof(struct sockaddr_ns);
}
         swaplinks_update_walk_length(swp,nodes/2);
         for(i = 0; i < nodes*REDUNDANCY; i++) {
             printf("Coordinator sending greeting...\n");
             swaplinks_sendtoany(swp,"GREETING",9,0);
             sleep(1);
         }
 
         while(done < nodes) {
             printf("Coordinator waiting for DONE signal...\n");
             if(swaplinks_recvfrom(swp, tmp, sizeof(tmp), 0, peer_list+done, &len) > 0 && streq(tmp,"DONE")) {
                 printf("Coordinator received DONE signal from %s. Total: %d finished nodes.\n",peer_list[done].user+4,done+1);
                 swaplinks_sendtoany(swp,"GREETING",9,0);
                 done++;
             }
         }
         printf("All nodes reported, now sending KILL signals\n");
         for(i = 0; i < nodes; i++) {
             printf("Sending kill to %s\n",peer_list[i].user+4);
             swaplinks_sendto(swp, "SHUTDOWN", 9, 0, peer_list+i, len);
         }
 
         return 0;
     }
 
     pthread_create(&tid, NULL, recvthread, swp);
 
     while (time++ < REDUNDANCY*limit) {
         if(randint(100) < percent) snprintf(tmp, sizeof(tmp), "%d;%d;0;", duration, 0);
         else snprintf(tmp, sizeof(tmp), "%d;%d;0;", duration, weight);
         swaplinks_sendtoany(swp, tmp, strlenn(tmp)+1, 0);
         swaplinks_get_neighbors(swp, nbrs, 4096);
         p = process_neighbor_list(nbrs);
         if(g) fprintf(g,"%d %d\n",p.i,p.o);
         clear(nbrs);
         sleep(1);
     }
 
     if(g) fclose(g);
     if(s) fprintf(s,"%d DONE SENDING\n",num);
     pthread_join(tid, NULL);
     if(s) fprintf(s,"%d SHUTTING DOWN\n",num);
 
 //    if(num == 1) system("echo \" \" | mail -s\"node one finished sending!\" ths22@cs.cornell.edu");
 
     return 0;
 }
 
 int_pair process_neighbor_list(char *s) {
     int_pair p = { 0, 0 };
     char *t,*v;
 
     if(!s) return p;
     while(*s) {
         if(*s++ == '_') {
             switch(*s++) {
                 case 'i': p.i++; break;
                 case 'o': p.o++; break;
                 default: exit(s[-1]);
             }
         }
     }
     return p;
 }

Removed from v.1.23  
changed lines
  Added in v.1.24


FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>
 

GForge.cis.cornell.edu is brought to you by

Cornell Computing and Information Science


Powered By GForge Collaborative Development Environment