Log In
New Account
  
 
Home My Page Project Tree Code Snippets Project Openings NUTSS
 
 
Summary Tracker Lists CVS Files
 

CVS | Administration


File:  [GForge:cvsroot/nutss] / libnutss / examples / slinks_variable_load.c
Revision 1.32: download - view: text, annotated - select for diffs
Wed Oct 10 19:20:58 2007 UTC (9 years, 8 months ago) by ths
Branches: MAIN
CVS tags: HEAD
minimal change to test rig

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <math.h>
#include <limits.h>
#ifndef WIN32
#include <poll.h>
#include <sys/socket.h>
#endif
#include <pthread.h>
#include "../include/swaplinks.h"

#ifdef WIN32
#define sleep(sec) Sleep((sec)*1000)
#define snprintf   _snprintf
#endif
#define clear(a) a[0]=0
#define streq(a,b) !strcmp((a),(b))
#define strlenn(s) (s) ? strlen(s) : 0
#define randint(N) ((int)(rand() / (((double)RAND_MAX + 1) / (N))))
#define round(f) (f)-(int)(f) > 0.5 ? (int)(f)+1 : (int)(f)
#define min(a,b) ((a)<(b)?(a):(b))
#define max(a,b) ((a)>(b)?(a):(b))
#define assert(p) if(!(p)) { printf("Assertion '%s' FAILED!\n", #p); exit(1); }
#define REDUNDANCY    5       // how many message repetitions are necessary to reach every node
#define PROPAGATION   600     // how many seconds the coordinator should wait for every node to recieve KILL signal
#define EXPIRATION    600     // how many seconds a node should keep a peering report fron another node
#define MIN_GRACE     1       // minimum number if times a request can be passed on by a full-capacity node
#define MAX_GRACE     60      // maximum
#define ALLOTMENT     400     // how much load a node will accomadate for each of his neighbors (obsolete w/ auto peering)
#define QUORUM        12      // how many reports a node needs before adjusting his peering level
#define THRESHOLD     0.15    // how much different a newly calculated degree must be from the old one to switch to it
#define ASSURANCE     0.6     // how sure we want to be that we will find the available capacity out there (affects walk length)
#define MAX_REPORTS   30
#define WALK_LENGTH   10
#define MIN_PEERS     4
#define MAX_PEERS     25
#define DEFAULT_PEERS 8       // obsolete w/ auto peering
#define MAX_FLOWS     256
#define BUFFER_SIZE   128
#define URI_LENGTH    256

typedef struct flow {
    int id, duration, weight, exts, capacity, available, degree;
    char source[URI_LENGTH];
    struct flow *next, *last;
} flow_t;

typedef struct int_pair_t { int i, o; }                                     int_pair;
typedef struct flw_spec_t { int weight, expiration; }                       flw_spec;
typedef struct nbr_rprt_t { int expiration, capacity, available, degree; }  nbr_rprt;

int_pair process_neighbor_list(char*);

// Gobal information, used by both threads
int num, weight, exts, nodes = 0, peers = 0, capacity = 0, load = 0, alive = 1;
swaplinks_p swp;
FILE *f = NULL, *s = NULL, *q = NULL, *r = NULL;

