【Linux】Linux多线程(上)
前言
hi~ 大家好呀,欢迎来到我的Linux学习笔记。本篇笔记将会重点从内核结构引入Linux下的线程,理解Linux下线程和进程的相关性和区别,以及线程相关的操作方法,在到之后的线程互斥和线程同步中的条件变量相关概念哦~
Linux进程控制和这篇很搭哦~【Linux】进程控制_柒海啦的博客-CSDN博客
我的上一篇Linux文章~【Linux】进程信号_柒海啦的博客-CSDN博客
我们一起努力吧~
目录
一、线程概念
1.堆区零散化和页帧、页框
2.理解线程
3.线程和进程的调度层面
二、线程控制
1.线程创建和线程异常
-pthread_create-
2.线程等待
-pthread_join-
-pthread_exit-
-pthread_cancel-
3.线程id
-pthread_self-
4.共享资源和分离线程
__thread
-pthread_detach-
三、线程互斥
1.临界相关概念
2.互斥锁
pthread_mutex_t
锁的初始化:
-pthread_mutex_init-
上锁:
-pthread_mutex_lock-
解锁:
-pthread_mutex_unlock-
释放锁:
-pthread_mutex_destroy-
互斥锁原理:
3.可重入和线程安全
4.死锁问题
死锁的必要条件:
四、线程同步-条件变量
1.线程同步概念
2.条件变量概念与相关接口
条件变量:
pthread_cond_t
条件变量的初始化和摧毁:
-pthread_cond_init&&pthread_cond_destroy-
条件变量等待:
-pthread_cond_wait-
条件发送通知:
-pthread_cond_signal&&pthread_cond_broadcast-
3.生产者与消费者模型
4.基于BlockingQueue的阻塞队列模拟生产消费模型
一、线程概念
1.堆区零散化和页帧、页框
在学习线程前,我们先补充学习几个知识点。
首先,当一个进程在内核被调度后,建立的关系应该如下图所示:
像栈区和代码区就是被整体使用的,但是对于堆区来说实际上是被零散化的,我们可以从平时代码看出来:new、malloc申请出的空间地址比较分散。
而且我们也可以明显的发现在C/C++代码下,我们每次只告诉new 或者malloc要多少什么空间,但是并没有告诉其什么时候结束,又或者申请的一段一段内存如何精确找到并且保存其区域的呢?
实际上,在内核中用vm_area_struct结构体管理申请的每一份堆空间。
vm_area_struct
其中有如下的结构可以帮我们解决上面的管理问题:
vm_start、vm_end 他们都是无符号长整数,分别存储当前申请堆空间的起始和终止地址。而返回的就是申请的堆空间起始地址。
*vm_next; *pre; 这两个是双链表的结构。实际上,在堆区中申请的空间由整个双链表组织起来,每个结点就是申请的一份堆空间。
另外,OS是可以做到让进程进行资源的细粒度划分的。
在虚拟地址与物理地址进行相互转化的过程中,大致的情况可以如下图所示:
因为地址空间大小为4g(以此为标准),对于分出来的4kb的大小,大概有100w(4GB/4KB)个页帧或者页框。那么OS会不会对这些进行管理呢?
先描述在组织。OS必须要对其进行管理。在内核中用page结构体进行管理,并且一定是特别小。在此结构中存在标记位flages用来标识状态,为了组织起来自然就是struct page mem[100w+],然后每次访问页的时候就利用下标进行访问。
所以,在这里我们可以明确的知道io的基本单位就是4kb,无论每次大小是多少。
如果此时对应进程地址空间在页表的右端进行寻址的时候找不到对应的内存,此时就会发生缺页中断。即此时就申请页框,在去磁盘找到对应页帧的地址,加载带入内存,填入物理内存,并且上述操作对用户基本透明。
如上,我们需要知道:操作系统和编译器均为4kb进行划分的,这样io的基本单位就是4kb。
2.理解线程
首先,什么叫做线程?
线程
线程在进程内部执行,是操作系统调度的基本单位。
我们从下面这张图入手:
可以发现,这和我们以前所学的进程结构有所不同。以前的都是一个PCB(task_struct)对应一个进程地址空间,页表和内存。可是现在缺有很多歌PCB对应着右边一个结构(进程地址空间+页表+内存)。
上述通过一定的技术手段,将当前进程的"资源",以一定的方式划分给不同的task_struct。此时每一个task_struct 实际上就代表了一个线程。它和我们之前所学的的进程创建子进程来说区别就是不会申请进程地址空间、页表。所以实际上就可以体现出线程是在进程内部执行,是OS进程调度的基本单位。
因为一个进程只有一张进程地址空间已经页表对应映射内存的一部分区域。所以上图中所有的PCB结构均可代表一个线程,也就是一个执行流,共享此进程的资源。
而第一个线程(实际就是最开始没有创建其他线程的主执行流)就是主线程,其余叫新线程。主线程向操作系统申请资源,然后由主线程分配资源给其余新线程。
那么此时,什么才叫进程呢?
1.从资源角度:
进程就是:内核数据结构+该进程所对应的代码和数据。而这里的内核数据结构可以存在多个PCB。(用户视角)
承担分配系统资源的基本实体。(创建其的第一个PCB -> 向操作系统要资源=进程的身份,线程想要资源就会向此进程要)(内核视角)
2.如何理解曾经写过的代码:
以前所写的,就是只存在一个PCB ,内部只存在一个执行流的进程。现在多线程就是:内部具有多个执行流的进程。一大批的执行流(最少一个)+数据代码 。而一个PCB相当于一个进程中的一个执行流。
3.CPU视角:
CPU其实不怎么关心当前是进程还是线程的概念,只认task_struct,CPU调度的基本单位为“线程”。
在Linux下,一个PCB <= 其他操作系统的PCB,Linux下的进程:轻量级进程(LWP)。(拿到的可能是独立运行的PCB,也可能是多线程的进程中一个执行流)
对于Linux来说,没有真正意义上的线程结构(没有为其专门设计数据结构),Linux是用进程PCB模拟的线程。
Linux并不能直接给我们提供线程相关的接口,只能提供轻量级进程的接口。-> 在用户层实现了一套用户层多线程方案,以库的方式提供给用户进行使用。pthread -- 线程库 原生线程库 用户级线程。(注意此pthread库属于第三方库(不是系统也不是语言提供的),所以在进行编译链接的时候需要加上链接so库选项-l,静态库与动态库相关操作可以看这篇博客哦:(1条消息) 【Linux】静动态库的制作和使用_柒海啦的博客-CSDN博客)
上面说了这么多,我想在正事介绍线程相关接口开始先来进行一段代码演示一遍(接口在后面会详细说明哦~)
我们创建三个线程,每个线程打印不同的名字。主线程是另外一个(一共四个线程)。并且每个线程输出当前进程的pid进行验证是否在同一进程,最后通过PS命令查看进程和轻量级进程,观察现象:
#include <iostream>
#include <pthread.h> // 注意是第三方库
#include <unistd.h>
using namespace std;
void* threadRun(void* argc)
{
while (1)
{
cout << "线程" << (char*)argc << "执行任务,当前进程pid为" << getpid() << endl;
sleep(1);
}
return nullptr;
}
int main()
{
// 主线程
pthread_t tid[3]; // 创建三个线程id变量
char name[48] = {0}; // 传名字的缓冲区
for (int i = 0; i < 3; ++i)
{
snprintf(name, sizeof name, "%s - id:%d", "thread", i + 1);
pthread_create(tid + i, nullptr, threadRun, (void*)name); // id地址 属性 回调函数void* (void*) 传入回调函数的参数
sleep(1); // 缓解传参产生的bug
}
while (true)
{
cout << "main线程执行任务-pid为" << getpid() << endl;
sleep(3);
}
return 0;
}
可以发现,我们可以看的出来主函数(主线程-主执行流)确实创建出了三个新线程,利用ps -aL选项我们可以看到其每个轻量级进程对应的PID和LWP。这四个LWP(轻量级进程)的pid都一样说明是属于一个进程的四个线程。其中第一个线程的LWP和PID一致表示此线程就是主线程,终止其线程整个进程均会退出。
那么,线程是如何看待进程内部的资源的呢?
对于一个进程中的线程来说,绝大多数资源都是共享的。比如文件描述符,每种信号的处理方式,当前工作目录,用户id和组id,进程地址空间的绝大部分(除开栈区(一般认为私有,但是如果想让线程之间进行共享也不是不可以))。
那么对于线程来说还是存在私有的属性:比如线程id,一组寄存器(线程自己维护的上下文),栈(独占结果),errno,信号屏蔽字,调度优先级等等......
3.线程和进程的调度层面
首先我们需要明确:进程是资源分配的基本单位,线程是调度的基本单位。
其实平时我们从各种资料都听说过线程的切换的成本更低,那么在Linux底下为什么呢?CPU不都看见的是轻量级进程么,为什么线程会成本低呢?
- 地址空间不需要切换,页表不需要切换(不切换几个寄存器内存地址的事情)
- 核心原因:CPU内部存在缓存的。硬件级别的L1 ~L3 cache 代码数据缓存,对内存的代码和数据根据局部性原理(一条指令的附近的代码大概率被使用)预读到CPU内部。如果是进程切换,进程具有独立性,缓存cache就会立即失效,新进程过来只能重新缓存。
那么,线程越多越好吗?自然不可,因为有可能本身任务量小于调度线程所消耗的性能了。
一般线程适合作用于计算密集型应用,多处理系统上运行,计算分解到多个线程实现;io密集型应用,为了提高性能,io操作重叠。线程可以同时等待不同的io操作。一般按照约定来说一个进程的线程数=cpu的核心数,最大就不能超过cpu的核心数。
那么线程具有什么缺点么?
1.性能损失(一般用户操作失误,创建过多线程)
2.健壮性降低(同一份代码在多线程环境下就可能存在问题)
3.缺乏访问控制
4.编程难度提高
我们需要合理的使用线程,用于提高CPU效率、io密集型效率,这样才能发挥线程的真正作用。
二、线程控制
现在来学习线程相关的接口(第三方库封装系统调用实现),在Linux下,为了方便的在用户层实现线程,存在一个第三方库即pthread。因为第三方库的使用,所以编译器在最后链接的时候会找不到对应的动态库,所以我们需要使用-l(小写L)选项指定libpthread.so动态库即可。
1.线程创建和线程异常
线程创建在上面其实已经验证过了,现在我们通过学习具体的接口进行验证。
-pthread_create-
man 3 pthread_create
头文件:
#include <pthread.h>
函数原型:
int pthread_create(pthread_t *thread, const pthread_attr_t *attr,\
void *(*start_routine) (void *), void *arg);通过传入输出型参数线程id和属性以及回调函数创建一个执行流去执行回调函数。
函数参数:
thread:pthread_t * 表示传入pthread_t类型变量地址,会返回此线程的id。
attr:线程属性信息,暂时可不用管(置为nullptr)
start_routine:回调函数,返回值为void*,参数为void* 类型的函数
arg:回调函数的参数,表示向此回调函数传入的参数。
返回值:成功的时候返回0,失败的时候返回一个错误码。
我们利用上面的接口验证如下场景:创建一个线程,让这个线程执行任务。线程处理的函数中发生中断(即让其发生空指针引用问题-硬件异常导致发生信号),查看整个进程是否被异常退出。
#include <iostream>
#include <pthread.h>
#include <unistd.h>
using namespace std;
void* threadRun(void* argc)
{
int count = 10;
while (true)
{
cout << "新线程执行自己的任务中......" << endl;
sleep(1);
--count;
if (count == 0)
{
// 执行空指针引用触发信号
int* tmp = nullptr;
*tmp = 6;
}
}
return nullptr;
}
int main()
{
// 1 创建一个线程,线程中发生异常中断
pthread_t tid;
pthread_create(&tid, nullptr, threadRun, nullptr);
// 主线程自己完成自己的事情
while (true)
{
cout << "主线程执行自己的任务中......" << endl;
sleep(1);
}
return 0;
}
运行结果:
可以发现,10s后,虽然主线程执行过程中没有任何问题,但是因为新线程发送了段错误导致发送信号让整个进程终止。并且整个主线程和新线程谁先出先都不一定,因为都属于cpu的调度队列中。
由此,其实我们可以得到两个结论:
1.主线程和新线程创建出来谁先运行也不确定,由机器的调度器决定。
2.线程一旦异常,都可能导致整个进程退出。
线程异常:任何一个线程一旦崩溃,整个进程均会跟着崩溃
2.线程等待
和进程类似的,线程在创建过程中也是需要等待的。如果主线程不等待的话会引起类似进程的僵尸问题,从而导致内存泄漏。(注意线程和进程不同,线程无法观测到其是否为僵尸-未被回收状态,所以在观察角度暂时无法观察,只能用代码进行测试)
-pthread_join-
man 3 pthread_join
头文件:
#include <pthread.h>
函数原型:
int pthread_join(pthread_t thread, void **retval);
通过对应线程id阻塞等待回收对应线程,并且回收其返回值。
函数参数:
thread:线程id。
retval:二级指针,传入接收指针变量的地址,获取对应回调函数(在线程创建加入的回调函数)的返回值。
返回值:成功0,失败返回错误码。
注意:
默认为阻塞等待,不像进程可以非阻塞等待(1)。
线程等待不需要关心什么退出异常或者信号-线程一崩,整个进程就没有了。
注意上面的回收回调函数的返回值,那么我们可以让线程自己终止的办法有吗?
return就是其中一个(函数返回),exit不行(因为是进程终止函数,所以一旦终止整个进程就会终止),但是pthread_exit(void*)就可以进行终止线程。
-pthread_exit-
man 3 pthread_exit
头文件:
#include <pthread.h>
函数原型:
void pthread_exit(void *retval);
终止当前线程,并通过retval返回一个值(如果线程是可连接的-即被等待)
函数参数:
retval 回调函数的返回值
另外除了自己终止自己的方法,还有一个线程取消库函数,可以取消对应线程。(新线程也可以取消主线程哦~但是不推荐这么做)
-pthread_cancel-
man 3 pthread_cancel
头文件:
#include <pthread.h>
函数原型:
int pthread_cancel(pthread_t thread);
向对应线程id的线程发送取消请求,但是取决于两个属性:都在该线程的控制之下:它的取消状态和类型。
函数参数:
thread:线程id
返回值:0成功,错误码失败。
注意:
取消对应线程成功的话对应线程返回码为-1。
利用上面三个接口,我们创建如下场景进行验证:创建两个线程,均循环执行自己的任务。其中一个线程时间到了使用线程exit函数进行退出。这两个线程在主线程均进行等待,当第一个线程等待成功,主线程利用线程取消函数取消另一个线程,等待接收其返回码信息。
void* threadRun1(void* arg)
{
// 线程1调度函数
int count = 5;
while (true)
{
cout << (char*)arg << "执行任务......" << endl;
--count;
sleep(1);
if (0 == count)
{
cout << (char*)arg << "线程退出" << endl;
pthread_exit((void*)1); // 终止此线程,并且返回1
}
}
return nullptr;
}
void* threadRun2(void* arg)
{
// 线程2调度函数
while (true)
{
cout << (char*)arg << "执行任务......" << endl;
sleep(1);
}
return nullptr;
}
int main()
{
// 2 创建两个线程,其中一个线程使用线程exit,等待成功后主线程取消另一个线程,等待接收信息
pthread_t tid1, tid2;
// 创建两个线程
pthread_create(&tid1, nullptr, threadRun1, (void*)"thread-1");
pthread_create(&tid2, nullptr, threadRun2, (void*)"thread-2");
void* flag;
pthread_join(tid1, &flag); // 阻塞等待线程1,flag接收返回值
cout << "线程1退出,返回值为" << (long long)flag << " 等待3秒后取消线程2" << endl; // flag是指针变量,由于是x64-所以指针变量大小为八字节 1
sleep(3);
pthread_cancel(tid2); // 主线程取消线程2
pthread_join(tid2, &flag);
cout << "线程2退出,返回值为" << (long long)flag << endl; // -1
return 0;
}
运行结果:
3.线程id
可以看到上述接口基本都用了线程id-pthread_t。那么用了这么多,线程id究竟代表什么呢?
首先我们执行一个线程就可以获取其id,我们不妨打印出来看看:
可以发现线程id打印出来是一个非常大的整数。其实这个整数的本质就是一个地址。
首先,我们需要明确的是:我们目前用的不是Linux自带的创建线程的接口,我们用的是pthread库的接口。操作系统承担一部分(轻量级进程的调度和管理)库管理一部分(提供属性信息)。
实际上在调度过程中,多个线程看到的也都是同一个栈区,那么每个线程的栈区独立性是如何保证的呢?-既然内核无法提供,那么我们就在用户层提供。
并且在对应的tid区域内存在线程id、线程局部存储、线程栈 -- 用户共享区的用户栈结构(不和单线程进程冲突)。
实际上之所以能够这么设计的原因就是在Linux的系统接口中(clone)里面就有参数可以选择栈结构,所以便就可以通过中间的共享区在库中进行管理。pthread库底层就是调用其clone分配栈空间。
-pthread_self-
pthread_self() 可以获取当前线程的线程id。
当然,上面的线程是可以自己取消自己的(通过self获取本身id),但是并不推荐这么去做。
4.共享资源和分离线程
共享资源的概念我们在上面已经介绍过了,现在具体的利用代码来进行检查线程对于进程地址空间内的共享资源能否查看和修改。
#include <iostream>
#include <pthread.h>
using namespace std;
int val = 6;
void* threadRun(void* arg)
{
cout << pthread_self() << "线程看到的全局变量大小为val:" << val << endl; // 5;
val++; // 线程内进行修改
return nullptr;
}
int main()
{
pthread_t tid;
val--;
pthread_create(&tid, nullptr, threadRun, nullptr);
pthread_join(tid, nullptr); // 线程等待
cout << pthread_self() << "主线程现在看到的全局变量大小为val:" << val << endl; // 6
return 0;
}
上述现象说明了线程之间是能够共享进程中的地址空间资源的。实际上存在一个关键字,可以让每个线程看到不同的全局变量:
__thread
__thread关键字修饰全局变量,能够让一个进程中不同的线程看到不同的全局变量,此时此变量不再是共享资源。
可以看到,在代码保持不变的情况下在全局变量前面加上__thread关键字(第三方库提供的)能够让同一个进程看到不同的全局变量,此时此全局变量val不再被共享。
既然同一个进程中线程之间共享地址空间,那么在之前我们所学的进程替换系列函数还存在作用么?是像子进程那样只是一个新线程还是整个进程呢?我们不妨利用如下的代码进行检测:
void* threadRun(void* arg)
{
sleep(1);
cout << "子线程等待3s执行ls -l 命令" << endl;
sleep(3);
execlp("ls", "ls", "-l", nullptr); // 进程替换函数 从库目录下进行搜索
return nullptr;
}
int main()
{
pthread_t tid;
pthread_create(&tid, nullptr, threadRun, nullptr);
for(int i = 0; i < 12; ++i)
{
cout << "主线程执行任务" << i + 1 << endl;
sleep(1);
}
pthread_join(tid, nullptr); // 线程等待
return 0;
}
可以看到,本来主线程是要执行12次任务的,但是由于新线程在发出等待3s执行进程替换任务后,果然整个进程都被替换了,不与以前所学的进程相关知识相冲突。
一个线程在执行任务,另一个线程突然进行替换- 不会出现问题。只要成功替换了,整个进程就会被替换为其他进程了。主线程外其余线程终止,然后进行进程替换。
当然,像多进程那样,如果主线程不想关心新线程,但是又不想造成内存泄漏问题。并且线程相关的函数可不像进程那样存在非阻塞等待的选项。那么主线程该如何去做呢?
对于进程来说,存在因为子进程退出会发送信号,然后利用此信号进行捕捉进行忽略或者自定义处理即可。对于线程来说存在一个接口叫做分离线程可以帮我们解决类似的问题。分离线程,主线程不需要join,新线程结束库会自动去回收。
-pthread_detach-
man 3 pthread_detach
头文件:
#include <pthread.h>
函数原型:
int pthread_detach(pthread_t thread);
根据传入的线程id对对于线程进行分离。
函数参数:
thread:线程id
返回值:成功返回0,否则返回错误码。
注意:
对于线程进行分离后如果偏要进行等待就会返回cerrno cstring错误码。
线程分离了,如果主线程先退出怎么办--主线程退出进程也就会退出,其余线程也都会跟着退出。多进程还是多线程的场景下(一般都是让父进程或者主线程最后退出)注意是否等待什么的都是和环境和场景相关。
如果线程分离,如果他异常了,那么会影响主线程吗?同样会。(同样的是属于同一个进程)
我们验证一下主线程不用等待新线程,新线程自动被库回收的效果。
void* threadRun(void* arg)
{
pthread_detach(pthread_self()); // 一般是自己分离自己
for (int i = 0; i < 3; ++i)
{
cout << "新线程执行任务j-" << i + 1 << endl;
sleep(1);
}
cout << "子线程退出" << endl;
return nullptr;
}
int main()
{
pthread_t tid;
pthread_create(&tid, nullptr, threadRun, nullptr);
// pthread_detach(tid); // 可以主线程进行分离
for(int i = 0; i < 12; ++i)
{
cout << "主线程执行任务" << i + 1 << endl;
sleep(1);
}
return 0;
}
由于无法验证子线程是否被回收,所以验证只能验证个代码。
三、线程互斥
1.临界相关概念
首先,我们需要明确的两点:线程是cpu调度的基本单位;进程是资源分配的基本单位。
现在,下面我们来具体谈谈如果线程之间进行共享和共同执行共享资源操作的代码的定义:
临界资源:多线程执行流共享的资源叫做临界资源。(同一时刻只能存在一个执行流访问)
临界区:每个线程内部,访问临界资源的代码,叫做临界区。
互斥:任何时刻,互斥保证有且只有一个执行流进入临界区,访问临界资源。通常对临界资源起保护作用。
原子性:不会被任何调度机制打断的操作,该操作只有两态,要么完成,要么,未完成。
现在我们通过以下实例来共同探索上述的知识点。
现在我们模拟多线程共同访问一个共享的变量,均对其进行--操作,当减到0的时候就退出线程:
#define THREAD_NUM 3
int n = 1000;
void* threadRun(void* arg)
{
while (true)
{
if (n > 0)
{
usleep(rand() % 1500);
cout << (char*)arg << "线程抢到一张票啦" << n << endl;
n--; // 访问修改共享资源
}
else break;
usleep(rand() % 2000); // 模拟执行其他任务
}
return nullptr;
}
int main()
{
srand((unsigned long)time(nullptr));
pthread_t tid[THREAD_NUM];
string ar[THREAD_NUM];
for (int i = 0; i < THREAD_NUM; ++i)
{
ar[i] = "thread-";
ar[i] += to_string(i + 1);
pthread_create(tid + i, nullptr, threadRun, (void*)ar[i].c_str());
}
// 主线程阻塞等待新线程
for (int i = 0; i < THREAD_NUM; ++i)
{
pthread_join(tid[i], nullptr);
}
return 0;
}
从运行结果看,上述多线程程序访问此函数明显出了问题。我们的逻辑是大于0才可以进行--啊,怎么可能出现0和-1的票号呢?
此时多线程访问的同一个函数就是可重入函数。因为此时并没有对共享资源加以保护,就会存在问题。
针对上述代码访问共享资源的地方,可以发现存在两处是有线程安全问题的(线程安全问题即多线程环境下存在问题),如下图:
接下来对上述代码存在两处问题做出解释:
1.n>0 由于线程调度问题,我们不得而知当其中一个线程访问到n>0(假设此时n = 1),那么条件满足进入,但是此时此线程时间片到了或者优先级问题(因为cpu的调度)导致该线程收回此时的上下文接着到后面调度队列进行等待,而接下来的那个线程却将代码执行完了(n-- n = 0),此时再切回此线程,可以发现虽然进来了,但是并不满足条件,此时n = 0了但是仍然可以--,这也就是-1出现的原因。
2.n--的时候造成并发访问。首先计算过程,对于计算机来说并不是一步完成的,而是分为三步:先写入cpu的寄存器中去,其次进行--(cpu内部进行计算),将结果写回内存中去。可以发现在这三步执行过程中均可以发生线程调度切换的问题,一旦造成并发进行,同样的,在之后线程执行完他们修改好的数据后轮到此线程就会打回原型。
上面因为多线程造成时序或者因为不是原子性的操作导致异常的出现,所以才会出现线程互斥的说法:任何时刻,互斥保证有且只有一个执行流进入临界区,访问临界资源。那么我们就要对共享资源进行保护,使其成为临界资源。
此时我们要引入互斥锁的概念。
2.互斥锁
针对上述因为多线程产生的问题,如果我们能够在特定区域(共享资源-临界资源)限定每次只能由一个线程即一个执行流完成对资源的操作就能解决了。互斥锁就能帮助我们解决此问题,并且其的相关操作均是原子的,下面来看互斥锁的相关接口:
pthread_mutex_t
pthread_mutex_t是原生线程库(pthread)提供的一种数据类型,能够帮助我们定义互斥锁这个锁的变量。之后线程进行上锁和解锁均需要通过此锁变量方可进行。
锁的初始化:
如果锁的变量定义的是全局或者静态的,那么可以直接对其变量进行初始化:
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER
如果锁的变量是局部定义的,那么需要使用init函数进行初始化:
-pthread_mutex_init-
man 3 pthread_mutex_init
头文件:
#include <pthread.h>
函数原型:
int pthread_mutex_init(pthread_mutex_t *restrict mutex,
const pthread_mutexattr_t *restrict attr);对未初始化的锁根据传入的互斥锁属性进行初始化。
函数参数:
mutex:锁变量的地址。
attr:互斥锁相关属性(默认设置为NULL)
返回值:成功返回0,否则返回错误码。
上锁:
-pthread_mutex_lock-
man 3 pthread_mutex_lock
头文件:
#include <pthread.h>
函数原型:
int pthread_mutex_lock(pthread_mutex_t *mutex);
使当前执行流(线程)获取锁。如果当前互斥锁被锁定,那么就会处于阻塞状态(另一个接口trylock类似,只不过不会处于阻塞状态,其会立即返回)
函数参数:
mutex:锁变量的地址。(注意需要初始化)
返回值:成功0,失败返回一个错误编号。
解锁:
-pthread_mutex_unlock-
man 3 pthread_mutex_unlock
头文件:
#include <pthread.h>
函数原型:
int pthread_mutex_unlock(pthread_mutex_t *mutex);
是使当前执行流获取的锁释放,即解锁。
函数参数:
mutex:锁变量的地址。(上锁状态)
返回值:成功0,失败返回一个错误编号。
释放锁:
-pthread_mutex_destroy-
man 3 pthread_mutex_destroy
头文件:
#include <pthread.h>
函数原型:
int pthread_mutex_destroy(pthread_mutex_t *mutex);
将传入的锁变量进行摧毁,使其成为未初始化阶段。
函数参数:
mutex:锁变量的地址。(上锁状态)
返回值:成功0,失败返回一个错误编号。
现在,我们利用上面的互斥锁,对我们的共享资源进行加锁保护,此时能够避免上述多线程访问全局变量出现的问题吗?
我们可以试着对上述代码进行加工:
1.全局锁对象:
2.局部锁对象:
class ThreadArg // 通过自定义类型的方式将局部(主线程)定义的互斥锁对象传给每个新线程
{
public:
ThreadArg(const string& name, pthread_mutex_t* mtx)
:_mtx(mtx), _name(name)
{}
pthread_mutex_t* _mtx;
string _name;
};
注意最后均需要释放锁对象资源:
根据上述代码,实际上我们就能运行出没有上述问题的代码,均是到1就结束了。但是实际上可以观察到时间明显是延长了,这是因为访问临界资源进行串行的原因。但是并不会造成线程安全的问题。所以实际上这里也就提醒了我们加锁的时候,一定保证加锁的力度越小越好。(别把无关紧要的代码放入临界区)
现在我们利用所学的锁的接口解决问题后,尝试解决如下的问题:
加锁就是串行执行了么?加锁了之后,线程在临界区中是否会切换,会存在问题么?原子性的体现?
要解决上述问题,我们首先需要解决的是锁的原理:
互斥锁原理:
要访问临界资源,每一个线程必须先申请锁,每一个线程都必须先看到同一把锁并且去访问它,锁本身是不是就是一种共享资源?谁来保证锁的安全呢?
为了保证锁的安全,锁只能自己保护自己。换句话来说就是锁的操作均为原子性的。
首先我们需要了解到swap和exchange指令:以一条汇编的方式,将内存和cpu内寄存区数据进行交换。
因为一条汇编语句是原子性的(即指定操作系统干什么),所以上述的交换命令自然是原子性的(和平时我们写的函数做为区分(非原子性))
lock:(加锁汇编)
movb $0 %al // 0放入寄存器中:0属于线程的上下文
xchgb %al, mutex // 把寄存器的值和锁的值进行交换 -真正的申请锁 交换-数字1永远只有一个。(对应线程没有申请锁内存中1和寄存器 的0进行交换)锁代表1 - 锁被其余线程占用表示0 --内存mutex中
if (al寄存器的内存 > 0) return 0
else 挂起等待
goto lock
%al代表的是一个寄存器 1表示此线程是否获取锁,0表示没有获取
根据上述汇编,我们可以看到:
交换的现象:内存<->%al做交换
交换的本质:共享<->私有 // 在执行流,如何看待cpu上面的寄存器的?当前执行流的上下文。寄存器们,空间是被所有的空间共享的,但是执行器的内容,是被每一个执行流私有的,上下文。(在此执行流使用期间,一定是自己的数据)
所以在进行上锁的时候,无论怎么切换,最终只会存在一个1,这也就是原子性的体现,要么申请到1了,要么没有,即只存在两种状态。unlock解锁的汇编同理。
此时,可以针对上述问题进行解答了:
1.加锁后,线程同样的会进行切换,但是完全没有影响:
第一次理解:执行到此区域一定是上了锁了的,并没有解锁。所以即便是切换,其余相关线程都需要申请锁,无法申请成功,自然就无法干扰临界资源,即无法执行临界区代码。保证了临界区中数据一致性。
第二次理解:一旦切换了,寄存器中唯一的锁(1)也被此线程保留走了,其余线程不可申请锁。(锁的原理)
2.原子性的理解:
在没有持有锁的线程看来,对其有意义的情况只有两种:1.线程1没有持有锁(什么都不做) 2.线程1释放了锁(做完了)。因为此时才可以申请锁。
3.加锁自然就是串行执行了,即对临界区的代码进行串行执行。
3.可重入和线程安全
我们在此重新理解可重入函数的概念:
如果同一个函数被不同的执行流调用,出现了问题就是不可重入函数,否则就是可重入函数。
对于是否可重入来说并没有对错之分,而是一种函数的特性,这让我们在多线程编程的时候对不可重入函数要多加编写互斥锁进行保护即可。
常见的不可重入函数:malloc、free I/O库函数 标准I/O库都是以不可重入实现的 静态的数据结构
那么对应线程来说,此环境安不安全那么同样的至关重要:
线程安全:对全局或者静态变量进行操作,并且没有锁的保护的情况下,在多线程环境下被称为线程安全问题。 --描述线程的。(多线程上的要求)
线程安全就是对与错之分了。如果你编写的多线程代码不安全那么就是错误代码。
常见的线程不安全:共享变量的函数,状态随着函数调用产生变化, 返回静态变量指针的函数....
区别:如果一个函数是可重入的:那么一定就是线程安全。如果是线程安全的,但是不一定是可重入的。
4.死锁问题
首先我们可以看一下如下的场景:
可以看到线程A先申请了锁1,此时是持有锁1的状态,现在想申请锁2继续执行下面的部分,相反的,线程B先申请了锁2,此时是持有锁2的状态,想申请锁1继续执行下面的部分。此时申请的过程形成死结,谁也不愿放弃之前的那把锁谁也申请不到下一把锁。
同样的,1把锁也可以造成死锁状态,也就是在申请锁之后在释放锁前在此申请,自己造成回路导致阻塞。
可以看到,多线程场景中:自身具备锁的情况下申请其余有锁,互相申请导致不会释放永远等待下去的过程。
死锁的必要条件:
1.互斥条件(一个资源只能被一个执行流使用 - 互斥锁)
2.请求与保持条件(一个执行流因请求资源而阻塞时,对已获得资源保持不放)
3.不剥夺条件(一个执行流已获得的资源,在未使用完前,不可强行剥夺)
4.循环等待条件(若干执行流之间形成一种头尾相连的循环等待资源的关系)
死锁一旦产生,上述四个条件均一定被满足
为了避免死锁,我们可以尝试破坏上述的四个必要条件其中一个或者利用其余解决方法:
1.破坏其中一个必要条件:
能不能不加锁
trylock()-如果申请成功返回(申请不到锁就不会阻塞等待,根据传出信号自己主动释放(申请失败也可以返回,返回错误码--不会阻塞式的等待,连续申请数次不成功,然后释放自己的锁 ))
强制的抢占他人的锁(优先级越高)
所有线程必须按照一定顺序申请锁,不可交叉申请
2.加锁顺序一致
3.避免锁未释放的场景
4.资源一次性分配
四、线程同步-条件变量
1.线程同步概念
对于之前的互斥锁即线程互斥,虽然解决了线程安全问题,但是同样的也衍生出了其他的问题。
1.同一个线程频繁的申请到资源(上述代码例子中票频繁的被同一个线程抢占),那么其余线程只能阻塞等待,没有起到作用,反而还占用了cpu资源。
2.太过于浪费我自己和对方的资源了。即频繁的申请锁,但是资源却没有到就再次释放锁(当前没有实例,可以假设票抢完了,在一定时间后存在一个线程生产票。因为访问票就是访问临界资源,需要受到互斥锁的保护,但是如果其余线程频繁的申请锁和释放锁不但没有申请到票,还让生产票的线程阻塞等待锁,自然造成效率低下的问题)
其实,上面的两个问题都没有错,但是在效率和节省资源方面反而起了副作用,没有让多线程的性能发挥到最大,这自然不是我们想要的。
为了解决上面的问题,我们引入了线程同步的概念,即解决访问临界资源的合理性的问题。线程同步就是线程按照一定的顺序进行临界资源的访问 -- 线程同步 执行过程具备一定的顺序性。竞争条件:因为时序问题,导致程序异常。
2.条件变量概念与相关接口
首先,根据之前我们利用互斥锁的那个例子,分析一下对临界资源的使用时的情况:
当我们申请临界资源前->先要做临界资源是否存在的检测 -> 要做检测的本质:也是访问临界资源!!-> 结论:对临界资源的检测也一定是在加锁和解锁之间的。
如果我们不采取任何线程同步的措施,那么常规方式要检测条件就绪,注定了必须频繁申请和释放锁。那么现在有没有办法让我们的线程检测到资源不就绪的时候:1. 不要让线程频繁自己检测,等待。2.条件就绪的时候通知对应的线程,让他来进行资源申请和访问。
我们的条件变量就能帮助我们解决这个问题。因为检测临界资源时在加锁和解锁之内的,那么条件变量自然也是在里面,也可以说是一种临界资源。
条件变量:
pthread_cond_t
pthread原生线程库提供的条件变量的数据类型。对应的和pthread_mutex_t 类似,并且变量均需要初始化方可使用。
条件变量的初始化和摧毁:
如果条件变量是全局的,那么可以直接利用宏进行初始化即可:
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
-pthread_cond_init&&pthread_cond_destroy-
man 3 pthread_cond_init
头文件:
#include <pthread.h>
函数原型:
int pthread_cond_init(pthread_cond_t *restrict cond,
const pthread_condattr_t *restrict attr);int pthread_cond_destroy(pthread_cond_t *cond);
根据传入的未初始化条件变量以及相关属性进行初始化。局部条件变量
释放对应条件变量。
函数参数:
cond:条件变量地址
attr:属性(默认为null)
返回值:成功返回0,否则返回一个错误编号。
条件变量等待:
如果当前线程的临界资源未达到满足状态,可根据对应的条件变量进行阻塞等待。
-pthread_cond_wait-
man 3 pthread_cond_wait
头文件:
#include <pthread.h>
函数原型:
int pthread_cond_wait(pthread_cond_t *restrict cond,
pthread_mutex_t *restrict mutex);对指定的条件变量进行阻塞等待,发信号就会唤醒(阻塞过程中会将传入的mutex锁进行解锁操作,唤醒后会重新进行上锁)(另外如果根不想长时间阻塞等待的话可以利用接口pthread_cond_timedwait接口进行设定时间醒来)
函数参数:
cond:条件变量地址
mutex:互斥锁对象地址
返回值:成功0,失败返回错误码。
条件发送通知:
可以给特定的条件变量发送醒来通知(可以针对特定条件变量下阻塞的任意一个线程或者全体线程)
-pthread_cond_signal&&pthread_cond_broadcast-
man 3 pthread_cond_broadcast
头文件:
#include <pthread.h>
函数原型:
int pthread_cond_broadcast(pthread_cond_t *cond);
int pthread_cond_signal(pthread_cond_t *cond);对特定条件变量下阻塞的全体线程进行唤醒。
对特定条件变量阻塞下的任意线程进行唤醒
函数参数:
cond:条件变量地址
返回值:成功0,失败返回错误码。
首先,如果我们屏蔽之前代码所写的一行:
可以发现产生了之前我们所说的问题1。那么现在我们能否利用条件变量的相关接口,让我们的主线程能够按照特定的顺序唤醒每个新线程执行抢票任务,不在出现上面的情况呢?
#define THREAD_NUM 3
pthread_cond_t cond = PTHREAD_COND_INITIALIZER; // 全局或者静态条件变量可以直接初始化
int n = 12;
class ThreadArg // 通过自定义类型的方式将局部(主线程)定义的互斥锁对象传给每个新线程
{
public:
ThreadArg(const string& name, pthread_mutex_t* mtx)
:_mtx(mtx), _name(name)
{}
pthread_mutex_t* _mtx;
string _name;
};
void* threadRun(void* arg)
{
ThreadArg* ta = (ThreadArg*)arg;
while (true)
{
pthread_mutex_lock(ta->_mtx); // 即将进入临界区,进行加锁
pthread_cond_wait(&cond, ta->_mtx); // 进来先阻塞等待,等待主线程发信号进行后序的步骤
if (n > 0)
{
usleep(rand()%1500);
cout << ta->_name << "线程抢到一张票啦" << n << endl;
n--; // 访问修改共享资源
pthread_mutex_unlock(ta->_mtx); // 解锁
}
else
{
pthread_mutex_unlock(ta->_mtx); // 解锁 注意不满足需要到这里进行解锁
break;
}
// usleep(rand() % 2000); // 模拟执行其他任务
}
delete ta; // 别忘了释放空间
cout << "-新线程" << ta->_name << "退出-" << endl;
return nullptr;
}
int main()
{
pthread_mutex_t mtx; // 定义局部锁对象
pthread_mutex_init(&mtx, nullptr); // 局部锁对象需要调用初始化接口
srand((unsigned long)time(nullptr));
pthread_t tid[THREAD_NUM];
string ar[THREAD_NUM];
for (int i = 0; i < THREAD_NUM; ++i)
{
ar[i] = "thread-";
ar[i] += to_string(i + 1);
ThreadArg* ta = new ThreadArg(ar[i], &mtx);
pthread_create(tid + i, nullptr, threadRun, (void*)ta);
}
int num = 0;
while (true)
{
// pthread_cond_signal(&cond); // 指定当前条件变量下任意线程(其不同线程发生竞争)
pthread_cond_broadcast(&cond); // 或者全部 基本上3个3个一起
num++;
sleep(1);
if (num == 5)
{
while (pthread_cond_broadcast(&cond) > 0); // 循环对触发条件线程发送最后的信号,使其线程退出
break;
}
}
// 主线程阻塞等待新线程
for (int i = 0; i < THREAD_NUM; ++i)
{
pthread_join(tid[i], nullptr);
}
pthread_mutex_destroy(&mtx); // 对锁对象进行摧毁,释放
pthread_cond_destroy(&cond); // 摧毁条件变量
cout << "-互斥锁和条件变量均被摧毁,主线程退出-" << endl;
return 0;
}
可以发现,我们确实可以简单的控制一下每个线程的执行顺序。(上面的条件变量代码并没有完善),上述代码用的是全局变量进行控制的,也可以使用局部变量先锋像互斥锁那样去增加成员传参使用。
那么现在我们可以提出如下的两个问题:1.条件满足的时候,我们在唤醒指定的线程 -- 我怎么知道条件是否满足呢?2.mutex的意义?mutex的意义实际上就是一个线程进入条件等待的时候,此时是持有锁的(在临界区),那么此时应该解锁掉,让其余线程获取锁执行其过程,当被唤醒的时候类似重新去获取锁(在临界区),获取锁后才能执行下面的步骤。
但是条件满足线程之间的关系该如何去表示呢?我们下面利用一个模型进行更加具体的说明:
3.生产者与消费者模型
我们首先需要在脑海中建立起如下图的关系:
我们首先发现交易场所是介于消费者和生产者之间的存在。其核心的价值在于提高效率,解耦(生产者和消费者之间)。本质就是一个商品的缓冲区。
其次,针对此模型我们有着如下的研究:
一个交易场所。
二种角色:生产者和消费者。
三种关系:
对于消费者和生产者来说,交易场所是共享的。
生产者和生产者之间:互斥关系(竞争,比如ABC都是同一类型商品的不同厂家,那么它们就需要进行竞争)
消费者和消费者:互斥关系(竞争,假如某种商品特别稀缺,那abc消费者谁都想要就要看谁更快)
生产者和消费者:互斥关系、同步关系(互斥是指如果生产者存在生成前,生成中,生成后的过程,那么如果消费者此时访问生成者生成中就存在问题,为了避免这种中间状态,那么生成者和消费者之间就必须存在互斥;当生成者生产满的时候就需要消费者进行消费放可进行生成,消费者消费完了同样的需要生产者进行生产才能继续消费)
记忆方法可以被123口诀,即1个交易场所,2种角色,3种关系。
根据上述的图例,我们简单的明确了生产者和消费者模型之间的关系,那么这和我们线程间互斥和同步又有什么关系呢?我们不妨利用基本工程师的思维在重新梳理一遍:
1.首先,生成者和消费者就是代表执行不同功能的线程,其中一个可以是获取任务或者从外界获取资源的线程(生产者),另一个则就是从生成者线程中获取资源或者任务进行处理的线程(消费者)。其中交易场所就可以表示某种数据结构所表示的缓冲区,而商品则就是交换的数据了。
2.生产者生成的商品,就可以告诉消费者可以消费了。消费者消费了就可以告诉生产者可以生成了。
现在,我们基于此模型再提出两个问题:
1如果消费者和生产者只存在一个,那么(同类型)之间就不用维护互斥问题了。
2数据生产到仓库,在合适的时候被消费者拿走。但是生产消费的过程不仅仅于此。生产者生产的数据是哪里来的?(不清楚,但是花时间)消费者如何使用发送过来的数据呢?(不知道,但是也要花时间)
接下来,我们基于模型编写出一个数据结构,结合问题去更加深层次理解消费者和生产者模型在条件变量-线程同步中的应用。
4.基于BlockingQueue的阻塞队列模拟生产消费模型
首先明确一下需求:现在我们需要一批多线程处理任务,一部分线程生产任务,另一部分线程处理任务。要求:借助阻塞队列这个缓冲区,在保证线程安全的前提下开辟想要的生产数据总空间,如果空间满了就阻塞生产任务的线程,如果空间空了就阻塞消费任务的线程。
首先确定了需要阻塞队列这个缓冲区,那么我们就要实现此数据结构。利用生产者和消费者模型,生产者需要对缓冲区进行放数据,那么对应接口就是push,消费者需要对缓冲区进行取数据,那么对应接口就是pop。保证两个接口的线程安全(临界资源),并且能够通过条件变量实现线程同步。(缓冲区满了push阻塞,缓冲区空了pop阻塞,消费者消费数据就可以告诉生产者可以生产了,生产者生产了就可以告诉消费者可以消费了)
mymutex.hpp代码:
#pragma once
#include <pthread.h>
class mutexguard
{
public:
// 注意传入的互斥锁对象必须是初始化后的
mutexguard(pthread_mutex_t* mtx)
:_mtx(mtx)
{
pthread_mutex_lock(_mtx); // 创建对象就进行上锁
}
~mutexguard()
{
// 析构函数
pthread_mutex_unlock(_mtx); // 解锁
}
private:
pthread_mutex_t* _mtx;
};
BlockQueue.hpp代码:
// 阻塞队列 模拟实现消费者和生产者模型
#pragma once
#include <iostream>
#include <pthread.h>
#include <queue>
#include "mymutex.hpp"
template<class T>
class blockqueue
{
public:
blockqueue(const size_t capacity = 1024)
:_capacity(capacity)
{
_bq = new std::queue<T>();
// 初始化互斥锁和条件变量
pthread_mutex_init(&_mtx, nullptr);
pthread_cond_init(&_full, nullptr);
pthread_cond_init(&_empty, nullptr);
}
blockqueue(const blockqueue&) = delete; // 禁止拷贝阻塞队列对象
// 生产者写入数据
void push(const T& val)
{
// 上锁和解锁我们可以利用一个优雅的方式 RAII
mutexguard mtx(&_mtx);
// 判断临界资源是否满足条件,满足在进行写入
while (_capacity == _bq->size()) pthread_cond_wait(&_full, &_mtx); // 注意这种方式容易出现问题:如果wait等待失败的话,那么就不会阻塞,继续进行下一步,所以需要while循环等待
_bq->push(val);
// 注意此时生成者写入数据了,就可以告诉消费者可以取数据了
pthread_cond_signal(&_empty);
}
// 消费者获得数据
void pop(T* arg) // 输出型参数
{
mutexguard mtx(&_mtx);
while (0 == _bq->size()) pthread_cond_wait(&_empty, &_mtx);
*arg = _bq->front();
_bq->pop();
pthread_cond_signal(&_full);
}
~blockqueue()
{
// 析构函数释放对应的资源
delete _bq;
pthread_mutex_destroy(&_mtx);
pthread_cond_destroy(&_full);
pthread_cond_destroy(&_empty);
}
private:
std::queue<T>* _bq; // 阻塞队列自然用STL库中的队列实现 -注意此队列接口一般是不可重入函数,即存在线程安全问题,需要我们自己去保障
size_t _capacity; // 容量大小
pthread_mutex_t _mtx; // 互斥锁 -互斥
pthread_cond_t _full; // 条件变量 -同步 满
pthread_cond_t _empty; // 空
};
测试代码可以自己写,这里我给出一个简单的两个线程执行过程,其中消费者消费的慢,可以明显看到阻塞过程,当然反着来也可以,这里不再多赘述。
#include <iostream>
#include <string>
#include <pthread.h>
#include <unistd.h>
#include "BlockQueue.hpp"
using namespace std;
const int pNum = 1; // 生产者线程个数
const int cNum = 1; // 消费者线程个数
template<class T>
class threadData
{
public:
threadData(const string& name, blockqueue<T>* bq)
:_name(name), _bq(bq)
{}
string _name;
blockqueue<T>* _bq;
};
// 生产者执行线程
void* producer(void* args)
{
threadData<int>* td = (threadData<int>*)args;
int num = 10;
while (true)
{
cout << td->_name << "线程生产出" << num << endl;
td->_bq->push(num);
sleep(1);
num--;
if (num == 0) break;
}
cout << td->_name << "线程退出" << endl;
delete td;
return nullptr;
}
// 消费者执行线程
void* consumer(void* args)
{
threadData<int>* td = (threadData<int>*)args;
int num = 10;
while (true)
{
int tmp;
td->_bq->pop(&tmp);
cout << td->_name << "线程消费得到" << tmp << endl;
sleep(3);
num--;
if (num == 0) break;
}
cout << td->_name << "线程退出" << endl;
delete td;
return nullptr;
}
int main()
{
blockqueue<int> bq(2); // 阻塞队列
pthread_t tid[pNum + cNum];
string ar1[pNum], ar2[cNum];
for (int i = 0; i < pNum; ++i)
{
// 生产者线程
ar1[i] = "生产者-" + to_string(i + 1);
threadData<int>* td = new threadData<int>(ar1[i], &bq);
pthread_create(tid + i, nullptr, producer, (void*)td);
}
for (int i = 0; i < cNum; ++i)
{
// 消费者线程
ar2[i] = "消费者-" + to_string(i + 1);
threadData<int>* td = new threadData<int>(ar2[i], &bq);
pthread_create(tid + i + pNum, nullptr, consumer, (void*)td);
}
for (int i = 0; i < pNum + cNum; ++i)
{
pthread_join(tid[i], nullptr); // 主线程阻塞等待
}
cout << "-主线程退出-" << endl;
return 0;
}
首先,既然多线程访问临界资源是必须串行执行的,那么多线程的效率体现在哪里呢?
我们需要知道的时候线程并不单单是访问临界资源。比如生产者线程,获取数据的时候可能会通过网络链接进行访问,又或者数据库等等,这些都是需要进行时间的,而这些就是非临界区代码所要实现的,也正是可以并行的,同样的,消费者线程也一样,处理数据同样需要花费时间,自然多线程并行执行效率会更高。
生产者生产数据需要花时间,消费者消费数据需要花时间。实现两种线程的并发过程。--非临界区的执行并发优化。因为临界区是串行执行的,并不能提高效率。
那么多生产和多消费的意义在哪里呢?
可以让生产行为和消费行为让有多个执行流进行并发的处理。
未完待续......