From 37cee047187c3fef8b3aa5200edbf0dd8eb47bc6 Mon Sep 17 00:00:00 2001 From: "Michael D. Lowis" Date: Thu, 8 Sep 2022 12:58:23 -0400 Subject: [PATCH] working version multithreaded with a single queue --- tasks.c | 268 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 268 insertions(+) create mode 100644 tasks.c diff --git a/tasks.c b/tasks.c new file mode 100644 index 0000000..2cbcfd8 --- /dev/null +++ b/tasks.c @@ -0,0 +1,268 @@ +#include +#include +#include +#include +#include +#include + +/* TODO: + +* Recycle dead tasks +* implement message passing +* implement task-local heap + +*/ + +typedef struct Task_T { + long* stack_top; // top of tasks's stack + long* stack_base; // allocated memory for stack + struct Task_T* next; // pointer to next task +} Task_T; + +typedef struct { + Task_T* head; + Task_T* tail; +} TaskQueue_T; + +typedef struct { + Task_T* task; + Task_T* idle; + pthread_t thread; +} CpuState_T; + +/*************************************** + State Variables +***************************************/ +static pthread_mutex_t ScheduleLock; +static pthread_cond_t ScheduleCond; +static __thread int CpuID = 0; +static TaskQueue_T ReadyQueue = {0}; +static TaskQueue_T DeadQueue = {0}; +static CpuState_T* Running = NULL; +static int CpuCount; + +/*************************************** + Lopck Operations +***************************************/ +static void AcquireLock(void) +{ + pthread_mutex_lock(&ScheduleLock); +} + +static void ReleaseLock(void) +{ + pthread_mutex_unlock(&ScheduleLock); +} + +/*************************************** + Queue Operations +***************************************/ + +static void Enqueue(TaskQueue_T* queue, Task_T* task) +{ + if (task) + { + if (queue->tail) + { + queue->tail->next = task; + task->next = NULL; + } + queue->tail = task; + if (!queue->head) + { + queue->head = queue->tail; + } + } +} + +static Task_T* Dequeue(TaskQueue_T* queue) +{ + Task_T* task = queue->head; + if (task) + { + queue->head = queue->head->next; + if (!queue->head) + { + queue->tail = NULL; + } + } + return task; +} + +/*************************************** + Ready Queue Operations +***************************************/ +// Should eventually handle prioritization + +static void Enter(Task_T* task) +{ + if (task) + { + Enqueue(&ReadyQueue, task); + pthread_cond_signal(&ScheduleCond); + } +} + +static Task_T* Select(void) +{ + pthread_cond_signal(&ScheduleCond); + return Dequeue(&ReadyQueue); +} + +/*************************************** + Task Switching Operations +***************************************/ + +extern void SwitchTo(Task_T* prev, Task_T* next); + +__asm__( +"SwitchTo:\n" +" push %rdi\n" +" push %rbp\n" +" push %rbx\n" +" push %r12\n" +" push %r13\n" +" push %r14\n" +" push %r15\n" +" mov %rsp,(%rdi)\n" +" mov (%rsi),%rsp\n" +" pop %r15\n" +" pop %r14\n" +" pop %r13\n" +" pop %r12\n" +" pop %rbx\n" +" pop %rbp\n" +" pop %rdi\n" +" ret\n" +); + +static void Yield(void) +{ + AcquireLock(); + Task_T* next = Select(); + if (next) + { + puts("found task"); + Task_T* prev = Running[CpuID].task; + Enter(prev); + Running[CpuID].task = next; + prev = (prev ? prev : &(Task_T){0}); + ReleaseLock(); + SwitchTo(prev, next); + } + else + { + ReleaseLock(); + } +} + +void FinalizeTask(void) +{ + /* mark the current task as dead */ + AcquireLock(); + Running[CpuID].task = NULL; + pthread_cond_signal(&ScheduleCond); +// Recycle the task somehow here... + ReleaseLock(); + + /* now we loop until there is a task to jump to */ + while(1) + { + Yield(); + AcquireLock(); + pthread_cond_wait(&ScheduleCond, &ScheduleLock); + ReleaseLock(); + } +} + +void CreateTask(void (*task_fn)(void*), void* arg, int stacksize) +{ + if (stacksize == 0) { stacksize = 32768; } + Task_T* task = calloc(1, sizeof(Task_T)); + task->stack_base = calloc(stacksize/sizeof(long), sizeof(long)); + task->stack_top = &task->stack_base[stacksize/sizeof(long)-1]; // top of stack + *(--task->stack_top) = (long)FinalizeTask; // coroutine cleanup + *(--task->stack_top) = (long)task_fn; // user's function to run (rop style!) + *(--task->stack_top) = (long)arg; // user's function argument (rdi) + for (int saved = 0; saved < 6; saved++) + { + *(--task->stack_top) = 0xdeadbeef; // initial values for saved registers + } + printf("created task %p\n", task); + + /* put task in the ready queue */ + AcquireLock(); + Enter(task); + ReleaseLock(); +} + + + +static void* CpuMain(void* arg) +{ + CpuID = (long int)arg; + FinalizeTask(); + return NULL; +} + +/*************************************** + Main Routine +***************************************/ + +void task1(void* arg) +{ + for (int i = 0; i < 5; i++) + { + printf("%d Hello from task 1\n", CpuID); + Yield(); + } +} + +void task2(void* arg) +{ + for (int i = 0; i < 5; i++) + { + printf("%d Hello from task 2\n", CpuID); + Yield(); + } +} + +int main(int argc, char** argv) +{ + (void)argc; + (void)argv; + + pthread_mutex_init(&ScheduleLock, 0); + pthread_cond_init(&ScheduleCond, NULL); + CpuCount = sysconf(_SC_NPROCESSORS_ONLN); + Running = calloc(CpuCount, sizeof(CpuState_T)); + for (long int i = 0; i < CpuCount; i++) + { + pthread_create(&Running[i].thread, NULL, CpuMain, (void*)i); + } + + CreateTask(task1, 0, 0); + CreateTask(task2, 0, 0); + + /* wait for all jobs to be done */ + while(1) + { + AcquireLock(); + pthread_cond_wait(&ScheduleCond, &ScheduleLock); + if (!ReadyQueue.head) + { + long int val = 0; + for (int i = 0; i < CpuCount; i++) + { + val |= (long int)Running[i].task; + } + if (!val) + { + break; + } + } + ReleaseLock(); + } + puts("done"); + return 0; +} \ No newline at end of file -- 2.52.0