Log In
New Account
  
 
Home My Page Project Tree Code Snippets Project Openings NUTSS
 
 
Summary Tracker Lists CVS Files
 

CVS | Administration


File:  [GForge:cvsroot/nutss] / libnutss / examples / slinks_variable_load.c
Revision 1.24: download - view: text, annotated - select for diffs
Tue Jul 17 02:17:30 2007 UTC (10 years, 2 months ago) by ths
Branches: MAIN
CVS tags: HEAD
tweaks to test harness to more tightly coordinate the experiment.

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#ifndef WIN32
#include <poll.h>
#include <sys/socket.h>
#endif
#include <pthread.h>
#include "../include/swaplinks.h"

#ifdef WIN32
#define sleep(sec) Sleep((sec)*1000)
#define snprintf   _snprintf
#endif

#define clear(a) a[0]=0
#define streq(a,b) !strcmp((a),(b))
#define strlenn(s) (s) ? strlen(s) : 0
#define randint(N) ((int)(rand() / (((double)RAND_MAX + 1) / (N))))
#define assert(p) if(!(p)) { printf("Assertion '%s' FAILED!\n", #p); exit(1); }
#define REDUNDANCY 5

typedef struct flow {
    int duration;
    int weight;
    char source[256];
    struct flow *next, *last;
} flow_t;

typedef struct int_pair_t { int i; int o; } int_pair;

int_pair process_neighbor_list(char*);

int num, peers, limit, maxload;
swaplinks_p swp;
FILE *f = NULL, *s = NULL, *g = NULL;

