【Linux操作系统】多线程(一)
文章目录
- 1. 线程概念
- 2. 线程控制
- 2.1 创建线程
- 2.2 线程ID
- 2.3 线程等待
- 2.4 线程终止
- 2.5 线程分离
- 3. 线程同互斥与同步
- 3.1 互斥量
- 3.2 死锁
- 3.3 同步-条件变量
- 3.4 生产者消费者模型
- 3.5 POSIX信号量
- 3.6 基于环形队列的生产消费者模型
1. 线程概念
线程:是在进程内部运行的一个执行分支(执行流),属于进程的一部分,粒度要比进程更加细和轻量化
内部:线程在进程的地址空间内运行
执行分支:CPU调度的时候只看PCB,如果PCB曾被指派过指向方法和数据,那么CPU就可以直接调度
线程创建的时候,只创建task_struct,与当前进程共享同一个地址空间,当前进程的资源(代码+数据)被划分为若干份,让每一个PCB使用
一个PCB就是一个需要被调度的执行流
Linux中没有专门为线程设计PCB,而是用进程的PCB来模拟线程,这样就不需要维护复杂的进程与线程之间的关系,不用单独为线程设计任何算法,直接使用进程的一套相关的方法,OS只需要聚焦在线程间的资源分配上就可以了
在没有多线程之前的进程,是内部只有一个执行流的进程,而现在的进程,内部可以具有多个执行流
CPU创建进程的成本(时间+空间)非常高,需要使用到的资源非常多(0 -> 1),因而应该减少进程的创建,去使用多线程
从内核的角度,进程是承担分配系统资源的基本实体!线程是CPU调度的基本单位,承担进程资源的一部分的基本实体,是进程划分资源给线程
什么是线程
- 在一个程序里的一个执行线路就叫做线程(thread),更准确的定义是:线程是一个进程内部的控制序列
- 一切进程至少都有一个执行线程
- 线程在进程内部运行,本质是在进程地址空间内运行
- 在Linux系统中,从CPU视角看到的PCB都要比传统的进程更加轻量化
- 透过进程虚拟地址空间,可以看到进程的大部分资源,将进程资源合理分配给每个执行流,就形成了线程执行流
Linux因为使用的是进程模拟的线程,所以Linux下不会给我们提供直接操作线程的接口,而是给我们提供在同一个地址空间内创建PCB的方法,分配资源给指定的PCB接口,这对用户其实并不是很友好,所以就有系统级别的工程师,在用户层对Linux轻量级进程接口进行封装,给我们打包成库,让用户直接使用库接口,这被称为原生线程库
线程的优点
-
创建一个新线程得代价要比创建一个进程小得多
-
与进程之间的切换相比,线程之间的切换需要操作系统做的工作要少很多
-
线程占用的资源要比进程少很多
-
能充分利用多处理器的并行数量
-
在等待慢速I/O操作结束的同时,程序可执行其他的计算任务
-
计算密集型应用,为了能在多处理器系统上运行,将计算分解到多个线程中实现
计算密集型应用:加密、大数据运算等主要使用CPU资源的应用
线程越多越好嘛?并不一定,如果线程太多,会导致线程被过度调度切换,这是有成本的
-
I/O密集型应用,为了提高性能,将I/O操作重叠,线程可以同时等待不同的I/O操作
I/O密集型应用:网络下载、云盘、ssh、在线直播等使用内存和外设的I/O资源
线程越多越好吗?也不一定,但I/O允许多一些线程,因为大部分时间是在等待I/O就绪的,多个线程可以让I/O等待的时间重叠,从而缩短了时间
线程的缺点
-
性能损失
一个很少被外部事件阻塞的计算密集型线程往往无法与共它线程共享同一个处理器。如果计算密集型线程的数量比可用的处理器多,那么可能会有较大的性能损失,这里的性能损失指的是增加了额外的同步和调度开销,而可用的资源不变
-
健壮性降低
编写多线程需要更全面更深入的考虑,在一个多线程程序里,因时间分配上的细微偏差或者因共享了不该共享的变量而造成不良影响的可能性是很大的,换句话说线程之间是缺乏保护的
-
缺乏访问控制
进程是访问控制的基本粒度,在一个线程中调用某些OS函数会对整个进程造成影响
-
编程难度提高
编写与调试一个多线程程序比单线程程序困难得多
线程异常
- 单个线程如果出现除零,野指针问题导致线程崩溃,进程也会随着崩溃
- 线程是进程的执行分支,线程出异常,就类似进程出异常,进而触发信号机制,终止进程,进程终止,该进程内的所有线程也就随即退出
线程用途
- 合理的使用多线程,能提高CPU密集型程序的执行效率
- 合理的使用多线程,能提高I/O密集型程序的用户体验(如生活中我们一边写代码一边下载开发工具,就是多线程运行的一种表现)
进程 vs 线程
所有的轻量级进程(可能是线程)都是在进程的内部运行(地址空间:标识进程所能看到的大部分资源)
进程:具有独立性,可以共享部分资源(管道、ipc资源)
线程:大部分资源是共享的,可以有部分资源是”私有”的,如下
- 进程和线程
- 进程是资源分配的基本单位
- 线程是调度的基本单位
- 线程共享进程数据,但也拥有自己的一部分数据
- 线程ID
- 一组寄存器
- 栈
- errno
- 信号屏蔽字
- 调度优先级
- 进程的多个线程共享同一地址空间,因此Text Segment、Data Segment都是共享的,如果定义一个函数,在各线程中都可以调用,如果定义一个全局变量,在各线程中都可以访问到,除此之外,各线程还共享以下进程资源和环境
- 文件描述符
- 每种信号的处理方式(SIG_IGN、SIG_DFL或者自定义的信号处理函数)
- 当前工作目录
- 用户id和组id
进程和线程的关系
验证
#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
void *thread_run(void *args)
{
const char *id = (const char*)args;
while(1) {
printf("我是%s线程, pid: %d\n", id, getpid());
sleep(1);
}
}
int main()
{
pthread_t tid;
pthread_create(&tid, NULL, thread_run, (void*)"thread 1");
while(1) {
printf("我是main线程,pid: %d\n", getpid());
sleep(1);
}
return 0;
}
使用pthread需要在编译时-l
加上链接库pthread
gcc mythread.c -o mythread -lpthread
我们发现两个执行流的进程pid是一样的
说明此时实际上只有一个进程,但进程内部有两个执行流,使用如下命令查看各个轻量级进程
ps -aL
单独一个进程的时候其PID=LWP,所以在多线程的时候,PID=LWP的那个线程为主线程
Linux操作系统进行线程调度的时候,看的是LWP,这个LWP是内核LWP
一般把属于同一个进程内的一批线程称为同一个线程组,线程组的组ID为当前进程的PID
2. 线程控制
POSIX线程库
- 与线程有关的函数构成了一个完整的系列,绝大多数函数的名字是以"pthread_"打头的
- 要使用这些函数库,要通过引入头文件<pthread.h>
- 链接这些线程函数库时要使用编译器命令的"-lpthread"选项
2.1 创建线程
thread:输出型参数,返回创建出来的线程ID
attr:设置线程的属性,一般不需要设置,置为NULL表示使用默认属性
start_routine:为一个函数地址,线程启动后要执行的函数
arg:传给线程启动函数的参数
返回值:创建成功返回0,创建失败返回一个错误码
注意点
- 传统的一些函数是成功返回0、失败返回1,并且对全局变量errno赋值以指示错误
- pthreads函数出错时不会设置全局变量errno(大部分其他POSIX函数会这样做),而是将·错误代码通过返回值返回
- pthreads同样也提供了线程内的errno变量,以支持其他使用errno的代码,对于pthreads函数的错误,建议通过返回值来判定,因为读取返回值要比读取线程内的errno变量的开销更小
2.2 线程ID
- pthread_create函数会产生一个线程ID,存放在第一个参数指向的地址中,该线程ID与前面的LWP不是一回事
- 前面讲的LWP属于进程调度的范畴,因为线程是轻量级进程,是操作系统调度器的最小单位,所以需要一个数值来唯一表示该线程
- pthread_create函数第一个参数指向一个虚拟内存单元,该内存单元的地址即为新创建线程的线程ID,属于NPTL的范畴。线程库的后续操作,都是根据该线程ID来的,而非LWP
- 线程库NPTL提供了pthread_self函数,可以获得线程自身的ID
获取子线程的ID
#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
void *thread_run(void *args)
{
while(1) {
printf("我是新线程[%s],我的线程ID为: %lu\n", (const char*)args, pthread_self());
sleep(1);
}
}
int main()
{
pthread_t tid;
pthread_create(&tid, NULL, thread_run, (void *)"new thread");
while(1) {
printf("我是主线程,我创建的线程ID为: %lu,我的线程ID为: %lu\n", tid, pthread_self());
sleep(1);
}
}
新线程与主线程创建出来的新线程ID是一样的,说明我们创建出来了线程,但是却与查看到的内核LWP不一样
一次性创建一批线程
#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
void *thread_run()
{
while(1) {
sleep(3);
}
}
int main()
{
pthread_t tid[5];
for (int i = 0; i < 5; i++) {
pthread_create(tid + i, NULL, thread_run, NULL);
}
while(1) {
printf("我是主线程,我的线程ID为: %lu\n", pthread_self());
printf("##########begin##########");
for (int i = 0; i < 5; i++) {
printf("线程[%d]的ID是: %lu\n", i, tid[i]);
}
printf("##########end##########");
sleep(1);
}
}
在这里thread_run函数就被多次重入了
Linux下进程地址空间布局
对于Linux实现的NPTL,pthread_t类型的线程ID,本质就是一个进程地址空间上的一个地址
2.3 线程等待
一般而言,线程也是需要被等待的,如果不等待,可能会导致类似于“僵尸进程”的问题
thread:线程ID
retval:输出型参数,用来获取新线程退出的时候,函数的返回值,因为返回的是一个一级指针,所以需要用二级指针接收
创建线程时start_routine的类型是(void *),即线程执行的函数的返回值为一个一级指针
返回值:等待成功返回0,失败返回错误码
调用该函数的线程将挂起等待,直到id为thread的线程终止,thread线程以不同的方法终止,通过pthread_join得到的终止状态是不同的,有如下情况
- 如果thread线程通过return返回,value_ ptr所指向的单元里存放的是thread线程函数的返回值
- 如果thread线程被别的线程调用pthread_ cancel异常终掉,value_ ptr所指向的单元里存放的是常数PTHREAD_ CANCELED
- 如果thread线程是自己调用pthread_exit终止的,value_ptr所指向的单元存放的是传给pthread_exit的参数
- 如果对thread线程的终止状态不感兴趣,可以传NULL给value_ ptr参数
#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
void *thread_run()
{
while(1) {
printf("我是子线程,线程ID: %lu\n", pthread_self());
sleep(3);
break;
}
return (void *)1;
}
#define NUM 1
int main()
{
pthread_t tid[NUM];
for (int i = 0; i < NUM; i++) {
pthread_create(tid + i, NULL, thread_run, NULL);
}
void *status = NULL;
for (int i = 0; i < NUM; i++) {
pthread_join(tid[i], &status);
}
printf("ret: %d\n", (int)status);
return 0;
}
这样单纯的使用pthread_join只能处理代码跑完结果不对的情况,但如果代码出现了异常,根本就执行不到pthread_join
2.4 线程终止
如果需要只终止某个线程而不终止整个进程,有三种方法:
-
从线程函数return。这种方法对主线程不适用,从main函数return相当于调用了exit
exit是终止进程,如果你只是想终止一个线程的话不要在该线程中调用
-
线程可以调用pthread_exit终止自己
主线程调用pthread_exit函数也不会使整个进程退出,不影响其他线程的执行
但是其子线程就会成为僵尸进程,这是需要进程等待pthread_join
-
一个线程可以调用pthread_cancel终止同一进程中的另一个线程
pthread_exit
retval:表示线程退出状态,可以传NULL
注意事项
pthread_exit或者return返回的指针所指向的内存单元必须是全局的或者是用malloc分配的,不能在线程函数的栈上分配,因为当其它线程得到这个返回指针时线程函数已经退出了
#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
void *thread_run()
{
while(1) {
printf("我是子线程,线程ID: %lu\n", pthread_self());
sleep(3);
break;
}
pthread_exit((void *)1);
}
#define NUM 1
int main()
{
pthread_t tid[NUM];
for (int i = 0; i < NUM; i++) {
pthread_create(tid + i, NULL, thread_run, NULL);
}
void *status = NULL;
for (int i = 0; i < NUM; i++) {
pthread_join(tid[i], &status);
}
printf("ret: %d\n", (int)status);
return 0;
}
pthread_cancel
用pthread_cancel取消目标线程
thread:线程ID
返回值:成功返回0,失败返回错误码
#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
void *thread_run()
{
while(1) {
printf("我是子线程,线程ID: %lu\n", pthread_self());
sleep(1);
}
}
#define NUM 1
int main()
{
pthread_t tid[NUM];
for (int i = 0; i < NUM; i++) {
pthread_create(tid + i, NULL, thread_run, NULL);
}
printf("wait sub thread\n");
sleep(5);
printf("sub thread cancelled\n");
pthread_cancel(tid[0]);
void *status = NULL;
for (int i = 0; i < NUM; i++) {
pthread_join(tid[i], &status);
}
printf("ret: %d\n", (int)status);
return 0;
}
取消线程,函数的返回值为-1,为常量PTHREAD_ CANCELED
如果我们在子线程中杀掉主线程,子线程会继续运行,但主线程会进入defunct状态,就类似于僵尸进程(不建议这样)
#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
pthread_t g_tid;
void *thread_run()
{
while(1) {
printf("我是子线程,线程ID: %lu\n", pthread_self());
sleep(1);
pthread_cancel(g_tid);
}
}
#define NUM 1
int main()
{
g_tid = pthread_self();
pthread_t tid[NUM];
for (int i = 0; i < NUM; i++) {
pthread_create(tid + i, NULL, thread_run, NULL);
}
printf("wait sub thread\n");
sleep(50);
printf("sub thread cancelled\n");
pthread_cancel(tid[0]);
void *status = NULL;
for (int i = 0; i < NUM; i++) {
pthread_join(tid[i], &status);
}
printf("ret: %d\n", (int)status);
return 0;
}
2.5 线程分离
默认情况下,新创建的线程是joinable的,线程退出后,需要对其进行pthread_join操作,否则无法释放资源,从而造成系统泄漏
如果不关心线程的返回值,join是一种负担,这个时候,我们可以告诉系统,当线程退出时,自动释放线程资源,这就是线程分离
#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
pthread_t g_tid;
void *thread_run()
{
pthread_detach(pthread_self());
while(1) {
printf("我是子线程,线程ID: %lu\n", pthread_self());
sleep(1);
break;
}
return (void *)1;
}
#define NUM 1
int main()
{
pthread_t tid[NUM];
for (int i = 0; i < NUM; i++) {
pthread_create(tid + i, NULL, thread_run, NULL);
}
printf("wait sub thread\n");
sleep(1); // 重要,一定要让线程先分离再等待
printf("sub thread cancelled\n");
int ret = 0;
void *status = NULL;
for (int i = 0; i < NUM; i++) {
ret = pthread_join(tid[i], &status);
}
printf("ret: %d, status: %d\n", ret, (int)status);
sleep(3);
return 0;
}
一个线程被设置为分离之后,绝对不能再进行join了,主线程不退出,新线程在业务处理完毕后退出
3. 线程同互斥与同步
因为多个线程之间是共享地址空间的,也就是很多资源都是共享的
优点:通信方便 缺点:缺乏访问控制
线程安全问题:因为一个线程的操作问题,给其他线程造成了不可控,或者引起崩溃、异常、逻辑错误等现象
创建一个函数没有线程安全问题的话,就不能使用STL、malloc、new等会在全局内有效的数据
访问控制:因为全都是局部变量,线程有自己独立的栈结构
- 临界资源:凡是被线程共享访问的资源都是临界资源(多线程、多进程打印数据到显示器)
- 临界区:在代码中访问临界资源的代码(代码中并不是所有的代码都会访问临界资源,访问临界资源的代码区域才称为临界区)
- 对临界区进行保护的功能,本质就是对临界资源的保护,方式是互斥或同步
- 互斥:在任意时刻,只允许一个执行流访问某段代码(访问某部分资源),就可以称为互斥
- 原子性:不会被任何调度机制打断的操作,该操作只有两态,要么完成,要么未完成
- 同步:让访问临界资源的过程在安全的前提下(一般都是互斥和原子的),让访问资源有一定的顺序性
互斥量
- 大部分情况,线程使用的数据都是局部变量,变量的地址空间在线程栈空间内,这种情况变量归属单个线程,其他线程无法获得这种变量
- 但有时候,很多变量都需要在线程间共享,这样的变量称为共享变量,可以通过数据的共享,完成线程之间的交互
- 多个线程并发的操作共享变量,会带来一些问题
一个模拟抢票的例子
#include <iostream>
#include <pthread.h>
#include <unistd.h>
using namespace std;
int tickets = 10000; // 这里的tickets是不安全的
void *thread_run(void *args)
{
int id = *(int *)args;
delete (int *)args;
while (true)
{
if (tickets > 0)
{
usleep(1000);
cout << "[" << id + 1 << "]抢到了第{" << tickets << "}票" << endl;
tickets--;
}
else
{
break;
}
}
}
#define NUM 5
int main()
{
pthread_t tid[NUM];
for (int i = 0; i < NUM; i++)
{
int *id = new int(i);
pthread_create(tid + i, nullptr, thread_run, id);
}
for (int i = 0; i < NUM; i++)
pthread_join(tid[i], nullptr);
return 0;
}
最后抢到的负数票,可以发现这里的tickets是不安全的
tickets是一个全局变量,因此会存在于内存中
tickets–是一个非原子操作
tickets–从汇编角度来看是多行的代码,一个线程A在执行这几行代码是可能只执行了其中的一两行代码就带着自己的上下文被挂起了,换下一个线程B来执行这几行代码,而由于线程A并没有执行完全,等这个线程B执行结束之后,线程A又带着自己的上下文继续执行,完成之前没有完成的事——对tickets做减减操作,而这养的操作会改变会改变被线程B以及操作过的tickets,可能tickets已经被线程B减到了100,而对于线程的A的上下文tickets还是1000,做渐渐操作之后变成了999
总结一下无法获得正确结果的原因
- if语句判断条件为真之后,代码可以并发的切换到其它线程
- usleep是个漫长的业务过程,在这个漫长的业务过程中,可能会有很多个线程进入该代码段
- tickets–操作本身就不是一个原子操作
拿到ticket–的部分汇编代码
g++ -c mutex.cc -o mutex.o -std=c++11
objdump -d mutex.o > mutex.objdump
90: 8b 05 00 00 00 00 mov 0x0(%rip),%eax # 96 <_Z10thread_runPv+0x96>
96: 83 e8 01 sub $0x1,%eax
99: 89 05 00 00 00 00 mov %eax,0x0(%rip) # 9f <_Z10thread_runPv+0x9f>
有汇编指令我们可以进一步发现tickets–不是原子操作,他对应了三条汇编指令
- load:将共享变量tickets从内存加载到寄存器中
- update:更新寄存器里面的值,执行-1操作
- store:将新值从寄存器写回到共享变量tickets的内存地址当中
解决以上问题,需要如下三点
- 代码必须要有互斥行为:当代码进入临界区执行时,不允许其他线程进入该临界区
- 如果多个线程同时要求执行临界区的代码,并且临界区没有线程在执行,那么只能允许一个线程进入该临界区
- 如果线程不在临界区中执行,那么该线程不能阻止其他线程进入临界区
3.1 互斥量
互斥量的接口
-
初始化互斥量
-
静态分配(一般用于定义全局的或者静态的互斥量)
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
-
动态分配
int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr);
- mutex:要初始化的互斥量
- attr:置为NULL即可
-
-
销毁互斥量
int pthread_mutex_destroy(pthread_mutex_t *mutex);
- 使用
PTHREAD_ MUTEX_ INITIALIZER
初始化的互斥量不需要销毁 - 不要销毁一个已经加锁的互斥量
- 已经销毁的互斥量,要确保后面不会有线程再尝试加锁
- 使用
-
互斥量加锁和解锁
int pthread_mutex_lock(pthread_mutex_t *mutex); int pthread_mutex_unlock(pthread_mutex_t *mutex);
- 加锁/解锁成功返回0,失败返回错误码
- 互斥量处于未锁状态,该函数会将互斥量锁定,同时返回成功
- 发起函数调用时,其他线程已经锁定互斥量,或者存在其他线程同时申请互斥量,但没有竞争到互斥量,那么pthread_ lock调用会陷入阻塞(执行流被挂起),等待互斥量解锁
加锁之后的模拟抢票
#include <iostream>
#include <pthread.h>
#include <unistd.h>
using namespace std;
class Ticket
{
private:
int tickets;
pthread_mutex_t mtx;
public:
Ticket() : tickets(1000)
{
pthread_mutex_init(&mtx, nullptr);
}
bool getTicket()
{
bool flag = true;
pthread_mutex_lock(&mtx);
// 执行这部分代码的执行流就是互斥的,也就是串行执行的
while (true)
{
if (tickets > 0)
{
usleep(1000);
cout << "[" << pthread_self() << "]抢到了第{" << tickets << "}票" << endl;
tickets--;
}
else
{
cout << "票已经被抢完了" << endl;
flag = false;
break;
}
}
pthread_mutex_unlock(&mtx);
return flag;
}
~Ticket()
{
pthread_mutex_destroy(&mtx);
}
};
void *thread_run(void *args)
{
Ticket *ticket = (Ticket *)args;
while (true)
if (!ticket->getTicket())
break;
}
#define NUM 5
int main()
{
pthread_t tid[NUM];
Ticket *ticket = new Ticket();
for (int i = 0; i < NUM; i++)
pthread_create(tid + i, nullptr, thread_run, ticket);
for (int i = 0; i < NUM; i++)
pthread_join(tid[i], nullptr);
return 0;
}
有一个问题:
我们访问临界资源tickets的时候,需要先访问锁mtx,前提是所有的线程都必须先看到它
因此它本身也是临界资源,那么它是如何保证锁本身是安全的?
原理:加锁和解锁都是原子的
一行代码是原子的:只有一行汇编的情况
互斥量的实现原理
为了实现互斥锁操作,大多数体系结构都提供了swap或exchange指令,该指令的作用是把寄存器和内存单元的数据相交换,由于只有一条指令,保证了原子性,即使是多处理器平台,访问内存的总线周期也有先后,一个处理器上的交换指令执行时另一个处理器的交换指令只能等待总线周期
al为一个寄存器,将al寄存器置0
当CPU执行线程A的代码的时候,CPU寄存器内的数据是线程A私有的,是这个执行流的上下文
xchgb %al, mutex
就是使用一行汇编,原子性的完成共享的内存数据mutex交换到线程A的寄存器al中,也就是它的上下文中,从而实现私有的过程(mutex的本质就是通过一条汇编,将锁数据交换到自己的上下文中)
在执行临界区的代码时,线程也有可能会被切走,线程被切走的时候,会带走自己的上下文,而锁数据也是在线程的上下文中的,也就是拥有锁的线程被切走时是带着锁走的,因为锁只有一把,所以在此期间,其它线程无法申请锁,也就没有办法进入临界区。站在其它线程的角度,对其他线程有意义的状态,就是线程A要么没有申请,要么就已经使用完锁,这也就是线程A访问临界区的原子性
正是如此,代码是程序员编写的,为了保证临界区的安全,必须让每个线程遵守相同的“编码规范”,加入A申请了锁,其它线程的代码也必须要申请
可重入与线程安全
- 概念
- 线程安全:多个线程并发同一段代码时,不会出现不同的结果。常见于对全局变量或者静态变量进行操作,并且没有锁保护的情况下,会出现该问题
- 重入:同一个函数被不同的执行流调用,当前一个流程还没有执行完,就有其它的执行流再次进入,我们称之为重入。一个函数在重入的情况下,运行结果不会出现任何不同或者任何问题,则该函数被称为可重入函数,否则是不可重入函数
- 常见线程不安全的情况
- 不保护共享变量的函数
- 函数状态随着被调用,状态发生变化的函数
- 返回指向静态变量指针的函数
- 调用线程不安全函数的函数
- 常见线程安全的情况
- 每个线程对全局变量或者静态变量只有读取的权限,而没有写入的权限,一般来说这些线程是安全的
- 类或者接口对于线程来说都是原子操作
- 多个线程之间的切换不会导致该接口的执行结果存在二义性
- 常见不可重入的情况
- 调用了malloc/free函数,因为malloc函数是用全局链表来管理堆的
- 调用了标准I/O库函数,标准I/O库的很多实现都以不可重入的方式使用全局数据结构
- 可重入函数体内使用了静态的数据结构
- 常见可重入的情况
- 不使用全局变量或静态变量
- 不使用用malloc或者new开辟出的空间
- 不调用不可重入函数
- 不返回静态或全局数据,所有数据都有函数的调用者提供
- 使用本地数据,或者通过制作全局数据的本地拷贝来保护全局数据
- 可重入与线程安全联系
- 函数是可重入的,那就是线程安全的
- 函数是不可重入的,那就不能由多个线程使用,有可能引发线程安全问题
- 如果一个函数中有全局变量,那么这个函数既不是线程安全也不是可重入的
- 可重入与线程安全的区别
- 可重入函数是线程安全函数的一种
- 线程安全不一定是可重入的,而可重入函数则一定是线程安全的
- 如果将对临界资源的访问加上锁,则这个函数是线程安全的,但如果这个重入函数若锁还未释放则会产生死锁,因此是不可重入的
3.2 死锁
死锁是指在一组进程中的各个进程均占有不会释放的资源,但因互相申请被其他进程所站用不会释放的资源而处于的一种永久等待状态
死锁的四个必要条件
- 互斥条件:一个资源每次只能被一个执行流使用
- 请求与保持条件:一个执行流因请求资源而阻塞时,对已获得的资源保持不放
- 不剥夺条件:一个执行流已获得的资源,在末使用完之前,不能强行剥夺
- 循环等待条件:若干执行流之间形成一种头尾相接的循环等待资源的关系
避免死锁
- 破坏死锁的四个必要条件
- 加锁顺序一致
- 避免锁未释放的场景
- 资源一次性分配
避免死锁的算法
- 死锁检测算法
- 银行家算法
3.3 同步-条件变量
- 当一个线程互斥地访问某个变量时,它可能发现在其它线程改变状态之前,它什么也做不了
- 例如一个线程访问队列时,发现队列为空,它只能等待,只到其它线程将一个节点添加到队列中。这种情况就需要用到条件变量
两个概念
- 同步:在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题,叫做同步
- 竞态条件:因为时序问题,而导致程序异常,我们称之为竞态条件。在线程场景下,这种问题也不难理解
条件变量接口
-
初始化条件变量
-
静态分配
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
-
动态分配
int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr);
- cond:要初始化的条件变量
- attr:可置为NULL
-
-
销毁条件变量
int pthread_cond_destroy(pthread_cond_t *cond);
-
等待条件满足
int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex);
- cond:要在这个条件变量上等待
- mutex:互斥量
-
唤醒等待
// 唤醒在条件变量下等待的第一个线程,会有一个等待队列,依次被唤醒,唤醒之后回到队尾 int pthread_cond_broadcast(pthread_cond_t *cond); // 唤醒条件变量下的所有线程 int pthread_cond_signal(pthread_cond_t *cond);
例子
#include <iostream>
#include <pthread.h>
#include <unistd.h>
using namespace std;
pthread_mutex_t mtx;
pthread_cond_t cond;
// 用ctrl控制worker线程,让它定期执行
void *ctrl(void *args)
{
while (true)
{
// pthread_cond_signal(&cond);
pthread_cond_broadcast(&cond);
sleep(1);
}
}
void *work(void *args)
{
int id = *(int *)args;
delete (int *)args;
while (true)
{
pthread_cond_wait(&cond, &mtx);
cout << "worker[" << id << "] is working..." << endl;
}
}
int main()
{
#define NUM 3
pthread_mutex_init(&mtx, nullptr);
pthread_cond_init(&cond, nullptr);
pthread_t master;
pthread_t worker[NUM];
pthread_create(&master, nullptr, ctrl, (void*)"master");
for (int i = 0; i < NUM; i++)
{
int *id = new int(i);
pthread_create(worker + i, nullptr, work, id);
}
pthread_join(master, nullptr);
for (int i = 0; i < NUM; i++)
pthread_join(worker[i], nullptr);
pthread_mutex_destroy(&mtx);
pthread_cond_destroy(&cond);
return 0;
}
为什么pthread_cond_wait
需要互斥量
- 条件等待是线程间同步的一种手段,如果只有一个线程,条件不满足,一直等下去都不会满足,所以必须要有一个线程通过某些操作,改变共享变量,使原先不满足的条件变得满足,并且友好的通知等待在条件变量上的线程
- 条件不会无缘无故的突然变得满足了,必然会牵扯到共享数据的变化。所以一定要用互斥锁来保护。没有互斥锁就无法安全的获取和修改共享数据
先上锁,发现条件不满足,解锁,然后再等待条件变量
// 错误的设计
pthread_mutex_lock(&mutex);
while (condition_is_false) {
pthread_mutex_unlock(&mutex);
//解锁之后,等待之前,条件可能已经满足,信号已经发出,但是该信号可能被错过
pthread_cond_wait(&cond);
pthread_mutex_lock(&mutex);
}
pthread_mutex_unlock(&mutex);
- 由于解锁和等待不是原子操作,调用解锁之后,
pthread_cond_wait
之前,如果已经有其他线程获取到互斥量,摒弃条件满足,发送了信号,那么pthread_cond_wait
将错过这个信号,可能会导致线程永远阻塞在这个pthread_cond_wait
,所以解锁和等待必须是一个原子操作 int pthread_cond_wait(pthread_cond_ t *cond, pthread_mutex_ t *mutex);
进入该函数后,会去看条件量等于0不?等于,就把互斥量变成1,直到cond_ wait返回,把条件量改成1,把互斥量恢复成原样
条件变量使用规范
-
等待条件代码
pthread_mutex_lock(&mutex); while (条件为假) pthread_cond_wait(cond, mutex); 修改条件 pthread_mutex_unlock(&mutex);
-
给条件发送信号代码
pthread_mutex_lock(&mutex); 设置条件为真 pthread_cond_signal(cond); pthread_mutex_unlock(&mutex);
使用细节
- 调用的时候,会首先自动释放_mtx,然后再挂起自己
- 返回的时候,会首先自动竞争锁,获取到锁之后,才能返回
3.4 生产者消费者模型
为什么要使用生产者消费者模型
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的
生产者消费者模型优点
- 解耦
- 支持并发
- 支持忙闲不均
“321原则”
- 三种关系
- 生产商 - 生产者:竞争、互斥
- 消费者 - 消费者:竞争、互斥
- 生产者 - 消费者:互斥、同步
- 两种角色
- n个生产则
- n个消费者
- 一个交易场所
- 一段缓冲区(内存空间,STL容器等)
基于BlockingQueue的生产者消费者模型
BlockingQueue
在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)
Task.hpp
#pragma once
#include <iostream>
#include <pthread.h>
namespace ns_task
{
class Task
{
private:
int _x;
int _y;
char _op;
public:
Task()
{
}
Task(int x, int y, char op) : _x(x), _y(y), _op(op)
{
}
~Task()
{
}
public:
int run()
{
int res = 0;
switch (this->_op)
{
case '+':
res = this->_x + this->_y;
break;
case '-':
res = this->_x - this->_y;
break;
case '*':
res = this->_x * this->_y;
break;
case '/':
res = this->_x / this->_y;
break;
case '%':
res = this->_x % this->_y;
break;
default:
std::cout << "计算出错" << std::endl;
break;
}
std::cout << "thread[" << pthread_self() << "]: " << this->_x << this->_op << this->_y << "=" << res << std::endl;
return res;
}
};
}
BlockQueue.hpp
#pragma once
#include <iostream>
#include <queue>
#include <pthread.h>
namespace ns_blockqueue
{
const int default_capacity = 5;
template <class T>
class BlockQueue
{
private:
std::queue<T> _bq; // 阻塞队列
int _capacity;
pthread_mutex_t _mtx; // 保护临界资源的锁
// 1.当生产满了的时候,就不应该再生产了,也就是生产者不要再竞争锁了,而应该让消费者来消费
// 2.当消费完了的时候,就不应该再消费了,也就是消费者不要再竞争锁了,而应该让生产者来生产
pthread_cond_t _full; // _bq满了,生产者在该条件下等待
pthread_cond_t _empty; // _bq空了,消费者在该条件下等待
private:
void lockQueue()
{
pthread_mutex_lock(&this->_mtx);
}
void unlockQueue()
{
pthread_mutex_unlock(&this->_mtx);
}
bool isFull()
{
return _bq.size() == this->_capacity;
}
void producerWait()
{
// 1.调用的时候,会首先自动释放_mtx,然后再挂起自己
// 2.返回的时候,会首先自动竞争锁,获取到锁之后,才能返回
pthread_cond_wait(&this->_full, &this->_mtx);
}
void wakeConsumer()
{
pthread_cond_signal(&this->_empty);
}
bool isEmpty()
{
return this->_bq.size() == 0;
}
void consumerWait()
{
pthread_cond_wait(&this->_empty, &this->_mtx);
}
void wakeProducer()
{
pthread_cond_signal(&this->_full);
}
public:
BlockQueue(int capacity = default_capacity) : _capacity(capacity)
{
pthread_mutex_init(&this->_mtx, nullptr);
pthread_cond_init(&this->_full, nullptr);
pthread_cond_init(&this->_empty, nullptr);
}
~BlockQueue()
{
pthread_mutex_destroy(&this->_mtx);
pthread_cond_destroy(&this->_full);
pthread_cond_destroy(&this->_empty);
}
public:
void push(const T &in)
{
this->lockQueue();
// 进行条件检测的时候,需要使用循环的方式,来保证退出循环一定是因为条件不满足导致的
while (this->isFull())
{
// 等待时把线程挂起,当前是持有锁的,会存在如下两种情况不应该让程序继续执行
// 1.挂起失败
// 2.被伪唤醒了
this->producerWait();
}
// 生产者函数,向队列中放数据
_bq.push(in);
this->unlockQueue();
this->wakeConsumer();
}
void pop(T *out)
{
this->lockQueue();
while (this->isEmpty())
{
this->consumerWait();
}
// 消费者函数,从队列中拿数据
*out = _bq.front();
_bq.pop();
this->unlockQueue();
this->wakeProducer();
}
};
}
CpTest.cc
#include "BlockQueue.hpp"
#include "Task.hpp"
#include <ctime>
#include <cstdlib>
#include <unistd.h>
using namespace ns_blockqueue;
using namespace ns_task;
void *producer(void *args)
{
BlockQueue<Task> *bq = (BlockQueue<Task>*)args;
std::string ops = "+-*/%";
while (true)
{
int x = rand()%20+1;
int y = rand()%10+1;
char op = ops[rand() % ops.size()];
Task task(x, y, op);
bq->push(task);
sleep(1);
}
}
void *consumer(void* args)
{
BlockQueue<Task> *bq = (BlockQueue<Task>*)args;
while (true)
{
// sleep(2);
Task task;
bq->pop(&task);
task.run();
}
}
int main()
{
srand((long long)time(nullptr));
BlockQueue<Task> *bq = new BlockQueue<Task>();
pthread_t p;
pthread_t c1, c2, c3, c4, c5;
pthread_create(&p, nullptr, producer, (void*)bq);
pthread_create(&c1, nullptr, consumer, (void*)bq);
pthread_create(&c2, nullptr, consumer, (void*)bq);
pthread_create(&c3, nullptr, consumer, (void*)bq);
pthread_create(&c4, nullptr, consumer, (void*)bq);
pthread_create(&c5, nullptr, consumer, (void*)bq);
pthread_join(p, nullptr);
pthread_join(c1, nullptr);
pthread_join(c2, nullptr);
pthread_join(c3, nullptr);
pthread_join(c4, nullptr);
pthread_join(c5, nullptr);
return 0;
}
Makefile
CpTest:CpTest.cc
g++ $^ -o $@ -lpthread -std=c++11
.PHONY:clean
clean:
rm -f CpTest
3.5 POSIX信号量
信号量的概念
- 信号量本质就是一把计数器,描述临界资源中资源数目的大小,即最多能有多少资源分配给线程
- 临界资源如果可以被划分成为一个一个的小资源,如果处理得当,也有可能让多个线程同时访问临界资源的不同区域,从而实现并发
- 信号量如果能被合理使用,可以达到预定资源的目的
- 每个线程想访问临界资源都得先申请信号量资源(有且一定会有线程的小块资源)
信号量操作函数
POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的,但POSIX可以用于线程间同步
头文件:#include <semaphore.h>
-
初始化信号量
int sem_init(sem_t *sem, int pshared, unsigned int value);
- pshared:0表示线程间共享,非零表示进程间共享
- value:信号量初始值
-
销毁信号量
int sem_destroy(sem_t *sem);
-
等待信号量(P()操作)
int sem_wait(sem_t *sem);
- 等待信号量,会将信号量的值减1
-
发布信号量(V()操作)
int sem_post(sem_t *sem);
- 发布信号量,表示资源使用完毕,可以归还资源,将信号量值加1
3.6 基于环形队列的生产消费者模型
环形队列采用数组模拟,用模运算来模拟环状特性
环形结构起始状态和结束状态都是一样的,不好判断为空或者为满,所以可以通过加计数器或者标记位来判断满或者空;另外也可以预留一个空的位置,作为满的状态
-
生产者消费者不能同时进行(互斥特性+同步特性)
- 生产者和消费者开始的时候,指向的就是同一个位置,即队列为空的时候应该让生产者先生产
- 生产者和消费者在队列为满的时候,也指向的是同一个位置,即应该让消费者先消费
-
生产者消费者可以并发执行
- 队列不为空、不为满的时候,生产者和消费者一定指向的不是同一个位置
-
规则
- 生产者不能把消费者套一个圈
- 消费者不能超过生产者
- 当指向同一个位置的时候,要根据空、满的状态来判定让谁先指执行
- 除以上三种之外,生产和消费都可以并发执行
单生产者单消费者
CircularQueue.hpp
#pragma once
#include <iostream>
#include <pthread.h>
#include <semaphore.h>
#include <vector>
namespace ns_circularqueue
{
const int default_capacity = 10;
template <class T>
class CircularQueue
{
private:
std::vector<T> _cq;
int _capacity;
sem_t _blanck_sem;
sem_t _data_sem;
int _cstep;
int _pstep;
public:
CircularQueue(int capacity = default_capacity) : _cq(capacity), _capacity(capacity)
{
sem_init(&this->_blanck_sem, 0, this->_capacity);
sem_init(&this->_data_sem, 0, 0);
this->_cstep = this->_pstep = 0;
}
~CircularQueue()
{
sem_destroy(&this->_blanck_sem);
sem_destroy(&this->_data_sem);
}
public:
void push(const T &in)
{
// 生产接口
sem_wait(&this->_blanck_sem); // P(空位置)
this->_cq[this->_pstep] = in;
sem_post(&this->_data_sem); // V(数据)
this->_pstep++;
this->_pstep %= this->_capacity;
}
void pop(T *out)
{
// 消费接口
sem_wait(&this->_data_sem); //P(数据)
*out = this->_cq[this->_cstep];
sem_post(&this->_blanck_sem); //V(空位置)
this->_cstep++;
this->_cstep %= this->_capacity;
}
};
}
CpTest.cc
#include "CircularQueue.hpp"
#include <ctime>
#include <unistd.h>
using namespace ns_circularqueue;
void *consumer(void *args)
{
CircularQueue<int> *cq = (CircularQueue<int>*)args;
while (true)
{
int *data = new int();
cq->pop(data);
std::cout << "consumer get: " << *data << std::endl;
sleep(1);
}
}
void *producer(void *args)
{
CircularQueue<int> *cq = (CircularQueue<int>*)args;
while (true)
{
int num = rand()%20 + 1;
cq->push(num);
std::cout << "producer push: " << num << std::endl;
}
}
int main()
{
srand((long long)time(nullptr));
CircularQueue<int> *cq = new CircularQueue<int>();
pthread_t c, p;
pthread_create(&c, nullptr, consumer, (void *)cq);
pthread_create(&p, nullptr, producer, (void *)cq);
pthread_join(c, nullptr);
pthread_join(p, nullptr);
return 0;
}
Makefile
CpTest:CpTest.cc
g++ $^ -o $@ -lpthread -std=c++11
.PHONYE:clean
clean:
rm -f CpTest
多生产者多消费者
CircularQueue.hpp
#pragma once
#include <iostream>
#include <pthread.h>
#include <semaphore.h>
#include <vector>
namespace ns_circularqueue
{
const int default_capacity = 10;
template <class T>
class CircularQueue
{
private:
std::vector<T> _cq;
int _capacity;
sem_t _blanck_sem;
sem_t _data_sem;
int _cstep;
int _pstep;
pthread_mutex_t _c_mtx;
pthread_mutex_t _p_mtx;
public:
CircularQueue(int capacity = default_capacity) : _cq(capacity), _capacity(capacity)
{
sem_init(&this->_blanck_sem, 0, this->_capacity);
sem_init(&this->_data_sem, 0, 0);
this->_cstep = this->_pstep = 0;
pthread_mutex_init(&this->_c_mtx, nullptr);
pthread_mutex_init(&this->_p_mtx, nullptr);
}
~CircularQueue()
{
sem_destroy(&this->_blanck_sem);
sem_destroy(&this->_data_sem);
pthread_mutex_destroy(&this->_c_mtx);
pthread_mutex_destroy(&this->_p_mtx);
}
public:
void push(const T &in)
{
// 生产接口
sem_wait(&this->_blanck_sem); // P(空位置)
pthread_mutex_lock(&this->_p_mtx);
this->_cq[this->_pstep] = in;
this->_pstep++;
this->_pstep %= this->_capacity;
pthread_mutex_unlock(&this->_p_mtx);
sem_post(&this->_data_sem); // V(数据)
}
void pop(T *out)
{
// 消费接口
sem_wait(&this->_data_sem); //P(数据)
pthread_mutex_lock(&this->_c_mtx);
*out = this->_cq[this->_cstep];
this->_cstep++;
this->_cstep %= this->_capacity;
pthread_mutex_unlock(&this->_c_mtx);
sem_post(&this->_blanck_sem); //V(空位置)
}
};
}
CpTest.cc
#include "CircularQueue.hpp"
#include <ctime>
#include <unistd.h>
using namespace ns_circularqueue;
void *consumer(void *args)
{
CircularQueue<int> *cq = (CircularQueue<int>*)args;
while (true)
{
int *data = new int();
cq->pop(data);
std::cout << "consumer" << "[" << pthread_self() << "] get: " << *data << std::endl;
sleep(1);
}
}
void *producer(void *args)
{
CircularQueue<int> *cq = (CircularQueue<int>*)args;
while (true)
{
int num = rand()%20 + 1;
cq->push(num);
std::cout << "producer" << "[" << pthread_self() << "] get: " << num << std::endl;
}
}
int main()
{
srand((long long)time(nullptr));
CircularQueue<int> *cq = new CircularQueue<int>();
pthread_t c1, c2, c3, c4;
pthread_t p1, p2, p3;
pthread_create(&c1, nullptr, consumer, (void *)cq);
pthread_create(&c2, nullptr, consumer, (void *)cq);
pthread_create(&c3, nullptr, consumer, (void *)cq);
pthread_create(&c4, nullptr, consumer, (void *)cq);
pthread_create(&p1, nullptr, producer, (void *)cq);
pthread_create(&p2, nullptr, producer, (void *)cq);
pthread_create(&p3, nullptr, producer, (void *)cq);
pthread_join(c1, nullptr);
pthread_join(c2, nullptr);
pthread_join(c3, nullptr);
pthread_join(c4, nullptr);
pthread_join(p1, nullptr);
pthread_join(p2, nullptr);
pthread_join(p3, nullptr);
return 0;
}