前言
BakaOS 是我在THU教学用操作系统 rCore 基础上捣鼓的个人项目,仅仅是为了训练 Rust 技能和学习 OS 知识。
rCore 的并发模块实现非常简陋,完全是基于 S-Mode 下的中断屏蔽,以至于创建锁、获取锁和释放锁都要通过系统调用实现,频繁的特权级切换大大增加了性能开销,也让 PCB 臃肿不堪(锁、条件变量和信号量都存储在PCB中)。
我基于 RISC-V 原子指令重构了并发模块,将锁、条件变量和信号量移至用户库,增加了系统调用 sys_futex 以支持 Futex 和 Condvar。
RISC-V 架构的原子指令
RISC-V 的 A 拓展提供了两类原子指令。第一类称为 AMO(Atomic Memory Operation),由硬件保证 read-modify-write 三阶段操作的原子性;第二类称为加载保留/条件存储(Load Reserved / Store Conditional),LR 指令读取内存中的一个值到 rd ,SC 指令修改内存中的值为 rs2 的值(地址是 rs1 ,与 LR 指令中的保持一致),前提是:LR 指令和 SC 指令之间的时间里内存中这个值未被修改。
LR 指令格式如下:
lr.{w/d} rd, (rs1)
LR 指令从 rs1 地址处加载 4 字节或 8 字节到 rd 寄存器,并注册一个保留集(reservation set),这个保留集包含 rs1 地址。
SC 指令格式如下:
sc.{w/d} rd, rs2, (rs1)
SC 指令有条件地把 rs2 寄存器的值存储到 rs1 地址中,执行结果反映到 rd 寄存器中。若 rd 寄存器为 0,说明 SC 指令执行成功;不为 0 则失败,需要跳转到 LR 指令处,重新执行原子加载以及原子存储操作。不管 SC 指令执行成功与否,保留集中的数据都会失效。
封装原子指令
#[inline(always)]
pub unsafe fn load_reserved(addr: *const u32) -> u32 {
    let val;
    core::arch::asm!(
        "lr.w {}, ({})",
        out(reg) val,
        in(reg) addr
    );
    val
}
#[inline(always)]
pub unsafe fn store_conditional(addr: *const u32, val: u32) -> bool {
    let res: u32;
    core::arch::asm!(
        "sc.w {}, {}, ({})",
        out(reg) res,
        in(reg) val,
        in(reg) addr
    );
    res == 0
}
互斥锁
为了保证多线程能够正确并发访问共享资源,使用 锁 让线程互斥地访问 临界区 的共享资源。
锁的评价指标是:
自旋锁
自旋锁存在于用户库和内核。自旋锁通过 while 循环等待条件的改变,满足“忙则等待”;如果锁资源被释放,一定有循环等待的线程可以拿到锁,满足”空闲则入“;自旋锁没有公平性,不保证线程可以在一定时间内拿到锁,所以不满足”有界等待“。
数据结构
pub struct MutexSpin {
    locked: UnsafeCell<u32>,
}
unsafe impl Send for MutexSpin {}
unsafe impl Sync for MutexSpin {}
locked:
MutexSpin 的成员实际上是一个 u32 变量,通过 UnsafeCell 获取内部可变性,这样 MutexSpin 不必以 mut 修饰。
对 locked 的修改方法是原子的,所以结构体是线程安全的。
方法
impl MutexSpin {
    pub fn new() -> Self {
        Self {
            locked: UnsafeCell::new(0),
        }
    }
    pub fn lock(&self) {
        let addr = self.locked.get();
        loop {
            while load_reserved(addr) == 1 {}
            if store_conditional(addr, 1) { return; }
        }
    }
    pub fn unlock(&self) {
        let addr = self.locked.get();
        unsafe { *addr = 0; }
    }
}
阻塞锁
我参考了 Linux futex 的实现。Futex(Fast Userspace Mutexes)是一种用于用户空间和内核空间混合的同步机制,旨在提高低竞争情况下的性能。Futex 的设计思想是通过用户态的检查,如果没有竞争发生,就不需要陷入内核,从而大大提高了效率。
前提
Futex 同时涉及用户空间和内核空间,因此设计比自旋锁复杂得多,需要以下基础设施:
- 原子操作
   
- atomic_increment
 
   - atomic_decrement
 
   - atomic_bit_test_and_set
 
   - atomic_add_and_compare
 
 
- 系统调用
   
- sys_futex(uaddr, futex_op, val),将线程加入阻塞队列或唤醒一个线程
 
 
- 内核数据结构
   
 
数据结构
Futex 位于用户库,只允许用户线程使用。
// user/src/sync/mutex.rs
pub struct Futex {
    flag: UnsafeCell<i32>,  
}
unsafe impl Send for Futex {}
unsafe impl Sync for Futex {}
flag:
- flag_{31} ,空闲标志位,为 0 表示锁空闲,为 1 表示锁占用
 
- flag_{[30:0]},等待线程数
 
