This C program implements parallel and sequential Run-Length Encoding (RLE) for compressing text files. In parallel mode (-j option), it uses pthread threads to divide the data into chunks, encode them concurrently, and merge overlapping results. In sequential mode, it processes files one at a time, handling edge cases for overlapping characters. The program efficiently reads files using mmap and synchronizes tasks using mutexes and condition variables for thread-safe operations.
Handling overlapping results in parallel mode and ensuring thread-safe operations were among the most challenging aspects. These issues were solved by implementing robust synchronization mechanisms using mutexes and condition variables.
//This program was created as part of the Operating Systems coursework at New York University with project specifications provided by Professor Yang Tang.
//Developed by Travis Perry
#include <stdio.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <pthread.h>
#define MAX_FILES 100
#define MAX_FILES_SIZE 1073741824
#define MAX_APPEARANCES 255
#define CHUNK_SIZE 4096
char *data;
char **taskQueue;
size_t taskQueueSize = 0;
size_t taskIndex = 0;
char **resultQueue;
size_t nextResultIndex = 0;
size_t offset = 0;
int stopThreads = 0;
size_t completedTasks = 0;
size_t numTasks;
size_t dataSize = 0;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t result_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond_var = PTHREAD_COND_INITIALIZER;
pthread_cond_t resultReady = PTHREAD_COND_INITIALIZER;
void handle_error(void) {
exit(1);
}
int stitch_chunks(size_t index) {
if (resultQueue[index + 1] == NULL) {
return 0;
}
if (index + 1 == numTasks) {
return 1;
}
char *currentChunk = resultQueue[index];
char *nextChunk = resultQueue[index + 1];
size_t currentLen = strlen(currentChunk);
char lastChar = currentChunk[currentLen - 2];
int lastCount = currentChunk[currentLen - 1];
char firstChar = nextChunk[0];
int firstCount = nextChunk[1];
if (lastChar == firstChar) {
int combinedCount = lastCount + firstCount;
currentChunk[currentLen - 1] = combinedCount;
size_t nextLen = strlen(nextChunk);
memmove(nextChunk, nextChunk + 2, nextLen - 2);
nextChunk[nextLen - 2] = '\0';
}
return 1;
}
void dequeue(size_t index) {
for (size_t i = index + 1; i < taskQueueSize; i++) {
taskQueue[i - 1] = taskQueue[i];
}
taskQueueSize--;
offset -= CHUNK_SIZE;
}
void *encode(void *arg) {
(void) arg;
while (1) {
pthread_mutex_lock(&mutex);
while (taskQueueSize == 0 && !stopThreads) {
pthread_cond_wait(&cond_var, &mutex);
}
if (stopThreads && taskQueueSize == 0) {
pthread_mutex_unlock(&mutex);
break;
}
char *input = taskQueue[0];
size_t currTaskIndex = taskIndex;
taskIndex++;
dequeue(0);
pthread_mutex_unlock(&mutex);
size_t len = strlen(input);
char *encodedChunk = malloc((len * 2 + 1) * sizeof(char));
char currVal = input[0];
int currCount = 1;
int dataIter = 0;
for (size_t j = 1; j < len; j++) {
if (currVal != input[j]) {
encodedChunk[dataIter++] = currVal;
encodedChunk[dataIter++] = currCount;
currVal = input[j];
currCount = 1;
} else {
currCount++;
}
}
encodedChunk[dataIter++] = currVal;
encodedChunk[dataIter++] = currCount;
encodedChunk[dataIter] = '\0';
pthread_mutex_lock(&result_mutex);
resultQueue[currTaskIndex] = encodedChunk;
completedTasks++;
pthread_cond_signal(&resultReady);
pthread_mutex_unlock(&result_mutex);
}
return NULL;
}
void *process_results(void *arg) {
(void)arg;
size_t current_index = 0;
while (current_index < numTasks) {
pthread_mutex_lock(&result_mutex);
while (current_index < numTasks && resultQueue[current_index] == NULL) {
pthread_cond_wait(&resultReady, &result_mutex);
}
if (current_index < numTasks && resultQueue[current_index] != NULL) {
if (current_index + 1 < numTasks) {
while (resultQueue[current_index + 1] == NULL) {
pthread_cond_wait(&resultReady, &result_mutex);
}
stitch_chunks(current_index);
}
write(STDOUT_FILENO, resultQueue[current_index], strlen(resultQueue[current_index]));
free(resultQueue[current_index]);
resultQueue[current_index] = NULL;
current_index++;
}
pthread_mutex_unlock(&result_mutex);
}
return NULL;
}
void store_files(int argc, char *argv[], size_t *totalSize) {
struct stat sb;
for (int i = 3; i < argc; i++) {
int fd = open(argv[i], O_RDONLY);
fstat(fd, &sb);
*totalSize += sb.st_size;
close(fd);
}
data = (char *)malloc(*totalSize);
int offset = 0;
for (int i = 3; i < argc; i++) {
int fd = open(argv[i], O_RDONLY);
fstat(fd, &sb);
char *file_data = mmap(NULL, sb.st_size, PROT_READ, MAP_PRIVATE, fd, 0);
memcpy(data + offset, file_data, sb.st_size);
offset += sb.st_size;
munmap(file_data, sb.st_size);
close(fd);
}
}
int main(int argc, char* argv[]) {
if (strcmp(argv[1], "-j") == 0) {
int numThreads = atoi(argv[2]);
store_files(argc, argv, &dataSize);
numTasks = (dataSize / CHUNK_SIZE) + 1;
resultQueue = (char**)calloc(numTasks, sizeof(char *));
taskQueue = (char **)malloc(numTasks * sizeof(char *));
pthread_t threads[numThreads];
for (int i = 0; i < numThreads; i++) {
pthread_create(&threads[i], NULL, encode, NULL);
}
pthread_t processor_thread;
pthread_create(&processor_thread, NULL, process_results, NULL);
for (size_t i = 0; i < numTasks; i++) {
pthread_mutex_lock(&mutex);
if (i == numTasks - 1) {
size_t chunkSize = dataSize - (i * CHUNK_SIZE);
taskQueue[taskQueueSize] = (char *)malloc(chunkSize + 1);
memcpy(taskQueue[taskQueueSize], data + (i * CHUNK_SIZE), chunkSize);
taskQueue[taskQueueSize][chunkSize] = '\0';
} else {
taskQueue[taskQueueSize] = (char *)malloc(CHUNK_SIZE + 1);
memcpy(taskQueue[taskQueueSize], data + (i * CHUNK_SIZE), CHUNK_SIZE);
taskQueue[taskQueueSize][CHUNK_SIZE] = '\0';
}
taskQueueSize++;
pthread_cond_signal(&cond_var);
pthread_mutex_unlock(&mutex);
}
pthread_join(processor_thread, NULL);
stopThreads = 1;
pthread_cond_broadcast(&cond_var);
for (int i = 0; i < numThreads; i++) {
pthread_join(threads[i], NULL);
}
free(taskQueue);
free(resultQueue);
free(data);
} else {
char lastChar = '\0';
int lastCount = 0;
for (int i=1; i < argc; i++) {
int fd = open(argv[i], O_RDONLY);
if (fd == -1)
handle_error();
struct stat sb;
if (fstat(fd, &sb) == -1)
handle_error();
if (sb.st_size == 0) {
close(fd);
handle_error();
}
char *addr = mmap(NULL, sb.st_size, PROT_READ, MAP_PRIVATE, fd, 0);
if (addr == MAP_FAILED)
handle_error();
data = malloc(sb.st_size + 256);
char currVal = addr[0];
int currCount = 1;
int dataIter = 0;
off_t j = 0;
if (currVal == lastChar) {
while (currVal == addr[j]) {
j++;
lastCount++;
}
data[dataIter] = lastChar;
dataIter++;
data[dataIter] = lastCount;
dataIter++;
if (addr[j] == '\0') {
if (i + 1 == argc) {
write(STDOUT_FILENO, data, dataIter);
}
munmap(addr, sb.st_size);
close(fd);
free(data);
continue;
}
else {
currVal = addr[j];
j++;
}
}
else if (lastCount != 0) {
data[dataIter] = lastChar;
dataIter++;
data[dataIter] = lastCount;
dataIter++;
j = 1;
}
else {
j = 1;
}
for (; j < sb.st_size; j++) {
if (currVal != addr[j]) {
data[dataIter] = currVal;
dataIter++;
data[dataIter] = currCount;
dataIter++;
currVal = addr[j];
currCount = 1;
}
else {
currCount++;
}
}
if (i + 1 == argc) {
data[dataIter] = currVal;
dataIter++;
data[dataIter] = currCount;
dataIter++;
}
else {
lastChar = currVal;
lastCount = currCount;
}
write(STDOUT_FILENO, data, dataIter);
munmap(addr, sb.st_size);
close(fd);
free(data);
}
}
return 0;
}