void *recvthread(void *arg) {
    swaplinks_p swp = (swaplinks_p)arg;
    struct sockaddr_ns peer, coord;
    socklen_t len = sizeof(peer);
    flow_t *flows = NULL, *newflow = NULL, *ptr = NULL, *axe = NULL;
    char msg[64], tmp[128], buf[2048], greeting[1024];
    int time = 0, load = 0, exts, delta, expiration, greeted = 0;

    clear(buf);
    while(time < limit) {
        memset(tmp,0,128);
        memset(&peer,0,sizeof(peer));
        if(swaplinks_recvfrom(swp, tmp, sizeof(tmp), 0, &peer, &len) > 0) {
            if(streq(tmp,"GREETING")) {
                if(greeted) continue;
                printf("%d Received greeting from coordinator\n",num);
                memset(&coord, 0, sizeof(coord));
                memcpy(&coord,&peer,sizeof(peer));
                greeted = 1;
                continue;
            }
            time++; delta = 0;
            // parse new flow
            newflow = (flow_t*)calloc(1,sizeof(flow_t));
            newflow->duration = atoi(strtok(tmp,";"));
            newflow->weight = atoi(strtok(NULL,";"));
            exts = atoi(strtok(NULL,";"));
            strncpy(newflow->source,peer.user+4,sizeof(newflow->source));
            if(s) fprintf(s,"      %d recvd request #%d from %s - w:%d d:%d\n",
                          num,time,newflow->source,newflow->weight,newflow->duration);
            snprintf(buf,2048,"->%d:%d:%s<->",newflow->duration,newflow->weight,newflow->source);
            // run through flows, decrementing duration, cleaning any expired flows
            for(ptr = flows; ptr;) {
                sprintf(buf,"%s%d:%d<->",buf,ptr->duration,ptr->weight);
                expiration = 0; axe = NULL;
                if(--ptr->duration == 0) {
                    axe = ptr;
                    load -= ptr->weight;
                    delta -= ptr->weight;
                    if(ptr->last && ptr->next) {
                        assert(flows != ptr);
                        ptr->last->next = ptr->next;
                        ptr->next->last = ptr->last;
                    }
                    else if(ptr->next) {
                        assert(flows == ptr);
                        ptr->next->last = NULL;
                        flows = ptr->next;
                    }
                    else if(ptr->last) {
                        assert(flows != ptr);
                        ptr->last->next = NULL;
                    }
                    else {
                        assert(flows == ptr);
                        flows = NULL;
                    }
                    expiration = 1;
                }
                ptr = ptr->next;
                if(expiration) { free(axe); axe = NULL; }
            }
            sprintf(buf,"%s||\n",buf);

            // if this flow will not put us overbudget, accept
            if(load+newflow->weight <= maxload) {
                load += newflow->weight;
                delta += newflow->weight;
                newflow->next = flows;
                if(flows) flows->last = newflow;
                flows = newflow;
                if(s) fprintf(s,"        %d accepted flow: %d\n",num,load);
            }
            // ADMISSION CONTROL
            // if we're overloaded, drop it if its been pushed too far,
            // otherwise pass on to a neighbor and mark it
            else if(exts < 5) {
                fprintf(s,"        %d rejected flow, load too high: %d <= %d\n",num,maxload,load+newflow->weight);
                swaplinks_update_walk_length(swp,1);
                snprintf(msg,64,"%d;%d;%d;",newflow->duration,newflow->weight,exts+1);
                swaplinks_sendtoany(swp,msg,strlenn(msg)+1,0);
                swaplinks_update_walk_length(swp,peers);
            }
            else
                fprintf(s,"        %d retired unserviceable request\n",num);

            if(f) fprintf(f,"%d\n",load);
//          if(s) fprintf(s,"%s",buf);
        }
    }
    if(s) fprintf(s,"%d DONE RECEIVING\n",num);

    // clean up flows list
    for(ptr = flows;ptr;) {axe = ptr; ptr = ptr->next; free(axe);}

    while(!greeted) {
        memset(tmp,0,128);
        memset(&peer,0,sizeof(peer));
        if(swaplinks_recvfrom(swp, tmp, sizeof(tmp), 0, &peer, &len) > 0) {
            if(streq(tmp,"GREETING")) {
                printf("%d Received late greeting from coordinator\n",num);
                memset(&coord,0,sizeof(coord));
                memcpy(&coord,&peer,sizeof(peer));
                greeted = 1;
            }
        }
    }

    // tell coordinator that were all done here
    if(swaplinks_sendto(swp, "DONE", 5, 0, &coord, sizeof(coord)) != 5)
        printf("%d cannot contact coordinator\n", num);
    else
        printf("%d sent alert to coordinator at %s. entering holding pattern...\n",num,coord.user+4);

    // go into holding pattern to let other nodes catch up
    for(;;) {
        memset(tmp,0,128);
        if(swaplinks_recvfrom(swp, tmp, sizeof(tmp), 0, &peer, &len) > 0) {
            printf("%d is holding pattern: %s\n",num,tmp);
            if(streq(tmp,"SHUTDOWN")) { printf("%d Received KILL signal from coordinator. Shutting down...\n",num); break; }
            else if(streq(tmp,"GREETING")) continue;
            newflow = (flow_t*)calloc(1,sizeof(flow_t));
            newflow->duration = atoi(strtok(tmp,";"));
            newflow->weight = atoi(strtok(NULL,";"));
            exts = atoi(strtok(NULL,";"));
            strncpy(newflow->source,peer.user+4,sizeof(newflow->source));
            if(exts < 5) {
                swaplinks_update_walk_length(swp,1);
                snprintf(msg,64,"%d;%d;%d;",newflow->duration,newflow->weight,exts+1);
                swaplinks_sendtoany(swp,msg,strlenn(msg)+1,0);
                swaplinks_update_walk_length(swp,peers);
            }
        }
    }

    if(f && f != stderr) { fclose(f); f = NULL; }
    if(s && s != stdout) { fclose(s); s = NULL; }

    pthread_exit(NULL);
}

