#include <stdio.h>
#include <stdlib.h>#include <deque>#include <string>#include <string.h>#include <pthread.h>#include <signal.h>#include <assert.h>#include "clock.h"std::deque<std::string> task_queue;pthread_mutex_t g_mutex;pthread_cond_t g_cond;FILE* g_fp = NULL;pthread_mutex_t g_fp_mutex;bool g_stop = false;bool g_read_th_run = true;bool g_proc_th_run1 = true;bool g_proc_th_run2 = true;bool g_proc_th_run3 = true;bool g_proc_th_run4 = true;void* read_func(void* obj);void* proc_func(void* obj);void cleanup_handler(void* obj);void int_handler(int no){ fprintf(stdout, "Bye!\n"); g_stop = true;}int main(int argc, char* argv[]){ if (argc < 2) { abort(); return -1; } signal(SIGINT, int_handler); g_fp = fopen("result", "w"); pthread_mutex_init(&g_fp_mutex, NULL); pthread_mutex_init(&g_mutex, NULL); pthread_cond_init(&g_cond, NULL); pthread_t tid = 0; pthread_t tid1 = 0; pthread_t tid2 = 0; pthread_t tid3 = 0; pthread_t tid4 = 0; pthread_create(&tid, NULL, read_func, argv[1]); pthread_create(&tid1, NULL, proc_func, &g_proc_th_run1); pthread_create(&tid2, NULL, proc_func, &g_proc_th_run2); pthread_create(&tid3, NULL, proc_func, &g_proc_th_run3); pthread_create(&tid4, NULL, proc_func, &g_proc_th_run4); while (!g_stop) { sleep(30); }; g_read_th_run = false; g_proc_th_run1 = false; g_proc_th_run2 = false; g_proc_th_run3 = false; g_proc_th_run4 = false; pthread_mutex_lock(&g_mutex); pthread_cond_broadcast(&g_cond); pthread_mutex_unlock(&g_mutex); //assert(!pthread_join(tid, NULL)); assert(!pthread_join(tid1, NULL)); assert(!pthread_join(tid2, NULL)); assert(!pthread_join(tid3, NULL)); assert(!pthread_join(tid4, NULL)); fclose(g_fp); return 0;}void* read_func(void* obj){ pthread_detach(pthread_self()); //pthread_cleanup_push(cleanup_handler, NULL); const char* file = (const char*)(obj); FILE* fp = fopen(file, "r"); if (!fp) return NULL; char* line = new char [1024]; size_t len = 1024; while (!feof(fp)) { getline(&line, &len, fp); if (len) { pthread_mutex_lock(&g_mutex); task_queue.push_back(line); pthread_cond_signal(&g_cond); pthread_mutex_unlock(&g_mutex); memset(line, 0, 1024); } }; fprintf(stdout, "read thread quit!\n"); free(line); fclose(fp); return NULL;}void* proc_func(void* obj){ //pthread_cleanup_push(cleanup_handler, NULL); bool& running = *(bool*)(obj); while (running) { pthread_mutex_lock(&g_mutex); if (task_queue.empty()) { pthread_cond_wait(&g_cond, &g_mutex); } if (task_queue.size()) { pthread_mutex_lock(&g_fp_mutex); std::string& s = task_queue.front(); fwrite(s.c_str(), s.size(), 1, g_fp); task_queue.pop_front(); pthread_mutex_unlock(&g_fp_mutex); } pthread_mutex_unlock(&g_mutex); } fprintf(stdout, "proc thread quit!\n"); return NULL;}void cleanup_handler(void* obj){ fprintf(stdout, "%d: call cleanup_handler\n", (int)pthread_self());}