读者-写者问题
一个缓冲区,有些进程只读取里面的内容,另外有的进程会修改里面的内容。为了保持数据的一致性,如果没有进程修改内容时,任意个读进程可以同时访问缓冲区,但是同一时间只能有一个写进程可以访问缓冲区,其它写进程和读进程都不能对缓冲区进行操作。
读者-写者问题和生产者-消费者问题不同的是,后者的每个线程都要修改缓冲区的内容,所以不得不使用互斥锁来保证数据一致性,而前者有些线程是只读的,多个只读线程同时访问并不会出现数据不一致的情况,所以在实现上不必为每个线程都加一个互斥锁,而是让多个读线程可以同时访问,只有写进程的访问是互斥的。
使用互斥锁实现
下面是利用pthread_mutex系列函数的实现。
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081 | #include <stdio.h>#include <pthread.h> struct pool { int nr_reader; unsigned long long value; pthread_mutex_t may_write, rd_count_mutex;}; static void* writer(void* arg){ struct pool* p = arg; while (1) { pthread_mutex_lock(&p->may_write); ++p->value; printf("writer: value = %llu\n", p->value); pthread_mutex_unlock(&p->may_write); } return NULL;} static void* reader(void* arg){ struct pool* p = arg; while (1) { pthread_mutex_lock(&p->rd_count_mutex); ++p->nr_reader; if (p->nr_reader == 1) pthread_mutex_lock(&p->may_write); pthread_mutex_unlock(&p->rd_count_mutex); printf("%d reader(s), value = %llu\n", p->nr_reader, p->value); pthread_mutex_lock(&p->rd_count_mutex); --p->nr_reader; if (p->nr_reader == 0) pthread_mutex_unlock(&p->may_write); pthread_mutex_unlock(&p->rd_count_mutex); } return NULL;} static inline void pool_init(struct pool* p){ p->nr_reader = 0; p->value = 0; pthread_mutex_init(&p->may_write, NULL); pthread_mutex_init(&p->rd_count_mutex, NULL);} #define NR_READER 5 int main(void){ int i, status; struct pool pool; pthread_t reader_pid[NR_READER], writer_pid; pool_init(&pool); status = pthread_create(&writer_pid, NULL, writer, &pool); if (status != 0) fprintf(stderr, "create writer failed.\n"); for (i = 0; i < NR_READER; ++i) { status = pthread_create(&reader_pid[i], NULL, reader, &pool); if (status != 0) fprintf(stderr, "create reader %d failed.\n", i); } pthread_join(writer_pid, NULL); for (i = 0; i < NR_READER; ++i) pthread_join(reader_pid[i], NULL); return 0;} |
struct pool中的变量may_write是保护value的锁,rd_count_mutex是nr_reader的锁。
在reader线程中,如果是第一个读者获得pool的访问权,则对may_write加锁(第31-32行),阻止其它写线程访问。使用完nr_reader后及时对mutex解锁(第33行),这样其它读线程就可以同时访问value。在读线程访问value期间如果有其它读线程要访问value则不受影响,只需将nr_reader加1即可。在读线程中直到所有线程退出访问时才会解锁may_write(第39-40行),之后的写线程才能访问。
使用读写锁实现
pthread还提供了另一种锁——读写锁(rwlock)来方便这种既有读又有写的问题的处理。读写锁的好处是用户不用自己维护读者的计数,还有减少进出函数的切换。
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768 | #include <stdio.h>#include <pthread.h> struct pool { unsigned long long value; pthread_rwlock_t rwlock;}; static void* writer(void* arg){ struct pool* p = arg; while (1) { pthread_rwlock_wrlock(&p->rwlock); ++p->value; printf("writer: value = %llu\n", p->value); pthread_rwlock_unlock(&p->rwlock); } return NULL;} static void* reader(void* arg){ struct pool* p = arg; while (1) { pthread_rwlock_rdlock(&p->rwlock); printf("reader: value = %llu\n", p->value); pthread_rwlock_unlock(&p->rwlock); } return NULL;} static inline void pool_init(struct pool* p){ p->value = 0; pthread_rwlock_init(&p->rwlock, NULL);} #define NR_READER 5 int main(void){ int i, status; struct pool pool; pthread_t reader_pid[NR_READER], writer_pid; pool_init(&pool); status = pthread_create(&writer_pid, NULL, writer, &pool); if (status != 0) fprintf(stderr, "create writer failed.\n"); for (i = 0; i < NR_READER; ++i) { status = pthread_create(&reader_pid[i], NULL, reader, &pool); if (status != 0) fprintf(stderr, "create reader %d failed.\n", i); } pthread_join(writer_pid, NULL); for (i = 0; i < NR_READER; ++i) pthread_join(reader_pid[i], NULL); return 0;} |
类似于mutex,rwlock的构造和析构函数分别为
int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock, const pthread_rwlockattr_t *restrict attr); int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);
第一个参数是读写锁指针。第二个参数是锁的属性设置,如果为NULL则使用默认设置。
读线程获取读写锁的函数是
int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);
如果有写线程占有锁时读线程会阻塞,直到获取为止。
写线程获取锁的函数是
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);
如果有读线程或写线程占有锁时同样会阻塞。如果希望不阻塞在函数中则使用
int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock);int pthread_rwlock_trywrlock(pthread_rwlock_t *rwlock);
如果获取成功则返回0,否则返回错误信息。
使用完后释放锁使用函数
int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);
无论读线程还是写线程都是用这个函数。(疑问:怎么识别读线程和写线程呢?)
读者优先还是写者优先
上面的程序实现的都是读者优先的原则,即如果有读线程正在使用缓冲区,那么后续到来的读线程可以不必等待,直接访问缓冲区,写线程则需等待全部读线程完成后才能访问。如果不停地有读线程到达,那么写线程可能永远也不能访问缓冲区。而写者优先的规则是,当读线程正在访问缓冲区时有写线程到达,那么后续的读线程都必须等待,直到写线程修改完缓冲区后才能进入,而不能像读者优先那样直接访问缓冲区。
pthread中提供的rwlock是读者优先的。下面的程序提升了写线程的优先级,但是还没完全做到写者优先,这与线程的调度算法有关。
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103 | #include <stdio.h>#include <pthread.h> struct pool { int nr_reader, nr_writer; unsigned long long value; pthread_mutex_t may_read, may_write, wr_count_mutex, rd_count_mutex;}; static void* writer(void* arg){ struct pool* p = arg; while (1) { pthread_mutex_lock(&p->wr_count_mutex); ++p->nr_writer; if (p->nr_writer == 1) pthread_mutex_lock(&p->may_read); pthread_mutex_unlock(&p->wr_count_mutex); pthread_mutex_lock(&p->may_write); ++p->value; printf("writer: value = %llu\n", p->value); pthread_mutex_unlock(&p->may_write); pthread_mutex_lock(&p->wr_count_mutex); --p->nr_writer; if (p->nr_writer == 0) pthread_mutex_unlock(&p->may_read); pthread_mutex_unlock(&p->wr_count_mutex); } return NULL;} static void* reader(void* arg){ struct pool* p = arg; while (1) { pthread_mutex_lock(&p->may_read); pthread_mutex_lock(&p->rd_count_mutex); ++p->nr_reader; if (p->nr_reader == 1) pthread_mutex_lock(&p->may_write); pthread_mutex_unlock(&p->rd_count_mutex); pthread_mutex_unlock(&p->may_read); printf("%d reader(s), value = %llu\n", p->nr_reader, p->value); pthread_mutex_lock(&p->rd_count_mutex); --p->nr_reader; if (p->nr_reader == 0) pthread_mutex_unlock(&p->may_write); pthread_mutex_unlock(&p->rd_count_mutex); } return NULL;} static inline void pool_init(struct pool* p){ p->nr_reader = 0; p->value = 0; pthread_mutex_init(&p->may_read, NULL); pthread_mutex_init(&p->may_write, NULL); pthread_mutex_init(&p->rd_count_mutex, NULL); pthread_mutex_init(&p->wr_count_mutex, NULL);} #define NR_READER 5#define NR_WRITER 5 int main(void){ int i, status; struct pool pool; pthread_t reader_pid[NR_READER], writer_pid[NR_WRITER]; pool_init(&pool); for (i = 0; i < NR_WRITER; ++i) { status = pthread_create(&writer_pid[i], NULL, writer, &pool); if (status != 0) fprintf(stderr, "create writer %d failed.\n", i); } for (i = 0; i < NR_READER; ++i) { status = pthread_create(&reader_pid[i], NULL, reader, &pool); if (status != 0) fprintf(stderr, "create reader %d failed.\n", i); } for (i = 0; i < NR_WRITER; ++i) pthread_join(writer_pid[i], NULL); for (i = 0; i < NR_READER; ++i) pthread_join(reader_pid[i], NULL); return 0;} |
写线程通过一个互斥锁may_read来阻止读线程,只要有一个写线程等待,读线程都拿不到may_read,从而无法读取。但是如果调度的时候读线程有较高的优先级,这也就成了读者优先。
联系客服