Parallel and Sequential Run-Length Encoder

Project Overview

This C program implements parallel and sequential Run-Length Encoding (RLE) for compressing text files. In parallel mode (-j option), it uses pthread threads to divide the data into chunks, encode them concurrently, and merge overlapping results. In sequential mode, it processes files one at a time, handling edge cases for overlapping characters. The program efficiently reads files using mmap and synchronizes tasks using mutexes and condition variables for thread-safe operations.

Key Features

Technologies Used

Challenges Faced

Handling overlapping results in parallel mode and ensuring thread-safe operations were among the most challenging aspects. These issues were solved by implementing robust synchronization mechanisms using mutexes and condition variables.

//This program was created as part of the Operating Systems coursework at New York University with project specifications provided by Professor Yang Tang.
//Developed by Travis Perry

#include <stdio.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <pthread.h>

#define MAX_FILES 100
#define MAX_FILES_SIZE 1073741824
#define MAX_APPEARANCES 255
#define CHUNK_SIZE 4096

char *data;
char **taskQueue;
size_t taskQueueSize = 0;
size_t taskIndex = 0;
char **resultQueue;
size_t nextResultIndex = 0;
size_t offset = 0;
int stopThreads = 0;
size_t completedTasks = 0;
size_t numTasks;
size_t dataSize = 0;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t result_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond_var = PTHREAD_COND_INITIALIZER;
pthread_cond_t resultReady = PTHREAD_COND_INITIALIZER;

void handle_error(void) {
    exit(1);
}

int stitch_chunks(size_t index) {
    if (resultQueue[index + 1] == NULL) {
        return 0;
    }
    if (index + 1 == numTasks) {
        return 1;
    }

    char *currentChunk = resultQueue[index];
    char *nextChunk = resultQueue[index + 1];
    size_t currentLen = strlen(currentChunk);
    char lastChar = currentChunk[currentLen - 2];
    int lastCount = currentChunk[currentLen - 1];
    char firstChar = nextChunk[0];
    int firstCount = nextChunk[1];

    if (lastChar == firstChar) {
        int combinedCount = lastCount + firstCount;
        currentChunk[currentLen - 1] = combinedCount;
        size_t nextLen = strlen(nextChunk);
        memmove(nextChunk, nextChunk + 2, nextLen - 2);
        nextChunk[nextLen - 2] = '\0';
    }
    return 1;
}

void dequeue(size_t index) {
    for (size_t i = index + 1; i < taskQueueSize; i++) {
        taskQueue[i - 1] = taskQueue[i];
    }
    taskQueueSize--;
    offset -= CHUNK_SIZE;
}

void *encode(void *arg) {
    (void) arg;
    while (1) {
        pthread_mutex_lock(&mutex);
        
        while (taskQueueSize == 0 && !stopThreads) {
            pthread_cond_wait(&cond_var, &mutex);
        }
        
        if (stopThreads && taskQueueSize == 0) {
            pthread_mutex_unlock(&mutex);
            break;
        }
        
        char *input = taskQueue[0];
        size_t currTaskIndex = taskIndex;
        taskIndex++;
        dequeue(0);
        
        pthread_mutex_unlock(&mutex);

        size_t len = strlen(input);
        char *encodedChunk = malloc((len * 2 + 1) * sizeof(char));
        
        char currVal = input[0];
        int currCount = 1;
        int dataIter = 0;
        
        for (size_t j = 1; j < len; j++) {
            if (currVal != input[j]) {
                encodedChunk[dataIter++] = currVal;
                encodedChunk[dataIter++] = currCount;
                currVal = input[j];
                currCount = 1;
            } else {
                currCount++;
            }
        }
        
        encodedChunk[dataIter++] = currVal;
        encodedChunk[dataIter++] = currCount;
        encodedChunk[dataIter] = '\0';

        pthread_mutex_lock(&result_mutex);
        resultQueue[currTaskIndex] = encodedChunk;
        completedTasks++;
        pthread_cond_signal(&resultReady);
        pthread_mutex_unlock(&result_mutex);
    }
    return NULL;
}

void *process_results(void *arg) {
    (void)arg;
    size_t current_index = 0;
    while (current_index < numTasks) {
        pthread_mutex_lock(&result_mutex);
        while (current_index < numTasks && resultQueue[current_index] == NULL) {
            pthread_cond_wait(&resultReady, &result_mutex);
        }
        if (current_index < numTasks && resultQueue[current_index] != NULL) {
            if (current_index + 1 < numTasks) {
                while (resultQueue[current_index + 1] == NULL) {
                    pthread_cond_wait(&resultReady, &result_mutex);
                }
                stitch_chunks(current_index);
            }
            
            write(STDOUT_FILENO, resultQueue[current_index], strlen(resultQueue[current_index]));
            free(resultQueue[current_index]);
            resultQueue[current_index] = NULL;
            current_index++;
        }
        pthread_mutex_unlock(&result_mutex);
    }
    return NULL;
}

