Log In
New Account
  
 
Home My Page Project Tree Code Snippets Project Openings NUTSS
 
 
Summary Tracker Lists CVS Files
 

CVS | Administration

Diff for /libnutss/examples/slinks_variable_load.c between versions 1.12 and 1.13

version 1.12, 2007/06/29 17:19:44 version 1.13, 2007/06/30 20:57:28
Line 1 Line 1
#include <stdio.h>#include <stdio.h>
#include <stdlib.h>#include <stdlib.h>
#include <string.h>#include <string.h>
#ifndef WIN32#ifndef WIN32
#include <poll.h>#include <poll.h>
#include <sys/socket.h>#include <sys/socket.h>
#endif#endif
#include <pthread.h>#include <pthread.h>
#include "../include/swaplinks.h"#include "../include/swaplinks.h"
#ifdef WIN32#ifdef WIN32
#define sleep(sec) Sleep((sec)*1000)#define sleep(sec) Sleep((sec)*1000)
#define snprintf   _snprintf#define snprintf   _snprintf
#endif#endif
#define assert(p) if(!(p)) { printf("Assertion FAILED!\n"); exit(1); }#define assert(p) if(!(p)) { printf("Assertion FAILED!\n"); exit(1); }
typedef struct flow {typedef struct flow {
   int duration;    int duration;
   int weight;    int weight;
   char source[256];    char source[256];
   struct flow *next, *last;    struct flow *next, *last;
} flow_t;} flow_t;
int num, peers, limit, maxload;int num, peers, limit, maxload;
swaplinks_p swp;swaplinks_p swp;
FILE *f = NULL, *s = NULL;FILE *f = NULL, *s = NULL;
void *recvthread(void *arg) {void *recvthread(void *arg) {
   swaplinks_p swp = (swaplinks_p)arg;    swaplinks_p swp = (swaplinks_p)arg;
   struct sockaddr_ns peer;    struct sockaddr_ns peer;
   socklen_t len = sizeof(peer);    socklen_t len = sizeof(peer);
   flow_t *flows = NULL, *newflow = NULL, *ptr = NULL, *axe = NULL;    flow_t *flows = NULL, *newflow = NULL, *ptr = NULL, *axe = NULL;
   char msg[64], tmp[128], buf[2048];    char msg[64], tmp[128], buf[2048];
   int time = 0, load = 0, busy = 0, exts, delta, expiration;    int time = 0, load = 0, busy = 0, exts, delta, expiration;
   buf[0] = 0;    buf[0] = 0;
   while(time < limit) {    while(time < limit) {
       sleep(1);       sleep(1);
       if(busy++ > limit*3) {       if(swaplinks_recvfrom(swp, tmp, sizeof(tmp), 0, &peer, &len) > 0) {
           if(s) fprintf(s,"%d HALTING RECEPTION. TOO MUCH WAITING.\n",num);      time++; delta = 0;
           pthread_exit(NULL);      if(s) fprintf(s,"%d recvd request %d\n",num,time);
       }      newflow = (flow_t*)calloc(1,sizeof(flow_t));
       if(swaplinks_recvfrom(swp, tmp, sizeof(tmp), 0, &peer, &len) > 0) {      newflow->duration = atoi(strtok(tmp,";"));
           time++; delta = 0;      newflow->weight = atoi(strtok(NULL,";"));
           if(s) fprintf(s,"\t\tTIME at node %d: %d\n",num,time);      exts = atoi(strtok(NULL,";"));
           newflow = (flow_t*)calloc(1,sizeof(flow_t));      if(load < maxload) {
           newflow->duration = atoi(strtok(tmp,";"));     strncpy(newflow->source,peer.user+4,sizeof(newflow->source));
           newflow->weight = atoi(strtok(NULL,";"));     snprintf(buf,2048,"->%d:%d<->",newflow->duration,newflow->weight);
           exts = atoi(strtok(NULL,";"));     for(ptr = flows; ptr;) {
           if(load < maxload) {    sprintf(buf,"%s%d:%d<->",buf,ptr->duration,ptr->weight);
          strncpy(newflow->source,peer.user+4,sizeof(newflow->source));    expiration = 0; axe = NULL;
                   snprintf(buf,2048,"->%d:%d<->",newflow->duration,newflow->weight);    if(--ptr->duration == 0) {
          for(ptr = flows; ptr;) {   axe = ptr;
     sprintf(buf,"%s%d:%d<->",buf,ptr->duration,ptr->weight);   load -= ptr->weight;
     expiration = 0; axe = NULL;   delta -= ptr->weight;
                       if(--ptr->duration == 0) {
         axe = ptr;   if(ptr->last && ptr->next) {
                           load -= ptr->weight;  assert(flows != ptr);
                           delta -= ptr->weight;  ptr->last->next = ptr->next;
 ptr->next->last = ptr->last;
         if(ptr->last && ptr->next) {   }
    assert(flows != ptr);   else if(ptr->next) {
    ptr->last->next = ptr->next;  assert(flows == ptr);
    ptr->next->last = ptr->last;  flows = ptr->next;
         }}
         else if(ptr->next) {
    assert(flows == ptr);   if(flows) flows->last = NULL;
    flows = ptr->next;else if(ptr->last) {
if(flows) flows->last = NULL;  assert(flows != ptr);
}  ptr->last->next = NULL;
         else if(ptr->last) {   }
    assert(flows != ptr);   else {
    ptr->last->next = NULL;  assert(flows == ptr);
         }  flows = NULL;
         else {   }
    assert(flows == ptr);
    flows = NULL;       if(s) fprintf(s,"%d EXPIRED FLOW from %s:d%d/w%d\n",num,ptr->source,ptr->duration,ptr->weight);
         }   expiration = 1;
   }
       if(s) fprintf(s,"%d EXPIRED FLOW from %s:d%d/w%d\n",num,ptr->source,ptr->duration,ptr->weight);    ptr = ptr->next;
         expiration = 1;    if(expiration) free(axe);
                       }     }
     ptr = ptr->next;     sprintf(buf,"%s||\n",buf);
     if(expiration) free(axe);
      //          if(s) fprintf(s,"%d ACCEPTED FLOW from %s:d%d/w%d\n",num,newflow->source,newflow->duration,newflow->weight);
                   }        if(s) fprintf(s,"%s",buf);
          sprintf(buf,"%s||\n",buf);
    load += newflow->weight;
       //          if(s) fprintf(s,"%d ACCEPTED FLOW from %s:d%d/w%d\n",num,newflow->source,newflow->duration,newflow->weight);     delta += newflow->weight;
        if(s) fprintf(s,"%s",buf);     newflow->next = flows;
    if(newflow->next) newflow->next->last = newflow;
                   load += newflow->weight;     flows = newflow;
          delta += newflow->weight;     if(f) fprintf(f,"%d\n",load);
          newflow->next = flows;      }
          if(newflow->next) newflow->next->last = newflow;      // ADMISSION CONTROL
          flows = newflow;      // if we're overloaded, drop itif its been pushed too far,
                   if(f) fprintf(f,"%d\n",load);      // or pass on to a neighborand mark it as suchif not
               }      else if(exts < 5) {
      // ADMISSION CONTROL     swaplinks_update_walk_length(swp,1);
      // if we're overloaded, drop itor pass this on to a neighbor, mark it as such     snprintf(msg,64,"%d;%d;%d;",newflow->duration,newflow->weight,exts+1);
else if(exts < 5) {     swaplinks_sendtoany(swp,msg,strlen(msg)+1,0);
     swaplinks_update_walk_length(swp,1);     swaplinks_update_walk_length(swp,peers);
     snprintf(msg,64,"%d;%d;%d;",newflow->duration,newflow->weight,exts+1);      }
     swaplinks_sendtoany(swp,msg,strlen(msg)+1,0);       }
     swaplinks_update_walk_length(swp,peers);    }
      }    if(s) fprintf(s,"%d DONE RECEIVING\n",num);
       }
   }if(f) { fclose(f); f = NULL; }
   if(s) fprintf(s,"%d DONE RECEIVING\n",num);    if(s) { fclose(s); s = NULL; }
pthread_exit(NULL);    pthread_exit(NULL);
}}
int main(int argc, char **argv) {int main(int argc, char **argv) {
   struct sockaddr_ns reg;    struct sockaddr_ns reg;
   char tmp[256];    char tmp[256];
   int time = 0, duration, weight, sendcount = 0;    int time = 0, duration, weight, sendcount = 0;
   pthread_t tid;    pthread_t tid;
   if (argc > 6) {    if (argc > 6) {
       num = atoi(argv[1]);        num = atoi(argv[1]);
       peers = atoi(argv[2]);        peers = atoi(argv[2]);
       limit = atoi(argv[3]);        limit = atoi(argv[3]);
       duration = atoi(argv[4]);        duration = atoi(argv[4]);
       weight = atoi(argv[5]);        weight = atoi(argv[5]);
       maxload = atoi(argv[6]);       maxload = atoi(argv[6]);
   }    }
   else {    else {
       printf("Usage:\n  slinks_variable_load node_number peers_number time_limit flow_duration flow_weight max_load\n");        printf("Usage:\n  slinks_variable_load node_number peers_number time_limit flow_duration flow_weight max_load\n");
       exit(1);        exit(1);
   }    }
   snprintf(tmp, sizeof(tmp), "test%d", num);    snprintf(tmp, sizeof(tmp), "test%d", num);
   nutss_config_set(NUTSS_CONFIG_USERNAME, tmp, strlen(tmp));    nutss_config_set(NUTSS_CONFIG_USERNAME, tmp, strlen(tmp));
   nutss_config_set(NUTSS_CONFIG_PROXYUSERNAME, tmp, strlen(tmp));    nutss_config_set(NUTSS_CONFIG_PROXYUSERNAME, tmp, strlen(tmp));
   nutss_config_set(NUTSS_CONFIG_PROXYPASSWORD, tmp, strlen(tmp));    nutss_config_set(NUTSS_CONFIG_PROXYPASSWORD, tmp, strlen(tmp));
   strncpy(tmp, "nutss.net", sizeof(tmp));    strncpy(tmp, "nutss.net", sizeof(tmp));
   nutss_config_set(NUTSS_CONFIG_DOMAINNAME, tmp, strlen(tmp));    nutss_config_set(NUTSS_CONFIG_DOMAINNAME, tmp, strlen(tmp));
   strncpy(tmp, "sip.nutss.net:5060", sizeof(tmp));    strncpy(tmp, "sip.nutss.net:5060", sizeof(tmp));
   nutss_config_set(NUTSS_CONFIG_SIGPROXY, tmp, strlen(tmp));    nutss_config_set(NUTSS_CONFIG_SIGPROXY, tmp, strlen(tmp));
   sprintf(tmp, "output/data_node%d.txt", num); f = fopen(tmp,"w");    sprintf(tmp, "output/data_node%d.txt", num); f = fopen(tmp,"w");
sprintf(tmp, "output/output_node%d.txt", num); s = fopen(tmp,"w");//    sprintf(tmp, "output/output_node%d.txt", num); s = fopen(tmp,"w");
   if (f == NULL) f = stderr;    if (f == NULL) f = stderr;
   if (s == NULL) s = stdout;    if (s == NULL) s = stdout;
   memset(&reg, 0, sizeof(reg));    memset(&reg, 0, sizeof(reg));
   reg.family = AF_NUTSS;    reg.family = AF_NUTSS;
   strncpy(reg.user, "ths1", sizeof(reg.user));    strncpy(reg.user, "ths1", sizeof(reg.user));
   strncpy(reg.domain, "nutss.net", sizeof(reg.domain));    strncpy(reg.domain, "nutss.net", sizeof(reg.domain));
   strncpy(reg.service, "swaplinksd", sizeof(reg.service));    strncpy(reg.service, "swaplinksd", sizeof(reg.service));
   swaplinks_init();    swaplinks_init();
   swp = swaplinks_new("cloud9", &reg, peers, peers);    swp = swaplinks_new("cloud9", &reg, peers, peers);
   // let swaplinks get going.    // let swaplinks get going.
   sleep(10);    sleep(10);
   pthread_create(&tid, NULL, recvthread, swp);    pthread_create(&tid, NULL, recvthread, swp);
   sleep(1);    sleep(1);
   snprintf(tmp, sizeof(tmp), "%d;%d;0;", duration, weight);    snprintf(tmp, sizeof(tmp), "%d;%d;0;", duration, weight);
   while (time++ <2*limit) { // (i-- != 0) {    while (time++ <3*limit) { // (i-- != 0) {
       swaplinks_sendtoany(swp, tmp, strlen(tmp)+1, 0);        swaplinks_sendtoany(swp, tmp, strlen(tmp)+1, 0);
sleep(5);if(s) fprintf(s,"   %d sent request %d\n",num,time);
   }        else printf("   %d sent request %d\n",num,time);
       sleep(10);
   if(s) fprintf(s,"%d DONE SENDING\n",num);    }
pthread_join(tid,NULL);        pthread_join(tid, NULL);
   if(f) fclose(f);    if(s) fprintf(s,"%d DONE SENDING\n",num);
   if(s) fclose(s);
//    if(num == 1) system("echo \" \" | mail -s\"node one finished sending!\" ths22@cs.cornell.edu");
   if(num == 1) system("echo \" \" | mail -s\"experiment done!\" ths22@cs.cornell.edu");
   return 0;
   return 0;}
} 

Removed from v.1.12  
changed lines
  Added in v.1.13


FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>
 

GForge.cis.cornell.edu is brought to you by

Cornell Computing and Information Science


Powered By GForge Collaborative Development Environment