This C program implements parallel and sequential Run-Length Encoding (RLE) for compressing text files. In parallel mode (-j option), it uses pthread threads to divide the data into chunks, encode them concurrently, and merge overlapping results. In sequential mode, it processes files one at a time, handling edge cases for overlapping characters. The program efficiently reads files using mmap and synchronizes tasks using mutexes and condition variables for thread-safe operations.
Handling overlapping results in parallel mode and ensuring thread-safe operations were among the most challenging aspects. These issues were solved by implementing robust synchronization mechanisms using mutexes and condition variables.
//This program was created as part of the Operating Systems coursework at New York University with project specifications provided by Professor Yang Tang. //Developed by Travis Perry #include <stdio.h> #include <sys/mman.h> #include <sys/stat.h> #include <fcntl.h> #include <unistd.h> #include <string.h> #include <stdlib.h> #include <pthread.h> #define MAX_FILES 100 #define MAX_FILES_SIZE 1073741824 #define MAX_APPEARANCES 255 #define CHUNK_SIZE 4096 char *data; char **taskQueue; size_t taskQueueSize = 0; size_t taskIndex = 0; char **resultQueue; size_t nextResultIndex = 0; size_t offset = 0; int stopThreads = 0; size_t completedTasks = 0; size_t numTasks; size_t dataSize = 0; pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; pthread_mutex_t result_mutex = PTHREAD_MUTEX_INITIALIZER; pthread_cond_t cond_var = PTHREAD_COND_INITIALIZER; pthread_cond_t resultReady = PTHREAD_COND_INITIALIZER; void handle_error(void) { exit(1); } int stitch_chunks(size_t index) { if (resultQueue[index + 1] == NULL) { return 0; } if (index + 1 == numTasks) { return 1; } char *currentChunk = resultQueue[index]; char *nextChunk = resultQueue[index + 1]; size_t currentLen = strlen(currentChunk); char lastChar = currentChunk[currentLen - 2]; int lastCount = currentChunk[currentLen - 1]; char firstChar = nextChunk[0]; int firstCount = nextChunk[1]; if (lastChar == firstChar) { int combinedCount = lastCount + firstCount; currentChunk[currentLen - 1] = combinedCount; size_t nextLen = strlen(nextChunk); memmove(nextChunk, nextChunk + 2, nextLen - 2); nextChunk[nextLen - 2] = '\0'; } return 1; } void dequeue(size_t index) { for (size_t i = index + 1; i < taskQueueSize; i++) { taskQueue[i - 1] = taskQueue[i]; } taskQueueSize--; offset -= CHUNK_SIZE; } void *encode(void *arg) { (void) arg; while (1) { pthread_mutex_lock(&mutex); while (taskQueueSize == 0 && !stopThreads) { pthread_cond_wait(&cond_var, &mutex); } if (stopThreads && taskQueueSize == 0) { pthread_mutex_unlock(&mutex); break; } char *input = taskQueue[0]; size_t currTaskIndex = taskIndex; taskIndex++; dequeue(0); pthread_mutex_unlock(&mutex); size_t len = strlen(input); char *encodedChunk = malloc((len * 2 + 1) * sizeof(char)); char currVal = input[0]; int currCount = 1; int dataIter = 0; for (size_t j = 1; j < len; j++) { if (currVal != input[j]) { encodedChunk[dataIter++] = currVal; encodedChunk[dataIter++] = currCount; currVal = input[j]; currCount = 1; } else { currCount++; } } encodedChunk[dataIter++] = currVal; encodedChunk[dataIter++] = currCount; encodedChunk[dataIter] = '\0'; pthread_mutex_lock(&result_mutex); resultQueue[currTaskIndex] = encodedChunk; completedTasks++; pthread_cond_signal(&resultReady); pthread_mutex_unlock(&result_mutex); } return NULL; } void *process_results(void *arg) { (void)arg; size_t current_index = 0; while (current_index < numTasks) { pthread_mutex_lock(&result_mutex); while (current_index < numTasks && resultQueue[current_index] == NULL) { pthread_cond_wait(&resultReady, &result_mutex); } if (current_index < numTasks && resultQueue[current_index] != NULL) { if (current_index + 1 < numTasks) { while (resultQueue[current_index + 1] == NULL) { pthread_cond_wait(&resultReady, &result_mutex); } stitch_chunks(current_index); } write(STDOUT_FILENO, resultQueue[current_index], strlen(resultQueue[current_index])); free(resultQueue[current_index]); resultQueue[current_index] = NULL; current_index++; } pthread_mutex_unlock(&result_mutex); } return NULL; } void store_files(int argc, char *argv[], size_t *totalSize) { struct stat sb; for (int i = 3; i < argc; i++) { int fd = open(argv[i], O_RDONLY); fstat(fd, &sb); *totalSize += sb.st_size; close(fd); } data = (char *)malloc(*totalSize); int offset = 0; for (int i = 3; i < argc; i++) { int fd = open(argv[i], O_RDONLY); fstat(fd, &sb); char *file_data = mmap(NULL, sb.st_size, PROT_READ, MAP_PRIVATE, fd, 0); memcpy(data + offset, file_data, sb.st_size); offset += sb.st_size; munmap(file_data, sb.st_size); close(fd); } } int main(int argc, char* argv[]) { if (strcmp(argv[1], "-j") == 0) { int numThreads = atoi(argv[2]); store_files(argc, argv, &dataSize); numTasks = (dataSize / CHUNK_SIZE) + 1; resultQueue = (char**)calloc(numTasks, sizeof(char *)); taskQueue = (char **)malloc(numTasks * sizeof(char *)); pthread_t threads[numThreads]; for (int i = 0; i < numThreads; i++) { pthread_create(&threads[i], NULL, encode, NULL); } pthread_t processor_thread; pthread_create(&processor_thread, NULL, process_results, NULL); for (size_t i = 0; i < numTasks; i++) { pthread_mutex_lock(&mutex); if (i == numTasks - 1) { size_t chunkSize = dataSize - (i * CHUNK_SIZE); taskQueue[taskQueueSize] = (char *)malloc(chunkSize + 1); memcpy(taskQueue[taskQueueSize], data + (i * CHUNK_SIZE), chunkSize); taskQueue[taskQueueSize][chunkSize] = '\0'; } else { taskQueue[taskQueueSize] = (char *)malloc(CHUNK_SIZE + 1); memcpy(taskQueue[taskQueueSize], data + (i * CHUNK_SIZE), CHUNK_SIZE); taskQueue[taskQueueSize][CHUNK_SIZE] = '\0'; } taskQueueSize++; pthread_cond_signal(&cond_var); pthread_mutex_unlock(&mutex); } pthread_join(processor_thread, NULL); stopThreads = 1; pthread_cond_broadcast(&cond_var); for (int i = 0; i < numThreads; i++) { pthread_join(threads[i], NULL); } free(taskQueue); free(resultQueue); free(data); } else { char lastChar = '\0'; int lastCount = 0; for (int i=1; i < argc; i++) { int fd = open(argv[i], O_RDONLY); if (fd == -1) handle_error(); struct stat sb; if (fstat(fd, &sb) == -1) handle_error(); if (sb.st_size == 0) { close(fd); handle_error(); } char *addr = mmap(NULL, sb.st_size, PROT_READ, MAP_PRIVATE, fd, 0); if (addr == MAP_FAILED) handle_error(); data = malloc(sb.st_size + 256); char currVal = addr[0]; int currCount = 1; int dataIter = 0; off_t j = 0; if (currVal == lastChar) { while (currVal == addr[j]) { j++; lastCount++; } data[dataIter] = lastChar; dataIter++; data[dataIter] = lastCount; dataIter++; if (addr[j] == '\0') { if (i + 1 == argc) { write(STDOUT_FILENO, data, dataIter); } munmap(addr, sb.st_size); close(fd); free(data); continue; } else { currVal = addr[j]; j++; } } else if (lastCount != 0) { data[dataIter] = lastChar; dataIter++; data[dataIter] = lastCount; dataIter++; j = 1; } else { j = 1; } for (; j < sb.st_size; j++) { if (currVal != addr[j]) { data[dataIter] = currVal; dataIter++; data[dataIter] = currCount; dataIter++; currVal = addr[j]; currCount = 1; } else { currCount++; } } if (i + 1 == argc) { data[dataIter] = currVal; dataIter++; data[dataIter] = currCount; dataIter++; } else { lastChar = currVal; lastCount = currCount; } write(STDOUT_FILENO, data, dataIter); munmap(addr, sb.st_size); close(fd); free(data); } } return 0; }