/******************************************************************************/
/********************************* MM-Server **********************************/ 
/******************************************************************************/ 

#include <sys/types.h>
#include <sys/socket.h> 
#include <sys/time.h>
#include <netinet/in.h>
#include <errno.h> 
#include <netdb.h> 
#include <stdio.h>
#include "mm_prot.h"

#define SERVER_PORT 6669


 int init();

//############################################################################
inline long time_diff(struct timeval *tv_old) {
  struct timeval tv_now;
 
  gettimeofday(&tv_now,0);
  return (tv_now.tv_sec-tv_old->tv_sec)*1000000L
          +tv_now.tv_usec - tv_old->tv_usec; 
}


//############################################################################
 int main() {
  int                sock;
  struct sockaddr_in client;
  int                length;
  char               buf[4096];
  long               rval,i,j ;
  fd_set             sock_list;
  struct timeval     tv,tv_start_loop,tv_serve;
  struct timespec    nano_tv;
  mm_client          *root,*mmc, *mmtmp;
  mm_stats           stats;
  hello_pkt          *hello;
  accept_pkt         accept;
  reject_pkt         reject;
  end_pkt            end;


  sock=init();                    //get a listening server socket
  strcpy(accept.tag,"ACCPT");
  strcpy(reject.tag,"REJCT");
  strcpy(end.tag,"END");
  

  stats.avg_bytes_per_usec=1.0;   //is about 8MBit/sec
  stats.remaining_time=1000000L;  //one second
  stats.algo_time=20000;          //estimated for loop code

  for (;;) { //everlasting server loop
     gettimeofday(&tv_start_loop,0);  //when did the loop start?

    FD_ZERO(&sock_list);
    FD_SET(sock, &sock_list);
    tv.tv_sec = 0;
    tv.tv_usec = 1; 
    rval = select(sock+1, &sock_list, NULL, NULL, &tv);  //check for incoming packets

    //############## LOOK FOR NEW CLIENTS #####################################
    if (FD_ISSET(sock,&sock_list)) { //is there new data?
        length=sizeof(client);
        bzero(&buf,sizeof(buf));
        if ((rval = recvfrom(sock,&buf,sizeof(buf),0
			     ,(struct sockaddr*)&client,&length))<0) {
           perror("recvfrom client message");
	}


        //********** NEW CLIENT ***********************************************
        if (strcmp(buf,"HELLO")==0) {   //there is a hello-packet
          hello=(hello_pkt*)buf; //treat the buf as a hello_pkt struct
          //admission control: can the wanted rate be guaranteed?
          if (hello->rate/stats.avg_bytes_per_usec < (stats.remaining_time 
                                      - stats.algo_time + stats.avg_adjust)) {

            //add new node to client list
            if (!(mmc=(mm_client*)malloc(sizeof(mm_client)) ))
              perror("new client node");
            mmc->prev=NULL;
            mmc->next=root;
            if (root) root->prev=mmc;
            root=mmc;		

            //and fill node struct
            i=ntohl(client.sin_addr.s_addr);
            sprintf(mmc->hostname,"%d.%d.%d.%d"
                     ,(unsigned long)i>>24,(unsigned long)i<<8>>24
                     ,(unsigned long)i<<16>>24,(unsigned long)i<<24>>24);
            mmc->port=ntohs(client.sin_port);
            mmc->client=client;
            strncpy(mmc->file,hello->file,STRLEN);
            mmc->fpos=hello->fpos;
            mmc->rate=hello->rate;

            //get a filehandle on the wanted file at the wanted offset
            if ((mmc->fp = fopen(mmc->file,"r")) == NULL)
              perror("Opening client file");
            fseek(mmc->fp,mmc->fpos,SEEK_SET);

            stats.num_clients++;
	    //printf("### New client %s:%d wants %s with %i byte/sec\n"
	    //  ,mmc->hostname,mmc->port,mmc->file,mmc->rate);

            //send the accept packet to the client
            sendto(sock,&accept,sizeof(accept),0,&client,sizeof(client));
 	  } else {
            //printf("Client %i rejected.\n",ntohs(client.sin_port));
           
            //send the reject packet to the client
            sendto(sock,&reject,sizeof(reject),0,&client,sizeof(client));
	  }//else
	}
    }//if new connection

 

    //################ SERVE OLD CLIENTS ######################################

    stats.sum_bytes=stats.sum_usecs=0;
    mmc=root;
    while(mmc) { //ruhn through the list
     gettimeofday(&tv_serve,0); //serving for client X starts now
 
     //get "rate" bytes from disk and send it to client
     i=mmc->rate;
     do {
       rval = fread(&buf,1,MIN(4096,i),mmc->fp); //fill a buffer
      if (ferror(mmc->fp)) perror("streaming client file");
      if (rval>=0) i-=rval;
      sendto(sock,&buf,rval,0,&(mmc->client),sizeof(mmc->client));
     } while ( (i>0) && (rval!=0) ); //either the full rate served, or EOF or err
     stats.sum_bytes+=mmc->rate-i;

     //**************** DEQUEUE OLD CLIENT ************************************
     if (feof(mmc->fp)) {
       printf("dequeuing %s:%i  | AVG_USECS: %5i  SUM_AVG_USECS: %5i SUM_AVG B/us: %2.2f\n"
      ,mmc->hostname,mmc->port,mmc->avg_usecs,stats.avg_sum_usecs,stats.avg_bytes_per_usec);
       fflush(NULL);
       //send end packet to client
       sendto(sock,&end,sizeof(end),0,&(mmc->client),sizeof(mmc->client));

       //delete node from list
       stats.num_clients--;
       if (mmc->prev)
         mmc->prev->next = mmc->next;
       else {
         root=mmc->next;
         if (mmc->next) mmc->next->prev=NULL;
       }
       mmtmp=mmc->next;
       free(mmc);
       mmc=mmtmp;

     //**************** SERVE CYCLE FOR THIS CLIENT DONE **********************
     } else {
       mmc->usecs=time_diff(&tv_serve);  //how long did this serving take?
       //average the last servings
       if (mmc->avg_usecs>0) 
          mmc->avg_usecs = (long)(mmc->avg_usecs*(1-ALPHA) + mmc->usecs*ALPHA);
       else 
          mmc->avg_usecs = mmc->usecs ;

       //       printf("%s(%i b/s) for %s:%i done in %ld microsecs (AVG: %ld)\n"
       //   ,mmc->file,mmc->rate,mmc->hostname,mmc->port,mmc->usecs,mmc->avg_usecs);
       mmc=mmc->next;
     }//else
 
     stats.sum_usecs+=time_diff(&tv_serve); //sum of spent time in this cycle
    }//while further clients in list



    //################ CLIENTS DONE, SLEEP REMAINING TIME #####################

    stats.remaining_time=1000000L - time_diff(&tv_start_loop); //one sec - time spent
    //time spent on code not concerning client serves
    stats.algo_time=1000000L - (stats.remaining_time+stats.sum_usecs); 

    if (!root) //only be exact, if there are jobs pending
      sleep(1);
    else { //***************** THERE WERE CLIENTS *****************************
      stats.avg_bytes_per_usec = (stats.avg_bytes_per_usec*(1.0-ALPHA) 
                    + ((float)stats.sum_bytes/(float)stats.sum_usecs)*ALPHA);

      //printf("### spent %ld   to: %ld\n",time_diff(&tv_start_loop)
      //                                  ,(1000000L + stats.adjust) );
      while (( time_diff(&tv_start_loop) < (1000000L + stats.adjust)  ) ) {
        // spend rest of the time
        i=(1000000L  + stats.adjust) - time_diff(&tv_start_loop);
        nano_tv.tv_sec=0;
        nano_tv.tv_nsec=i * 1000 ;
        nanosleep(&nano_tv,NULL);
        // why does nanosleep return after 20000usecs? linux-timeslice ?
        // printf("%ld.",nano_tv.tv_nsec);
      }//while
     
       
      stats.sum_working_time+=(stats.working_time=time_diff(&tv_start_loop));
      //get new adjust value to compensate unexact sleeps in the next cycle
      stats.adjust=(1000000L+stats.adjust) - stats.working_time;
      //calc average adjust for admission control
      stats.avg_adjust = (long)(stats.avg_adjust*(1-ALPHA) + stats.adjust*ALPHA);
      //calc average usecs spent for all clients
      stats.avg_sum_usecs = (stats.avg_sum_usecs*(1-ALPHA) + stats.sum_usecs*ALPHA);
      stats.cycles++;
      /*
      printf("(%i) COMP.TIME %6ld  USECS: %6ld  BYTES: %8ld  AVG BYTES/USEC %2.2f\n"
           ,stats.num_clients,stats.algo_time
           ,stats.sum_usecs,stats.sum_bytes,stats.avg_bytes_per_usec);
      printf("     THIS: %8ld usecs  ADJUST: %6ld  AVG_ALL: %10ld usecs\n"
         ,stats.working_time, stats.adjust, stats.sum_working_time/stats.cycles);
      */
    }//else
    
   }//for ever

   return 0;
} //main



//############################################################################
int init() {   
  int sock;
  struct sockaddr_in server;
  int                length;
  FILE               *fp;

  
  //writing the server-pid to a file
  fp=fopen("mm_server.pid","w");
  fprintf(fp,"%i\n",getpid());
  fclose(fp);

  /*---------------------------- Create socket -------------------------------*/
  sock = socket(AF_INET, SOCK_DGRAM, 0);
   if (sock < 0) {
    perror("opening dgram socket");
     exit(1);
  }

  /*---------------------- Name socket using wildcards -----------------------*/
  server.sin_family      = AF_INET;
  server.sin_addr.s_addr = INADDR_ANY;
  server.sin_port        = ntohs(SERVER_PORT);
  if (bind(sock, (struct sockaddr *) &server, sizeof(server))) {
    perror("binding stream socket");
    exit(1);
  }

  /*--------------- Find out assigned port number and print it out -----------*/
  length = sizeof(server);
  if (getsockname(sock, (struct sockaddr *) &server, &length)) {
    perror("getting socket name");
    exit(1);
  }

   //printf("Socket has port #%d\n", ntohs(server.sin_port));

   /*------------------------- Start accepting connections --------------------*/
  listen(sock, 25);
  return sock;
} //init