void store_files(int argc, char *argv[], size_t *totalSize) {
    struct stat sb;
    
    for (int i = 3; i < argc; i++) {
        int fd = open(argv[i], O_RDONLY);
        fstat(fd, &sb);
        *totalSize += sb.st_size;
        close(fd);
    }
    data = (char *)malloc(*totalSize);
    int offset = 0;
    
    for (int i = 3; i < argc; i++) {
        int fd = open(argv[i], O_RDONLY);
        fstat(fd, &sb);
        char *file_data = mmap(NULL, sb.st_size, PROT_READ, MAP_PRIVATE, fd, 0);
        memcpy(data + offset, file_data, sb.st_size);
        offset += sb.st_size;
        munmap(file_data, sb.st_size);
        close(fd);
    }
}

int main(int argc, char* argv[]) {
    if (strcmp(argv[1], "-j") == 0) {
        int numThreads = atoi(argv[2]);
        store_files(argc, argv, &dataSize);
        
        numTasks = (dataSize / CHUNK_SIZE) + 1;
        resultQueue = (char**)calloc(numTasks, sizeof(char *));
        taskQueue = (char **)malloc(numTasks * sizeof(char *));

        pthread_t threads[numThreads];
        for (int i = 0; i < numThreads; i++) {
            pthread_create(&threads[i], NULL, encode, NULL);
        }

        pthread_t processor_thread;
        pthread_create(&processor_thread, NULL, process_results, NULL);

        for (size_t i = 0; i < numTasks; i++) {
            pthread_mutex_lock(&mutex);
            if (i == numTasks - 1) {
                size_t chunkSize = dataSize - (i * CHUNK_SIZE);
                taskQueue[taskQueueSize] = (char *)malloc(chunkSize + 1);
                memcpy(taskQueue[taskQueueSize], data + (i * CHUNK_SIZE), chunkSize);
                taskQueue[taskQueueSize][chunkSize] = '\0';
            } else {
                taskQueue[taskQueueSize] = (char *)malloc(CHUNK_SIZE + 1);
                memcpy(taskQueue[taskQueueSize], data + (i * CHUNK_SIZE), CHUNK_SIZE);
                taskQueue[taskQueueSize][CHUNK_SIZE] = '\0';
            }
            taskQueueSize++;
            pthread_cond_signal(&cond_var);
            pthread_mutex_unlock(&mutex);
        }

        pthread_join(processor_thread, NULL);

        stopThreads = 1;
        pthread_cond_broadcast(&cond_var);
        for (int i = 0; i < numThreads; i++) {
            pthread_join(threads[i], NULL);
        }

        free(taskQueue);
        free(resultQueue);
        free(data);
    } else {
        char lastChar = '\0';
             int lastCount = 0;
     
        for (int i=1; i < argc; i++) {
            int fd = open(argv[i], O_RDONLY);
            if (fd == -1)
                handle_error();
            
            struct stat sb;
            if (fstat(fd, &sb) == -1)
                handle_error();
            if (sb.st_size == 0) {
                close(fd);
                handle_error();
            }
            
            char *addr = mmap(NULL, sb.st_size, PROT_READ, MAP_PRIVATE, fd, 0);
            if (addr == MAP_FAILED)
                handle_error();
            
            data = malloc(sb.st_size + 256);
            char currVal = addr[0];
            int currCount = 1;
            int dataIter = 0;
            
            off_t j = 0;
            
            if (currVal == lastChar) {
                while (currVal == addr[j]) {
                    j++;
                    lastCount++;
                }
                data[dataIter] = lastChar;
                dataIter++;
                data[dataIter] = lastCount;
                dataIter++;
                if (addr[j] == '\0') {
                    if (i + 1 == argc) {
                        write(STDOUT_FILENO, data, dataIter);
                    }
                    munmap(addr, sb.st_size);
                    close(fd);
                    free(data);
                    continue;
                }
                else {
                    currVal = addr[j];
                    j++;
                }
            }
            else if (lastCount != 0) {
                data[dataIter] = lastChar;
                dataIter++;
                data[dataIter] = lastCount;
                dataIter++;
                j = 1;
            }
            else {
                j = 1;
            }
            
            for (; j < sb.st_size; j++) {
                if (currVal != addr[j]) {
                    data[dataIter] = currVal;
                    dataIter++;
                    data[dataIter] = currCount;
                    dataIter++;
                    
                    currVal = addr[j];
                    currCount = 1;
                }
                else {
                    currCount++;
                }
            }
            
            if (i + 1 == argc) {
                data[dataIter] = currVal;
                dataIter++;
                data[dataIter] = currCount;
                dataIter++;
            }
            else {
                lastChar = currVal;
                lastCount = currCount;
            }
            
            write(STDOUT_FILENO, data, dataIter);
            munmap(addr, sb.st_size);
            close(fd);
            free(data);
        }
    }
    return 0;
}