FutexQ 是内核数据结构,位于进程控制块,存储被阻塞的线程。Futex 成员 flag 的虚拟地址通过 BTreeMap 映射到唯一的 FutexQ。操作队列之前必须先获取自旋锁 guard .
// os/src/sync/mutex.rs
// Futex的等待队列,由位于内核的进程控制块维护
pub struct FutexQ {
    pub guard: MutexSpin,
    queue: UnsafeCell<VecDeque<Arc<TaskControlBlock>>>,
}
// os/src/task/process.rs
pub struct ProcessControlBlockInner {
	...
    pub futex_queues: BTreeMap<usize, FutexQ>,
    ...
}
方法
Futex::lock() 首先检查 flag 最高位是否为 0,为 0 说明锁空闲,原子地设置最高位为 1 并返回。竞争失败则原子地使 flag[30:0] 自加 1,表示多了一位等待者。通过系统调用 sys_futex_wait 陷入阻塞前,会进行最后一次对 flag 的检查。从 sys_futex_wait 返回后,需要重新竞争锁。
Futex::unlock() 原子地使 flag 自加 0x80000000 (释放锁)并比较结果是否等于 0,为 0 说明没有线程在阻塞队列中,直接返回,否则通过系统调用 sys_futex_wake 唤醒队列中的一个线程。
	
impl Futex {
    pub fn new() -> Self {
        Self {
            flag: UnsafeCell::new(0),
        }
    }
    pub fn lock(&self) {
        let addr = self.flag.get();
        if atomic_test_and_set(addr as *mut u32, 31) == 0 {
            // fastpath
            return;
        }
        // one more waiter
        atomic_increment(addr as *mut u32);
        loop {
            if atomic_test_and_set(addr as *mut u32, 31) == 0 {
                atomic_decrement(addr as *mut u32);
                return;
            }
            let flag = unsafe { *addr };
            if flag >= 0 { continue; }
            // we have to wait now
            sys_futex_wait(addr, flag);
        }
    }
    pub fn unlock(&self) {
        let addr = self.flag.get();
        if atomic_add_and_compare(addr as *mut u32, 0x80000000, 0) {
            // no threads are waiting
            return;
        }         
        sys_futex_wake(addr);
    }
}
FutexQ 采取先进先出的方式管理阻塞线程。调用 push_back 和 pop_front 方法前一定要先获取 FutexQ 的自旋锁。
impl FutexQ {
    pub fn new() -> Self {
        Self {
            guard: MutexSpin::new(),
            queue: UnsafeCell::new(VecDeque::new()), 
        }
    }
    /// 外部调用必须确保已拿到 FutexQ 的自旋锁
    pub fn push_back(&self, task: Arc<TaskControlBlock>) {
        unsafe { &mut *self.queue.get() }.push_back(task);        
    } 
    /// 外部调用必须确保已拿到 FutexQ 的自旋锁
    pub fn pop_front(&self) -> Option<Arc<TaskControlBlock>> {
        unsafe { &mut *self.queue.get() }.pop_front()
    }
}
系统调用
sys_futex 用于将线程加入 Futex 的阻塞队列,或是从阻塞队列中唤醒一个线程,具体是哪种操作取决于参数 futex_op .
uaddr 是 Futex.flag 的虚拟地址,以它为 key 可在 PCB 中找到对应的 FutexQ .
val 是 Futex.flag 的期望值,如果陷入内核后 Futex.flag 没有发生变化,则将线程加入阻塞队列(单核CPU上一定不会有变化)
 
