当前现代处理器的发展在性能上已经很难跟得上摩尔定律了,各大处理器设计商都在多核心上做文章,从而多核多线程编程则成为了高性能计算所必要的手段。那么在多核多线程环境下与经典的单处理器多线程环境下的多线程同步有何区别呢?
传统单处理器多线程的多线程同步
在传统单处理器环境下,由于处理器就一个核心在运转,因此一次只能执行一个线程,然后通过时间片轮询(Round-Robin)与优先级抢占(Priority Preemption)方式做线程调度。无论是哪种方式,操作系统对多线程的调度都是基于ntent="mp" data-source="innerlink" href="https://www.baike.com/wiki/%E4%B8%AD%E6%96%AD/20089690?view_id=4s5nsdc85t0000" rel="noopener noreferrer noopener noreferrer" target="_blank">中断机制执行上下文切换,然后选择当前优先级高的、排在就绪队列中最前面的线程调度执行。因此,我们看到经典的操作系统ntent="mp" data-source="innerlink" href="https://www.baike.com/wiki/%E5%90%8C%E6%AD%A5%E5%8E%9F%E8%AF%AD/5429882?view_id=5ldwszd3724000" rel="noopener noreferrer noopener noreferrer" target="_blank">同步原语(Synchronization Primitive),诸如ntent="mp" data-source="innerlink" href="https://www.baike.com/wiki/%E4%BA%92%E6%96%A5%E4%BD%93/5050286?view_id=4202cchml9s000" rel="noopener noreferrer noopener noreferrer" target="_blank">互斥体(Mutex)、ntent="mp" data-source="innerlink" href="https://www.baike.com/wikiid/1730889235234559166?prd=home_search&search_id=7qe67y3kzgw00&view_id=zkiuitkjwi800" rel="noopener noreferrer noopener noreferrer" target="_blank">信号量(Semaphore)、ntent="mp" data-source="innerlink" href="https://www.baike.com/wiki/%E6%B6%88%E6%81%AF%E9%98%9F%E5%88%97/2432790?view_id=ewo0w46w96g00" rel="noopener noreferrer noopener noreferrer" target="_blank">消息队列(Message Queue) 等等均是基于抑制软硬件ntent="mp" data-source="innerlink" href="https://www.baike.com/wiki/%E4%B8%AD%E6%96%AD/20089690?view_id=29hrftztm66800" rel="noopener noreferrer noopener noreferrer" target="_blank">中断或ntent="mp" data-source="innerlink" href="https://www.baike.com/wiki/%E4%B8%8A%E4%B8%8B%E6%96%87%E5%88%87%E6%8D%A2/3073529?view_id=2kqt4dyutym000" rel="noopener noreferrer noopener noreferrer" target="_blank">上下文切换来做到多线程同步的。比如,在上锁前先屏蔽CPU中断,禁止当前执行线程被切换;等上完锁之后再恢复中断。这使得整个上锁过程是原子的,而不会被外部打断。
基于多核处理器的多线程同步
在多核处理器或多个处理器的环境下,多线程同步则更为复杂。因为对于一般通用操作系统而言,核心A是不知道核心B正在处理啥的,尽管对于某些嵌入式ntent="mp" data-source="innerlink" href="https://www.baike.com/wiki/%E5%AE%9E%E6%97%B6%E6%93%8D%E4%BD%9C%E7%B3%BB%E7%BB%9F/1057980?view_id=3c20zpwbvby000" rel="noopener noreferrer noopener noreferrer" target="_blank">实时操作系统(RTOS)而言,一个核心可以给另一个核心发送信号以通知某些事件,但这些均用于特定应用场景,而通用应用级操作系统不会采用这些同步手段,代价太高。正因为如此,在多核多处理器环境下,如果我们要对多线程所共享的一个对象的修改做到数据一致性,那么只能通过原子操作指令!
为何原子操作能起作用?因为原子操作指令通过锁总线等手段,确保多线程对同一共享对象的读-修改-写是原子的。所谓原子(atomic)就是指,一组操作在执行时作为一个整体进行,而不会被打断。这里举一个简单的例子来说明对某一共享对象的修改采用原子操作与不采用原子操作的差别。
上图中,假定线程A与线程B在不同的核心上并行执行。上半部分采用的是普通读写方式对共享存储单元进行修改的,而下半部分则使用了原子的“读-修改-写”操作。第一段所采用的普通读写方式会出什么问题呢?从时序图上我们可以看到,假定线程A与线程B几乎同时先对共享存储单元进行加载(读取),然后再几乎同时进行计算。由于线程A计算速度比较快,它先将计算好的结果存储(写入)到了共享存储单元中;而此后,线程B才计算好,再将它计算好的结果存储到该共享单元中,这就导致了数据不一致!因为线程B最终所写回的结果是基于一开始线程A对该共享存储单元修改之前的,它这么一改就把线程A对共享存储单元的操作给抹掉了。因此这里正确的做法应该是线程B必须基于线程A所修改完的结果再对共享存储单元进行操作。
而下半部分采用原子操作则不会有数据不一致问题,无论是线程A先对共享存储单元修改完还是线程B先修改完,最终结果都是经过这两个线程的计算操作的。CPU系统总线仲裁器会去裁决哪个先执行,哪个后执行,而后执行的那个会被阻塞(等待)直到先前的操作完成。
因此,对于基于多核多处理器的多线程同步而言,原子操作不是有没有必要用的问题,而是必须得用!上述所提到的那些个经典的同步原语在多核多处理器环境下都用到了原子操作。
原子操作的种类
原子操作有许多种,有纯粹用于做同步的(即作为锁的用途),有用于做基本运算操作的,也有可将这两者相结合的。
对于早期的多核处理器,有不少提供了数据交换(swap)、标志测试与设置(flag test and set)等基于“锁”的原子操作。比如8086上的XCHG指令,ARMv7架构之前的SWP指令,这些都属于SWAP类原子操作;而Blackfin 561 Duo-Core DSP上则提供了flag test and set原子操作……
这些原子操作的实现比较简单,不过都是基于“锁”,也就是说如果你要用原子操作来同步某一共享存储对象,那么必须先针对它定义一个原子变量作为锁去同步。这些原子操作所引发的最大问题就是如果某一线程在利用这些锁做“自旋等待”,而另一个线程在释放该原子锁之前就被销毁了,那么等待该锁的线程就倒霉成为僵尸线程了~尽管一般对于应用层来说,我们不会轻易自己去杀线程,但对于操作系统层还有学术界而言,这是一个必须解决的问题。
目前一般常见的解决方案就是添加一个重试次数,如果重试了比如1000次,这个原子锁还没有被释放,那么就强行解锁,或者抛出异常等。所以这里要提醒各位的是,用了原子锁之后,后续相关的操作得尽量快,然后马上释放锁,否则的话宁可直接调用系统所提供的同步原语API。下面给出一份使用SWAP原子操作流程的伪代码,当然flag test and set跟这个流程其实也差不多~
// 在执行多任务前,原子锁对象初始值为0volatile int g_atomic_lock = 0;// 这里是多个线程可能共同的代码void AtomicModify(void){ // SWAP的第一个参数指向某一原子对象的地址。 // SWAP操作一般是将第二个实参的值写入到原子对象中去, // 然后返回该原子对象在SWAP操作之前的值。 while(SWAP(&g_atomic_lock, 1) == 1) { // 如果SWAP操作返回1,说明之前已经有线程对该对象上了锁, // 此时只能等待该原子对象重新变为0。 CPU_PAUSE(); // 这里暗示CPU可以做对其他线程的调度 } // SWAP操作成功之后,g_atomic_lock的值变为1了, // 此时对多个线程所共享的对象进行操作 DoModificationToSharedObject(); // 对共享对象操作完之后,释放原子锁 SWAP(&g_atomic_lock, 0);}随着科技的进步,现代多核处理器纷纷引入了无锁(Lock-Free)原子操作。比如:比较与交换(Compare and Swap,简称CAS),加载时锁定/有条件存储(Load-locked Store-conditional,简称LL-SC)。x86处理器使用前者;而Alpha、Power、MIPS、RISC-V、ARMv7、ARMv8等则使用后者,而从ARMv8.1开始也引入了CAS原子操作。LL-SC形式上虽然与CAS有些不同,但逻辑都是互通的。其主要思路就是在当前线程先加载共享对象的值,然后对它做任意修改操作,最后写的时候是原子的——先判定之前所加载的共享对象的值与当前共享对象的值是否完全相同,如果相同,则把新修改的值写回去;否则交换失败,程序可以做循环从而再次从该共享对象中加载值。这种无锁机制可以把之前的原子锁去掉,而直接对目标共享对象进行操作。其实践用法如以下代码所示:
void AtomicSumOfSquares(volatile int *atom, int b){ int orgValue, dstValue; do { // 先加载原子对象的当前值 orgValue = *atom; // 再对原始值做平方和计算,作为最终写入的结果 dstValue = orgValue * orgValue + b * b; } // 这里的CAS函数,第一个参数指向一个原子对象; // 第二个参数指向之前加载该原子对象的变量; // 第三个参数则是比较成功后,最后写入到该原子对象的值。 // CAS先将原子对象的内容与之前加载它的变量的值进行比较,两者是否相同; // 如果相同,说明在此操作期间没有其他线程对该原子进行修改, // 则将第三个参数的值写到该原子对象中去,然后返回true; // 如果两者不同,说明这期间已经有某个线程对该原子对象进行了修改,返回false。 while(!CAS(atom, &orgValue, dstValue); return dstValue;}我们看到,使用“无锁”的CAS原子操作也用了一个do-while循环,它跟之前的AtomicModify中的SWAP过程相比,到底无锁在什么地方呢?不都有循环吗?
各位同学请注意!CAS是直接对一个原子对象进行修改,而不是将它作为一个ntent="mp" data-source="innerlink" href="https://www.baike.com/wiki/%E8%87%AA%E6%97%8B%E9%94%81/1741898?view_id=5mdw8vrpbgk000" rel="noopener noreferrer noopener noreferrer" target="_blank">自旋锁(Spin-Lock)那样使用。SWAP原子操作需要等待其操作的原子对象处于一个“可用”的状态,如果它一直不处于可用状态,则会一直等下去,或者采用有限次尝试机制而做“异常处理”。而CAS对它所操作的原子对象是“瞬时”的,只要没有其他线程对该原子对象进行修改,那么CAS操作就一定成功,而CAS操作一成功,整个流程结束!因此前者SWAP机制是需要等原子对象的某一状态;而后者CAS机制则是去避免“数据冲突”情况的发生,所以它是无锁的。
我们理解了有锁的SWAP原子操作与无锁的CAS原子操作之后,下面将正式进入正题,我们来谈谈ntent="mp" data-source="innerlink" href="https://www.baike.com/wiki/C11/19942474?view_id=3iqiyccojva000" rel="noopener noreferrer noopener noreferrer" target="_blank">C11标准(ISO/IEC 9899:2011)中的原子操作标准库。
C11标准中的原子操作标准库
首先,原子操作库在C11标准中属于可选(Optional)的库,对于一些低端的处理器,尤其像8位的单片机MCU那种完全不具备原子操作指令的系统环境,则可以不提供此库。因此C11标准引入了__STDC_NO_ATOMICS__这个预定义宏来指示当前系统环境下的C11编译器实现是否提供了原子操作标准库。如果定义了这个宏,则说明当前环境下的C11编译器实现没有支持原子操作的标准库。如果支持原子操作标准库,我们就能引入<stdatomic.h>这个头文件,这里面声明了所以能被当前C11编译器支持的原子类型以及原子操作函数接口。
C11中能支持的原子操作类型
上面已经提到过,由于当前处理器技术非常成熟,各种架构、各种配置的处理器百花齐放,有专门用于低功耗、高续航领域的,也有追求高性能领域的……因此这些处理器硬件本身对原子操作指令的支持也可能ntent="mp" data-source="innerlink" href="https://www.baike.com/wiki/%E5%A4%A7%E7%9B%B8%E5%BE%84%E5%BA%AD/240201?view_id=3pp2gtpbsxk000" rel="noopener noreferrer noopener noreferrer" target="_blank">大相径庭!比如,有些处理器只支持SWAP或flag test and set等基于锁的原子操作;而有些能支持基于无锁版本的原子操作;还有一些能支持基于64位整数的无锁原子操作……为了能给程序员提供当前编译环境能支持何种原子操作,C11标准列出了以下这些与定义宏:
// 指示当前编译器能支持 _Bool 类型的无锁原子操作ATOMIC_BOOL_LOCK_FREE// 指示当前编译器能支持 signed char、unsigned char、以及char类型的无锁原子操作ATOMIC_CHAR_LOCK_FREE// 指示当前编译器能支持 char16_t 类型的无锁原子操作ATOMIC_CHAR16_T_LOCK_FREE// 指示当前编译器能支持 char32_t 类型的无锁原子操作ATOMIC_CHAR32_T_LOCK_FREE// 指示当前编译器能支持 wchar_t 类型的无锁原子操作ATOMIC_WCHAR_T_LOCK_FREE// 指示当前编译器能支持 short、unsigned short 类型的无锁原子操作ATOMIC_SHORT_LOCK_FREE// 指示当前编译器能支持 int、unsigned int 类型的无锁原子操作ATOMIC_INT_LOCK_FREE// 指示当前编译器能支持 long、unsigned long 类型的无锁原子操作ATOMIC_LONG_LOCK_FREE// 指示当前编译器能支持 long long、unsigned long long 类型的无锁原子操作ATOMIC_LLONG_LOCK_FREE// 指示当前编译器能支持 ptrdiff_t、intptr_t、uintptr_t 类型的无锁原子操作ATOMIC_POINTER_LOCK_FREE有了这些预定义宏之后,我们就能判定在当前编译环境下可用那些原子操作了。当然,如果当前编译环境能支持原子操作的话,那么它至少应该能支持atomic_flag类型。该类型是一个原子标志对象,用于flag test and set原子操作。如果当前的CPU不支持flag test and set,但支持SWAP,那么也可以用SWAP来实现该操作。此外,其他lock-free的原子操作也都能实现flag test and set,包括原子加法、原子逻辑操作、CAS等。因此原子标志操作属于整个原子操作中最最基本的操作方式。我们稍后将会对这些原子操作做具体介绍。
对于其他整数类型,如果当前编译环境能支持的话(可根据上面列出的预定义宏来判断),都有其相对应的原子类型。尽管C11标准引入了_Atomic宏,我们可以用它将一个基本整数类型转换为对应的原子类型,比如:
char => _Atomic(char)short => _Atomic(short)int => _Atomic(int)这里的 ( ) 可省略,但类型与_Atomic之间必须至少有一个空白符(比如,_Atomic(int) 也可被写作为:_Atomic int);不过笔者仍然建议各位在使用原子类型的时候,先包含<stdatomic.h>,然后使用该头文件所列出的原子类型。当前C11标准所提供支持的整数原子类型如下表所示:
typedef _Atomic(_Bool) atomic_bool;typedef _Atomic(char) atomic_char;typedef _Atomic(signed char) atomic_schar;typedef _Atomic(unsigned char) atomic_uchar;typedef _Atomic(short) atomic_short;typedef _Atomic(unsigned short) atomic_ushort;typedef _Atomic(int) atomic_int;typedef _Atomic(unsigned int) atomic_uint;typedef _Atomic(long) atomic_long;typedef _Atomic(unsigned long) atomic_ulong;typedef _Atomic(long long) atomic_llong;typedef _Atomic(unsigned long long) atomic_ullong;// 对于没有定义过char16_t的编译环境,// 也可能会用 _Atomic(uint_least16_t) 类型来定义其相应的原子类型typedef _Atomic(char16_t) atomic_char16_t;// 对于没有定义过char32_t的编译环境,// 也可能会用 _Atomic(uint_least32_t) 类型来定义其相应的原子类型typedef _Atomic(char32_t) atomic_char32_t;typedef _Atomic(wchar_t) atomic_wchar_t;typedef _Atomic(int_least8_t) atomic_int_least8_t;typedef _Atomic(uint_least8_t) atomic_uint_least8_t;typedef _Atomic(int_least16_t) atomic_int_least16_t;typedef _Atomic(uint_least16_t) atomic_uint_least16_t;typedef _Atomic(int_least32_t) atomic_int_least32_t;typedef _Atomic(uint_least32_t) atomic_uint_least32_t;typedef _Atomic(int_least64_t) atomic_int_least64_t;typedef _Atomic(uint_least64_t) atomic_uint_least64_t;typedef _Atomic(int_fast8_t) atomic_int_fast8_t;typedef _Atomic(uint_fast8_t) atomic_uint_fast8_t;typedef _Atomic(int_fast16_t) atomic_int_fast16_t;typedef _Atomic(uint_fast16_t) atomic_uint_fast16_t;typedef _Atomic(int_fast32_t) atomic_int_fast32_t;typedef _Atomic(uint_fast32_t) atomic_uint_fast32_t;typedef _Atomic(int_fast64_t) atomic_int_fast64_t;typedef _Atomic(uint_fast64_t) atomic_uint_fast64_t;typedef _Atomic(intptr_t) atomic_intptr_t;typedef _Atomic(uintptr_t) atomic_uintptr_t;typedef _Atomic(size_t) atomic_size_t;typedef _Atomic(ptrdiff_t) atomic_ptrdiff_t;typedef _Atomic(intmax_t) atomic_intmax_t;typedef _Atomic(uintmax_t) atomic_uintmax_t;由于绝大部分处理器都没有提供针对浮点数的原子操作指令,因此C11标准也没有提供任何针对浮点数的原子类型!这里请各位务必注意。(这里顺带一提,nVidia从SM35版本的CUDA架构起可支持浮点数的原子操作)
C11中所提供的原子操作函数接口
C11所提供的原子操作函数接口一般都有两个版本。第一个版本是不带有存储器次序参数的,第二个是带有存储器次序参数的,并且函数名也以 _explicit 结尾。我们再下一章将会详细描述存储器次序,这是一个比较复杂也比较高级的话题,对于初学者而言,我们先把不带存储器次序的搞明白就哦了~
一、原子标志相关操作:
对于原子标志的相关操作,C11标准提供了初始化、标志测试与设置、标志清除这三个接口。原子标志 atomic_flag 对象本身只有两种状态(即只有两种取值),设置状态(编译器实现一般用 1 或 true 来表示)以及清零状态(编译器实现一般用 0 或 false 来表示)。
C11标准为原子标志类型提供了一个用于初始化的宏——ATOMIC_FLAG_INIT,我们应该用这个宏对一个原子标志对象进行初始化,如以下代码所示。
volatile atomic_flag g_flag = ATOMIC_FLAG_INIT;int main(void){ // 这里展示了如何在函数内对已声明的g_flag进行初始化。 // 由于atomic_flag通常被定义为一种结构体形式, // 而ATOMIC_FLAG_INIT则通常被定义为针对结构体atomic_flag的初始化器, // 即:{ ... } 的形式。 // 因此我们这里使用C99标准所引入的匿名结构体对象的表示语法 // 对g_flag进行初始化 g_flag = (atomic_flag)ATOMIC_FLAG_INIT;}初始化之后,原子标志对象即处于“清零状态”。
原子标志的测试与设置操作函数有explicit版本,其原型为:
_Bool atomic_flag_test_and_set(volatile atomic_flag *object);_Bool atomic_flag_test_and_set_explicit(volatile atomic_flag *object, memory_order order);该操作函数的语义为:对原子标志对象 object 进行设置,并返回该原子标志对象做设置操作之前的值。也就是说,如果原子标志对象 object 之前为“清零状态”,那么经过此操作之后,该原子标志对象的状态变为了“设置状态”,并且返回 false。如果原子标志对象 object 之前为“设置状态”,那么经过此操作之后,它仍然为“设置状态”,并且返回 true。所以,当我们将原子标志对象作为一个“锁”来用的话,就看这个函数接口返回的是啥,如果返回 false,则说明锁成功,可以对多线程共享对象做相关的修改操作;如果返回的是 true,则说明该原子标志已经被其他线程占用了,需要等待释放。
最后再谈谈原子标志的清除操作函数接口。它也有explicit版本,其原型为:
void atomic_flag_clear(volatile atomic_flag *object); void atomic_flag_clear_explicit(volatile atomic_flag *object, memory_order order);该操作函数的语义为:对指定的原子标志对象 object 进行清零操作。如果我们将原子标志对象用作“锁”的话,那么执行此操作就相当于释放锁。
作为C11标准里最最基本的原子类型,原子标志的适用性还是非常广的。尤其要对一些较大的共享资源进行操作的话,无锁原子操作也难以胜任,此时就可以用原子标志或甚至是系统自带的同步原语进行同步操作了。下面笔者将用原子标志操作来举一个例子,描述如何对一个共享浮点数对象做递增的原子操作。
#include <stdio.h>#include <stdbool.h>#include <stdint.h>#include <stddef.h>#include <stdalign.h>#include <stdatomic.h>#include <pthread.h>// 为了避免CPU直接死等,// 这里定义了常用处理器架构的释放当前线程的暗示指令操作#if defined(__x86__) || defined(__x86_64__) || defined(__i386__)#define CPU_PAUSE() asm("pause")#elif defined(__arm__) || defined(__aarch64__)#define CPU_PAUSE() asm("yield")#else#define CPU_PAUSE()#endif/// 为了通用性,这里用一个结构体封装了原子标志与普通的浮点数对象struct MyAtomicFloat{ volatile atomic_flag atomFlag; // 出于性能上考虑,我们应该尽量让原子对象与普通对象之间留有些空间 int padding; // 如果当前编译器能支持C11的alignas的话, // 那么我们也能使用alignas来做字节对齐 alignas(16) float value;};/// 定义一个将被多线程共享的原子浮点数对象static volatile struct MyAtomicFloat sAtomicFLoatObject;/// 对多线程共享的原子对象进行求递增操作/// @param nLoops 指定对共享原子对象操作几次static void AtomicValueInc(int nLoops){ // 这里对共享原子对象操作nLoops次 for(int loop = 0; loop < nLoops; loop++) { // 先进行上锁 while(atomic_flag_test_and_set(&sAtomicFLoatObject.atomFlag)) CPU_PAUSE(); // 对共享数据做递增操作 sAtomicFLoatObject.value += 1.0f; // 最后释放锁 atomic_flag_clear(&sAtomicFLoatObject.atomFlag); }}/// 线程处理函数static void* ThreadProc(void *args){ // 在用户线程中执行10000次 AtomicValueInc(10000); return NULL;}int main(int argc, const char * argv[]){ printf("The size is: %zu, `value` offset is: %zu\n", sizeof(sAtomicFLoatObject), offsetof(struct MyAtomicFloat, value)); // 对原子浮点数对象先进行初始化 sAtomicFLoatObject.atomFlag = (atomic_flag)ATOMIC_FLAG_INIT; sAtomicFLoatObject.value = 0.0f; pthread_t threadID; // 创建线程并调度执行 if(pthread_create(&threadID, NULL, ThreadProc, NULL) != 0) { puts("Failed to create a thread!"); return 0; } // 在主线程中也执行10000次 AtomicValueInc(10000); // 等待线程执行完毕 pthread_join(threadID, NULL); // 输出最终结果 printf("The final result is: %f\n", sAtomicFLoatObject.value);}上述代码以及后面的代码出于可跨平台考虑吧,用到了pthread库,因此如果各位在Linux环境下编译运行的话需要添加 -pthread 编译选项,macOS、iOS等Apple系统环境则不需要,pthread是被默认连接的。此外,上述代码以及后续代码都要用到C11标准,所以各位所使用的编译器如果稍旧的话(比如GCC 4.8,Clang 3.6),那么必须显式地加上 -std=gnu11 编译选项。
Windows系统下MSVC没有提供原子操作的库,笔者这里为Windows平台的开发者封装了一个,可供使用:https://github.com/zenny-chen/simple-stdatomic-for-VS-Clang
上述代码配合注释讲解后,各位应该能很容易地看明白。各位如果感兴趣的话,可以尝试一下将 while(atomic_flag_test_and_set(&sAtomicFLoatObject.atomFlag)) 这条语句注释掉,然后观察一下输出结果是否正确。
以上描述的是关于 atomic_flag 类型的相关原子操作描述。下面要介绍的原子操作函数都是基于整数原子类型的(包括布尔与字符类型)。
此外,C11标准为了简化针对整数原子类型的原子操作接口,将所有针对整数原子类型的操作函数以泛型的形式给出,因此我们在某些编译器实现中查看其自带的 <stdatomic.h> 头文件时,能发现里面的许多原子操作函数都用宏定义成了编译器自带的内建函数(intrinsic functions)形式了。这样能简化函数接口,毕竟要为每种整数原子类型写一种原子操作,太过冗长了。我们可以参考上面列出来的那么多整数原子类型。而且,要让程序员再自己去判定用哪种类型的原子操作,也将是一件很麻烦的事情。
