Log In
New Account
  
 
Home My Page Project Tree Code Snippets Project Openings NUTSS
 
 
Summary Tracker Lists CVS Files
 

CVS | Administration


File:  [GForge:cvsroot/nutss] / libnutss / examples / slinks_variable_load.c
Revision 1.12: download - view: text, annotated - select for diffs
Fri Jun 29 17:19:44 2007 UTC (10 years, 2 months ago) by ths
Branches: MAIN
CVS tags: HEAD
added admission control to the new swaplinks test as well as a mechanism for churn.

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#ifndef WIN32
#include <poll.h>
#include <sys/socket.h>
#endif
#include <pthread.h>
#include "../include/swaplinks.h"

#ifdef WIN32
#define sleep(sec) Sleep((sec)*1000)
#define snprintf   _snprintf
#endif

#define assert(p) if(!(p)) { printf("Assertion FAILED!\n"); exit(1); }

typedef struct flow {
    int duration;
    int weight;
    char source[256];
    struct flow *next, *last;
} flow_t;

int num, peers, limit, maxload;
swaplinks_p swp;
FILE *f = NULL, *s = NULL;

void *recvthread(void *arg) {
    swaplinks_p swp = (swaplinks_p)arg;
    struct sockaddr_ns peer;
    socklen_t len = sizeof(peer);
    flow_t *flows = NULL, *newflow = NULL, *ptr = NULL, *axe = NULL;
    char msg[64], tmp[128], buf[2048];
    int time = 0, load = 0, busy = 0, exts, delta, expiration;

    buf[0] = 0;

    while(time < limit) {
	sleep(1);
	if(busy++ > limit*3) {
	    if(s) fprintf(s,"%d HALTING RECEPTION. TOO MUCH WAITING.\n",num);
	    pthread_exit(NULL);
	}
        if(swaplinks_recvfrom(swp, tmp, sizeof(tmp), 0, &peer, &len) > 0) {
            time++; delta = 0;
	    if(s) fprintf(s,"\t\tTIME at node %d: %d\n",num,time);
            newflow = (flow_t*)calloc(1,sizeof(flow_t));
            newflow->duration = atoi(strtok(tmp,";"));
            newflow->weight = atoi(strtok(NULL,";"));
	    exts = atoi(strtok(NULL,";"));
	    if(load < maxload) {
		    strncpy(newflow->source,peer.user+4,sizeof(newflow->source));
	            snprintf(buf,2048,"->%d:%d<->",newflow->duration,newflow->weight);
		    for(ptr = flows; ptr;) {
			sprintf(buf,"%s%d:%d<->",buf,ptr->duration,ptr->weight);
			expiration = 0; axe = NULL;
	                if(--ptr->duration == 0) {
			    axe = ptr;
	                    load -= ptr->weight;
	                    delta -= ptr->weight;
			    
			    if(ptr->last && ptr->next) {
				assert(flows != ptr);
				ptr->last->next = ptr->next;
				ptr->next->last = ptr->last;
			    }
			    else if(ptr->next) {
				assert(flows == ptr);			
				flows = ptr->next;
				if(flows) flows->last = NULL;
			    }
			    else if(ptr->last) {
				assert(flows != ptr);	
				ptr->last->next = NULL;
			    }
			    else {
				assert(flows == ptr);
				flows = NULL;
			    }

	//		    if(s) fprintf(s,"%d EXPIRED FLOW from %s:d%d/w%d\n",num,ptr->source,ptr->duration,ptr->weight);
			    expiration = 1;
	                }
			ptr = ptr->next;
			if(expiration) free(axe);
	
	            }
		    sprintf(buf,"%s||\n",buf);

	//          if(s) fprintf(s,"%d ACCEPTED FLOW from %s:d%d/w%d\n",num,newflow->source,newflow->duration,newflow->weight);
	//	    if(s) fprintf(s,"%s",buf);	

	            load += newflow->weight;
		    delta += newflow->weight;
		    newflow->next = flows;
		    if(newflow->next) newflow->next->last = newflow;
		    flows = newflow;
	            if(f) fprintf(f,"%d\n",load);
	        }
		// ADMISSION CONTROL
		// if we're overloaded, drop it or pass this on to a neighbor, mark it as such
		else if(exts < 5) {
			swaplinks_update_walk_length(swp,1);
			snprintf(msg,64,"%d;%d;%d;",newflow->duration,newflow->weight,exts+1);
			swaplinks_sendtoany(swp,msg,strlen(msg)+1,0);
			swaplinks_update_walk_length(swp,peers);
		}
	}
    }
    if(s) fprintf(s,"%d DONE RECEIVING\n",num);

    pthread_exit(NULL);
}

int main(int argc, char **argv) {
    struct sockaddr_ns reg;
    char tmp[256];
    int time = 0, duration, weight, sendcount = 0;
    pthread_t tid;

    if (argc > 6) {
        num = atoi(argv[1]);
        peers = atoi(argv[2]);
        limit = atoi(argv[3]);
        duration = atoi(argv[4]);
        weight = atoi(argv[5]);
	maxload = atoi(argv[6]);
    }
    else {
        printf("Usage:\n  slinks_variable_load node_number peers_number time_limit flow_duration flow_weight max_load\n");
        exit(1);
    }

    snprintf(tmp, sizeof(tmp), "test%d", num);
    nutss_config_set(NUTSS_CONFIG_USERNAME, tmp, strlen(tmp));
    nutss_config_set(NUTSS_CONFIG_PROXYUSERNAME, tmp, strlen(tmp));
    nutss_config_set(NUTSS_CONFIG_PROXYPASSWORD, tmp, strlen(tmp));
    strncpy(tmp, "nutss.net", sizeof(tmp));
    nutss_config_set(NUTSS_CONFIG_DOMAINNAME, tmp, strlen(tmp));
    strncpy(tmp, "sip.nutss.net:5060", sizeof(tmp));
    nutss_config_set(NUTSS_CONFIG_SIGPROXY, tmp, strlen(tmp));

    sprintf(tmp, "output/data_node%d.txt", num); f = fopen(tmp,"w");
    sprintf(tmp, "output/output_node%d.txt", num); s = fopen(tmp,"w");
    if (f == NULL) f = stderr;
    if (s == NULL) s = stdout;

    memset(&reg, 0, sizeof(reg));
    reg.family = AF_NUTSS;
    strncpy(reg.user, "ths1", sizeof(reg.user));
    strncpy(reg.domain, "nutss.net", sizeof(reg.domain));
    strncpy(reg.service, "swaplinksd", sizeof(reg.service));

    swaplinks_init();
    swp = swaplinks_new("cloud9", &reg, peers, peers);
    
    // let swaplinks get going.
    sleep(10);
    
    pthread_create(&tid, NULL, recvthread, swp);

    sleep(1);

    snprintf(tmp, sizeof(tmp), "%d;%d;0;", duration, weight);        

    while (time++ < 2*limit) { // (i-- != 0) {
        swaplinks_sendtoany(swp, tmp, strlen(tmp)+1, 0);
        sleep(5);
    }

    if(s) fprintf(s,"%d DONE SENDING\n",num);

    pthread_join(tid,NULL);

    if(f) fclose(f);
    if(s) fclose(s);

    if(num == 1) system("echo \" \" | mail -s\"experiment done!\" ths22@cs.cornell.edu");

    return 0;
}

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>
 

GForge.cis.cornell.edu is brought to you by

Cornell Computing and Information Science


Powered By GForge Collaborative Development Environment