/******************************************************************************/ /********************************* MM-Server **********************************/ /******************************************************************************/ #include #include #include #include #include #include #include #include "mm_prot.h" #define SERVER_PORT 6669 int init(); //############################################################################ inline long time_diff(struct timeval *tv_old) { struct timeval tv_now; gettimeofday(&tv_now,0); return (tv_now.tv_sec-tv_old->tv_sec)*1000000L +tv_now.tv_usec - tv_old->tv_usec; } //############################################################################ int main() { int sock; struct sockaddr_in client; int length; char buf[4096]; long rval,i,j ; fd_set sock_list; struct timeval tv,tv_start_loop,tv_serve; struct timespec nano_tv; mm_client *root,*mmc, *mmtmp; mm_stats stats; hello_pkt *hello; accept_pkt accept; reject_pkt reject; end_pkt end; sock=init(); //get a listening server socket strcpy(accept.tag,"ACCPT"); strcpy(reject.tag,"REJCT"); strcpy(end.tag,"END"); stats.avg_bytes_per_usec=1.0; //is about 8MBit/sec stats.remaining_time=1000000L; //one second stats.algo_time=20000; //estimated for loop code for (;;) { //everlasting server loop gettimeofday(&tv_start_loop,0); //when did the loop start? FD_ZERO(&sock_list); FD_SET(sock, &sock_list); tv.tv_sec = 0; tv.tv_usec = 1; rval = select(sock+1, &sock_list, NULL, NULL, &tv); //check for incoming packets //############## LOOK FOR NEW CLIENTS ##################################### if (FD_ISSET(sock,&sock_list)) { //is there new data? length=sizeof(client); bzero(&buf,sizeof(buf)); if ((rval = recvfrom(sock,&buf,sizeof(buf),0 ,(struct sockaddr*)&client,&length))<0) { perror("recvfrom client message"); } //********** NEW CLIENT *********************************************** if (strcmp(buf,"HELLO")==0) { //there is a hello-packet hello=(hello_pkt*)buf; //treat the buf as a hello_pkt struct //admission control: can the wanted rate be guaranteed? if (hello->rate/stats.avg_bytes_per_usec < (stats.remaining_time - stats.algo_time + stats.avg_adjust)) { //add new node to client list if (!(mmc=(mm_client*)malloc(sizeof(mm_client)) )) perror("new client node"); mmc->prev=NULL; mmc->next=root; if (root) root->prev=mmc; root=mmc; //and fill node struct i=ntohl(client.sin_addr.s_addr); sprintf(mmc->hostname,"%d.%d.%d.%d" ,(unsigned long)i>>24,(unsigned long)i<<8>>24 ,(unsigned long)i<<16>>24,(unsigned long)i<<24>>24); mmc->port=ntohs(client.sin_port); mmc->client=client; strncpy(mmc->file,hello->file,STRLEN); mmc->fpos=hello->fpos; mmc->rate=hello->rate; //get a filehandle on the wanted file at the wanted offset if ((mmc->fp = fopen(mmc->file,"r")) == NULL) perror("Opening client file"); fseek(mmc->fp,mmc->fpos,SEEK_SET); stats.num_clients++; //printf("### New client %s:%d wants %s with %i byte/sec\n" // ,mmc->hostname,mmc->port,mmc->file,mmc->rate); //send the accept packet to the client sendto(sock,&accept,sizeof(accept),0,&client,sizeof(client)); } else { //printf("Client %i rejected.\n",ntohs(client.sin_port)); //send the reject packet to the client sendto(sock,&reject,sizeof(reject),0,&client,sizeof(client)); }//else } }//if new connection //################ SERVE OLD CLIENTS ###################################### stats.sum_bytes=stats.sum_usecs=0; mmc=root; while(mmc) { //ruhn through the list gettimeofday(&tv_serve,0); //serving for client X starts now //get "rate" bytes from disk and send it to client i=mmc->rate; do { rval = fread(&buf,1,MIN(4096,i),mmc->fp); //fill a buffer if (ferror(mmc->fp)) perror("streaming client file"); if (rval>=0) i-=rval; sendto(sock,&buf,rval,0,&(mmc->client),sizeof(mmc->client)); } while ( (i>0) && (rval!=0) ); //either the full rate served, or EOF or err stats.sum_bytes+=mmc->rate-i; //**************** DEQUEUE OLD CLIENT ************************************ if (feof(mmc->fp)) { printf("dequeuing %s:%i | AVG_USECS: %5i SUM_AVG_USECS: %5i SUM_AVG B/us: %2.2f\n" ,mmc->hostname,mmc->port,mmc->avg_usecs,stats.avg_sum_usecs,stats.avg_bytes_per_usec); fflush(NULL); //send end packet to client sendto(sock,&end,sizeof(end),0,&(mmc->client),sizeof(mmc->client)); //delete node from list stats.num_clients--; if (mmc->prev) mmc->prev->next = mmc->next; else { root=mmc->next; if (mmc->next) mmc->next->prev=NULL; } mmtmp=mmc->next; free(mmc); mmc=mmtmp; //**************** SERVE CYCLE FOR THIS CLIENT DONE ********************** } else { mmc->usecs=time_diff(&tv_serve); //how long did this serving take? //average the last servings if (mmc->avg_usecs>0) mmc->avg_usecs = (long)(mmc->avg_usecs*(1-ALPHA) + mmc->usecs*ALPHA); else mmc->avg_usecs = mmc->usecs ; // printf("%s(%i b/s) for %s:%i done in %ld microsecs (AVG: %ld)\n" // ,mmc->file,mmc->rate,mmc->hostname,mmc->port,mmc->usecs,mmc->avg_usecs); mmc=mmc->next; }//else stats.sum_usecs+=time_diff(&tv_serve); //sum of spent time in this cycle }//while further clients in list //################ CLIENTS DONE, SLEEP REMAINING TIME ##################### stats.remaining_time=1000000L - time_diff(&tv_start_loop); //one sec - time spent //time spent on code not concerning client serves stats.algo_time=1000000L - (stats.remaining_time+stats.sum_usecs); if (!root) //only be exact, if there are jobs pending sleep(1); else { //***************** THERE WERE CLIENTS ***************************** stats.avg_bytes_per_usec = (stats.avg_bytes_per_usec*(1.0-ALPHA) + ((float)stats.sum_bytes/(float)stats.sum_usecs)*ALPHA); //printf("### spent %ld to: %ld\n",time_diff(&tv_start_loop) // ,(1000000L + stats.adjust) ); while (( time_diff(&tv_start_loop) < (1000000L + stats.adjust) ) ) { // spend rest of the time i=(1000000L + stats.adjust) - time_diff(&tv_start_loop); nano_tv.tv_sec=0; nano_tv.tv_nsec=i * 1000 ; nanosleep(&nano_tv,NULL); // why does nanosleep return after 20000usecs? linux-timeslice ? // printf("%ld.",nano_tv.tv_nsec); }//while stats.sum_working_time+=(stats.working_time=time_diff(&tv_start_loop)); //get new adjust value to compensate unexact sleeps in the next cycle stats.adjust=(1000000L+stats.adjust) - stats.working_time; //calc average adjust for admission control stats.avg_adjust = (long)(stats.avg_adjust*(1-ALPHA) + stats.adjust*ALPHA); //calc average usecs spent for all clients stats.avg_sum_usecs = (stats.avg_sum_usecs*(1-ALPHA) + stats.sum_usecs*ALPHA); stats.cycles++; /* printf("(%i) COMP.TIME %6ld USECS: %6ld BYTES: %8ld AVG BYTES/USEC %2.2f\n" ,stats.num_clients,stats.algo_time ,stats.sum_usecs,stats.sum_bytes,stats.avg_bytes_per_usec); printf(" THIS: %8ld usecs ADJUST: %6ld AVG_ALL: %10ld usecs\n" ,stats.working_time, stats.adjust, stats.sum_working_time/stats.cycles); */ }//else }//for ever return 0; } //main //############################################################################ int init() { int sock; struct sockaddr_in server; int length; FILE *fp; //writing the server-pid to a file fp=fopen("mm_server.pid","w"); fprintf(fp,"%i\n",getpid()); fclose(fp); /*---------------------------- Create socket -------------------------------*/ sock = socket(AF_INET, SOCK_DGRAM, 0); if (sock < 0) { perror("opening dgram socket"); exit(1); } /*---------------------- Name socket using wildcards -----------------------*/ server.sin_family = AF_INET; server.sin_addr.s_addr = INADDR_ANY; server.sin_port = ntohs(SERVER_PORT); if (bind(sock, (struct sockaddr *) &server, sizeof(server))) { perror("binding stream socket"); exit(1); } /*--------------- Find out assigned port number and print it out -----------*/ length = sizeof(server); if (getsockname(sock, (struct sockaddr *) &server, &length)) { perror("getting socket name"); exit(1); } //printf("Socket has port #%d\n", ntohs(server.sin_port)); /*------------------------- Start accepting connections --------------------*/ listen(sock, 25); return sock; } //init