// Node's receiving thread that decides whether to accept an incoming
// request and records all the necessary information; also carries
// out the gossiping-equilibrium protocol to assign peering level
void *recvthread(void *arg) {
    struct sockaddr_ns peer;
    socklen_t len = sizeof(peer);
    char msg[BUFFER_SIZE], tmp[BUFFER_SIZE];
    int bytes, msg_count, i;
    time_t start,now;
    flow_t newflow;
    flw_spec flows[MAX_FLOWS];
    nbr_rprt reports[MAX_FLOWS];
    nbr_rprt tempbuf[MAX_FLOWS];
    nbr_rprt sorted[MAX_FLOWS];

    memset(flows,0,sizeof(flw_spec)*MAX_FLOWS);
    memset(reports,0,sizeof(nbr_rprt)*MAX_FLOWS);

    start = time(NULL);
    for(;;) {
        memset(&peer,0,sizeof(peer));
        sleep(1);
    // simulate churn by having a small portion of nodes duck out early, and replace them periodically
//        if(randint(1000) < 2) {
//            printf("%d Withdrawing from cloud early\n",num);
//            break;
//        }
    if((bytes=swaplinks_recvfrom(swp, msg, sizeof(msg), 0, &peer, &len)) > 0) {
            msg[bytes] = 0;
            memcpy(tmp,msg,bytes);
            // Flood SHUTDOWN to neighbors, then power down
            if(streq(msg,"SHUTDOWN")) {
                swaplinks_update_walk_length(swp,1);
                for(i = 0; i < peers*2; i++) swaplinks_sendtoany(swp,"SHUTDOWN",9,0);
                printf("%d Recieved KILL signal...\n",num);
                break;
            }
            // parse the new flow
            memcpy(tmp,msg,sizeof(tmp));
            newflow.duration = atoi(strtok(tmp,";"));
            newflow.weight = atoi(strtok(NULL,";"));
            newflow.exts = atoi(strtok(NULL,";"));
            newflow.id = atoi(strtok(NULL,";"));
            newflow.capacity = atoi(strtok(NULL,";"));
            newflow.available = atoi(strtok(NULL,";"));
            newflow.degree = atoi(strtok(NULL,";"));
            strncpy(newflow.source,peer.user+4,sizeof(newflow.source));

            /*******/
            // Adjust peering level based on updated picture of the state of the network
            {
                int alive = 0, livelinks = 0, totallinks = 0, more = 0, max_capacity = 0, min_capacity = INT_MAX, numReports = 0, src, curmax, k;
                float rank, live, newDegree, tempfloat;
                // parse report
                now = difftime(time(NULL),start);
                src = atoi(newflow.source);
                assert(src < MAX_FLOWS);
                // retire any old data and count current reports
                for(i=0;i<MAX_FLOWS;i++) {
                    if(0 < reports[i].expiration && reports[i].expiration > now)
                        numReports++;
                    if(0 < reports[i].expiration && reports[i].expiration < now)
                        memset(reports+i,0,sizeof(nbr_rprt));
                }
                // dont add this report if we have enough already
                if(numReports < MAX_REPORTS) {
                    reports[src].capacity = newflow.capacity;
                    reports[src].available = newflow.available;
                    reports[src].degree = newflow.degree;
                    reports[src].expiration = now+EXPIRATION;
                }
                // gather important data based on known info
                for(i=0;i<MAX_FLOWS;i++) {
                    if(0 < reports[i].capacity) {
                        alive++;
                        totallinks += reports[i].degree;
                        if(weight < reports[i].available) livelinks += reports[i].degree;
                        if(max_capacity < reports[i].capacity) max_capacity = reports[i].capacity;
                        if(min_capacity > reports[i].capacity && 0 < reports[i].capacity) min_capacity = reports[i].capacity;
                        if(capacity < reports[i].capacity) more++;
                    }
                }
                // include this node's capacity in mix
                if(max_capacity < capacity) max_capacity = capacity;
                if(min_capacity > capacity) min_capacity = capacity;
                assert(alive > 0);
                rank = (float)(alive-more)/alive;
                live = (float)livelinks/totallinks;
                tempfloat = min(log(1-ASSURANCE)/log(1-live),MAX_GRACE);
                if(alive > QUORUM && abs(max(tempfloat,MIN_GRACE)-exts) > exts*THRESHOLD)
                    exts = round(max(tempfloat,MIN_GRACE));
                // sort information for viewing purposes
    //                memcpy(tempbuf,reports,MAX_FLOWS*sizeof(nbr_rprt));
    //                for(i=0;i<alive;i++) {
    //                    curmax = 0;
    //                    for(k=0;k<MAX_FLOWS;k++) {
    //                        if(tempbuf[curmax].capacity < tempbuf[k].capacity)
    //                            curmax = k;
    //                    }
    //                    sorted[i] = tempbuf[curmax];
    //                    tempbuf[curmax].capacity = 0;
    //                }

                // compute new degree... still need to think about compressing scale if it gets too big
                newDegree = rank*min(((float)max_capacity/min_capacity)*MIN_PEERS,MAX_PEERS);
                // only alter degree if we have a quorum and the change would be significant
                if(alive > QUORUM && abs(max(newDegree,MIN_PEERS)-peers) > peers*THRESHOLD) {
                    peers = round(max(newDegree,MIN_PEERS));
                    swaplinks_update_degree(swp,peers);
                }
                if(q) fprintf(q,"%ld %d\n",now,peers);
                if(r) fprintf(r,"%ld %d\n",now,exts);

                //                printf("- *%d* -\n",capacity);
                //                for(i=0;i<alive;i++)
                //                    printf(" %d:%4.2f",sorted[i].capacity,(double)(sorted[i].expiration)-difftime(time(NULL),start));
                //                printf("\n  %d peering at %d, %4.2f out of %d, adjmax %3.1f\n",num,peers,rank*100.0,alive+1,adjusted_max_peers);
                //                printf("---------\n");
            }
            /********/

            //if(s) fprintf(s,"    %d recvd request %d from %s - w:%d d:%d e:%d\n",
            //              num,newflow.id,newflow.source,newflow.weight,newflow.duration,newflow.exts);
            // retire any expired flows before processing
            now = difftime(time(NULL),start);
            for(i=0;i<MAX_FLOWS;i++) {
                if(0 < flows[i].expiration && flows[i].expiration < now) {
                    load -= flows[i].weight;
                    flows[i].weight = 0;
                    flows[i].expiration = 0;
                    //if(s) fprintf(s,"        %d finished accomodating flow at %ld: %d\n",num,now,load);
                }
            }
            if(load+newflow.weight < capacity) {
                load += newflow.weight;
                // find an empty spot to record this flow
                for(i=0;i<MAX_FLOWS;i++) {
                    if(flows[i].expiration == 0) {
                        flows[i].weight = newflow.weight;
                        flows[i].expiration = now + newflow.duration;
                        //if(s) fprintf(s,"        %d accepted request at %ld: %d\n",num,now,load);
                        if(f) fprintf(f,"%ld %s\n",now,msg);
                        break;
                    }
                }
                // if we run out of space, i dont want to add more space, just die.
                assert(i != MAX_FLOWS);
            }
            // ADMISSION CONTROL
            // do not accept the flow if it will overload this node.
            // if the request been pushed too far, reject it, otherwise
            // pass it on to a neighbor
            else if(newflow.exts > 0) {
                swaplinks_update_walk_length(swp,1);
                snprintf(tmp,sizeof(tmp),"%d;%d;%d;%d;%d;%d;%d;",newflow.duration,
                                                                 newflow.weight,
                                                                 newflow.exts-1,
                                                                 newflow.id,
                                                                 newflow.capacity,
                                                                 newflow.available,
                                                                 newflow.degree);
                swaplinks_sendtoany(swp,tmp,strlenn(tmp)+1,0);
                swaplinks_update_walk_length(swp,WALK_LENGTH);
                if(s) fprintf(s,"  %d rejected flow %d:%d, load too high: %d <= %d\n",num,newflow.id,newflow.exts,capacity,load+newflow.weight);
                if(s) fprintf(s,"  %d sends requests with exts: %d\n",num,exts);
                if(f) fprintf(f,"%ld %s\n",now,"reject");
            }
            else {
                if(s) fprintf(s,"        %d retired unserviceable request %d\n",num,newflow.id);
                if(f) fprintf(f,"%ld %s\n",now,"retire");
            }
        }
    }
    // close output files to assure data is recorded
    if(f && f != stderr) { fclose(f); f = NULL; }
    if(s && s != stdout) { fclose(s); s = NULL; }

    alive = 0;

    pthread_exit(NULL);
}

