OS Signaling

talhabeg95
Lab06.zip

list.c

#include <stdio.h> #include "list.h" /* list helper functions */ int list_size(thread_info_list *list) { int cnt = 0; if (!list) return -1; pthread_mutex_lock(&list->lock); list_elem *le = list->head; while (le) { cnt++; le = le->next; } pthread_mutex_unlock(&list->lock); return cnt; } int list_insert_head(thread_info_list *list, list_elem *new) { if (!list || !new) return -1; pthread_mutex_lock(&list->lock); new->next = list->head; new->prev = 0; if (new->next) { new->next->prev = new; } list->head = new; if (list->tail == 0) { list->tail = new; } pthread_mutex_unlock(&list->lock); return 0; } int list_insert_tail(thread_info_list *list, list_elem *new) { if (!list || !new) return -1; pthread_mutex_lock(&list->lock); new->prev = list->tail; new->next = 0; if (new->prev) { new->prev->next = new; } list->tail = new; if (list->head == 0) { list->head = new; } pthread_mutex_unlock(&list->lock); return 0; } int list_remove(thread_info_list *list, list_elem *old) { if (!old || !list) return -1; pthread_mutex_lock(&list->lock); if (old->next) { old->next->prev = old->prev; } if (old->prev) { old->prev->next = old->next; } if (list->tail == old) { list->tail = old->prev; } if (list->head == old) { list->head = old->next; } old->next = old->prev = 0; pthread_mutex_unlock(&list->lock); return 0; } void print_list(thread_info_list *list) { pthread_mutex_lock(&list->lock); list_elem *le = list->head; while (le) { printf("0x%X,", (unsigned int)le->info); le = le->next; } pthread_mutex_unlock(&list->lock); printf("\n"); }

__MACOSX/._list.c

list.h

#ifndef __LIST_H_ #define __LIST_H_ #include <pthread.h> typedef struct list_elem { struct list_elem *prev; struct list_elem *next; void *info; } list_elem; typedef struct thread_info_list { list_elem *head; list_elem *tail; pthread_mutex_t lock; } thread_info_list; int list_size(thread_info_list *list); int list_insert_head(thread_info_list *list, list_elem *new); int list_insert_tail(thread_info_list *list, list_elem *new); int list_remove(thread_info_list *list, list_elem *new); void print_list(thread_info_list *list); #endif /* __LIST_H_ */

__MACOSX/._list.h

Makefile

CC = gcc CCOPTS = -c -g -Wall LINKOPTS = -g -lpthread -lrt -Wall EXEC=scheduler OBJECTS=scheduler.o worker.o list.o all: $(EXEC) $(EXEC): $(OBJECTS) $(CC) $(LINKOPTS) -o $@ $^ %.o:%.c $(CC) $(CCOPTS) -o $@ $^ clean: - $(RM) $(EXEC) - $(RM) $(OBJECTS) - $(RM) *~ - $(RM) core.* test: scheduler - ./scheduler -test -f0 rr - killall -q -KILL scheduler; true pretty: indent *.c *.h -kr

__MACOSX/._Makefile

scheduler.c

