前言
BakaOS 是我在THU教学用操作系统 rCore 基础上捣鼓的个人项目,仅仅是为了训练 Rust 技能和学习 OS 知识。
rCore 的并发模块实现非常简陋,完全是基于 S-Mode 下的中断屏蔽,以至于创建锁、获取锁和释放锁都要通过系统调用实现,频繁的特权级切换大大增加了性能开销,也让 PCB 臃肿不堪(锁、条件变量和信号量都存储在PCB中)。
我基于 RISC-V 原子指令重构了并发模块,将锁、条件变量和信号量移至用户库,增加了系统调用 sys_futex 以支持 Futex 和 Condvar。
RISC-V 架构的原子指令
RISC-V 的 A 拓展提供了两类原子指令。第一类称为 AMO(Atomic Memory Operation),由硬件保证 read-modify-write 三阶段操作的原子性;第二类称为加载保留/条件存储(Load Reserved / Store Conditional),LR 指令读取内存中的一个值到 rd
,SC 指令修改内存中的值为 rs2
的值(地址是 rs1
,与 LR 指令中的保持一致),前提是:LR 指令和 SC 指令之间的时间里内存中这个值未被修改。
LR 指令格式如下:
lr.{w/d} rd, (rs1)
LR 指令从 rs1 地址处加载 4 字节或 8 字节到 rd 寄存器,并注册一个保留集(reservation set),这个保留集包含 rs1 地址。
SC 指令格式如下:
sc.{w/d} rd, rs2, (rs1)
SC 指令有条件地把 rs2 寄存器的值存储到 rs1 地址中,执行结果反映到 rd 寄存器中。若 rd 寄存器为 0,说明 SC 指令执行成功;不为 0 则失败,需要跳转到 LR 指令处,重新执行原子加载以及原子存储操作。不管 SC 指令执行成功与否,保留集中的数据都会失效。
封装原子指令
#[inline(always)]
pub unsafe fn load_reserved(addr: *const u32) -> u32 {
let val;
core::arch::asm!(
"lr.w {}, ({})",
out(reg) val,
in(reg) addr
);
val
}
#[inline(always)]
pub unsafe fn store_conditional(addr: *const u32, val: u32) -> bool {
let res: u32;
core::arch::asm!(
"sc.w {}, {}, ({})",
out(reg) res,
in(reg) val,
in(reg) addr
);
res == 0
}
互斥锁
为了保证多线程能够正确并发访问共享资源,使用 锁 让线程互斥地访问 临界区 的共享资源。
锁的评价指标是:
自旋锁
自旋锁存在于用户库和内核。自旋锁通过 while 循环等待条件的改变,满足“忙则等待”;如果锁资源被释放,一定有循环等待的线程可以拿到锁,满足”空闲则入“;自旋锁没有公平性,不保证线程可以在一定时间内拿到锁,所以不满足”有界等待“。
数据结构
pub struct MutexSpin {
locked: UnsafeCell<u32>,
}
unsafe impl Send for MutexSpin {}
unsafe impl Sync for MutexSpin {}
locked
:
MutexSpin
的成员实际上是一个 u32
变量,通过 UnsafeCell
获取内部可变性,这样 MutexSpin
不必以 mut
修饰。
对 locked
的修改方法是原子的,所以结构体是线程安全的。
方法
impl MutexSpin {
pub fn new() -> Self {
Self {
locked: UnsafeCell::new(0),
}
}
pub fn lock(&self) {
let addr = self.locked.get();
loop {
while load_reserved(addr) == 1 {}
if store_conditional(addr, 1) { return; }
}
}
pub fn unlock(&self) {
let addr = self.locked.get();
unsafe { *addr = 0; }
}
}
阻塞锁
我参考了 Linux futex 的实现。Futex(Fast Userspace Mutexes)是一种用于用户空间和内核空间混合的同步机制,旨在提高低竞争情况下的性能。Futex 的设计思想是通过用户态的检查,如果没有竞争发生,就不需要陷入内核,从而大大提高了效率。
前提
Futex
同时涉及用户空间和内核空间,因此设计比自旋锁复杂得多,需要以下基础设施:
- 原子操作
- atomic_increment
- atomic_decrement
- atomic_bit_test_and_set
- atomic_add_and_compare
- 系统调用
- sys_futex(uaddr, futex_op, val),将线程加入阻塞队列或唤醒一个线程
- 内核数据结构
数据结构
Futex
位于用户库,只允许用户线程使用。
// user/src/sync/mutex.rs
pub struct Futex {
flag: UnsafeCell<i32>,
}
unsafe impl Send for Futex {}
unsafe impl Sync for Futex {}
flag
:
- flag_{31} ,空闲标志位,为 0 表示锁空闲,为 1 表示锁占用
- flag_{[30:0]},等待线程数
FutexQ
是内核数据结构,位于进程控制块,存储被阻塞的线程。Futex
成员 flag
的虚拟地址通过 BTreeMap
映射到唯一的 FutexQ
。操作队列之前必须先获取自旋锁 guard
.
// os/src/sync/mutex.rs
// Futex的等待队列,由位于内核的进程控制块维护
pub struct FutexQ {
pub guard: MutexSpin,
queue: UnsafeCell<VecDeque<Arc<TaskControlBlock>>>,
}
// os/src/task/process.rs
pub struct ProcessControlBlockInner {
...
pub futex_queues: BTreeMap<usize, FutexQ>,
...
}
方法
Futex::lock()
首先检查 flag
最高位是否为 0,为 0 说明锁空闲,原子地设置最高位为 1 并返回。竞争失败则原子地使 flag[30:0]
自加 1,表示多了一位等待者。通过系统调用 sys_futex_wait
陷入阻塞前,会进行最后一次对 flag
的检查。从 sys_futex_wait
返回后,需要重新竞争锁。
Futex::unlock()
原子地使 flag
自加 0x80000000
(释放锁)并比较结果是否等于 0,为 0 说明没有线程在阻塞队列中,直接返回,否则通过系统调用 sys_futex_wake
唤醒队列中的一个线程。
impl Futex {
pub fn new() -> Self {
Self {
flag: UnsafeCell::new(0),
}
}
pub fn lock(&self) {
let addr = self.flag.get();
if atomic_test_and_set(addr as *mut u32, 31) == 0 {
// fastpath
return;
}
// one more waiter
atomic_increment(addr as *mut u32);
loop {
if atomic_test_and_set(addr as *mut u32, 31) == 0 {
atomic_decrement(addr as *mut u32);
return;
}
let flag = unsafe { *addr };
if flag >= 0 { continue; }
// we have to wait now
sys_futex_wait(addr, flag);
}
}
pub fn unlock(&self) {
let addr = self.flag.get();
if atomic_add_and_compare(addr as *mut u32, 0x80000000, 0) {
// no threads are waiting
return;
}
sys_futex_wake(addr);
}
}
FutexQ
采取先进先出的方式管理阻塞线程。调用 push_back
和 pop_front
方法前一定要先获取 FutexQ
的自旋锁。
impl FutexQ {
pub fn new() -> Self {
Self {
guard: MutexSpin::new(),
queue: UnsafeCell::new(VecDeque::new()),
}
}
/// 外部调用必须确保已拿到 FutexQ 的自旋锁
pub fn push_back(&self, task: Arc<TaskControlBlock>) {
unsafe { &mut *self.queue.get() }.push_back(task);
}
/// 外部调用必须确保已拿到 FutexQ 的自旋锁
pub fn pop_front(&self) -> Option<Arc<TaskControlBlock>> {
unsafe { &mut *self.queue.get() }.pop_front()
}
}
系统调用
sys_futex
用于将线程加入 Futex 的阻塞队列,或是从阻塞队列中唤醒一个线程,具体是哪种操作取决于参数 futex_op
.
uaddr
是 Futex.flag
的虚拟地址,以它为 key 可在 PCB 中找到对应的 FutexQ
.
val
是 Futex.flag
的期望值,如果陷入内核后 Futex.flag
没有发生变化,则将线程加入阻塞队列(单核CPU上一定不会有变化)
/// 封装在 Futex 方法中,不应直接调用
pub fn sys_futex(uaddr: *const i32, futex_op: usize, val: usize) -> isize{
let process = current_process();
let mut process_inner = process.inner_exclusive_access();
let phys_addr = process_inner
.memory_set
.translate_va((uaddr as usize).into())
.unwrap()
.0 as *const i32;
// 如果 Futex 第一次调用 sys_futex,则插入新的队列
let futex_q = process_inner.futex_queues
.entry(uaddr as usize)
.or_insert(FutexQ::new());
match futex_op {
FUTEX_WAIT => {
futex_q.guard.lock();
if unsafe { *phys_addr == (val as i32)} {
// *addr等于预期值,标记 task 为阻塞,加入 futex 等待队列
let task = take_current_task().unwrap();
let mut task_inner = task.inner_exclusive_access();
task_inner.task_status = TaskStatus::Blocked;
let task_cx_ptr = &mut task_inner.task_cx as *mut TaskContext;
drop(task_inner);
futex_q.push_back(task);
futex_q.guard.unlock();
// switch to other task
drop(process_inner);
schedule(task_cx_ptr);
} else {
futex_q.guard.unlock();
return 0;
}
},
FUTEX_WAKE => {
futex_q.guard.lock();
if let Some(task) = futex_q.pop_front() {
wakeup_task(task);
}
futex_q.guard.unlock();
},
_ => panic!("Unsupported futex_op: {}", futex_op)
};
0
}
条件变量
线程使用 条件变量(condition variable) 来等待一个条件变为真。表面上看,条件变量是一个队列,当某些执行状态(即条件,condition)不满足时,线程将自己加入队列,等待执行状态的改变。条件变量有两种操作:wait() 和 notify_one() 。
数据结构
Condvar
只有一个 i32
成员,不含显示队列。实际上,Condvar
拥有隐式队列的方式和 Futex
是相同的,都是通过系统调用 sys_futex
将自己的 flag
和 PCB 中的一个 FutexQ
联系起来。不同的是,Condvar
仅仅把 flag
当作一把访问 FutexQ
的钥匙,没有赋予其任何含义。
// user/src/sync/condvar.rs
pub struct Condvar {
flag: UnsafeCell<i32>,
}
unsafe impl Send for Condvar {}
unsafe impl Sync for Condvar {}
方法
wait()
需要传入一把互斥锁。线程陷入睡眠前会释放锁,唤醒后重新竞争锁。
notify_one()
仅唤醒队列中的一个线程。其中 atomic_increment(addr)
的作用聊胜于无,它改变了 flag
的值,这么做的影响是,当另一个线程通过 sys_futex_wait
陷入内核准备睡眠时,会检查 flag
是否与传入的预期值相同,不同则直接返回到用户态。
impl Condvar {
pub fn new() -> Self {
Self {
flag: UnsafeCell::new(0)
}
}
pub fn wait(&self, mutex: &MutexSpin) {
mutex.unlock();
let addr = self.flag.get();
let val = unsafe { *addr };
sys_futex_wait(addr, val);
mutex.lock();
}
pub fn notify_one(&self) {
let addr = self.flag.get();
atomic_increment(addr as *mut u32);
sys_futex_wake(addr);
}
}
信号量
信号量(Semaphore)是一种同步原语,用于控制多个线程对共享资源的访问。
数据结构
Semaphore
基于底层的同步原语(锁和条件变量)实现,位于 user lib。
pub struct Semaphore {
count: UnsafeCell<i32>,
cond: Condvar,
mutex: MutexSpin,
}
unsafe impl Send for Semaphore {}
unsafe impl Sync for Semaphore {}
count
: 大于 0 时表示可用的资源数。与 Dijkstra 定义的信号量略有不同,没有保持当信号量的值为负时,反映出等待线程数。事实上,该值永远不会小于 0。这样的行为更容易实现。
方法
Semaphore::wait()
首先检查信号量的值,如果小于等于 0 则陷入阻塞队列,否则信号量值减一并返回。
Semaphore::post()
将信号量加一,并唤醒队列中的一个线程(如果有的话)。
impl Semaphore {
pub fn new(count: i32) -> Self {
Self {
count: UnsafeCell::new(count),
cond: Condvar::new(),
mutex: MutexSpin::new(),
}
}
pub fn wait(&self) {
self.mutex.lock();
let cnt_p = self.count.get();
while unsafe { *cnt_p <= 0 } {
self.cond.wait(&self.mutex);
}
unsafe { *cnt_p -= 1; }
self.mutex.unlock();
}
pub fn post(&self) {
self.mutex.lock();
let cnt_p = self.count.get();
unsafe { *cnt_p += 1; }
self.cond.notify_one();
self.mutex.unlock();
}
}
感想
unsafe
应该用在哪里是个问题。实现 load_reserved
和 store_conditional
时,我一开始用 unsafe
修饰这两个函数,所以使用它们时也必须用 unsafe
括起来,这无疑是繁琐的。后来想起这么一个原则:Rust unsafe
应该尽可能少地出现,最好只停留在底层实现中,而不出现在使用时。最后我去掉修饰函数的 unsafe
,并用 unsafe
包裹函数里的内联汇编,这样外部就能清爽地调用函数了。
Linux 中自旋锁、Futex 是低级的同步原语,是高级同步原语(如 Mutex, Condvar, Semaphore)的基础。
试图创建新的类型 type tid_t = usize
,Rust 编译器警告说要用大驼峰命名法......查了一下,Rust 有命名规范的洁癖:类型名称规定使用大驼峰命名法,变量名称则用下划线命名法
一行 Rust 代码太长,转变为垂直排列的点调用更易于阅读:
let addr = process_inner.memory_set.translate_va((uaddr as usize).into()).unwrap().0 as *const i32;
let addr = process_inner
.memory_set
.translate_va((uaddr as usize).into())
.unwrap()
.0 as *const i32
Rust 没有稳定的静态断言,头一次碰到我想要使用某个特性却刚好没有的情况。我需要静态断言在编译期来保证类型大小符合约束,而不是在运行时通过 assert
检查!
lr/sc
真是非常强大的原子指令组合,基于它俩可以实现各种各样的原子操作,灵活性极强!因为我可以在 lr
和 sc
之间按照需求插入各种运算。