/// 封装在 Futex 方法中,不应直接调用
pub fn sys_futex(uaddr: *const i32, futex_op: usize, val: usize) -> isize{
    let process = current_process();
    let mut process_inner = process.inner_exclusive_access();
    let phys_addr = process_inner
        .memory_set
        .translate_va((uaddr as usize).into())
        .unwrap()
        .0 as *const i32;
    // 如果 Futex 第一次调用 sys_futex,则插入新的队列
    let futex_q = process_inner.futex_queues
        .entry(uaddr as usize)
        .or_insert(FutexQ::new());
    match futex_op {
        FUTEX_WAIT => {
            futex_q.guard.lock();
            if unsafe { *phys_addr == (val as i32)} {
                // *addr等于预期值,标记 task 为阻塞,加入 futex 等待队列
                let task = take_current_task().unwrap();
                let mut task_inner = task.inner_exclusive_access();    
                task_inner.task_status = TaskStatus::Blocked;
                let task_cx_ptr = &mut task_inner.task_cx as *mut TaskContext;
                drop(task_inner);
                futex_q.push_back(task);
                futex_q.guard.unlock();
                // switch to other task
                drop(process_inner);
                schedule(task_cx_ptr);
            } else {
                futex_q.guard.unlock();
                return 0;
            }
        },
        FUTEX_WAKE => {
            futex_q.guard.lock();
            if let Some(task) = futex_q.pop_front() {
                wakeup_task(task);
            } 
            futex_q.guard.unlock();
        },
        _ => panic!("Unsupported futex_op: {}", futex_op)
    };
    0
}
条件变量
线程使用 条件变量(condition variable) 来等待一个条件变为真。表面上看,条件变量是一个队列,当某些执行状态(即条件,condition)不满足时,线程将自己加入队列,等待执行状态的改变。条件变量有两种操作:wait() 和 notify_one() 。
数据结构
Condvar 只有一个 i32 成员,不含显示队列。实际上,Condvar 拥有隐式队列的方式和 Futex 是相同的,都是通过系统调用 sys_futex 将自己的 flag 和 PCB 中的一个 FutexQ 联系起来。不同的是,Condvar 仅仅把 flag 当作一把访问 FutexQ 的钥匙,没有赋予其任何含义。
// user/src/sync/condvar.rs
pub struct Condvar {
    flag: UnsafeCell<i32>,
}
unsafe impl Send for Condvar {}
unsafe impl Sync for Condvar {}
方法
wait() 需要传入一把互斥锁。线程陷入睡眠前会释放锁,唤醒后重新竞争锁。
notify_one() 仅唤醒队列中的一个线程。其中 atomic_increment(addr) 的作用聊胜于无,它改变了 flag 的值,这么做的影响是,当另一个线程通过 sys_futex_wait 陷入内核准备睡眠时,会检查 flag 是否与传入的预期值相同,不同则直接返回到用户态。
impl Condvar {
    pub fn new() -> Self {
        Self {
            flag: UnsafeCell::new(0)
        }
    }
    pub fn wait(&self, mutex: &MutexSpin) {
        mutex.unlock();
        let addr = self.flag.get();
        let val = unsafe { *addr }; 
        sys_futex_wait(addr, val);
        mutex.lock();
    }
    pub fn notify_one(&self) {
        let addr = self.flag.get();
        atomic_increment(addr as *mut u32);
        sys_futex_wake(addr);
    }
}
信号量
信号量(Semaphore)是一种同步原语,用于控制多个线程对共享资源的访问。
数据结构
Semaphore 基于底层的同步原语(锁和条件变量)实现,位于 user lib。
pub struct Semaphore {
    count: UnsafeCell<i32>,
    cond: Condvar,
    mutex: MutexSpin,
}
unsafe impl Send for Semaphore {}
unsafe impl Sync for Semaphore {}
count: 大于 0 时表示可用的资源数。与 Dijkstra 定义的信号量略有不同,没有保持当信号量的值为负时,反映出等待线程数。事实上,该值永远不会小于 0。这样的行为更容易实现。
方法
Semaphore::wait() 首先检查信号量的值,如果小于等于 0 则陷入阻塞队列,否则信号量值减一并返回。
Semaphore::post() 将信号量加一,并唤醒队列中的一个线程(如果有的话)。
impl Semaphore {
    pub fn new(count: i32) -> Self {
        Self {
            count: UnsafeCell::new(count),
            cond: Condvar::new(),
            mutex: MutexSpin::new(),
        }
    }
    
    pub fn wait(&self) {
        self.mutex.lock();
        let cnt_p = self.count.get();
        while unsafe { *cnt_p <= 0 } {
            self.cond.wait(&self.mutex);
        }
        unsafe { *cnt_p -= 1; }
        self.mutex.unlock();
    }
    pub fn post(&self) {
        self.mutex.lock();
        let cnt_p = self.count.get();
        unsafe { *cnt_p += 1; }
        self.cond.notify_one();
        self.mutex.unlock();
    }
}
感想
unsafe 应该用在哪里是个问题。实现 load_reserved 和 store_conditional 时,我一开始用 unsafe 修饰这两个函数,所以使用它们时也必须用 unsafe 括起来,这无疑是繁琐的。后来想起这么一个原则:Rust unsafe 应该尽可能少地出现,最好只停留在底层实现中,而不出现在使用时。最后我去掉修饰函数的 unsafe ,并用 unsafe 包裹函数里的内联汇编,这样外部就能清爽地调用函数了。
 
Linux 中自旋锁、Futex 是低级的同步原语,是高级同步原语(如 Mutex, Condvar, Semaphore)的基础。
 
试图创建新的类型 type tid_t = usize ,Rust 编译器警告说要用大驼峰命名法......查了一下,Rust 有命名规范的洁癖:类型名称规定使用大驼峰命名法,变量名称则用下划线命名法
 
一行 Rust 代码太长,转变为垂直排列的点调用更易于阅读:
   let addr = process_inner.memory_set.translate_va((uaddr as usize).into()).unwrap().0 as *const i32;
   
   let addr = process_inner
   	.memory_set
   	.translate_va((uaddr as usize).into())
   	.unwrap()
   	.0 as *const i32
 
Rust 没有稳定的静态断言,头一次碰到我想要使用某个特性却刚好没有的情况。我需要静态断言在编译期来保证类型大小符合约束,而不是在运行时通过 assert 检查!
 
lr/sc 真是非常强大的原子指令组合,基于它俩可以实现各种各样的原子操作,灵活性极强!因为我可以在 lr 和 sc 之间按照需求插入各种运算。