#include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #include <errno.h> #include <time.h> #include <signal.h> #include "scheduler.h" #include "worker.h" /* * define the extern global variables here. */ sem_t queue_sem; /* semaphore for scheduler queue */ thread_info_list sched_queue; /* list of current workers */ static int quit = 0; static timer_t timer; static thread_info_t *currentThread= 0; static long wait_times; static long run_times; static int completed = 0; static int thread_count = 0; static void exit_error(int); /* helper function. */ static void wait_for_queue(); /******************************************************************************* * * Implement these functions. * ******************************************************************************/ /* * This function intializes the queue semaphore and the queue itself. */ /* * Update the worker's current running time. * This function is called every time the thread is suspended. */ void update_run_time(thread_info_t *info) { /* TODO: implement this function */ } /* * Update the worker's current waiting time. * This function is called every time the thread resumes. */ void update_wait_time(thread_info_t *info) { /* TODO: implement this function */ } static void init_sched_queue(int queue_size) { /* set up a semaphore to restrict access to the queue */ sem_init(&queue_sem, 0, queue_size); /* initialize the scheduler queue */ sched_queue.head = sched_queue.tail = 0; pthread_mutex_init(&sched_queue.lock, NULL); /* initialize the timer */ if(timer_create(CLOCK_REALTIME,NULL,&timer)==-1) { perror("Can't create timer."); exit(-1); } } /* * signal a worker thread that it can resume. */ static void resume_worker(thread_info_t *info) { printf("Scheduler: resuming %lu.\n", info->thrid); /* * TODO: signal the worker thread that it can resume */ /* update the wait time for the thread */ update_wait_time(info); } /*send a signal to the thread, telling it to kill itself*/ void cancel_worker(thread_info_t *info) { /* TODO: send a signal to the thread, telling it to kill itself*/ /* Update global wait and run time info */ wait_times += info->wait_time; run_times += info->run_time; completed++; /* Update schedule queue */ leave_scheduler_queue(info); if (completed >= thread_count) { sched_yield(); /* Let other threads terminate. */ printf("The total wait time is %f seconds.\n", (float)wait_times / 1000000); printf("The total run time is %f seconds.\n", (float)run_times / 1000000); printf("The average wait time is %f seconds.\n", (float)wait_times / 1000000 / thread_count); printf("The average run time is %f seconds.\n", (float)run_times / 1000000 / thread_count); } } /* * signals a worker thread that it should suspend. */ static void suspend_worker(thread_info_t *info) { int whatgoeshere = 0; printf("Scheduler: suspending %lu.\n", info->thrid); /*update the run time for the thread*/ update_run_time(info); /* TODO: Update quanta remaining. */ /* TODO: decide whether to cancel or suspend thread */ if(whatgoeshere) { /* * Thread still running: suspend. * TODO: Signal the worker thread that it should suspend. */ /* Update Schedule queue */ list_remove(&sched_queue,info->le); list_insert_tail(&sched_queue,info->le); } else { /* Thread done: cancel */ cancel_worker(info); } } /* * this is the scheduling algorithm * pick the next worker thread from the available list * you may need to add new information to the thread_info struct */ static thread_info_t *next_worker() { if (completed >= thread_count) return 0; wait_for_queue(); printf("Scheduler: scheduling.\n"); /* return the thread_info_t for the next thread to run */ return sched_queue.head->info; } void timer_handler() { thread_info_t *info = 0; /* once the last worker has been removed, we're done. */ if (list_size(&sched_queue) == 0) { quit = 1; return; } /*suspend the current worker*/ if (currentThread) suspend_worker(currentThread); //resume the next worker info = next_worker(); /* Update currentThread */ currentThread = info; if (info) resume_worker(info); else quit = 1; } /* * Set up the signal handlers for SIGALRM, SIGUSR1, and SIGTERM. * TODO: Implement this function. */ void setup_sig_handlers() { struct sigaction act; act.sa_flags = 0; /* Setup timer handler for SIGALRM signal in threads */ act.sa_handler = timer_handler; act.sa_flags = 0; if (sigemptyset(&act.sa_mask) || sigaction(SIGALRM, &act, NULL)) { fprintf(stderr,"Failed to set up SIGALRM handler.\n"); exit(-1); } /* Setup cancel handler for SIGTERM signal in threads */ act.sa_handler = cancel_thread; if (sigaction(SIGTERM, &act, NULL)) { fprintf(stderr,"Failed to set up SIGTERM handler.\n"); exit(-1); } /* Setup suspend handler for SIGUSR1 signal in threads */ act.sa_handler = suspend_thread; if (sigaction(SIGUSR1, &act, NULL)) { fprintf(stderr,"Failed to set up SIGUSR1 handler.\n"); exit(-1); } } /******************************************************************************* * * * ******************************************************************************/ /* * waits until there are workers in the scheduling queue. */ static void wait_for_queue() { while(!list_size(&sched_queue)) { printf("Scheduler: waiting for workers.\n"); sched_yield(); } } /* * runs at the end of the program just before exit. */ static void clean_up() { /* * destroy any mutexes/condition variables/semaphores that were created. * free any malloc'd memory not already free'd */ sem_destroy(&queue_sem); pthread_mutex_destroy(&sched_queue.lock); } /* * prints an error summary and exits. */ static void exit_error(int err_num) { fprintf(stderr, "failure: %s\n", strerror(err_num)); exit(1); } /* * creates the worker threads. */ static void create_workers(int thread_count, int *quanta) { int i = 0; int err = 0; for (i = 0; i < thread_count; i++) { thread_info_t *info = (thread_info_t *) malloc(sizeof(thread_info_t)); info->quanta = quanta[i]; if ((err = pthread_create(&info->thrid, NULL, start_worker, (void *)info)) != 0) { exit_error(err); } printf("Main: detaching worker thread %lu.\n", info->thrid); pthread_detach(info->thrid); /* TODO: initialize the time variables for each thread for performance evalution*/ } } /* * runs the scheduler. */ static void *scheduler_run(void *unused) { wait_for_queue(); /* TODO: start the timer */ /*keep the scheduler thread alive*/ while( !quit ) sched_yield(); return NULL; } /* * starts the scheduler. * returns 0 on success or exits program on failure. */ static int start_scheduler(pthread_t *thrid) { int err = 0; if ((err = pthread_create(thrid, NULL, scheduler_run, 0)) != 0) { exit_error(err); } return err; } /* * reads the command line arguments and starts the scheduler & worker threads. */ int main(int argc, const char **argv) { int queue_size = 0; int ret_val = 0; int *quanta,i; pthread_t sched_thread; /* check the arguments. */ if (argc < 3) { print_help(argv[0]); exit(0); } thread_count = atoi(argv[1]); queue_size = atoi(argv[2]); quanta = (int*)malloc(sizeof(int)*thread_count); if (argc != 3 + thread_count) { print_help(argv[0]); exit(0); } for ( i = 0; i < thread_count; i++) quanta[i] = atoi(argv[i+3]); printf("Main: running %d workers with queue size %d for quanta:\n", thread_count, queue_size); for ( i = 0; i < thread_count; i++) printf(" %d", quanta[i]); printf("\n"); /*setup the sig handlers for scheduler and workers*/ setup_sig_handlers(); /* initialize anything that needs to be done for the scheduler queue. */ init_sched_queue(queue_size); /* creates a thread for the scheduler. */ start_scheduler(&sched_thread); /* creates the worker threads and returns. */ create_workers(thread_count, quanta); /* wait for scheduler to finish */ printf("Main: waiting for scheduler %lu.\n", sched_thread); pthread_join(sched_thread, (void **) &ret_val); /* clean up our resources */ clean_up(); /* this will wait for all other threads */ pthread_exit(0); } long time_difference(const struct timespec *time1, const struct timespec *time2) { return (time1->tv_sec - time2->tv_sec) * 1000000 + (time1->tv_nsec - time2->tv_nsec) / 1000; }