int main(int argc, char **argv) {
    struct sockaddr_ns reg;
    char tmp[BUFFER_SIZE], nbrs[4096];
    int time = 0, duration, percent, limit, i;
    FILE *g;
    int_pair p;
    pthread_t tid;

    // parse args
    if(argc > 1) num = atoi(argv[1]);
    if(num != 99 && argc > 6) {
        limit = atoi(argv[2]);
        percent = atoi(argv[3]);
        duration = atoi(argv[4]);
        weight = atoi(argv[5]);
        capacity = atoi(argv[6]);
        peers = DEFAULT_PEERS;
    }
    else if(num == 99 && argc > 4) {
        nodes = atoi(argv[2]);
        // peer coordinator with more nodes
        // to facilitate dispersion of kill signals
        peers = atoi(argv[3]);
        time = atoi(argv[4]);
    }
    else if(num != 99) {
        printf("Usage:\n  slinksvariableload node_number num_messages percent_load flow_duration flow_weight capacity\n");
        exit(1);
    }
    else {
       printf("Coordinator Usage:\n  slinksvariableload 99 num_nodes num_peers experiment_duration_minutes\n");
       exit(1);
    }

    // set all the important nutss fields
    snprintf(tmp, sizeof(tmp), "test%d", num);
    nutss_config_set(NUTSS_CONFIG_USERNAME, tmp, strlenn(tmp));
    nutss_config_set(NUTSS_CONFIG_PROXYUSERNAME, tmp, strlenn(tmp));
    nutss_config_set(NUTSS_CONFIG_PROXYPASSWORD, tmp, strlenn(tmp));
    strncpy(tmp, "nutss.net", sizeof(tmp));
    nutss_config_set(NUTSS_CONFIG_DOMAINNAME, tmp, strlenn(tmp));
    strncpy(tmp, "sip.nutss.net:5060", sizeof(tmp));
    nutss_config_set(NUTSS_CONFIG_SIGPROXY, tmp, strlenn(tmp));

    // open output files
    if(num != 99) {
        sprintf(tmp, "output/data_node%d.txt", num); f = fopen(tmp,"w");
        sprintf(tmp, "output/peer_node%d.txt", num); q = fopen(tmp,"w");
        sprintf(tmp, "output/walk_node%d.txt", num); r = fopen(tmp,"w");
//        sprintf(tmp, "output/output_node%d.txt", num); s = fopen(tmp,"w");
//        sprintf(tmp, "output/nbrs_node%d.txt", num); g = fopen(tmp,"w");
        if (f == NULL) f = stderr;
        if (s == NULL) s = stdout;
    }
    if(f) fprintf(f,"%d\n",capacity);

    // build registrar uri
    memset(&reg, 0, sizeof(reg));
    reg.family = AF_NUTSS;
    strncpy(reg.user, "ths1", sizeof(reg.user));
    strncpy(reg.domain, "nutss.net", sizeof(reg.domain));
    strncpy(reg.service, "swaplinksd", sizeof(reg.service));
    // start up swaplinks and let it get going
    swaplinks_init(); swp = swaplinks_new("cloud9", &reg, peers, WALK_LENGTH); sleep(5);

    // coordinator thread
    if(num == 99) {
        for(;time > 0;time--) {
            printf(" ---> %d MINUTE%s REMAIN%s\n",time,time==1?"":"S",time==1?"S":"");
            clear(nbrs);
            swaplinks_get_neighbors(swp,nbrs,4096);
            print_neighbor_list(nbrs);
            p = process_neighbor_list(nbrs);
            printf("Coordinator Nbrs: %d %d\n",p.i,p.o);
            sleep(60);
        }
        printf("Now sending KILL signals\n",limit);
//
        swaplinks_update_walk_length(swp,1);
        swaplinks_sendtoany(swp, "SHUTDOWN", 9, 0);
        swaplinks_update_walk_length(swp,WALK_LENGTH);
//
        for(i = 0; i < nodes*REDUNDANCY; i++) swaplinks_sendtoany(swp, "SHUTDOWN", 9, 0);
        // give them time to scramble
        printf("Waiting for KILL signals to propagate\n");
        sleep(PROPAGATION);
        // clean up their mess
        printf("Forcibly removing remaining nodes and associated file\n");
        system("find output -size 0c | xargs rm -rf");
        // then kill stragglers more violently (including the coordinator)
        system("killall lt-nutssd");
        system("killall lt-slinksvariableload");
        return 0;
    }

    exts = MIN_GRACE;
    pthread_create(&tid, NULL, recvthread, swp);
    // give recieving threads a chance to wake up
    sleep(5);

    while (time++ < limit && alive) {
        // send a request for resources with probability percent/100
        if(randint(100) < percent) {            
            snprintf(tmp,sizeof(tmp),"%d;%d;%d;%d%d;%d;%d;%d;",duration,weight,exts,num,time,capacity,capacity-load,peers);
            swaplinks_sendtoany(swp,tmp,strlenn(tmp)+1, 0);
        }
        // record number and type of neighbors
      //  swaplinks_get_neighbors(swp,nbrs,4096);
      //  p = process_neighbor_list(nbrs);
      //  if(g) fprintf(g,"%d %d\n",p.i,p.o);
      //  clear(nbrs);
       // it might make more sense to sleep for more time between requests
        sleep(12);
    }

  //  if(g) fclose(g);
    if(s) printf("%d DONE SENDING\n",num);
    pthread_join(tid, NULL);
    if(s) fprintf(s,"%d SHUTTING DOWN\n",num);

    return 0;
}