int main(int argc, char **argv) {
    struct sockaddr_ns reg;
    char tmp[256], nbrs[4096];
    int time = 0, duration, weight, percent, sendcount = 0;
    int_pair p;
    pthread_t tid;

    if(argc > 1) num = atoi(argv[1]);
    if(argc > 7) {
        peers = atoi(argv[2]);
        limit = atoi(argv[3]);
        percent = atoi(argv[4]);
        duration = atoi(argv[5]);
        weight = atoi(argv[6]);
        maxload = atoi(argv[7]);
    }
    else if(num != 99) {
        printf("Usage:\n  slinks_variable_load node_number peers_number time_limit percent_to_send flow_duration flow_weight max_load\n");
        exit(1);
    }

    snprintf(tmp, sizeof(tmp), "test%d", num);
    nutss_config_set(NUTSS_CONFIG_USERNAME, tmp, strlenn(tmp));
    nutss_config_set(NUTSS_CONFIG_PROXYUSERNAME, tmp, strlenn(tmp));
    nutss_config_set(NUTSS_CONFIG_PROXYPASSWORD, tmp, strlenn(tmp));
    strncpy(tmp, "nutss.net", sizeof(tmp));
    nutss_config_set(NUTSS_CONFIG_DOMAINNAME, tmp, strlenn(tmp));
    strncpy(tmp, "sip.nutss.net:5060", sizeof(tmp));
    nutss_config_set(NUTSS_CONFIG_SIGPROXY, tmp, strlenn(tmp));

    if(num != 99) {
        sprintf(tmp, "output/data_node%d.txt", num); f = fopen(tmp,"w");
//        sprintf(tmp, "output/output_node%d.txt", num); s = fopen(tmp,"w");
        sprintf(tmp, "output/nbrs_node%d.txt", num); g = fopen(tmp,"w");
        if (f == NULL) f = stderr;
        if (s == NULL) s = stdout;
    }

    memset(tmp,0,256);
    memset(&reg, 0, sizeof(reg));
    reg.family = AF_NUTSS;
    strncpy(reg.user, "ths1", sizeof(reg.user));
    strncpy(reg.domain, "nutss.net", sizeof(reg.domain));
    strncpy(reg.service, "swaplinksd", sizeof(reg.service));

    swaplinks_init();
    swp = swaplinks_new("cloud9", &reg, peers, peers+5);
    // let swaplinks get going.
    sleep(5);

    // coordinator thread
    if(num == 99) {
        int done = 0, nodes = atoi(argv[2]), i;
        struct sockaddr_ns* peer_list = (struct sockaddr_ns*)calloc(nodes,sizeof(struct sockaddr_ns));
        socklen_t len = sizeof(struct sockaddr_ns);

        swaplinks_update_walk_length(swp,nodes/2);        
	for(i = 0; i < nodes*REDUNDANCY; i++) {
            printf("Coordinator sending greeting...\n");
            swaplinks_sendtoany(swp,"GREETING",9,0);
            sleep(1);
        }

        while(done < nodes) {
            printf("Coordinator waiting for DONE signal...\n");
            if(swaplinks_recvfrom(swp, tmp, sizeof(tmp), 0, peer_list+done, &len) > 0 && streq(tmp,"DONE")) {               
                printf("Coordinator received DONE signal from %s. Total: %d finished nodes.\n",peer_list[done].user+4,done+1);
                swaplinks_sendtoany(swp,"GREETING",9,0);
                done++;
            }
        }
        printf("All nodes reported, now sending KILL signals\n");
        for(i = 0; i < nodes; i++) {
            printf("Sending kill to %s\n",peer_list[i].user+4);
            swaplinks_sendto(swp, "SHUTDOWN", 9, 0, peer_list+i, len);
        }

        return 0;
    }

    pthread_create(&tid, NULL, recvthread, swp);

    while (time++ < REDUNDANCY*limit) {
        if(randint(100) < percent) snprintf(tmp, sizeof(tmp), "%d;%d;0;", duration, 0);
        else snprintf(tmp, sizeof(tmp), "%d;%d;0;", duration, weight);
        swaplinks_sendtoany(swp, tmp, strlenn(tmp)+1, 0);
        swaplinks_get_neighbors(swp, nbrs, 4096);
        p = process_neighbor_list(nbrs);
        if(g) fprintf(g,"%d %d\n",p.i,p.o);
        clear(nbrs);
        sleep(1);
    }

    if(g) fclose(g);
    if(s) fprintf(s,"%d DONE SENDING\n",num);
    pthread_join(tid, NULL);
    if(s) fprintf(s,"%d SHUTTING DOWN\n",num);

//    if(num == 1) system("echo \" \" | mail -s\"node one finished sending!\" ths22@cs.cornell.edu");

    return 0;
}

int_pair process_neighbor_list(char *s) {
    int_pair p = { 0, 0 };
    char *t,*v;

    if(!s) return p;
    while(*s) {
        if(*s++ == '_') {
            switch(*s++) {
                case 'i': p.i++; break;
                case 'o': p.o++; break;
                default: exit(s[-1]);
            }
        }
    }
    return p;
}

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>
 

GForge.cis.cornell.edu is brought to you by

Cornell Computing and Information Science


Powered By GForge Collaborative Development Environment