__MACOSX/._scheduler.c

scheduler.h

#ifndef __SCHEDULER_H_ #define __SCHEDULER_H_ #include <pthread.h> #include <semaphore.h> #include "list.h" #define QUANTUM 2 /* typedefs */ typedef struct thread_info { pthread_t thrid; int quanta; list_elem* le; /*added for evalution bookkeeping*/ struct timespec suspend_time; struct timespec resume_time; long wait_time; long run_time; } thread_info_t; /* functions */ void *start_worker(void *); long time_difference(const struct timespec*, const struct timespec*); int smp5_main(int argc, const char** argv); /* shared variables */ extern sem_t queue_sem; /* semaphore for scheduler queue */ extern thread_info_list sched_queue; /* list of current workers */ #endif /* __SCHEDULER_H_ */

__MACOSX/._scheduler.h

worker.c

#include <stdio.h> #include <errno.h> #include <stdlib.h> #include <string.h> #include <signal.h> #include "scheduler.h" /* Handler for SIGTERM signal */ void cancel_thread() { printf("Thread %u: terminating.\n",(unsigned int) pthread_self()); /* signal that done in queue */ sem_post(&queue_sem); pthread_exit(NULL); } /* Handle the SIGUSR1 signal */ void suspend_thread() { sigset_t mask; int signal; printf("Thread %u: suspending.\n", (unsigned int)pthread_self()); /*wait for a resume signal from the scheduler*/ if(sigemptyset(&mask) || sigaddset(&mask,SIGUSR2)) { perror("Can't create signal mask."); pthread_exit(NULL); } sigwait(&mask, &signal); printf("Thread %u: resuming.\n",(unsigned int) pthread_self()); } /* * waits to gain access to the scheduler queue. */ static int enter_scheduler_queue(thread_info_t *info) { /* * wait for available room in queue. * create a new list entry for this thread * store this thread info in the new entry. */ sem_wait(&queue_sem); list_elem *item = (list_elem*)malloc(sizeof(list_elem)); info->le = item; item->info = info; item->prev = 0; item->next = 0; list_insert_tail(&sched_queue, item); return 0; } /* * leaves the scheduler queue */ void leave_scheduler_queue(thread_info_t *info) { printf("Thread %lu: leaving scheduler queue.\n", info->thrid); /* * remove the given worker from queue * clean up the memory that we malloc'd for the list * clean up the memory that was passed to us */ list_remove(&sched_queue, info->le); free(info->le); free(info); } /* * Initialize thread, enter scheduling queue, and execute instructions. * arg is a pointer to thread_info_t */ void *start_worker(void *arg) { thread_info_t *info = (thread_info_t *) arg; float calc = 0.8; int j = 0; sigset_t sigset; /* TODO: Block SIGALRM and SIGUSR2. */ if(sigemptyset(&sigset) || sigaddset(&sigset,SIGALRM) || sigaddset(&sigset,SIGUSR2)) { perror("Can't initialize blocking signal mask."); } if(pthread_sigmask(SIG_BLOCK,&sigset,NULL) == -1) { perror("Can't block signals."); } /* Unblock SIGUSR1 and SIGTERM. */ if(sigemptyset(&sigset) || sigaddset(&sigset,SIGTERM) || sigaddset(&sigset,SIGUSR1)) { perror("Can't initialize unblocking signal mask."); } if(pthread_sigmask(SIG_UNBLOCK,&sigset,NULL) == -1) { perror("Can't unblock signals."); } /* compete with other threads to enter queue. */ if (enter_scheduler_queue(info)) { printf("Thread %lu: failure entering scheduler queue - %s\n", info->thrid, strerror(errno)); free (info); pthread_exit(0); } printf("Thread %lu: in scheduler queue.\n", info->thrid); suspend_thread(); while (1) { /* do some meaningless work... */ for (j = 0; j < 10000000; j++) { calc = 4.0 * calc * (1.0 - calc); } } }

__MACOSX/._worker.c

worker.h

#ifndef __WORKER_H_ #define __WORKER_H_ void cancel_thread(); void suspend_thread(); void leave_scheduler_queue(thread_info_t*); #endif

__MACOSX/._worker.h