// read the neighbor list and extract the number of in and out nbrs
// would also like to alter this to give the number of unique nbrs
// instead of the raw number
int_pair process_neighbor_list(char *s) {
    int_pair p = {0,0};
    int inuniq[MAX_PEERS],outuniq[MAX_PEERS],inuniqs=0,outuniqs=0,newpeer,i;
    char temp[5],copy[URI_LENGTH];
    char *t,inseenit=0,outseenit;

    memset(inuniq,0,MAX_PEERS*sizeof(int));
    memset(outuniq,0,MAX_PEERS*sizeof(int));

    // avoid null strings altogether
    if(!s) return p;
    // move through the string one character at a time
    while(*s) {
        // if we have some characters left
        if(strlen(s) > 4) {
            // check if the next four chars are "test"
            strncpy(copy,s,URI_LENGTH);
            strncpy(temp,copy,4);
            if(streq(temp,"test")) {
                // get number of peer and save it
                newpeer = atoi(strtok(copy+4,"@"));
                inseenit = outseenit = 0;
                // check if we've seen this peer before
                for(i=0;i<inuniqs;i++) {
                    if(inuniq[i] == newpeer) {
                        inseenit = 1;
                        break;
                    }
                }
                for(i=0;i<outuniqs;i++) {
                    if(outuniq[i] == newpeer) {
                        outseenit = 1;
                        break;
                    }
                }
            }
        }
        if(*s++ == '_') {
            switch(*s++) {
                // only record this nbr if we haven't seen it
                case 'i': if(!inseenit) { p.i++; inuniq[inuniqs++] = newpeer; } break;
                case 'o': if(!outseenit) { p.o++; outuniq[outuniqs++] = newpeer; } break;
                default: exit(s[-1]);
            }
        }
    }
    return p;
}

void print_neighbor_list(char *s) {
    int newpeer;
    char copy[URI_LENGTH],temp[5];
    char inp[256], outp[256];
    clear(inp); clear(outp);
    // avoid null strings altogether
    if(!s) return;
    // move through the string one character at a time
    while(*s) {
        // if we have some characters left
        if(strlen(s) > 4) {
            // check if the next four chars are "test"
            strncpy(copy,s,URI_LENGTH);
            strncpy(temp,copy,4);
            if(streq(temp,"test"))
                newpeer = atoi(strtok(copy+4,"@"));
        }
        if(*s++ == '_') {
            switch(*s++) {
                // only record this nbr if we haven't seen it
                case 'i': sprintf(inp,"%s%d ",inp,newpeer); break;
                case 'o': sprintf(outp,"%s%d ",outp,newpeer); break;
                default: exit(s[-1]);
            }
        }
    }
    printf("in: %s\nout: %s\n",inp,outp);
}



FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>
 

GForge.cis.cornell.edu is brought to you by

Cornell Computing and Information Science


Powered By GForge Collaborative Development Environment