Log In
New Account
  
 
Home My Page Project Tree Code Snippets Project Openings NUTSS
 
 
Summary Tracker Lists CVS Files
 

CVS | Administration


File:  [GForge:cvsroot/nutss] / libnutss / examples / slinks_variable_load.c
Revision 1.14: download - view: text, annotated - select for diffs
Sat Jun 30 21:57:25 2007 UTC (10 years, 2 months ago) by ths
Branches: MAIN
CVS tags: HEAD
more tweaks to the experiment

#include <stdio.h>

#include <stdlib.h>

#include <string.h>

#ifndef WIN32

#include <poll.h>

#include <sys/socket.h>

#endif

#include <pthread.h>

#include "../include/swaplinks.h"



#ifdef WIN32

#define sleep(sec) Sleep((sec)*1000)

#define snprintf   _snprintf

#endif



#define assert(p) if(!(p)) { printf("Assertion '%s' FAILED!\n", #p); exit(1); }



typedef struct flow {

    int duration;

    int weight;

    char source[256];

    struct flow *next, *last;

} flow_t;



int num, peers, limit, maxload;

swaplinks_p swp;

FILE *f = NULL, *s = NULL;



void *recvthread(void *arg) {

    swaplinks_p swp = (swaplinks_p)arg;

    struct sockaddr_ns peer;

    socklen_t len = sizeof(peer);

    flow_t *flows = NULL, *newflow = NULL, *ptr = NULL, *axe = NULL;

    char msg[64], tmp[128], buf[2048];

    int time = 0, load = 0, busy = 0, exts, delta, expiration;



    buf[0] = 0;



    while(time < limit) {

		sleep(1);

		if(swaplinks_recvfrom(swp, tmp, sizeof(tmp), 0, &peer, &len) > 0) {

			time++; delta = 0;

			if(s) fprintf(s,"      %d recvd request %d\n",num,time);

			newflow = (flow_t*)calloc(1,sizeof(flow_t));

			newflow->duration = atoi(strtok(tmp,";"));

			newflow->weight = atoi(strtok(NULL,";"));

			exts = atoi(strtok(NULL,";"));

			if(load < maxload) {

				strncpy(newflow->source,peer.user+4,sizeof(newflow->source));

				snprintf(buf,2048,"->%d:%d<->",newflow->duration,newflow->weight);

				for(ptr = flows; ptr;) {

					sprintf(buf,"%s%d:%d<->",buf,ptr->duration,ptr->weight);

					expiration = 0; axe = NULL;

					if(--ptr->duration == 0) {

						axe = ptr;

						load -= ptr->weight;

						delta -= ptr->weight;



						if(ptr->last && ptr->next) {

							assert(flows != ptr);

							ptr->last->next = ptr->next;

							ptr->next->last = ptr->last;

						}

						else if(ptr->next) {

							assert(flows == ptr);

							flows = ptr->next;

						}



						if(flows) flows->last = NULL;

						else if(ptr->last) {

							assert(flows != ptr);

							ptr->last->next = NULL;

						}

						else {

							assert(flows == ptr);

							flows = NULL;

						}



		//		    if(s) fprintf(s,"%d EXPIRED FLOW from %s:d%d/w%d\n",num,ptr->source,ptr->duration,ptr->weight);

						expiration = 1;

					}

					ptr = ptr->next;

					if(expiration) free(axe);

				}

				sprintf(buf,"%s||\n",buf);



		//          if(s) fprintf(s,"%d ACCEPTED FLOW from %s:d%d/w%d\n",num,newflow->source,newflow->duration,newflow->weight);

		//	    if(s) fprintf(s,"%s",buf);



				load += newflow->weight;

				delta += newflow->weight;

				newflow->next = flows;

				if(newflow->next) newflow->next->last = newflow;

				flows = newflow;

				if(f) fprintf(f,"%d\n",load);

			}

			// ADMISSION CONTROL

			// if we're overloaded, drop it if its been pushed too far,

			// or pass on to a neighbor and mark it as such if not

			else if(exts < 5) {

				swaplinks_update_walk_length(swp,1);

				snprintf(msg,64,"%d;%d;%d;",newflow->duration,newflow->weight,exts+1);

				swaplinks_sendtoany(swp,msg,strlen(msg)+1,0);

				swaplinks_update_walk_length(swp,peers);

			}

		}

    }

    if(s) fprintf(s,"%d DONE RECEIVING\n",num);



    if(f && f != stderr) { fclose(f); f = NULL; }

    if(s && s != stdout) { fclose(s); s = NULL; }



    pthread_exit(NULL);

}



int main(int argc, char **argv) {

    struct sockaddr_ns reg;

    char tmp[256];

    int time = 0, duration, weight, sendcount = 0;

    pthread_t tid;



    if (argc > 6) {

        num = atoi(argv[1]);

        peers = atoi(argv[2]);

        limit = atoi(argv[3]);

        duration = atoi(argv[4]);

        weight = atoi(argv[5]);

		maxload = atoi(argv[6]);

    }

    else {

        printf("Usage:\n  slinks_variable_load node_number peers_number time_limit flow_duration flow_weight max_load\n");

        exit(1);

    }



    snprintf(tmp, sizeof(tmp), "test%d", num);

    nutss_config_set(NUTSS_CONFIG_USERNAME, tmp, strlen(tmp));

    nutss_config_set(NUTSS_CONFIG_PROXYUSERNAME, tmp, strlen(tmp));

    nutss_config_set(NUTSS_CONFIG_PROXYPASSWORD, tmp, strlen(tmp));

    strncpy(tmp, "nutss.net", sizeof(tmp));

    nutss_config_set(NUTSS_CONFIG_DOMAINNAME, tmp, strlen(tmp));

    strncpy(tmp, "sip.nutss.net:5060", sizeof(tmp));

    nutss_config_set(NUTSS_CONFIG_SIGPROXY, tmp, strlen(tmp));



    sprintf(tmp, "output/data_node%d.txt", num); f = fopen(tmp,"w");

//    sprintf(tmp, "output/output_node%d.txt", num); s = fopen(tmp,"w");

    if (f == NULL) f = stderr;

    if (s == NULL) s = stdout;



    memset(&reg, 0, sizeof(reg));

    reg.family = AF_NUTSS;

    strncpy(reg.user, "ths1", sizeof(reg.user));

    strncpy(reg.domain, "nutss.net", sizeof(reg.domain));

    strncpy(reg.service, "swaplinksd", sizeof(reg.service));



    swaplinks_init();

    swp = swaplinks_new("cloud9", &reg, peers, peers);



    // let swaplinks get going.

    sleep(10);



    pthread_create(&tid, NULL, recvthread, swp);



    sleep(1);



    snprintf(tmp, sizeof(tmp), "%d;%d;0;", duration, weight);



    while (time++ < 4*limit) { // (i-- != 0) {

        swaplinks_sendtoany(swp, tmp, strlen(tmp)+1, 0);

        if(s) fprintf(s,"   %d sent request %d\n",num,time);

        sleep(1);

    }



	if(s) fprintf(s,"%d DONE SENDING\n",num);



	pthread_join(tid, NULL);



    if(s) fprintf(s,"%d SHUTTING DOWN\n",num);



    if(num == 1) system("echo \" \" | mail -s\"node one finished sending!\" ths22@cs.cornell.edu");



    return 0;

}


FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>
 

GForge.cis.cornell.edu is brought to you by

Cornell Computing and Information Science


Powered By GForge Collaborative Development Environment