From a266993808d3f4812e66c622e62ac75d45c54dcc Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Wed, 15 Jan 2014 15:32:44 -0800 Subject: [PATCH 01/13] Add an AtomicU64 type to std::sync::atomics This also generalizes all atomic intrinsics over T so we'll be able to add u8 atomics if we really feel the need to (do we really want to?) --- src/librustc/middle/typeck/check/mod.rs | 41 ++-- src/libstd/sync/atomics.rs | 277 +++++++++++++++++++++--- src/libstd/unstable/intrinsics.rs | 110 +++++++++- src/test/auxiliary/cci_intrinsic.rs | 24 +- src/test/run-pass/intrinsic-atomics.rs | 32 +-- 5 files changed, 391 insertions(+), 93 deletions(-) diff --git a/src/librustc/middle/typeck/check/mod.rs b/src/librustc/middle/typeck/check/mod.rs index 5f221994787b8..ece1eb0681cb6 100644 --- a/src/librustc/middle/typeck/check/mod.rs +++ b/src/librustc/middle/typeck/check/mod.rs @@ -4081,29 +4081,32 @@ pub fn check_intrinsic_type(ccx: @CrateCtxt, it: &ast::ForeignItem) { //We only care about the operation here match split[1] { - "cxchg" => (0, ~[ty::mk_mut_rptr(tcx, + "cxchg" => (1, ~[ty::mk_mut_rptr(tcx, ty::ReLateBound(it.id, ty::BrAnon(0)), - ty::mk_int()), - ty::mk_int(), - ty::mk_int() - ], ty::mk_int()), - "load" => (0, + param(ccx, 0)), + param(ccx, 0), + param(ccx, 0), + ], param(ccx, 0)), + "load" => (1, ~[ - ty::mk_imm_rptr(tcx, ty::ReLateBound(it.id, ty::BrAnon(0)), ty::mk_int()) + ty::mk_imm_rptr(tcx, ty::ReLateBound(it.id, ty::BrAnon(0)), + param(ccx, 0)) ], - ty::mk_int()), - "store" => (0, + param(ccx, 0)), + "store" => (1, ~[ - ty::mk_mut_rptr(tcx, ty::ReLateBound(it.id, ty::BrAnon(0)), ty::mk_int()), - ty::mk_int() + ty::mk_mut_rptr(tcx, ty::ReLateBound(it.id, ty::BrAnon(0)), + param(ccx, 0)), + param(ccx, 0) ], ty::mk_nil()), - "xchg" | "xadd" | "xsub" | "and" | "nand" | "or" | "xor" | "max" | + "xchg" | "xadd" | "xsub" | "and" | "nand" | "or" | "xor" | "max" | "min" | "umax" | "umin" => { - (0, ~[ty::mk_mut_rptr(tcx, + (1, ~[ty::mk_mut_rptr(tcx, ty::ReLateBound(it.id, ty::BrAnon(0)), - ty::mk_int()), ty::mk_int() ], ty::mk_int()) + param(ccx, 0)), param(ccx, 0) ], + param(ccx, 0)) } "fence" => { (0, ~[], ty::mk_nil()) @@ -4136,16 +4139,6 @@ pub fn check_intrinsic_type(ccx: @CrateCtxt, it: &ast::ForeignItem) { } "needs_drop" => (1u, ~[], ty::mk_bool()), "owns_managed" => (1u, ~[], ty::mk_bool()), - "atomic_xchg" | "atomic_xadd" | "atomic_xsub" | - "atomic_xchg_acq" | "atomic_xadd_acq" | "atomic_xsub_acq" | - "atomic_xchg_rel" | "atomic_xadd_rel" | "atomic_xsub_rel" => { - (0, - ~[ - ty::mk_mut_rptr(tcx, ty::ReLateBound(it.id, ty::BrAnon(0)), ty::mk_int()), - ty::mk_int() - ], - ty::mk_int()) - } "get_tydesc" => { let tydesc_ty = match ty::get_tydesc_ty(ccx.tcx) { diff --git a/src/libstd/sync/atomics.rs b/src/libstd/sync/atomics.rs index 30d9ede8a433e..6ce285152dea8 100644 --- a/src/libstd/sync/atomics.rs +++ b/src/libstd/sync/atomics.rs @@ -59,9 +59,25 @@ pub struct AtomicUint { priv nocopy: NonCopyable } +/** + * An unsigned atomic integer type that is forced to be 64-bits. This does not + * support all operations. + */ +#[cfg(not(stage0))] +pub struct AtomicU64 { + priv v: u64, + priv nocopy: NonCopyable +} + /** * An unsafe atomic pointer. Only supports basic atomic operations */ +#[cfg(not(stage0))] +pub struct AtomicPtr { + priv p: uint, + priv nocopy: NonCopyable +} +#[cfg(stage0)] pub struct AtomicPtr { priv p: *mut T, priv nocopy: NonCopyable @@ -71,6 +87,12 @@ pub struct AtomicPtr { * An owned atomic pointer. Ensures that only a single reference to the data is held at any time. */ #[unsafe_no_drop_flag] +#[cfg(not(stage0))] +pub struct AtomicOption { + priv p: uint, +} +#[unsafe_no_drop_flag] +#[cfg(stage0)] pub struct AtomicOption { priv p: *mut u8 } @@ -87,6 +109,8 @@ pub static INIT_ATOMIC_FLAG : AtomicFlag = AtomicFlag { v: 0, nocopy: NonCopyabl pub static INIT_ATOMIC_BOOL : AtomicBool = AtomicBool { v: 0, nocopy: NonCopyable }; pub static INIT_ATOMIC_INT : AtomicInt = AtomicInt { v: 0, nocopy: NonCopyable }; pub static INIT_ATOMIC_UINT : AtomicUint = AtomicUint { v: 0, nocopy: NonCopyable }; +#[cfg(not(stage0))] +pub static INIT_ATOMIC_U64 : AtomicU64 = AtomicU64 { v: 0, nocopy: NonCopyable }; impl AtomicFlag { @@ -215,6 +239,43 @@ impl AtomicInt { } } +#[cfg(not(stage0))] +impl AtomicU64 { + pub fn new(v: u64) -> AtomicU64 { + AtomicU64 { v:v, nocopy: NonCopyable } + } + + #[inline] + pub fn load(&self, order: Ordering) -> u64 { + unsafe { atomic_load(&self.v, order) } + } + + #[inline] + pub fn store(&mut self, val: u64, order: Ordering) { + unsafe { atomic_store(&mut self.v, val, order); } + } + + #[inline] + pub fn swap(&mut self, val: u64, order: Ordering) -> u64 { + unsafe { atomic_swap(&mut self.v, val, order) } + } + + #[inline] + pub fn compare_and_swap(&mut self, old: u64, new: u64, order: Ordering) -> u64 { + unsafe { atomic_compare_and_swap(&mut self.v, old, new, order) } + } + + #[inline] + pub fn fetch_add(&mut self, val: u64, order: Ordering) -> u64 { + unsafe { atomic_add(&mut self.v, val, order) } + } + + #[inline] + pub fn fetch_sub(&mut self, val: u64, order: Ordering) -> u64 { + unsafe { atomic_sub(&mut self.v, val, order) } + } +} + impl AtomicUint { pub fn new(v: uint) -> AtomicUint { AtomicUint { v:v, nocopy: NonCopyable } @@ -254,26 +315,64 @@ impl AtomicUint { } impl AtomicPtr { + #[cfg(stage0)] + pub fn new(p: *mut T) -> AtomicPtr { + AtomicPtr { p: p, nocopy: NonCopyable } + } + #[cfg(not(stage0))] pub fn new(p: *mut T) -> AtomicPtr { - AtomicPtr { p:p, nocopy: NonCopyable } + AtomicPtr { p: p as uint, nocopy: NonCopyable } } #[inline] + #[cfg(not(stage0))] + pub fn load(&self, order: Ordering) -> *mut T { + unsafe { + atomic_load(&self.p, order) as *mut T + } + } + + #[inline] + #[cfg(not(stage0))] + pub fn store(&mut self, ptr: *mut T, order: Ordering) { + unsafe { atomic_store(&mut self.p, ptr as uint, order); } + } + + #[inline] + #[cfg(not(stage0))] + pub fn swap(&mut self, ptr: *mut T, order: Ordering) -> *mut T { + unsafe { atomic_swap(&mut self.p, ptr as uint, order) as *mut T } + } + + #[inline] + #[cfg(not(stage0))] + pub fn compare_and_swap(&mut self, old: *mut T, new: *mut T, order: Ordering) -> *mut T { + unsafe { + atomic_compare_and_swap(&mut self.p, old as uint, + new as uint, order) as *mut T + } + } + + #[inline] + #[cfg(stage0)] pub fn load(&self, order: Ordering) -> *mut T { unsafe { atomic_load(&self.p, order) } } #[inline] + #[cfg(stage0)] pub fn store(&mut self, ptr: *mut T, order: Ordering) { unsafe { atomic_store(&mut self.p, ptr, order); } } #[inline] + #[cfg(stage0)] pub fn swap(&mut self, ptr: *mut T, order: Ordering) -> *mut T { unsafe { atomic_swap(&mut self.p, ptr, order) } } #[inline] + #[cfg(stage0)] pub fn compare_and_swap(&mut self, old: *mut T, new: *mut T, order: Ordering) -> *mut T { unsafe { atomic_compare_and_swap(&mut self.p, old, new, order) } } @@ -281,20 +380,13 @@ impl AtomicPtr { impl AtomicOption { pub fn new(p: ~T) -> AtomicOption { - unsafe { - AtomicOption { - p: cast::transmute(p) - } - } + unsafe { AtomicOption { p: cast::transmute(p) } } } - pub fn empty() -> AtomicOption { - unsafe { - AtomicOption { - p: cast::transmute(0) - } - } - } + #[cfg(stage0)] + pub fn empty() -> AtomicOption { AtomicOption { p: 0 as *mut c_void } } + #[cfg(not(stage0))] + pub fn empty() -> AtomicOption { AtomicOption { p: 0 } } #[inline] pub fn swap(&mut self, val: ~T, order: Ordering) -> Option<~T> { @@ -302,9 +394,7 @@ impl AtomicOption { let val = cast::transmute(val); let p = atomic_swap(&mut self.p, val, order); - let pv : &uint = cast::transmute(&p); - - if *pv == 0 { + if p as uint == 0 { None } else { Some(cast::transmute(p)) @@ -314,9 +404,7 @@ impl AtomicOption { #[inline] pub fn take(&mut self, order: Ordering) -> Option<~T> { - unsafe { - self.swap(cast::transmute(0), order) - } + unsafe { self.swap(cast::transmute(0), order) } } /// A compare-and-swap. Succeeds if the option is 'None' and returns 'None' @@ -340,7 +428,7 @@ impl AtomicOption { /// result does not get invalidated by another task after this returns. #[inline] pub fn is_empty(&mut self, order: Ordering) -> bool { - unsafe { atomic_load(&self.p, order) == cast::transmute(0) } + unsafe { atomic_load(&self.p, order) as uint == 0 } } } @@ -351,11 +439,20 @@ impl Drop for AtomicOption { } } +#[cfg(stage0)] #[inline] pub unsafe fn atomic_store(dst: &mut T, val: T, order:Ordering) { let dst = cast::transmute(dst); let val = cast::transmute(val); - + cast::transmute(match order { + Release => intrinsics::atomic_store_rel(dst, val), + Relaxed => intrinsics::atomic_store_relaxed(dst, val), + _ => intrinsics::atomic_store(dst, val) + }) +} +#[cfg(not(stage0))] +#[inline] +pub unsafe fn atomic_store(dst: &mut T, val: T, order:Ordering) { match order { Release => intrinsics::atomic_store_rel(dst, val), Relaxed => intrinsics::atomic_store_relaxed(dst, val), @@ -363,22 +460,31 @@ pub unsafe fn atomic_store(dst: &mut T, val: T, order:Ordering) { } } +#[cfg(stage0)] #[inline] pub unsafe fn atomic_load(dst: &T, order:Ordering) -> T { let dst = cast::transmute(dst); - cast::transmute(match order { Acquire => intrinsics::atomic_load_acq(dst), Relaxed => intrinsics::atomic_load_relaxed(dst), _ => intrinsics::atomic_load(dst) }) } +#[cfg(not(stage0))] +#[inline] +pub unsafe fn atomic_load(dst: &T, order:Ordering) -> T { + match order { + Acquire => intrinsics::atomic_load_acq(dst), + Relaxed => intrinsics::atomic_load_relaxed(dst), + _ => intrinsics::atomic_load(dst) + } +} +#[cfg(stage0)] #[inline] pub unsafe fn atomic_swap(dst: &mut T, val: T, order: Ordering) -> T { let dst = cast::transmute(dst); let val = cast::transmute(val); - cast::transmute(match order { Acquire => intrinsics::atomic_xchg_acq(dst, val), Release => intrinsics::atomic_xchg_rel(dst, val), @@ -387,13 +493,24 @@ pub unsafe fn atomic_swap(dst: &mut T, val: T, order: Ordering) -> T { _ => intrinsics::atomic_xchg(dst, val) }) } +#[cfg(not(stage0))] +#[inline] +pub unsafe fn atomic_swap(dst: &mut T, val: T, order: Ordering) -> T { + match order { + Acquire => intrinsics::atomic_xchg_acq(dst, val), + Release => intrinsics::atomic_xchg_rel(dst, val), + AcqRel => intrinsics::atomic_xchg_acqrel(dst, val), + Relaxed => intrinsics::atomic_xchg_relaxed(dst, val), + _ => intrinsics::atomic_xchg(dst, val) + } +} /// Returns the old value (like __sync_fetch_and_add). +#[cfg(stage0)] #[inline] pub unsafe fn atomic_add(dst: &mut T, val: T, order: Ordering) -> T { let dst = cast::transmute(dst); let val = cast::transmute(val); - cast::transmute(match order { Acquire => intrinsics::atomic_xadd_acq(dst, val), Release => intrinsics::atomic_xadd_rel(dst, val), @@ -402,13 +519,25 @@ pub unsafe fn atomic_add(dst: &mut T, val: T, order: Ordering) -> T { _ => intrinsics::atomic_xadd(dst, val) }) } +/// Returns the old value (like __sync_fetch_and_add). +#[cfg(not(stage0))] +#[inline] +pub unsafe fn atomic_add(dst: &mut T, val: T, order: Ordering) -> T { + match order { + Acquire => intrinsics::atomic_xadd_acq(dst, val), + Release => intrinsics::atomic_xadd_rel(dst, val), + AcqRel => intrinsics::atomic_xadd_acqrel(dst, val), + Relaxed => intrinsics::atomic_xadd_relaxed(dst, val), + _ => intrinsics::atomic_xadd(dst, val) + } +} /// Returns the old value (like __sync_fetch_and_sub). +#[cfg(stage0)] #[inline] pub unsafe fn atomic_sub(dst: &mut T, val: T, order: Ordering) -> T { let dst = cast::transmute(dst); let val = cast::transmute(val); - cast::transmute(match order { Acquire => intrinsics::atomic_xsub_acq(dst, val), Release => intrinsics::atomic_xsub_rel(dst, val), @@ -417,13 +546,25 @@ pub unsafe fn atomic_sub(dst: &mut T, val: T, order: Ordering) -> T { _ => intrinsics::atomic_xsub(dst, val) }) } +/// Returns the old value (like __sync_fetch_and_sub). +#[cfg(not(stage0))] +#[inline] +pub unsafe fn atomic_sub(dst: &mut T, val: T, order: Ordering) -> T { + match order { + Acquire => intrinsics::atomic_xsub_acq(dst, val), + Release => intrinsics::atomic_xsub_rel(dst, val), + AcqRel => intrinsics::atomic_xsub_acqrel(dst, val), + Relaxed => intrinsics::atomic_xsub_relaxed(dst, val), + _ => intrinsics::atomic_xsub(dst, val) + } +} +#[cfg(stage0)] #[inline] pub unsafe fn atomic_compare_and_swap(dst:&mut T, old:T, new:T, order: Ordering) -> T { let dst = cast::transmute(dst); - let old = cast::transmute(old); let new = cast::transmute(new); - + let old = cast::transmute(old); cast::transmute(match order { Acquire => intrinsics::atomic_cxchg_acq(dst, old, new), Release => intrinsics::atomic_cxchg_rel(dst, old, new), @@ -432,12 +573,23 @@ pub unsafe fn atomic_compare_and_swap(dst:&mut T, old:T, new:T, order: Orderi _ => intrinsics::atomic_cxchg(dst, old, new), }) } +#[cfg(not(stage0))] +#[inline] +pub unsafe fn atomic_compare_and_swap(dst:&mut T, old:T, new:T, order: Ordering) -> T { + match order { + Acquire => intrinsics::atomic_cxchg_acq(dst, old, new), + Release => intrinsics::atomic_cxchg_rel(dst, old, new), + AcqRel => intrinsics::atomic_cxchg_acqrel(dst, old, new), + Relaxed => intrinsics::atomic_cxchg_relaxed(dst, old, new), + _ => intrinsics::atomic_cxchg(dst, old, new), + } +} +#[cfg(stage0)] #[inline] pub unsafe fn atomic_and(dst: &mut T, val: T, order: Ordering) -> T { let dst = cast::transmute(dst); let val = cast::transmute(val); - cast::transmute(match order { Acquire => intrinsics::atomic_and_acq(dst, val), Release => intrinsics::atomic_and_rel(dst, val), @@ -446,13 +598,23 @@ pub unsafe fn atomic_and(dst: &mut T, val: T, order: Ordering) -> T { _ => intrinsics::atomic_and(dst, val) }) } +#[cfg(not(stage0))] +#[inline] +pub unsafe fn atomic_and(dst: &mut T, val: T, order: Ordering) -> T { + match order { + Acquire => intrinsics::atomic_and_acq(dst, val), + Release => intrinsics::atomic_and_rel(dst, val), + AcqRel => intrinsics::atomic_and_acqrel(dst, val), + Relaxed => intrinsics::atomic_and_relaxed(dst, val), + _ => intrinsics::atomic_and(dst, val) + } +} - +#[cfg(stage0)] #[inline] pub unsafe fn atomic_nand(dst: &mut T, val: T, order: Ordering) -> T { let dst = cast::transmute(dst); let val = cast::transmute(val); - cast::transmute(match order { Acquire => intrinsics::atomic_nand_acq(dst, val), Release => intrinsics::atomic_nand_rel(dst, val), @@ -461,13 +623,24 @@ pub unsafe fn atomic_nand(dst: &mut T, val: T, order: Ordering) -> T { _ => intrinsics::atomic_nand(dst, val) }) } +#[cfg(not(stage0))] +#[inline] +pub unsafe fn atomic_nand(dst: &mut T, val: T, order: Ordering) -> T { + match order { + Acquire => intrinsics::atomic_nand_acq(dst, val), + Release => intrinsics::atomic_nand_rel(dst, val), + AcqRel => intrinsics::atomic_nand_acqrel(dst, val), + Relaxed => intrinsics::atomic_nand_relaxed(dst, val), + _ => intrinsics::atomic_nand(dst, val) + } +} +#[cfg(stage0)] #[inline] pub unsafe fn atomic_or(dst: &mut T, val: T, order: Ordering) -> T { let dst = cast::transmute(dst); let val = cast::transmute(val); - cast::transmute(match order { Acquire => intrinsics::atomic_or_acq(dst, val), Release => intrinsics::atomic_or_rel(dst, val), @@ -476,13 +649,24 @@ pub unsafe fn atomic_or(dst: &mut T, val: T, order: Ordering) -> T { _ => intrinsics::atomic_or(dst, val) }) } +#[cfg(not(stage0))] +#[inline] +pub unsafe fn atomic_or(dst: &mut T, val: T, order: Ordering) -> T { + match order { + Acquire => intrinsics::atomic_or_acq(dst, val), + Release => intrinsics::atomic_or_rel(dst, val), + AcqRel => intrinsics::atomic_or_acqrel(dst, val), + Relaxed => intrinsics::atomic_or_relaxed(dst, val), + _ => intrinsics::atomic_or(dst, val) + } +} +#[cfg(stage0)] #[inline] pub unsafe fn atomic_xor(dst: &mut T, val: T, order: Ordering) -> T { let dst = cast::transmute(dst); let val = cast::transmute(val); - cast::transmute(match order { Acquire => intrinsics::atomic_xor_acq(dst, val), Release => intrinsics::atomic_xor_rel(dst, val), @@ -491,6 +675,17 @@ pub unsafe fn atomic_xor(dst: &mut T, val: T, order: Ordering) -> T { _ => intrinsics::atomic_xor(dst, val) }) } +#[cfg(not(stage0))] +#[inline] +pub unsafe fn atomic_xor(dst: &mut T, val: T, order: Ordering) -> T { + match order { + Acquire => intrinsics::atomic_xor_acq(dst, val), + Release => intrinsics::atomic_xor_rel(dst, val), + AcqRel => intrinsics::atomic_xor_acqrel(dst, val), + Relaxed => intrinsics::atomic_xor_relaxed(dst, val), + _ => intrinsics::atomic_xor(dst, val) + } +} /** @@ -599,4 +794,22 @@ mod test { assert!(S_UINT.load(SeqCst) == 0); } } + + #[test] + #[cfg(not(stage0))] + fn different_sizes() { + unsafe { + let mut slot = 0u16; + assert_eq!(super::atomic_swap(&mut slot, 1, SeqCst), 0); + + let mut slot = 0u8; + assert_eq!(super::atomic_compare_and_swap(&mut slot, 1, 2, SeqCst), 0); + + let mut slot = 0u32; + assert_eq!(super::atomic_load(&mut slot, SeqCst), 0); + + let mut slot = 0u64; + super::atomic_store(&mut slot, 2, SeqCst); + } + } } diff --git a/src/libstd/unstable/intrinsics.rs b/src/libstd/unstable/intrinsics.rs index 198df3090ee80..7b9733e1fbf8b 100644 --- a/src/libstd/unstable/intrinsics.rs +++ b/src/libstd/unstable/intrinsics.rs @@ -168,16 +168,8 @@ pub trait TyVisitor { fn visit_closure_ptr(&mut self, ck: uint) -> bool; } +#[cfg(stage0)] extern "rust-intrinsic" { - /// Abort the execution of the process. - pub fn abort() -> !; - - /// Execute a breakpoint trap, for inspection by a debugger. - pub fn breakpoint(); - - pub fn volatile_load(src: *T) -> T; - pub fn volatile_store(dst: *mut T, val: T); - /// Atomic compare and exchange, sequentially consistent. pub fn atomic_cxchg(dst: &mut int, old: int, src: int) -> int; /// Atomic compare and exchange, acquire ordering. @@ -282,6 +274,106 @@ extern "rust-intrinsic" { pub fn atomic_fence_acq(); pub fn atomic_fence_rel(); pub fn atomic_fence_acqrel(); +} + +#[cfg(not(stage0))] +extern "rust-intrinsic" { + pub fn atomic_cxchg(dst: &mut T, old: T, src: T) -> T; + pub fn atomic_cxchg_acq(dst: &mut T, old: T, src: T) -> T; + pub fn atomic_cxchg_rel(dst: &mut T, old: T, src: T) -> T; + pub fn atomic_cxchg_acqrel(dst: &mut T, old: T, src: T) -> T; + pub fn atomic_cxchg_relaxed(dst: &mut T, old: T, src: T) -> T; + + pub fn atomic_load(src: &T) -> T; + pub fn atomic_load_acq(src: &T) -> T; + pub fn atomic_load_relaxed(src: &T) -> T; + + pub fn atomic_store(dst: &mut T, val: T); + pub fn atomic_store_rel(dst: &mut T, val: T); + pub fn atomic_store_relaxed(dst: &mut T, val: T); + + pub fn atomic_xchg(dst: &mut T, src: T) -> T; + pub fn atomic_xchg_acq(dst: &mut T, src: T) -> T; + pub fn atomic_xchg_rel(dst: &mut T, src: T) -> T; + pub fn atomic_xchg_acqrel(dst: &mut T, src: T) -> T; + pub fn atomic_xchg_relaxed(dst: &mut T, src: T) -> T; + + pub fn atomic_xadd(dst: &mut T, src: T) -> T; + pub fn atomic_xadd_acq(dst: &mut T, src: T) -> T; + pub fn atomic_xadd_rel(dst: &mut T, src: T) -> T; + pub fn atomic_xadd_acqrel(dst: &mut T, src: T) -> T; + pub fn atomic_xadd_relaxed(dst: &mut T, src: T) -> T; + + pub fn atomic_xsub(dst: &mut T, src: T) -> T; + pub fn atomic_xsub_acq(dst: &mut T, src: T) -> T; + pub fn atomic_xsub_rel(dst: &mut T, src: T) -> T; + pub fn atomic_xsub_acqrel(dst: &mut T, src: T) -> T; + pub fn atomic_xsub_relaxed(dst: &mut T, src: T) -> T; + + pub fn atomic_and(dst: &mut T, src: T) -> T; + pub fn atomic_and_acq(dst: &mut T, src: T) -> T; + pub fn atomic_and_rel(dst: &mut T, src: T) -> T; + pub fn atomic_and_acqrel(dst: &mut T, src: T) -> T; + pub fn atomic_and_relaxed(dst: &mut T, src: T) -> T; + + pub fn atomic_nand(dst: &mut T, src: T) -> T; + pub fn atomic_nand_acq(dst: &mut T, src: T) -> T; + pub fn atomic_nand_rel(dst: &mut T, src: T) -> T; + pub fn atomic_nand_acqrel(dst: &mut T, src: T) -> T; + pub fn atomic_nand_relaxed(dst: &mut T, src: T) -> T; + + pub fn atomic_or(dst: &mut T, src: T) -> T; + pub fn atomic_or_acq(dst: &mut T, src: T) -> T; + pub fn atomic_or_rel(dst: &mut T, src: T) -> T; + pub fn atomic_or_acqrel(dst: &mut T, src: T) -> T; + pub fn atomic_or_relaxed(dst: &mut T, src: T) -> T; + + pub fn atomic_xor(dst: &mut T, src: T) -> T; + pub fn atomic_xor_acq(dst: &mut T, src: T) -> T; + pub fn atomic_xor_rel(dst: &mut T, src: T) -> T; + pub fn atomic_xor_acqrel(dst: &mut T, src: T) -> T; + pub fn atomic_xor_relaxed(dst: &mut T, src: T) -> T; + + pub fn atomic_max(dst: &mut T, src: T) -> T; + pub fn atomic_max_acq(dst: &mut T, src: T) -> T; + pub fn atomic_max_rel(dst: &mut T, src: T) -> T; + pub fn atomic_max_acqrel(dst: &mut T, src: T) -> T; + pub fn atomic_max_relaxed(dst: &mut T, src: T) -> T; + + pub fn atomic_min(dst: &mut T, src: T) -> T; + pub fn atomic_min_acq(dst: &mut T, src: T) -> T; + pub fn atomic_min_rel(dst: &mut T, src: T) -> T; + pub fn atomic_min_acqrel(dst: &mut T, src: T) -> T; + pub fn atomic_min_relaxed(dst: &mut T, src: T) -> T; + + pub fn atomic_umin(dst: &mut T, src: T) -> T; + pub fn atomic_umin_acq(dst: &mut T, src: T) -> T; + pub fn atomic_umin_rel(dst: &mut T, src: T) -> T; + pub fn atomic_umin_acqrel(dst: &mut T, src: T) -> T; + pub fn atomic_umin_relaxed(dst: &mut T, src: T) -> T; + + pub fn atomic_umax(dst: &mut T, src: T) -> T; + pub fn atomic_umax_acq(dst: &mut T, src: T) -> T; + pub fn atomic_umax_rel(dst: &mut T, src: T) -> T; + pub fn atomic_umax_acqrel(dst: &mut T, src: T) -> T; + pub fn atomic_umax_relaxed(dst: &mut T, src: T) -> T; + + pub fn atomic_fence(); + pub fn atomic_fence_acq(); + pub fn atomic_fence_rel(); + pub fn atomic_fence_acqrel(); +} + +extern "rust-intrinsic" { + /// Abort the execution of the process. + pub fn abort() -> !; + + /// Execute a breakpoint trap, for inspection by a debugger. + pub fn breakpoint(); + + pub fn volatile_load(src: *T) -> T; + pub fn volatile_store(dst: *mut T, val: T); + /// The size of a type in bytes. /// diff --git a/src/test/auxiliary/cci_intrinsic.rs b/src/test/auxiliary/cci_intrinsic.rs index 9e69715d1cb21..07d6df89d220c 100644 --- a/src/test/auxiliary/cci_intrinsic.rs +++ b/src/test/auxiliary/cci_intrinsic.rs @@ -10,21 +10,21 @@ pub mod rusti { extern "rust-intrinsic" { - pub fn atomic_cxchg(dst: &mut int, old: int, src: int) -> int; - pub fn atomic_cxchg_acq(dst: &mut int, old: int, src: int) -> int; - pub fn atomic_cxchg_rel(dst: &mut int, old: int, src: int) -> int; + pub fn atomic_cxchg(dst: &mut T, old: T, src: T) -> T; + pub fn atomic_cxchg_acq(dst: &mut T, old: T, src: T) -> T; + pub fn atomic_cxchg_rel(dst: &mut T, old: T, src: T) -> T; - pub fn atomic_xchg(dst: &mut int, src: int) -> int; - pub fn atomic_xchg_acq(dst: &mut int, src: int) -> int; - pub fn atomic_xchg_rel(dst: &mut int, src: int) -> int; + pub fn atomic_xchg(dst: &mut T, src: T) -> T; + pub fn atomic_xchg_acq(dst: &mut T, src: T) -> T; + pub fn atomic_xchg_rel(dst: &mut T, src: T) -> T; - pub fn atomic_xadd(dst: &mut int, src: int) -> int; - pub fn atomic_xadd_acq(dst: &mut int, src: int) -> int; - pub fn atomic_xadd_rel(dst: &mut int, src: int) -> int; + pub fn atomic_xadd(dst: &mut T, src: T) -> T; + pub fn atomic_xadd_acq(dst: &mut T, src: T) -> T; + pub fn atomic_xadd_rel(dst: &mut T, src: T) -> T; - pub fn atomic_xsub(dst: &mut int, src: int) -> int; - pub fn atomic_xsub_acq(dst: &mut int, src: int) -> int; - pub fn atomic_xsub_rel(dst: &mut int, src: int) -> int; + pub fn atomic_xsub(dst: &mut T, src: T) -> T; + pub fn atomic_xsub_acq(dst: &mut T, src: T) -> T; + pub fn atomic_xsub_rel(dst: &mut T, src: T) -> T; } } diff --git a/src/test/run-pass/intrinsic-atomics.rs b/src/test/run-pass/intrinsic-atomics.rs index 2ec91ee440b86..d6e394a345e22 100644 --- a/src/test/run-pass/intrinsic-atomics.rs +++ b/src/test/run-pass/intrinsic-atomics.rs @@ -10,27 +10,27 @@ mod rusti { extern "rust-intrinsic" { - pub fn atomic_cxchg(dst: &mut int, old: int, src: int) -> int; - pub fn atomic_cxchg_acq(dst: &mut int, old: int, src: int) -> int; - pub fn atomic_cxchg_rel(dst: &mut int, old: int, src: int) -> int; + pub fn atomic_cxchg(dst: &mut T, old: T, src: T) -> T; + pub fn atomic_cxchg_acq(dst: &mut T, old: T, src: T) -> T; + pub fn atomic_cxchg_rel(dst: &mut T, old: T, src: T) -> T; - pub fn atomic_load(src: &int) -> int; - pub fn atomic_load_acq(src: &int) -> int; + pub fn atomic_load(src: &T) -> T; + pub fn atomic_load_acq(src: &T) -> T; - pub fn atomic_store(dst: &mut int, val: int); - pub fn atomic_store_rel(dst: &mut int, val: int); + pub fn atomic_store(dst: &mut T, val: T); + pub fn atomic_store_rel(dst: &mut T, val: T); - pub fn atomic_xchg(dst: &mut int, src: int) -> int; - pub fn atomic_xchg_acq(dst: &mut int, src: int) -> int; - pub fn atomic_xchg_rel(dst: &mut int, src: int) -> int; + pub fn atomic_xchg(dst: &mut T, src: T) -> T; + pub fn atomic_xchg_acq(dst: &mut T, src: T) -> T; + pub fn atomic_xchg_rel(dst: &mut T, src: T) -> T; - pub fn atomic_xadd(dst: &mut int, src: int) -> int; - pub fn atomic_xadd_acq(dst: &mut int, src: int) -> int; - pub fn atomic_xadd_rel(dst: &mut int, src: int) -> int; + pub fn atomic_xadd(dst: &mut T, src: T) -> T; + pub fn atomic_xadd_acq(dst: &mut T, src: T) -> T; + pub fn atomic_xadd_rel(dst: &mut T, src: T) -> T; - pub fn atomic_xsub(dst: &mut int, src: int) -> int; - pub fn atomic_xsub_acq(dst: &mut int, src: int) -> int; - pub fn atomic_xsub_rel(dst: &mut int, src: int) -> int; + pub fn atomic_xsub(dst: &mut T, src: T) -> T; + pub fn atomic_xsub_acq(dst: &mut T, src: T) -> T; + pub fn atomic_xsub_rel(dst: &mut T, src: T) -> T; } } From 88e3b67ec4fd620d623cc61121b1c44ad3097c02 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Wed, 15 Jan 2014 16:54:11 -0800 Subject: [PATCH 02/13] Enable the +v7 feature on Android by default With the recently added double word CAS functionality on 32-bit ARM (enabled via a 64-bit atomic instruction in LLVM IR), without some extra features enabled LLVM lowers code to function calls which emulate atomic instructions. With the v7 feature enabled, proper 64-bit CAS instructions are used on 32-bit arm. I've been told that v7 for arm is what we should have been doing anyway. This is overridable by providing some other non-empty feature string. --- src/librustc/back/link.rs | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/src/librustc/back/link.rs b/src/librustc/back/link.rs index a57f1296969f1..a2bfa67b74db0 100644 --- a/src/librustc/back/link.rs +++ b/src/librustc/back/link.rs @@ -97,6 +97,7 @@ pub mod write { use lib::llvm::{ModuleRef, TargetMachineRef, PassManagerRef}; use lib; use util::common::time; + use syntax::abi; use std::c_str::ToCStr; use std::io; @@ -105,6 +106,24 @@ pub mod write { use std::run; use std::str; + // On android, we by default compile for armv7 processors. This enables + // things like double word CAS instructions (rather than emulating them) + // which are *far* more efficient. This is obviously undesirable in some + // cases, so if any sort of target feature is specified we don't append v7 + // to the feature list. + fn target_feature<'a>(sess: &'a Session) -> &'a str { + match sess.targ_cfg.os { + abi::OsAndroid => { + if "" == sess.opts.target_feature { + "+v7" + } else { + sess.opts.target_feature.as_slice() + } + } + _ => sess.opts.target_feature.as_slice() + } + } + pub fn run_passes(sess: Session, trans: &CrateTranslation, output_type: OutputType, @@ -130,7 +149,7 @@ pub mod write { let tm = sess.targ_cfg.target_strs.target_triple.with_c_str(|T| { sess.opts.target_cpu.with_c_str(|CPU| { - sess.opts.target_feature.with_c_str(|Features| { + target_feature(&sess).with_c_str(|Features| { llvm::LLVMRustCreateTargetMachine( T, CPU, Features, lib::llvm::CodeModelDefault, From 92eed090b3fb8da3ec435160a021855eea490b45 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Thu, 16 Jan 2014 19:52:47 -0800 Subject: [PATCH 03/13] extra: Make room for more sync primitives --- src/libextra/{sync.rs => sync/mod.rs} | 48 ++++++++++++++++----------- 1 file changed, 28 insertions(+), 20 deletions(-) rename src/libextra/{sync.rs => sync/mod.rs} (98%) diff --git a/src/libextra/sync.rs b/src/libextra/sync/mod.rs similarity index 98% rename from src/libextra/sync.rs rename to src/libextra/sync/mod.rs index 60929b71e5f11..091a4abc02df2 100644 --- a/src/libextra/sync.rs +++ b/src/libextra/sync/mod.rs @@ -17,10 +17,9 @@ * in std. */ - +use std::cast; use std::borrow; use std::comm; -use std::unstable::sync::Exclusive; use std::sync::arc::UnsafeArc; use std::sync::atomics; use std::unstable::finally::Finally; @@ -33,6 +32,10 @@ use arc::MutexArc; * Internals ****************************************************************************/ +pub mod mutex; +pub mod one; +mod mpsc_intrusive; + // Each waiting task receives on one of these. #[doc(hidden)] type WaitEnd = Port<()>; @@ -55,7 +58,7 @@ impl WaitQueue { comm::Data(ch) => { // Send a wakeup signal. If the waiter was killed, its port will // have closed. Keep trying until we get a live task. - if ch.try_send_deferred(()) { + if ch.try_send(()) { true } else { self.signal() @@ -70,7 +73,7 @@ impl WaitQueue { loop { match self.head.try_recv() { comm::Data(ch) => { - if ch.try_send_deferred(()) { + if ch.try_send(()) { count += 1; } } @@ -82,14 +85,14 @@ impl WaitQueue { fn wait_end(&self) -> WaitEnd { let (wait_end, signal_end) = Chan::new(); - assert!(self.tail.try_send_deferred(signal_end)); + assert!(self.tail.try_send(signal_end)); wait_end } } // The building-block used to make semaphores, mutexes, and rwlocks. -#[doc(hidden)] struct SemInner { + lock: mutex::Mutex, count: int, waiters: WaitQueue, // Can be either unit or another waitqueue. Some sems shouldn't come with @@ -97,21 +100,30 @@ struct SemInner { blocked: Q } -#[doc(hidden)] -struct Sem(Exclusive>); +struct Sem(UnsafeArc>); #[doc(hidden)] impl Sem { fn new(count: int, q: Q) -> Sem { - Sem(Exclusive::new(SemInner { - count: count, waiters: WaitQueue::new(), blocked: q })) + Sem(UnsafeArc::new(SemInner { + count: count, + waiters: WaitQueue::new(), + blocked: q, + lock: mutex::Mutex::new(), + })) + } + + unsafe fn with(&self, f: |&mut SemInner|) { + let Sem(ref arc) = *self; + let state = arc.get(); + let _g = (*state).lock.lock(); + f(cast::transmute(state)); } pub fn acquire(&self) { unsafe { let mut waiter_nobe = None; - let Sem(ref lock) = *self; - lock.with(|state| { + self.with(|state| { state.count -= 1; if state.count < 0 { // Create waiter nobe, enqueue ourself, and tell @@ -130,8 +142,7 @@ impl Sem { pub fn release(&self) { unsafe { - let Sem(ref lock) = *self; - lock.with(|state| { + self.with(|state| { state.count += 1; if state.count <= 0 { state.waiters.signal(); @@ -211,8 +222,7 @@ impl<'a> Condvar<'a> { let mut out_of_bounds = None; // Release lock, 'atomically' enqueuing ourselves in so doing. unsafe { - let Sem(ref queue) = *self.sem; - queue.with(|state| { + self.sem.with(|state| { if condvar_id < state.blocked.len() { // Drop the lock. state.count += 1; @@ -254,8 +264,7 @@ impl<'a> Condvar<'a> { unsafe { let mut out_of_bounds = None; let mut result = false; - let Sem(ref lock) = *self.sem; - lock.with(|state| { + self.sem.with(|state| { if condvar_id < state.blocked.len() { result = state.blocked[condvar_id].signal(); } else { @@ -277,8 +286,7 @@ impl<'a> Condvar<'a> { let mut out_of_bounds = None; let mut queue = None; unsafe { - let Sem(ref lock) = *self.sem; - lock.with(|state| { + self.sem.with(|state| { if condvar_id < state.blocked.len() { // To avoid :broadcast_heavy, we make a new waitqueue, // swap it out with the old one, and broadcast on the From 00493d020de6f59a2ec34a6820a8a223723bf424 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Thu, 16 Jan 2014 19:53:42 -0800 Subject: [PATCH 04/13] extra: Add an intrusive MPSC to be used soon --- src/etc/licenseck.py | 1 + src/libextra/sync/mpsc_intrusive.rs | 139 ++++++++++++++++++++++++++++ 2 files changed, 140 insertions(+) create mode 100644 src/libextra/sync/mpsc_intrusive.rs diff --git a/src/etc/licenseck.py b/src/etc/licenseck.py index b5a721c03ff09..afbf34d07535d 100644 --- a/src/etc/licenseck.py +++ b/src/etc/licenseck.py @@ -41,6 +41,7 @@ "libstd/sync/mpsc_queue.rs", # BSD "libstd/sync/spsc_queue.rs", # BSD "libstd/sync/mpmc_bounded_queue.rs", # BSD + "libextra/sync/mpsc_intrusive.rs", # BSD ] def check_license(name, contents): diff --git a/src/libextra/sync/mpsc_intrusive.rs b/src/libextra/sync/mpsc_intrusive.rs new file mode 100644 index 0000000000000..0f13a4980d919 --- /dev/null +++ b/src/libextra/sync/mpsc_intrusive.rs @@ -0,0 +1,139 @@ +/* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved. + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO + * EVENT SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, + * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, + * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + * The views and conclusions contained in the software and documentation are + * those of the authors and should not be interpreted as representing official + * policies, either expressed or implied, of Dmitry Vyukov. + */ + +//! A mostly lock-free multi-producer, single consumer queue. +//! +//! This module implements an intrusive MPSC queue. This queue is incredibly +//! unsafe (due to use of unsafe pointers for nodes), and hence is not public. + +// http://www.1024cores.net/home/lock-free-algorithms +// /queues/intrusive-mpsc-node-based-queue + +use std::cast; +use std::sync::atomics; + +// NB: all links are done as AtomicUint instead of AtomicPtr to allow for static +// initialization. + +pub struct Node { + next: atomics::AtomicUint, + data: T, +} + +pub struct DummyNode { + next: atomics::AtomicUint, +} + +pub struct Queue { + head: atomics::AtomicUint, + tail: *mut Node, + stub: DummyNode, +} + +impl Queue { + pub fn new() -> Queue { + Queue { + head: atomics::AtomicUint::new(0), + tail: 0 as *mut Node, + stub: DummyNode { + next: atomics::AtomicUint::new(0), + }, + } + } + + pub unsafe fn push(&mut self, node: *mut Node) { + (*node).next.store(0, atomics::Release); + let prev = self.head.swap(node as uint, atomics::AcqRel); + + // Note that this code is slightly modified to allow static + // initialization of these queues with rust's flavor of static + // initialization. + if prev == 0 { + self.stub.next.store(node as uint, atomics::Release); + } else { + let prev = prev as *mut Node; + (*prev).next.store(node as uint, atomics::Release); + } + } + + /// You'll note that the other MPSC queue in std::sync is non-intrusive and + /// returns a `PopResult` here to indicate when the queue is inconsistent. + /// An "inconsistent state" in the other queue means that a pusher has + /// pushed, but it hasn't finished linking the rest of the chain. + /// + /// This queue also suffers from this problem, but I currently haven't been + /// able to detangle when this actually happens. This code is translated + /// verbatim from the website above, and is more complicated than the + /// non-intrusive version. + /// + /// Right now consumers of this queue must be ready for this fact. Just + /// because `pop` returns `None` does not mean that there is not data + /// on the queue. + pub unsafe fn pop(&mut self) -> Option<*mut Node> { + let tail = self.tail; + let mut tail = if !tail.is_null() {tail} else { + cast::transmute(&self.stub) + }; + let mut next = (*tail).next(atomics::Relaxed); + if tail as uint == &self.stub as *DummyNode as uint { + if next.is_null() { + return None; + } + self.tail = next; + tail = next; + next = (*next).next(atomics::Relaxed); + } + if !next.is_null() { + self.tail = next; + return Some(tail); + } + let head = self.head.load(atomics::Acquire) as *mut Node; + if tail != head { + return None; + } + let stub = cast::transmute(&self.stub); + self.push(stub); + next = (*tail).next(atomics::Relaxed); + if !next.is_null() { + self.tail = next; + return Some(tail); + } + return None + } +} + +impl Node { + pub fn new(t: T) -> Node { + Node { + data: t, + next: atomics::AtomicUint::new(0), + } + } + pub unsafe fn next(&mut self, ord: atomics::Ordering) -> *mut Node { + cast::transmute::>(self.next.load(ord)) + } +} From b04194c93b86a672cfecd3febf026ff36f65bc32 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Thu, 16 Jan 2014 19:54:24 -0800 Subject: [PATCH 05/13] std: Hardcode pthread constants and structures This allows for easier static initialization of a pthread mutex, although the windows mutexes still sadly suffer. Note that this commit removes the clone() method from a mutex because it no longer makes sense for pthreads mutexes. This also removes the Once type for now, but it'll get added back shortly. --- src/libgreen/sched.rs | 26 +- src/libstd/unstable/mutex.rs | 506 ++++++++++++++--------------------- src/rt/rust_builtin.c | 20 -- 3 files changed, 216 insertions(+), 336 deletions(-) diff --git a/src/libgreen/sched.rs b/src/libgreen/sched.rs index 8fa1e6732dcc8..f7a81c1342f82 100644 --- a/src/libgreen/sched.rs +++ b/src/libgreen/sched.rs @@ -1418,7 +1418,8 @@ mod test { #[test] fn test_spawn_sched_blocking() { - use std::unstable::mutex::Mutex; + use std::unstable::mutex::{Mutex, MUTEX_INIT}; + static mut LOCK: Mutex = MUTEX_INIT; // Testing that a task in one scheduler can block in foreign code // without affecting other schedulers @@ -1427,19 +1428,15 @@ mod test { let (start_po, start_ch) = Chan::new(); let (fin_po, fin_ch) = Chan::new(); - let lock = unsafe { Mutex::new() }; - let lock2 = unsafe { lock.clone() }; - let mut handle = pool.spawn_sched(); handle.send(PinnedTask(pool.task(TaskOpts::new(), proc() { - let mut lock = lock2; unsafe { - lock.lock(); + LOCK.lock(); start_ch.send(()); - lock.wait(); // block the scheduler thread - lock.signal(); // let them know we have the lock - lock.unlock(); + LOCK.wait(); // block the scheduler thread + LOCK.signal(); // let them know we have the lock + LOCK.unlock(); } fin_ch.send(()); @@ -1471,12 +1468,11 @@ mod test { child_ch.send(20); pingpong(&parent_po, &child_ch); unsafe { - let mut lock = lock; - lock.lock(); - lock.signal(); // wakeup waiting scheduler - lock.wait(); // wait for them to grab the lock - lock.unlock(); - lock.destroy(); // now we're guaranteed they have no locks + LOCK.lock(); + LOCK.signal(); // wakeup waiting scheduler + LOCK.wait(); // wait for them to grab the lock + LOCK.unlock(); + LOCK.destroy(); // now we're guaranteed they have no locks } }))); drop(handle); diff --git a/src/libstd/unstable/mutex.rs b/src/libstd/unstable/mutex.rs index ba965f2b5c5b2..7d013c5fa7478 100644 --- a/src/libstd/unstable/mutex.rs +++ b/src/libstd/unstable/mutex.rs @@ -47,180 +47,173 @@ #[allow(non_camel_case_types)]; -use int; -use sync::atomics; - pub struct Mutex { - // pointers for the lock/cond handles, atomically updated - priv lock: atomics::AtomicUint, - priv cond: atomics::AtomicUint, + priv inner: imp::Mutex, } pub static MUTEX_INIT: Mutex = Mutex { - lock: atomics::INIT_ATOMIC_UINT, - cond: atomics::INIT_ATOMIC_UINT, + inner: imp::MUTEX_INIT, }; impl Mutex { - /// Creates a new mutex, with the lock/condition variable pre-initialized + /// Creates a new mutex pub unsafe fn new() -> Mutex { - Mutex { - lock: atomics::AtomicUint::new(imp::init_lock()), - cond: atomics::AtomicUint::new(imp::init_cond()), - } - } - - /// Creates a new mutex, with the lock/condition variable not initialized. - /// This is the same as initializing from the MUTEX_INIT static. - pub unsafe fn empty() -> Mutex { - Mutex { - lock: atomics::AtomicUint::new(0), - cond: atomics::AtomicUint::new(0), - } - } - - /// Creates a new copy of this mutex. This is an unsafe operation because - /// there is no reference counting performed on this type. - /// - /// This function may only be called on mutexes which have had both the - /// internal condition variable and lock initialized. This means that the - /// mutex must have been created via `new`, or usage of it has already - /// initialized the internal handles. - /// - /// This is a dangerous function to call as both this mutex and the returned - /// mutex will share the same handles to the underlying mutex/condition - /// variable. Care must be taken to ensure that deallocation happens - /// accordingly. - pub unsafe fn clone(&self) -> Mutex { - let lock = self.lock.load(atomics::Relaxed); - let cond = self.cond.load(atomics::Relaxed); - assert!(lock != 0); - assert!(cond != 0); - Mutex { - lock: atomics::AtomicUint::new(lock), - cond: atomics::AtomicUint::new(cond), - } + Mutex { inner: imp::Mutex::new() } } /// Acquires this lock. This assumes that the current thread does not /// already hold the lock. - pub unsafe fn lock(&mut self) { imp::lock(self.getlock()) } + pub unsafe fn lock(&mut self) { self.inner.lock() } /// Attempts to acquire the lock. The value returned is whether the lock was /// acquired or not - pub unsafe fn trylock(&mut self) -> bool { imp::trylock(self.getlock()) } + pub unsafe fn trylock(&mut self) -> bool { self.inner.trylock() } /// Unlocks the lock. This assumes that the current thread already holds the /// lock. - pub unsafe fn unlock(&mut self) { imp::unlock(self.getlock()) } + pub unsafe fn unlock(&mut self) { self.inner.unlock() } /// Block on the internal condition variable. /// /// This function assumes that the lock is already held - pub unsafe fn wait(&mut self) { imp::wait(self.getcond(), self.getlock()) } + pub unsafe fn wait(&mut self) { self.inner.wait() } /// Signals a thread in `wait` to wake up - pub unsafe fn signal(&mut self) { imp::signal(self.getcond()) } + pub unsafe fn signal(&mut self) { self.inner.signal() } /// This function is especially unsafe because there are no guarantees made /// that no other thread is currently holding the lock or waiting on the /// condition variable contained inside. - pub unsafe fn destroy(&mut self) { - let lock = self.lock.swap(0, atomics::Relaxed); - let cond = self.cond.swap(0, atomics::Relaxed); - if lock != 0 { imp::free_lock(lock) } - if cond != 0 { imp::free_cond(cond) } - } - - unsafe fn getlock(&mut self) -> uint{ - match self.lock.load(atomics::Relaxed) { - 0 => {} - n => return n - } - let lock = imp::init_lock(); - match self.lock.compare_and_swap(0, lock, atomics::SeqCst) { - 0 => return lock, - _ => {} - } - imp::free_lock(lock); - self.lock.load(atomics::Relaxed) - } - - unsafe fn getcond(&mut self) -> uint { - match self.cond.load(atomics::Relaxed) { - 0 => {} - n => return n - } - let cond = imp::init_cond(); - match self.cond.compare_and_swap(0, cond, atomics::SeqCst) { - 0 => return cond, - _ => {} - } - imp::free_cond(cond); - self.cond.load(atomics::Relaxed) - } + pub unsafe fn destroy(&mut self) { self.inner.destroy() } } #[cfg(unix)] mod imp { use libc; - use ptr; - use rt::global_heap::malloc_raw; + use self::os::{PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER, + pthread_mutex_t, pthread_cond_t}; + use unstable::intrinsics; - type pthread_mutex_t = libc::c_void; type pthread_mutexattr_t = libc::c_void; - type pthread_cond_t = libc::c_void; type pthread_condattr_t = libc::c_void; - pub unsafe fn init_lock() -> uint { - let block = malloc_raw(rust_pthread_mutex_t_size() as uint) as *mut pthread_mutex_t; - let n = pthread_mutex_init(block, ptr::null()); - assert_eq!(n, 0); - return block as uint; - } - - pub unsafe fn init_cond() -> uint { - let block = malloc_raw(rust_pthread_cond_t_size() as uint) as *mut pthread_cond_t; - let n = pthread_cond_init(block, ptr::null()); - assert_eq!(n, 0); - return block as uint; - } + #[cfg(target_os = "freebsd")] + mod os { + use libc; + + pub type pthread_mutex_t = *libc::c_void; + pub type pthread_cond_t = *libc::c_void; + + pub static PTHREAD_MUTEX_INITIALIZER: pthread_mutex_t = + 0 as pthread_mutex_t; + pub static PTHREAD_COND_INITIALIZER: pthread_cond_t = + 0 as pthread_cond_t; + } + + #[cfg(target_os = "macos")] + mod os { + use libc; + + #[cfg(target_arch = "x86_64")] + static __PTHREAD_MUTEX_SIZE__: uint = 56; + #[cfg(target_arch = "x86_64")] + static __PTHREAD_COND_SIZE__: uint = 40; + #[cfg(target_arch = "x86")] + static __PTHREAD_MUTEX_SIZE__: uint = 40; + #[cfg(target_arch = "x86")] + static __PTHREAD_COND_SIZE__: uint = 24; + static _PTHREAD_MUTEX_SIG_init: libc::c_long = 0x32AAABA7; + static _PTHREAD_COND_SIG_init: libc::c_long = 0x3CB0B1BB; + + pub struct pthread_mutex_t { + __sig: libc::c_long, + __opaque: [u8, ..__PTHREAD_MUTEX_SIZE__], + } + pub struct pthread_cond_t { + __sig: libc::c_long, + __opaque: [u8, ..__PTHREAD_COND_SIZE__], + } - pub unsafe fn free_lock(h: uint) { - let block = h as *mut libc::c_void; - assert_eq!(pthread_mutex_destroy(block), 0); - libc::free(block); - } + pub static PTHREAD_MUTEX_INITIALIZER: pthread_mutex_t = pthread_mutex_t { + __sig: _PTHREAD_MUTEX_SIG_init, + __opaque: [0, ..__PTHREAD_MUTEX_SIZE__], + }; + pub static PTHREAD_COND_INITIALIZER: pthread_cond_t = pthread_cond_t { + __sig: _PTHREAD_COND_SIG_init, + __opaque: [0, ..__PTHREAD_COND_SIZE__], + }; + } + + #[cfg(target_os = "linux")] + #[cfg(target_os = "android")] + mod os { + use libc; + + // minus 8 because we have an 'align' field + #[cfg(target_arch = "x86_64")] + static __SIZEOF_PTHREAD_MUTEX_T: uint = 40 - 8; + #[cfg(target_arch = "x86")] + static __SIZEOF_PTHREAD_MUTEX_T: uint = 24 - 8; + #[cfg(target_arch = "x86_64")] + static __SIZEOF_PTHREAD_COND_T: uint = 48 - 8; + #[cfg(target_arch = "x86")] + static __SIZEOF_PTHREAD_COND_T: uint = 48 - 8; + + pub struct pthread_mutex_t { + __align: libc::c_long, + size: [u8, ..__SIZEOF_PTHREAD_MUTEX_T], + } + pub struct pthread_cond_t { + __align: libc::c_longlong, + size: [u8, ..__SIZEOF_PTHREAD_COND_T], + } - pub unsafe fn free_cond(h: uint) { - let block = h as *mut pthread_cond_t; - assert_eq!(pthread_cond_destroy(block), 0); - libc::free(block); + pub static PTHREAD_MUTEX_INITIALIZER: pthread_mutex_t = pthread_mutex_t { + __align: 0, + size: [0, ..__SIZEOF_PTHREAD_MUTEX_T], + }; + pub static PTHREAD_COND_INITIALIZER: pthread_cond_t = pthread_cond_t { + __align: 0, + size: [0, ..__SIZEOF_PTHREAD_COND_T], + }; } - pub unsafe fn lock(l: uint) { - assert_eq!(pthread_mutex_lock(l as *mut pthread_mutex_t), 0); + pub struct Mutex { + priv lock: pthread_mutex_t, + priv cond: pthread_cond_t, } - pub unsafe fn trylock(l: uint) -> bool { - pthread_mutex_trylock(l as *mut pthread_mutex_t) == 0 - } + pub static MUTEX_INIT: Mutex = Mutex { + lock: PTHREAD_MUTEX_INITIALIZER, + cond: PTHREAD_COND_INITIALIZER, + }; - pub unsafe fn unlock(l: uint) { - assert_eq!(pthread_mutex_unlock(l as *mut pthread_mutex_t), 0); - } + impl Mutex { + pub unsafe fn new() -> Mutex { + let m = Mutex { + lock: intrinsics::init(), + cond: intrinsics::init(), + }; - pub unsafe fn wait(cond: uint, m: uint) { - assert_eq!(pthread_cond_wait(cond as *mut pthread_cond_t, m as *mut pthread_mutex_t), 0); - } + pthread_mutex_init(&m.lock, 0 as *libc::c_void); + pthread_cond_init(&m.cond, 0 as *libc::c_void); - pub unsafe fn signal(cond: uint) { - assert_eq!(pthread_cond_signal(cond as *mut pthread_cond_t), 0); - } + return m; + } - extern { - fn rust_pthread_mutex_t_size() -> libc::c_int; - fn rust_pthread_cond_t_size() -> libc::c_int; + pub unsafe fn lock(&mut self) { pthread_mutex_lock(&self.lock); } + pub unsafe fn unlock(&mut self) { pthread_mutex_unlock(&self.lock); } + pub unsafe fn signal(&mut self) { pthread_cond_signal(&self.cond); } + pub unsafe fn wait(&mut self) { + pthread_cond_wait(&self.cond, &self.lock); + } + pub unsafe fn trylock(&mut self) -> bool { + pthread_mutex_trylock(&self.lock) == 0 + } + pub unsafe fn destroy(&mut self) { + pthread_mutex_destroy(&self.lock); + pthread_cond_destroy(&self.cond); + } } extern { @@ -242,16 +235,96 @@ mod imp { #[cfg(windows)] mod imp { - use libc; use libc::{HANDLE, BOOL, LPSECURITY_ATTRIBUTES, c_void, DWORD, LPCSTR}; + use libc; + use ptr::RawPtr; use ptr; - use rt::global_heap::malloc_raw; + use sync::atomics; type LPCRITICAL_SECTION = *c_void; static SPIN_COUNT: DWORD = 4000; + #[cfg(target_arch = "x86")] + static CRIT_SECTION_SIZE: uint = 24; + + pub struct Mutex { + // pointers for the lock/cond handles, atomically updated + priv lock: atomics::AtomicUint, + priv cond: atomics::AtomicUint, + } + + pub static MUTEX_INIT: Mutex = Mutex { + lock: atomics::INIT_ATOMIC_UINT, + cond: atomics::INIT_ATOMIC_UINT, + }; + + impl Mutex { + pub unsafe fn new() -> Mutex { + Mutex { + lock: atomics::AtomicUint::new(init_lock()), + cond: atomics::AtomicUint::new(init_cond()), + } + } + pub unsafe fn lock(&mut self) { + EnterCriticalSection(self.getlock() as LPCRITICAL_SECTION) + } + pub unsafe fn trylock(&mut self) -> bool { + TryEnterCriticalSection(self.getlock() as LPCRITICAL_SECTION) != 0 + } + pub unsafe fn unlock(&mut self) { + LeaveCriticalSection(self.getlock() as LPCRITICAL_SECTION) + } + + pub unsafe fn wait(&mut self) { + self.unlock(); + WaitForSingleObject(self.getcond() as HANDLE, libc::INFINITE); + self.lock(); + } + + pub unsafe fn signal(&mut self) { + assert!(SetEvent(self.getcond() as HANDLE) != 0); + } + + /// This function is especially unsafe because there are no guarantees made + /// that no other thread is currently holding the lock or waiting on the + /// condition variable contained inside. + pub unsafe fn destroy(&mut self) { + let lock = self.lock.swap(0, atomics::Relaxed); + let cond = self.cond.swap(0, atomics::Relaxed); + if lock != 0 { free_lock(lock) } + if cond != 0 { free_cond(cond) } + } + + unsafe fn getlock(&mut self) -> *mut c_void { + match self.lock.load(atomics::Relaxed) { + 0 => {} + n => return n as *mut c_void + } + let lock = init_lock(); + match self.lock.compare_and_swap(0, lock, atomics::SeqCst) { + 0 => return lock as *mut c_void, + _ => {} + } + free_lock(lock); + return self.lock.load(atomics::Relaxed) as *mut c_void; + } + + unsafe fn getcond(&mut self) -> *mut c_void { + match self.cond.load(atomics::Relaxed) { + 0 => {} + n => return n as *mut c_void + } + let cond = init_cond(); + match self.cond.compare_and_swap(0, cond, atomics::SeqCst) { + 0 => return cond as *mut c_void, + _ => {} + } + free_cond(cond); + return self.cond.load(atomics::Relaxed) as *mut c_void; + } + } pub unsafe fn init_lock() -> uint { - let block = malloc_raw(rust_crit_section_size() as uint) as *c_void; + let block = malloc_raw(CRIT_SECTION_SIZE as uint) as *c_void; InitializeCriticalSectionAndSpinCount(block, SPIN_COUNT); return block as uint; } @@ -271,32 +344,6 @@ mod imp { libc::CloseHandle(block); } - pub unsafe fn lock(l: uint) { - EnterCriticalSection(l as LPCRITICAL_SECTION) - } - - pub unsafe fn trylock(l: uint) -> bool { - TryEnterCriticalSection(l as LPCRITICAL_SECTION) != 0 - } - - pub unsafe fn unlock(l: uint) { - LeaveCriticalSection(l as LPCRITICAL_SECTION) - } - - pub unsafe fn wait(cond: uint, m: uint) { - unlock(m); - WaitForSingleObject(cond as HANDLE, libc::INFINITE); - lock(m); - } - - pub unsafe fn signal(cond: uint) { - assert!(SetEvent(cond as HANDLE) != 0); - } - - extern { - fn rust_crit_section_size() -> libc::c_int; - } - extern "system" { fn CreateEventA(lpSecurityAttributes: LPSECURITY_ATTRIBUTES, bManualReset: BOOL, @@ -314,157 +361,14 @@ mod imp { } } -/// A type which can be used to run a one-time global initialization. This type -/// is *unsafe* to use because it is built on top of the `Mutex` in this module. -/// It does not know whether the currently running task is in a green or native -/// context, and a blocking mutex should *not* be used under normal -/// circumstances on a green task. -/// -/// Despite its unsafety, it is often useful to have a one-time initialization -/// routine run for FFI bindings or related external functionality. This type -/// can only be statically constructed with the `ONCE_INIT` value. -/// -/// # Example -/// -/// ```rust -/// use std::unstable::mutex::{Once, ONCE_INIT}; -/// -/// static mut START: Once = ONCE_INIT; -/// unsafe { -/// START.doit(|| { -/// // run initialization here -/// }); -/// } -/// ``` -pub struct Once { - priv mutex: Mutex, - priv cnt: atomics::AtomicInt, - priv lock_cnt: atomics::AtomicInt, -} - -/// Initialization value for static `Once` values. -pub static ONCE_INIT: Once = Once { - mutex: MUTEX_INIT, - cnt: atomics::INIT_ATOMIC_INT, - lock_cnt: atomics::INIT_ATOMIC_INT, -}; - -impl Once { - /// Perform an initialization routine once and only once. The given closure - /// will be executed if this is the first time `doit` has been called, and - /// otherwise the routine will *not* be invoked. - /// - /// This method will block the calling *os thread* if another initialization - /// routine is currently running. - /// - /// When this function returns, it is guaranteed that some initialization - /// has run and completed (it may not be the closure specified). - pub fn doit(&mut self, f: ||) { - // Implementation-wise, this would seem like a fairly trivial primitive. - // The stickler part is where our mutexes currently require an - // allocation, and usage of a `Once` should't leak this allocation. - // - // This means that there must be a deterministic destroyer of the mutex - // contained within (because it's not needed after the initialization - // has run). - // - // The general scheme here is to gate all future threads once - // initialization has completed with a "very negative" count, and to - // allow through threads to lock the mutex if they see a non negative - // count. For all threads grabbing the mutex, exactly one of them should - // be responsible for unlocking the mutex, and this should only be done - // once everyone else is done with the mutex. - // - // This atomicity is achieved by swapping a very negative value into the - // shared count when the initialization routine has completed. This will - // read the number of threads which will at some point attempt to - // acquire the mutex. This count is then squirreled away in a separate - // variable, and the last person on the way out of the mutex is then - // responsible for destroying the mutex. - // - // It is crucial that the negative value is swapped in *after* the - // initialization routine has completed because otherwise new threads - // calling `doit` will return immediately before the initialization has - // completed. - - let prev = self.cnt.fetch_add(1, atomics::SeqCst); - if prev < 0 { - // Make sure we never overflow, we'll never have int::MIN - // simultaneous calls to `doit` to make this value go back to 0 - self.cnt.store(int::MIN, atomics::SeqCst); - return - } - - // If the count is negative, then someone else finished the job, - // otherwise we run the job and record how many people will try to grab - // this lock - unsafe { self.mutex.lock() } - if self.cnt.load(atomics::SeqCst) > 0 { - f(); - let prev = self.cnt.swap(int::MIN, atomics::SeqCst); - self.lock_cnt.store(prev, atomics::SeqCst); - } - unsafe { self.mutex.unlock() } - - // Last one out cleans up after everyone else, no leaks! - if self.lock_cnt.fetch_add(-1, atomics::SeqCst) == 1 { - unsafe { self.mutex.destroy() } - } - } -} - #[cfg(test)] mod test { use prelude::*; + use super::{Mutex, MUTEX_INIT}; use rt::thread::Thread; - use super::{ONCE_INIT, Once, Mutex, MUTEX_INIT}; use task; - #[test] - fn smoke_once() { - static mut o: Once = ONCE_INIT; - let mut a = 0; - unsafe { o.doit(|| a += 1); } - assert_eq!(a, 1); - unsafe { o.doit(|| a += 1); } - assert_eq!(a, 1); - } - - #[test] - fn stampede_once() { - static mut o: Once = ONCE_INIT; - static mut run: bool = false; - - let (p, c) = SharedChan::new(); - for _ in range(0, 10) { - let c = c.clone(); - do spawn { - for _ in range(0, 4) { task::deschedule() } - unsafe { - o.doit(|| { - assert!(!run); - run = true; - }); - assert!(run); - } - c.send(()); - } - } - - unsafe { - o.doit(|| { - assert!(!run); - run = true; - }); - assert!(run); - } - - for _ in range(0, 10) { - p.recv(); - } - } - #[test] fn somke_lock() { static mut lock: Mutex = MUTEX_INIT; @@ -493,7 +397,7 @@ mod test { #[test] fn destroy_immediately() { unsafe { - let mut m = Mutex::empty(); + let mut m = Mutex::new(); m.destroy(); } } diff --git a/src/rt/rust_builtin.c b/src/rt/rust_builtin.c index 6de5f80829003..81eba2984dad0 100644 --- a/src/rt/rust_builtin.c +++ b/src/rt/rust_builtin.c @@ -437,26 +437,6 @@ rust_win32_rand_release() { #endif -#if defined(__WIN32__) - -int -rust_crit_section_size() { return sizeof(CRITICAL_SECTION); } -int -rust_pthread_mutex_t_size() { return 0; } -int -rust_pthread_cond_t_size() { return 0; } - -#else - -int -rust_crit_section_size() { return 0; } -int -rust_pthread_mutex_t_size() { return sizeof(pthread_mutex_t); } -int -rust_pthread_cond_t_size() { return sizeof(pthread_cond_t); } - -#endif - // // Local Variables: // mode: C++ From af70d892d2e3169dc8b22f4f2c564079f8ca4775 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Thu, 16 Jan 2014 19:57:15 -0800 Subject: [PATCH 06/13] extra: Introduce a new efficient Mutex type There are no current consumers of this mutex, but rather more will be added soon. --- src/libextra/sync/mutex.rs | 492 +++++++++++++++++++++++++++++++++++++ 1 file changed, 492 insertions(+) create mode 100644 src/libextra/sync/mutex.rs diff --git a/src/libextra/sync/mutex.rs b/src/libextra/sync/mutex.rs new file mode 100644 index 0000000000000..3c76b7bb8a2f6 --- /dev/null +++ b/src/libextra/sync/mutex.rs @@ -0,0 +1,492 @@ +// Copyright 2014 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +//! A proper mutex implementation regardless of the "flavor of task" which is +//! acquiring the lock. + +// # The implementation of Rust's mutexes +// +// As hinted in the doc-comment above, the fundamental problem of implementing a +// mutex for rust is that you can't "just use pthreads". Green tasks are not +// allowed to block on a pthread mutex, because this can very easily lead to +// deadlock. Otherwise, there are other properties that we would want out of an +// "official mutex": +// +// * Any flavor of task can acquire the mutex, green or native +// * Any mixing of flavors of tasks can acquire the mutex. It should be possible +// for green and native threads to contend over acquiring the mutex +// * This mutex should be "just as fast" as pthreads +// * Mutexes should be statically initializeable +// * Mutexes should really not need to have destructors (see static +// initialization) +// +// Some properties which have been deemed not critical +// +// * Enforcing bounded waiting among all tasks acquiring the mutex. Mixing +// green/native tasks is predicted to be a fairly rare case. +// +// ## Mutexes, take 1 +// +// Within these constraints, the primitives we have available to us for blocking +// a task are the `deschedule` and `reawaken` methods on the `rt::Runtime` +// trait. These are the obvious choices to use first because they're "what we +// havel already" and should certainly be efficient. +// +// The sketch behind this mutex would be to use an intrusive (to avoid +// allocations) MPSC queue (the consumer is the lock holder) with some +// sprinkling of atomics to wake threads up. Each `BlockedTask` would be stored +// in the nodes of the queue. +// +// This implementation is all fine and dandy for green threads (user space +// context switching is fast), but when implemented, it was found that this +// implementation was about 50x slower than pthreads for native threads. +// +// Upon profiling, nearly all time was spent in cvar signal/wait (that's how +// native threads implement deschedule/reawaken). The problem was never tracked +// down with 100% certainty, but it was able discovered that this huge slowdown +// was only on a multicore system, not a single core system. With this knowledge +// in hand, plus some idea of how pthread mutexes are implemented, it was +// deduced that the kernel essentially knows what's going on when everyone's +// contended on the same mutex (as in the pthreads case). The kernel can +// cleverly schedule threads to *not* wake up on remote cores because all the +// work needs to happen on the same core (that's the whole point of a mutex). +// The deschedule/reawaken methods put threads to sleep on localized cvars, so +// the kernel had no idea that all our threads were contending *on the same +// mutex*. +// +// With this information in mind, it was concluded that it's impossible to +// create a pthreads-competitive mutex with the deschedule/reawaken primitives. +// We simply have no way of instructing the kernel that all native threads are +// contended on one object and should therefore *not* be spread out on many +// cores. +// +// ## Mutexes, take 2 +// +// Back do the drawing board, the key idea was to actually have this mutex be a +// wrapper around a pthreads mutex. This would clearly solve the native threads +// problem (we'd be "just as fast" as pthreads), but the green problem comes +// back into play (you can't just grab the lock). +// +// The solution found (and the current implementation) ended up having a hybrid +// solution of queues/mutexes. The key idea is that green threads only ever +// *trylock* and use an internal queue to keep track of who's waiting, and +// native threads will simply just call *lock*. +// +// With this scheme, we get all the benefits of both worlds: +// +// * Any flavor of task (even mixed) can grab a mutex, pthreads arbitrates among +// all native and the first green tasks, and then green tasks use atomics to +// arbitrate among themselves. +// * We're just as fast as pthreads (within a small percentage of course) +// * Native mutexes are statically initializeable, and some clever usage of +// atomics can make the green halves of the mutex also statically +// initializeable. +// * No destructors are necessary (there is no memory allocation). The caveat +// here is that windows doesn't have statically initialized mutexes, but it is +// predicted that statically initialized mutexes won't be *too* common. Plus, +// the "free" happens at program end when cleaning up doesn't matter *that* +// much. +// +// As you'll find out in the implementation, this approach cannot be fair to +// native and green threads. In order to soundly drain the internal queue of +// green threads, they *must* be favored over native threads. It was an explicit +// non-goal of these mutexes to be completely fair to everyone, so this has been +// deemed acceptable. +// +// This is the high-level implementation of the mutexes, but the nitty gritty +// details can be found in the code below. + +use std::rt::local::Local; +use std::rt::task::{BlockedTask, Task}; +use std::rt::thread::Thread; +use std::sync::atomics; +use std::unstable::mutex; + +use q = sync::mpsc_intrusive; + +/// A mutual exclusion primitive useful for protecting shared data +/// +/// This mutex is an implementation of a lock for all flavors of tasks which may +/// be grabbing. A common problem with green threads is that they cannot grab +/// locks (if they reschedule during the lock a contender could deadlock the +/// system), but this mutex does *not* suffer this problem. +/// +/// This mutex will properly block tasks waiting for the lock to become +/// available. The mutex can also be statically initialized or created via a +/// `new` constructor. +/// +/// # Example +/// +/// ```rust +/// use std::sync::Mutex; +/// +/// let mut m = Mutex::new(); +/// let guard = m.lock(); +/// // do some work +/// drop(guard); // unlock the lock +/// +/// { +/// let _g = m.lock(); +/// // do some work in a scope +/// } +/// +/// // now the mutex is unlocked +/// ``` +pub struct Mutex { + priv lock: StaticMutex, +} + +/// The static mutex type is provided to allow for static allocation of mutexes. +/// +/// Note that this is a separate type because using a Mutex correctly means that +/// it needs to have a destructor run. In Rust, statics are not allowed to have +/// destructors. As a result, a `StaticMutex` has one extra method when compared +/// to a `Mutex`, a `destroy` method. This method is unsafe to call, and +/// documentation can be found directly on the method. +/// +/// # Example +/// +/// ```rust +/// use std::sync::{StaticMutex, MUTEX_INIT}; +/// +/// static mut LOCK: StaticMutex = MUTEX_INIT; +/// +/// unsafe { +/// let _g = LOCK.lock(); +/// // do some productive work +/// } +/// // lock is unlocked here. +/// ``` +pub struct StaticMutex { + /// The OS mutex (pthreads/windows equivalent) that we're wrapping. + priv lock: mutex::Mutex, + /// Internal queue that all green threads will be blocked on. + priv q: q::Queue, + /// Dubious flag about whether this mutex is held or not. You might be + /// thinking "this is impossible to manage atomically," and you would be + /// correct! Keep on reading! + priv held: atomics::AtomicBool, +} + +/// An RAII implementation of a "scoped lock" of a mutex. When this structure is +/// dropped (falls out of scope), the lock will be unlocked. +pub struct Guard<'a> { + priv lock: &'a mut StaticMutex, +} + +/// Static initialization of a mutex. This constant can be used to initialize +/// other mutex constants. +pub static MUTEX_INIT: StaticMutex = StaticMutex { + lock: mutex::MUTEX_INIT, + held: atomics::INIT_ATOMIC_BOOL, + q: q::Queue { + head: atomics::INIT_ATOMIC_UINT, + tail: 0 as *mut q::Node, + stub: q::DummyNode { + next: atomics::INIT_ATOMIC_UINT, + } + } +}; + +impl StaticMutex { + /// Attempts to grab this lock, see `Mutex::try_lock` + pub fn try_lock<'a>(&'a mut self) -> Option> { + if unsafe { self.lock.trylock() } { + self.held.store(true, atomics::Release); // see below + Some(Guard{ lock: self }) + } else { + None + } + } + + /// Acquires this lock, see `Mutex::lock` + pub fn lock<'a>(&'a mut self) -> Guard<'a> { + // Remember that an explicit goal of these mutexes is to be "just as + // fast" as pthreads. Note that at some point our implementation + // requires an answer to the question "can we block" and implies a hit + // to OS TLS. In attempt to avoid this hit and to maintain efficiency in + // the uncontended case (very important) we start off by hitting a + // trylock on the OS mutex. If we succeed, then we're lucky! + if unsafe { self.lock.trylock() } { + self.held.store(true, atomics::Release); // see below + return Guard{ lock: self } + } + + let t: ~Task = Local::take(); + if t.can_block() { + // Tasks which can block are super easy. These tasks just accept the + // TLS hit we just made, and then call the blocking `lock()` + // function. Turns out the TLS hit is essentially 0 on contention. + Local::put(t); + unsafe { self.lock.lock(); } + self.held.store(true, atomics::Release); // see below + } else { + // And here's where we come to the "fun part" of this + // implementation. Contention with a green task is fairly difficult + // to resolve. The goal here is to push ourselves onto the internal + // queue, but still be able to "cancel" our enqueue in case the lock + // was dropped while we were doing our business. + // + // The pseudocode for this is: + // + // let mut node = ...; + // push(node) + // if trylock() { + // wakeup(pop()) + // } else { + // node.sleep() + // } + // + // And the pseudocode for the wakeup protocol is: + // + // match pop() { + // Some(node) => node.wakeup(), + // None => lock.unlock() + // } + // + // Note that a contended green thread does *not* re-acquire the + // mutex because ownership was silently transferred to it. You'll + // note a fairly large race condition here, which is that whenever + // the OS mutex is unlocked, "just before" it's unlocked a green + // thread can fly in and block itself. This turns out to be a + // fundamental problem with any sort of attempt to arbitrate among + // the unlocker and a locking green thread. + // + // One possible solution for this is to attempt to re-acquire the + // lock during the unlock procedure. This is less than ideal, + // however, because it means that the memory of a mutex must be + // guaranteed to be valid until *all unlocks* have returned. That's + // normally the job of the mutex itself, so it can be seen that + // touching a mutex again after it has been unlocked is an unwise + // decision. + // + // Another alternative solution (and the one implemented) is more + // distasteful, but functional. You'll notice that the struct + // definition has a `held` flag, which is impossible to maintain + // atomically. For our usage, the flag is set to `true` immediately + // after a mutex is acquired and set to `false` at the *beginning* + // of an unlock. + // + // By doing this, we're essentially instructing green threads to + // "please spin" while another thread is in the middle of performing + // the unlock procedure. Again, this is distasteful, but within the + // constraints that we're working in I found it difficult to think + // of other courses of action. + let mut node = q::Node::new(0); + t.deschedule(1, |task| { + unsafe { + node.data = task.cast_to_uint(); + self.q.push(&mut node); + } + + let mut stolen = false; + // Spinloop attempting to grab a mutex while someone's unlocking + // the mutex. While it's not held and we fail the trylock, the + // best thing we can do is hope that our yield will run the + // unlocker before us (note that bounded waiting is shattered + // here for green threads). + while !self.held.load(atomics::SeqCst) { + if unsafe { self.lock.trylock() } { + self.held.store(true, atomics::Release); + stolen = true; + break + } else { + Thread::yield_now(); + } + } + + // If we managed to steal the lock, then we need to wake up a + // thread. Note that we may not have acquired the mutex for + // ourselves (we're not guaranteed to be the head of the queue). + // The good news is that we *are* guaranteed to have a non-empty + // queue. This is because if we acquired the mutex no one could + // have transferred it to us (hence our own node must still be + // on the queue). + // + // The queue itself can return `None` from a pop when there's + // data on the queue (a known limitation of the queue), so here + // you'll find the second spin loop (which is in theory even + // rarer than the one above). + if stolen { + let locker; + loop { + match unsafe { self.q.pop() } { + Some(t) => { locker = t; break } + None => Thread::yield_now() + } + } + Err(unsafe { BlockedTask::cast_from_uint((*locker).data) }) + } else { + Ok(()) + } + }); + assert!(self.held.load(atomics::SeqCst)); + } + + Guard { lock: self } + } + + fn unlock(&mut self) { + // As documented above, we *initially* flag our mutex as unlocked in + // order to allow green threads just starting to block to realize that + // they shouldn't completely block. + assert!(self.held.load(atomics::SeqCst)); + self.held.store(false, atomics::Release); + + // Remember that the queues we are using may return None when there is + // indeed data on the queue. In this case, we can just safely ignore it. + // The reason for this ignorance is that a value of `None` with data on + // the queue means that the "head popper" hasn't finished yet. We've + // already flagged our mutex as acquire-able, so the "head popper" will + // see this and attempt to grab the mutex (or someone else will steal it + // and this whole process will begin anew). + match unsafe { self.q.pop() } { + Some(t) => { + self.held.store(true, atomics::Release); + let task = unsafe { BlockedTask::cast_from_uint((*t).data) }; + task.wake().map(|t| t.reawaken()); + } + None => unsafe { self.lock.unlock() } + } + } + + /// Deallocates resources associated with this static mutex. + /// + /// This method is unsafe because it provides no guarantees that there are + /// no active users of this mutex, and safety is not guaranteed if there are + /// active users of this mutex. + /// + /// This method is required to ensure that there are no memory leaks on + /// *all* platforms. It may be the case that some platforms do not leak + /// memory if this method is not called, but this is not guaranteed to be + /// true on all platforms. + pub unsafe fn destroy(&mut self) { + self.lock.destroy() + } +} + +impl Mutex { + /// Creates a new mutex in an unlocked state ready for use. + pub fn new() -> Mutex { + Mutex { + lock: StaticMutex { + held: atomics::AtomicBool::new(false), + q: q::Queue::new(), + lock: unsafe { mutex::Mutex::new() }, + } + } + } + + /// Attempts to acquire this lock. + /// + /// If the lock could not be acquired at this time, then `None` is returned. + /// Otherwise, an RAII guard is returned. The lock will be unlocked when the + /// guard is dropped. + /// + /// This function does not block. + pub fn try_lock<'a>(&'a mut self) -> Option> { + self.lock.try_lock() + } + + /// Acquires a mutex, blocking the current task until it is able to do so. + /// + /// This function will block the local task until it is availble to acquire + /// the mutex. Upon returning, the task is the only task with the mutex + /// held. An RAII guard is returned to allow scoped unlock of the lock. When + /// the guard goes out of scope, the mutex will be unlocked. + pub fn lock<'a>(&'a mut self) -> Guard<'a> { self.lock.lock() } +} + +#[unsafe_destructor] +impl<'a> Drop for Guard<'a> { + #[inline] + fn drop(&mut self) { + self.lock.unlock(); + } +} + +impl Drop for Mutex { + fn drop(&mut self) { + // This is actually safe b/c we know that there is no further usage of + // this mutex (it's up to the user to arrange for a mutex to get + // dropped, that's not our job) + unsafe { self.lock.destroy() } + } +} + +#[cfg(test)] +mod test { + extern mod native; + use super::{Mutex, StaticMutex, MUTEX_INIT}; + + #[test] + fn smoke() { + let mut m = Mutex::new(); + drop(m.lock()); + drop(m.lock()); + } + + #[test] + fn smoke_static() { + static mut m: StaticMutex = MUTEX_INIT; + unsafe { + drop(m.lock()); + drop(m.lock()); + m.destroy(); + } + } + + #[test] + fn lots_and_lots() { + static mut m: StaticMutex = MUTEX_INIT; + static mut CNT: uint = 0; + static M: uint = 1000; + static N: uint = 3; + + fn inc() { + for _ in range(0, M) { + unsafe { + let _g = m.lock(); + CNT += 1; + } + } + } + + let (p, c) = SharedChan::new(); + for _ in range(0, N) { + let c2 = c.clone(); + do native::task::spawn { inc(); c2.send(()); } + let c2 = c.clone(); + do spawn { inc(); c2.send(()); } + } + + drop(c); + for _ in range(0, 2 * N) { + p.recv(); + } + assert_eq!(unsafe {CNT}, M * N * 2); + unsafe { + m.destroy(); + } + } + + #[test] + fn trylock() { + let mut m = Mutex::new(); + assert!(m.try_lock().is_some()); + } + + #[test] #[should_fail] + fn double_lock() { + static mut m: StaticMutex = MUTEX_INIT; + let _g = m.lock(); + m.lock(); + } +} From 3c121a5b9f150d66424be31692fbcd8a3ac5fc29 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Thu, 16 Jan 2014 19:57:59 -0800 Subject: [PATCH 07/13] extra: Re-add the Once primitve to extra::sync This originally lived in std::unstable::mutex, but now it has a new home (and a more proper one). --- src/libextra/sync/one.rs | 168 ++++++++++++++++++++++++++++++ src/libnative/io/net.rs | 22 ++-- src/librustc/back/link.rs | 2 +- src/librustc/middle/trans/base.rs | 2 +- 4 files changed, 184 insertions(+), 10 deletions(-) create mode 100644 src/libextra/sync/one.rs diff --git a/src/libextra/sync/one.rs b/src/libextra/sync/one.rs new file mode 100644 index 0000000000000..1335be9b48615 --- /dev/null +++ b/src/libextra/sync/one.rs @@ -0,0 +1,168 @@ +// Copyright 2014 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +//! A "once initialization" primitive +//! +//! This primitive is meant to be used to run one-time initialization. An +//! example use case would be for initializing an FFI library. + +use std::int; +use std::sync::atomics; +use sync::mutex::{StaticMutex, MUTEX_INIT}; + +/// A type which can be used to run a one-time global initialization. This type +/// is *unsafe* to use because it is built on top of the `Mutex` in this module. +/// It does not know whether the currently running task is in a green or native +/// context, and a blocking mutex should *not* be used under normal +/// circumstances on a green task. +/// +/// Despite its unsafety, it is often useful to have a one-time initialization +/// routine run for FFI bindings or related external functionality. This type +/// can only be statically constructed with the `ONCE_INIT` value. +/// +/// # Example +/// +/// ```rust +/// use std::unstable::mutex::{Once, ONCE_INIT}; +/// +/// static mut START: Once = ONCE_INIT; +/// unsafe { +/// START.doit(|| { +/// // run initialization here +/// }); +/// } +/// ``` +pub struct Once { + priv mutex: StaticMutex, + priv cnt: atomics::AtomicInt, + priv lock_cnt: atomics::AtomicInt, +} + +/// Initialization value for static `Once` values. +pub static ONCE_INIT: Once = Once { + mutex: MUTEX_INIT, + cnt: atomics::INIT_ATOMIC_INT, + lock_cnt: atomics::INIT_ATOMIC_INT, +}; + +impl Once { + /// Perform an initialization routine once and only once. The given closure + /// will be executed if this is the first time `doit` has been called, and + /// otherwise the routine will *not* be invoked. + /// + /// This method will block the calling *os thread* if another initialization + /// routine is currently running. + /// + /// When this function returns, it is guaranteed that some initialization + /// has run and completed (it may not be the closure specified). + pub fn doit(&mut self, f: ||) { + // Implementation-wise, this would seem like a fairly trivial primitive. + // The stickler part is where our mutexes currently require an + // allocation, and usage of a `Once` should't leak this allocation. + // + // This means that there must be a deterministic destroyer of the mutex + // contained within (because it's not needed after the initialization + // has run). + // + // The general scheme here is to gate all future threads once + // initialization has completed with a "very negative" count, and to + // allow through threads to lock the mutex if they see a non negative + // count. For all threads grabbing the mutex, exactly one of them should + // be responsible for unlocking the mutex, and this should only be done + // once everyone else is done with the mutex. + // + // This atomicity is achieved by swapping a very negative value into the + // shared count when the initialization routine has completed. This will + // read the number of threads which will at some point attempt to + // acquire the mutex. This count is then squirreled away in a separate + // variable, and the last person on the way out of the mutex is then + // responsible for destroying the mutex. + // + // It is crucial that the negative value is swapped in *after* the + // initialization routine has completed because otherwise new threads + // calling `doit` will return immediately before the initialization has + // completed. + + let prev = self.cnt.fetch_add(1, atomics::SeqCst); + if prev < 0 { + // Make sure we never overflow, we'll never have int::min_value + // simultaneous calls to `doit` to make this value go back to 0 + self.cnt.store(int::min_value, atomics::SeqCst); + return + } + + // If the count is negative, then someone else finished the job, + // otherwise we run the job and record how many people will try to grab + // this lock + { + let _guard = self.mutex.lock(); + if self.cnt.load(atomics::SeqCst) > 0 { + f(); + let prev = self.cnt.swap(int::min_value, atomics::SeqCst); + self.lock_cnt.store(prev, atomics::SeqCst); + } + } + + // Last one out cleans up after everyone else, no leaks! + if self.lock_cnt.fetch_add(-1, atomics::SeqCst) == 1 { + unsafe { self.mutex.destroy() } + } + } +} + +#[cfg(test)] +mod test { + use super::{ONCE_INIT, Once}; + use std::task; + + #[test] + fn smoke_once() { + static mut o: Once = ONCE_INIT; + let mut a = 0; + unsafe { o.doit(|| a += 1); } + assert_eq!(a, 1); + unsafe { o.doit(|| a += 1); } + assert_eq!(a, 1); + } + + #[test] + fn stampede_once() { + static mut o: Once = ONCE_INIT; + static mut run: bool = false; + + let (p, c) = SharedChan::new(); + for _ in range(0, 10) { + let c = c.clone(); + do spawn { + for _ in range(0, 4) { task::deschedule() } + unsafe { + o.doit(|| { + assert!(!run); + run = true; + }); + assert!(run); + } + c.send(()); + } + } + + unsafe { + o.doit(|| { + assert!(!run); + run = true; + }); + assert!(run); + } + + for _ in range(0, 10) { + p.recv(); + } + } +} diff --git a/src/libnative/io/net.rs b/src/libnative/io/net.rs index 9be4247b05667..8b7559a5ffb93 100644 --- a/src/libnative/io/net.rs +++ b/src/libnative/io/net.rs @@ -201,14 +201,20 @@ pub fn init() { } unsafe { - use std::unstable::mutex::{Once, ONCE_INIT}; - static mut INIT: Once = ONCE_INIT; - INIT.doit(|| { - let mut data: WSADATA = intrinsics::init(); - let ret = WSAStartup(0x202, // version 2.2 - &mut data); - assert_eq!(ret, 0); - }); + use std::unstable::mutex::{Mutex, MUTEX_INIT}; + static mut INITIALIZED: bool = false; + static mut LOCK: Mutex = MUTEX_INIT; + unsafe { + LOCK.lock(); + if !INITIALIZED { + let mut data: WSADATA = intrinsics::init(); + let ret = WSAStartup(0x202, // version 2.2 + &mut data); + assert_eq!(ret, 0); + INITIALIZED = true; + } + LOCK.unlock(); + } } } diff --git a/src/librustc/back/link.rs b/src/librustc/back/link.rs index a2bfa67b74db0..326d598433f6a 100644 --- a/src/librustc/back/link.rs +++ b/src/librustc/back/link.rs @@ -329,7 +329,7 @@ pub mod write { } unsafe fn configure_llvm(sess: Session) { - use std::unstable::mutex::{Once, ONCE_INIT}; + use extra::sync::one::{Once, ONCE_INIT}; static mut INIT: Once = ONCE_INIT; // Copy what clang does by turning on loop vectorization at O2 and diff --git a/src/librustc/middle/trans/base.rs b/src/librustc/middle/trans/base.rs index 9744c395b7c19..f4e13fb050a68 100644 --- a/src/librustc/middle/trans/base.rs +++ b/src/librustc/middle/trans/base.rs @@ -2721,7 +2721,7 @@ pub fn trans_crate(sess: session::Session, output: &Path) -> CrateTranslation { // Before we touch LLVM, make sure that multithreading is enabled. unsafe { - use std::unstable::mutex::{Once, ONCE_INIT}; + use extra::sync::one::{Once, ONCE_INIT}; static mut INIT: Once = ONCE_INIT; static mut POISONED: bool = false; INIT.doit(|| { From 55111cf1415981470e3dbe4224dfefb5c2debcad Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Thu, 16 Jan 2014 19:58:42 -0800 Subject: [PATCH 08/13] std: Remove try_send_deferred plus all fallout Now that extra::sync primitives are built on a proper mutex instead of a pthreads one, there's no longer any use for this function. --- src/libgreen/simple.rs | 3 ++- src/libgreen/task.rs | 13 +++++-------- src/libnative/task.rs | 4 +++- src/librustuv/idle.rs | 2 +- src/librustuv/lib.rs | 2 +- src/librustuv/queue.rs | 2 +- src/librustuv/timer.rs | 2 +- src/libstd/comm/mod.rs | 20 +++++++------------- src/libstd/rt/mod.rs | 3 ++- src/libstd/rt/task.rs | 10 ++++++++-- 10 files changed, 31 insertions(+), 30 deletions(-) diff --git a/src/libgreen/simple.rs b/src/libgreen/simple.rs index 4a0523fe47a7a..8db95f55d18db 100644 --- a/src/libgreen/simple.rs +++ b/src/libgreen/simple.rs @@ -54,7 +54,7 @@ impl Runtime for SimpleTask { } Local::put(cur_task); } - fn reawaken(mut ~self, mut to_wake: ~Task, _can_resched: bool) { + fn reawaken(mut ~self, mut to_wake: ~Task) { let me = &mut *self as *mut SimpleTask; to_wake.put_runtime(self as ~Runtime); unsafe { @@ -76,6 +76,7 @@ impl Runtime for SimpleTask { } fn local_io<'a>(&'a mut self) -> Option> { None } fn stack_bounds(&self) -> (uint, uint) { fail!() } + fn can_block(&self) -> bool { true } fn wrap(~self) -> ~Any { fail!() } } diff --git a/src/libgreen/task.rs b/src/libgreen/task.rs index 31752941231cb..1c451435844e6 100644 --- a/src/libgreen/task.rs +++ b/src/libgreen/task.rs @@ -376,7 +376,7 @@ impl Runtime for GreenTask { } } - fn reawaken(mut ~self, to_wake: ~Task, can_resched: bool) { + fn reawaken(mut ~self, to_wake: ~Task) { self.put_task(to_wake); assert!(self.sched.is_none()); @@ -409,15 +409,10 @@ impl Runtime for GreenTask { match running_task.maybe_take_runtime::() { Some(mut running_green_task) => { running_green_task.put_task(running_task); - let mut sched = running_green_task.sched.take_unwrap(); + let sched = running_green_task.sched.take_unwrap(); if sched.pool_id == self.pool_id { - if can_resched { - sched.run_task(running_green_task, self); - } else { - sched.enqueue_task(self); - running_green_task.put_with_sched(sched); - } + sched.run_task(running_green_task, self); } else { self.reawaken_remotely(); @@ -462,6 +457,8 @@ impl Runtime for GreenTask { c.current_stack_segment.end() as uint) } + fn can_block(&self) -> bool { false } + fn wrap(~self) -> ~Any { self as ~Any } } diff --git a/src/libnative/task.rs b/src/libnative/task.rs index d2f68c4ef681d..a3f8b7f84fb5f 100644 --- a/src/libnative/task.rs +++ b/src/libnative/task.rs @@ -142,6 +142,8 @@ impl rt::Runtime for Ops { fn stack_bounds(&self) -> (uint, uint) { self.stack_bounds } + fn can_block(&self) -> bool { true } + // This function gets a little interesting. There are a few safety and // ownership violations going on here, but this is all done in the name of // shared state. Additionally, all of the violations are protected with a @@ -230,7 +232,7 @@ impl rt::Runtime for Ops { // See the comments on `deschedule` for why the task is forgotten here, and // why it's valid to do so. - fn reawaken(mut ~self, mut to_wake: ~Task, _can_resched: bool) { + fn reawaken(mut ~self, mut to_wake: ~Task) { unsafe { let me = &mut *self as *mut Ops; to_wake.put_runtime(self as ~rt::Runtime); diff --git a/src/librustuv/idle.rs b/src/librustuv/idle.rs index 3d6e81d0d6fd5..f0691a1cb05f8 100644 --- a/src/librustuv/idle.rs +++ b/src/librustuv/idle.rs @@ -122,7 +122,7 @@ mod test { } } }; - task.wake().map(|t| t.reawaken(true)); + task.wake().map(|t| t.reawaken()); } } diff --git a/src/librustuv/lib.rs b/src/librustuv/lib.rs index 1817be8a5940b..77f4424221735 100644 --- a/src/librustuv/lib.rs +++ b/src/librustuv/lib.rs @@ -207,7 +207,7 @@ fn wait_until_woken_after(slot: *mut Option, f: ||) { fn wakeup(slot: &mut Option) { assert!(slot.is_some()); - slot.take_unwrap().wake().map(|t| t.reawaken(true)); + slot.take_unwrap().wake().map(|t| t.reawaken()); } pub struct Request { diff --git a/src/librustuv/queue.rs b/src/librustuv/queue.rs index 32f8d8532a209..4eb198340d8f3 100644 --- a/src/librustuv/queue.rs +++ b/src/librustuv/queue.rs @@ -67,7 +67,7 @@ extern fn async_cb(handle: *uvll::uv_async_t, status: c_int) { loop { match state.consumer.pop() { mpsc::Data(Task(task)) => { - task.wake().map(|t| t.reawaken(true)); + task.wake().map(|t| t.reawaken()); } mpsc::Data(Increment) => unsafe { if state.refcnt == 0 { diff --git a/src/librustuv/timer.rs b/src/librustuv/timer.rs index 4a0ad44d31147..8eda598c0ce2c 100644 --- a/src/librustuv/timer.rs +++ b/src/librustuv/timer.rs @@ -138,7 +138,7 @@ extern fn timer_cb(handle: *uvll::uv_timer_t, status: c_int) { match timer.action.take_unwrap() { WakeTask(task) => { - task.wake().map(|t| t.reawaken(true)); + task.wake().map(|t| t.reawaken()); } SendOnce(chan) => { chan.try_send(()); } SendMany(chan, id) => { diff --git a/src/libstd/comm/mod.rs b/src/libstd/comm/mod.rs index dbffb6a0fd9b2..565835f035380 100644 --- a/src/libstd/comm/mod.rs +++ b/src/libstd/comm/mod.rs @@ -435,9 +435,9 @@ impl Packet { // This function must have had at least an acquire fence before it to be // properly called. - fn wakeup(&mut self, can_resched: bool) { + fn wakeup(&mut self) { match self.to_wake.take_unwrap().wake() { - Some(task) => task.reawaken(can_resched), + Some(task) => task.reawaken(), None => {} } self.selecting.store(false, Relaxed); @@ -511,7 +511,7 @@ impl Packet { match self.channels.fetch_sub(1, SeqCst) { 1 => { match self.cnt.swap(DISCONNECTED, SeqCst) { - -1 => { self.wakeup(true); } + -1 => { self.wakeup(); } DISCONNECTED => {} n => { assert!(n >= 0); } } @@ -586,20 +586,14 @@ impl Chan { /// /// Like `send`, this method will never block. If the failure of send cannot /// be tolerated, then this method should be used instead. - pub fn try_send(&self, t: T) -> bool { self.try(t, true) } - - /// This function will not stick around for very long. The purpose of this - /// function is to guarantee that no rescheduling is performed. - pub fn try_send_deferred(&self, t: T) -> bool { self.try(t, false) } - - fn try(&self, t: T, can_resched: bool) -> bool { + pub fn try_send(&self, t: T) -> bool { unsafe { let this = cast::transmute_mut(self); this.queue.push(t); let packet = this.queue.packet(); match (*packet).increment() { // As described above, -1 == wakeup - -1 => { (*packet).wakeup(can_resched); true } + -1 => { (*packet).wakeup(); true } // Also as above, SPSC queues must be >= -2 -2 => true, // We succeeded if we sent data @@ -614,7 +608,7 @@ impl Chan { // the TLS overhead can be a bit much. n => { assert!(n >= 0); - if can_resched && n > 0 && n % RESCHED_FREQ == 0 { + if n > 0 && n % RESCHED_FREQ == 0 { let task: ~Task = Local::take(); task.maybe_yield(); } @@ -690,7 +684,7 @@ impl SharedChan { match (*packet).increment() { DISCONNECTED => {} // oh well, we tried - -1 => { (*packet).wakeup(true); } + -1 => { (*packet).wakeup(); } n => { if n > 0 && n % RESCHED_FREQ == 0 { let task: ~Task = Local::take(); diff --git a/src/libstd/rt/mod.rs b/src/libstd/rt/mod.rs index 56f6c8f8e6c5b..6c4730d69b21f 100644 --- a/src/libstd/rt/mod.rs +++ b/src/libstd/rt/mod.rs @@ -146,7 +146,7 @@ pub trait Runtime { fn maybe_yield(~self, cur_task: ~Task); fn deschedule(~self, times: uint, cur_task: ~Task, f: |BlockedTask| -> Result<(), BlockedTask>); - fn reawaken(~self, to_wake: ~Task, can_resched: bool); + fn reawaken(~self, to_wake: ~Task); // Miscellaneous calls which are very different depending on what context // you're in. @@ -154,6 +154,7 @@ pub trait Runtime { fn local_io<'a>(&'a mut self) -> Option>; /// The (low, high) edges of the current stack. fn stack_bounds(&self) -> (uint, uint); // (lo, hi) + fn can_block(&self) -> bool; // FIXME: This is a serious code smell and this should not exist at all. fn wrap(~self) -> ~Any; diff --git a/src/libstd/rt/task.rs b/src/libstd/rt/task.rs index e99e7fa4edd70..b95175e60dab7 100644 --- a/src/libstd/rt/task.rs +++ b/src/libstd/rt/task.rs @@ -250,9 +250,9 @@ impl Task { /// Wakes up a previously blocked task, optionally specifiying whether the /// current task can accept a change in scheduling. This function can only /// be called on tasks that were previously blocked in `deschedule`. - pub fn reawaken(mut ~self, can_resched: bool) { + pub fn reawaken(mut ~self) { let ops = self.imp.take_unwrap(); - ops.reawaken(self, can_resched); + ops.reawaken(self); } /// Yields control of this task to another task. This function will @@ -283,6 +283,12 @@ impl Task { pub fn stack_bounds(&self) -> (uint, uint) { self.imp.get_ref().stack_bounds() } + + /// Returns whether it is legal for this task to block the OS thread that it + /// is running on. + pub fn can_block(&self) -> bool { + self.imp.get_ref().can_block() + } } impl Drop for Task { From 6911128d150c6d92d91fbb5d28be2ed2b16cb229 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Thu, 23 Jan 2014 10:15:46 -0800 Subject: [PATCH 09/13] Merge conflicts and test fallout --- src/libnative/io/timer_helper.rs | 12 ++++++++---- src/libstd/sync/atomics.rs | 2 +- src/libstd/unstable/mutex.rs | 21 +++++++++++---------- 3 files changed, 20 insertions(+), 15 deletions(-) diff --git a/src/libnative/io/timer_helper.rs b/src/libnative/io/timer_helper.rs index 3c20d073f2913..7a9c591fc3980 100644 --- a/src/libnative/io/timer_helper.rs +++ b/src/libnative/io/timer_helper.rs @@ -22,7 +22,7 @@ use std::cast; use std::rt; -use std::unstable::mutex::{Once, ONCE_INIT}; +use std::unstable::mutex::{Mutex, MUTEX_INIT}; use bookkeeping; use io::timer::{Req, Shutdown}; @@ -37,10 +37,12 @@ static mut HELPER_CHAN: *mut SharedChan = 0 as *mut SharedChan; static mut HELPER_SIGNAL: imp::signal = 0 as imp::signal; pub fn boot(helper: fn(imp::signal, Port)) { - static mut INIT: Once = ONCE_INIT; + static mut LOCK: Mutex = MUTEX_INIT; + static mut INITIALIZED: bool = false; unsafe { - INIT.doit(|| { + LOCK.lock(); + if !INITIALIZED { let (msgp, msgc) = SharedChan::new(); HELPER_CHAN = cast::transmute(~msgc); let (receive, send) = imp::new(); @@ -52,7 +54,9 @@ pub fn boot(helper: fn(imp::signal, Port)) { } rt::at_exit(proc() { shutdown() }); - }) + INITIALIZED = true; + } + LOCK.unlock(); } } diff --git a/src/libstd/sync/atomics.rs b/src/libstd/sync/atomics.rs index 6ce285152dea8..fb62bed9ed0ae 100644 --- a/src/libstd/sync/atomics.rs +++ b/src/libstd/sync/atomics.rs @@ -384,7 +384,7 @@ impl AtomicOption { } #[cfg(stage0)] - pub fn empty() -> AtomicOption { AtomicOption { p: 0 as *mut c_void } } + pub fn empty() -> AtomicOption { AtomicOption { p: 0 as *mut u8 } } #[cfg(not(stage0))] pub fn empty() -> AtomicOption { AtomicOption { p: 0 } } diff --git a/src/libstd/unstable/mutex.rs b/src/libstd/unstable/mutex.rs index 7d013c5fa7478..9f48b61b70a44 100644 --- a/src/libstd/unstable/mutex.rs +++ b/src/libstd/unstable/mutex.rs @@ -190,29 +190,29 @@ mod imp { impl Mutex { pub unsafe fn new() -> Mutex { - let m = Mutex { + let mut m = Mutex { lock: intrinsics::init(), cond: intrinsics::init(), }; - pthread_mutex_init(&m.lock, 0 as *libc::c_void); - pthread_cond_init(&m.cond, 0 as *libc::c_void); + pthread_mutex_init(&mut m.lock, 0 as *libc::c_void); + pthread_cond_init(&mut m.cond, 0 as *libc::c_void); return m; } - pub unsafe fn lock(&mut self) { pthread_mutex_lock(&self.lock); } - pub unsafe fn unlock(&mut self) { pthread_mutex_unlock(&self.lock); } - pub unsafe fn signal(&mut self) { pthread_cond_signal(&self.cond); } + pub unsafe fn lock(&mut self) { pthread_mutex_lock(&mut self.lock); } + pub unsafe fn unlock(&mut self) { pthread_mutex_unlock(&mut self.lock); } + pub unsafe fn signal(&mut self) { pthread_cond_signal(&mut self.cond); } pub unsafe fn wait(&mut self) { - pthread_cond_wait(&self.cond, &self.lock); + pthread_cond_wait(&mut self.cond, &mut self.lock); } pub unsafe fn trylock(&mut self) -> bool { - pthread_mutex_trylock(&self.lock) == 0 + pthread_mutex_trylock(&mut self.lock) == 0 } pub unsafe fn destroy(&mut self) { - pthread_mutex_destroy(&self.lock); - pthread_cond_destroy(&self.cond); + pthread_mutex_destroy(&mut self.lock); + pthread_cond_destroy(&mut self.cond); } } @@ -235,6 +235,7 @@ mod imp { #[cfg(windows)] mod imp { + use rt::global_heap::malloc_raw; use libc::{HANDLE, BOOL, LPSECURITY_ATTRIBUTES, c_void, DWORD, LPCSTR}; use libc; use ptr::RawPtr; From 0515d70569bea08f65a84332609cd615d1aa663f Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Fri, 24 Jan 2014 09:59:01 -0800 Subject: [PATCH 10/13] Generate properly aligned atomic loads/stores --- src/libextra/sync/mutex.rs | 23 ++++++++--------------- src/libextra/sync/one.rs | 6 +++--- src/libnative/io/net.rs | 19 +++++++++---------- src/librustc/middle/trans/builder.rs | 11 +++++++---- src/libstd/unstable/mutex.rs | 20 ++++++++++++++++---- 5 files changed, 43 insertions(+), 36 deletions(-) diff --git a/src/libextra/sync/mutex.rs b/src/libextra/sync/mutex.rs index 3c76b7bb8a2f6..fe6d2e1b9d0d4 100644 --- a/src/libextra/sync/mutex.rs +++ b/src/libextra/sync/mutex.rs @@ -125,7 +125,7 @@ use q = sync::mpsc_intrusive; /// # Example /// /// ```rust -/// use std::sync::Mutex; +/// use extra::sync::mutex::Mutex; /// /// let mut m = Mutex::new(); /// let guard = m.lock(); @@ -154,7 +154,7 @@ pub struct Mutex { /// # Example /// /// ```rust -/// use std::sync::{StaticMutex, MUTEX_INIT}; +/// use extra::sync::mutex::{StaticMutex, MUTEX_INIT}; /// /// static mut LOCK: StaticMutex = MUTEX_INIT; /// @@ -199,7 +199,7 @@ impl StaticMutex { /// Attempts to grab this lock, see `Mutex::try_lock` pub fn try_lock<'a>(&'a mut self) -> Option> { if unsafe { self.lock.trylock() } { - self.held.store(true, atomics::Release); // see below + self.held.store(true, atomics::SeqCst); // see below Some(Guard{ lock: self }) } else { None @@ -215,7 +215,7 @@ impl StaticMutex { // the uncontended case (very important) we start off by hitting a // trylock on the OS mutex. If we succeed, then we're lucky! if unsafe { self.lock.trylock() } { - self.held.store(true, atomics::Release); // see below + self.held.store(true, atomics::SeqCst); // see below return Guard{ lock: self } } @@ -226,7 +226,7 @@ impl StaticMutex { // function. Turns out the TLS hit is essentially 0 on contention. Local::put(t); unsafe { self.lock.lock(); } - self.held.store(true, atomics::Release); // see below + self.held.store(true, atomics::SeqCst); // see below } else { // And here's where we come to the "fun part" of this // implementation. Contention with a green task is fairly difficult @@ -294,7 +294,7 @@ impl StaticMutex { // here for green threads). while !self.held.load(atomics::SeqCst) { if unsafe { self.lock.trylock() } { - self.held.store(true, atomics::Release); + self.held.store(true, atomics::SeqCst); stolen = true; break } else { @@ -338,7 +338,7 @@ impl StaticMutex { // order to allow green threads just starting to block to realize that // they shouldn't completely block. assert!(self.held.load(atomics::SeqCst)); - self.held.store(false, atomics::Release); + self.held.store(false, atomics::SeqCst); // Remember that the queues we are using may return None when there is // indeed data on the queue. In this case, we can just safely ignore it. @@ -349,7 +349,7 @@ impl StaticMutex { // and this whole process will begin anew). match unsafe { self.q.pop() } { Some(t) => { - self.held.store(true, atomics::Release); + self.held.store(true, atomics::SeqCst); let task = unsafe { BlockedTask::cast_from_uint((*t).data) }; task.wake().map(|t| t.reawaken()); } @@ -482,11 +482,4 @@ mod test { let mut m = Mutex::new(); assert!(m.try_lock().is_some()); } - - #[test] #[should_fail] - fn double_lock() { - static mut m: StaticMutex = MUTEX_INIT; - let _g = m.lock(); - m.lock(); - } } diff --git a/src/libextra/sync/one.rs b/src/libextra/sync/one.rs index 1335be9b48615..6dc1cbee6dc4e 100644 --- a/src/libextra/sync/one.rs +++ b/src/libextra/sync/one.rs @@ -92,9 +92,9 @@ impl Once { let prev = self.cnt.fetch_add(1, atomics::SeqCst); if prev < 0 { - // Make sure we never overflow, we'll never have int::min_value + // Make sure we never overflow, we'll never have int::MIN // simultaneous calls to `doit` to make this value go back to 0 - self.cnt.store(int::min_value, atomics::SeqCst); + self.cnt.store(int::MIN, atomics::SeqCst); return } @@ -105,7 +105,7 @@ impl Once { let _guard = self.mutex.lock(); if self.cnt.load(atomics::SeqCst) > 0 { f(); - let prev = self.cnt.swap(int::min_value, atomics::SeqCst); + let prev = self.cnt.swap(int::MIN, atomics::SeqCst); self.lock_cnt.store(prev, atomics::SeqCst); } } diff --git a/src/libnative/io/net.rs b/src/libnative/io/net.rs index 8b7559a5ffb93..e7b38dbd501d9 100644 --- a/src/libnative/io/net.rs +++ b/src/libnative/io/net.rs @@ -204,17 +204,16 @@ pub fn init() { use std::unstable::mutex::{Mutex, MUTEX_INIT}; static mut INITIALIZED: bool = false; static mut LOCK: Mutex = MUTEX_INIT; - unsafe { - LOCK.lock(); - if !INITIALIZED { - let mut data: WSADATA = intrinsics::init(); - let ret = WSAStartup(0x202, // version 2.2 - &mut data); - assert_eq!(ret, 0); - INITIALIZED = true; - } - LOCK.unlock(); + + LOCK.lock(); + if !INITIALIZED { + let mut data: WSADATA = intrinsics::init(); + let ret = WSAStartup(0x202, // version 2.2 + &mut data); + assert_eq!(ret, 0); + INITIALIZED = true; } + LOCK.unlock(); } } diff --git a/src/librustc/middle/trans/builder.rs b/src/librustc/middle/trans/builder.rs index c4beb935ffecf..67425ef62bdc2 100644 --- a/src/librustc/middle/trans/builder.rs +++ b/src/librustc/middle/trans/builder.rs @@ -15,7 +15,7 @@ use lib::llvm::{Opcode, IntPredicate, RealPredicate, False}; use lib::llvm::{ValueRef, BasicBlockRef, BuilderRef, ModuleRef}; use middle::trans::base; use middle::trans::common::*; -use middle::trans::machine::llalign_of_min; +use middle::trans::machine::llalign_of_pref; use middle::trans::type_::Type; use std::cast; use std::hashmap::HashMap; @@ -461,8 +461,10 @@ impl Builder { pub fn atomic_load(&self, ptr: ValueRef, order: AtomicOrdering) -> ValueRef { self.count_insn("load.atomic"); unsafe { - let align = llalign_of_min(self.ccx, self.ccx.int_type); - llvm::LLVMBuildAtomicLoad(self.llbuilder, ptr, noname(), order, align as c_uint) + let ty = Type::from_ref(llvm::LLVMTypeOf(ptr)); + let align = llalign_of_pref(self.ccx, ty.element_type()); + llvm::LLVMBuildAtomicLoad(self.llbuilder, ptr, noname(), order, + align as c_uint) } } @@ -514,8 +516,9 @@ impl Builder { self.ccx.tn.val_to_str(val), self.ccx.tn.val_to_str(ptr)); self.count_insn("store.atomic"); - let align = llalign_of_min(self.ccx, self.ccx.int_type); unsafe { + let ty = Type::from_ref(llvm::LLVMTypeOf(ptr)); + let align = llalign_of_pref(self.ccx, ty.element_type()); llvm::LLVMBuildAtomicStore(self.llbuilder, val, ptr, order, align as c_uint); } } diff --git a/src/libstd/unstable/mutex.rs b/src/libstd/unstable/mutex.rs index 9f48b61b70a44..8bb4dc7f3041d 100644 --- a/src/libstd/unstable/mutex.rs +++ b/src/libstd/unstable/mutex.rs @@ -145,7 +145,6 @@ mod imp { } #[cfg(target_os = "linux")] - #[cfg(target_os = "android")] mod os { use libc; @@ -177,6 +176,20 @@ mod imp { size: [0, ..__SIZEOF_PTHREAD_COND_T], }; } + #[cfg(target_os = "android")] + mod os { + use libc; + + pub struct pthread_mutex_t { value: libc::c_int } + pub struct pthread_cond_t { value: libc::c_int } + + pub static PTHREAD_MUTEX_INITIALIZER: pthread_mutex_t = pthread_mutex_t { + value: 0, + }; + pub static PTHREAD_COND_INITIALIZER: pthread_cond_t = pthread_cond_t { + value: 0, + }; + } pub struct Mutex { priv lock: pthread_mutex_t, @@ -238,11 +251,10 @@ mod imp { use rt::global_heap::malloc_raw; use libc::{HANDLE, BOOL, LPSECURITY_ATTRIBUTES, c_void, DWORD, LPCSTR}; use libc; - use ptr::RawPtr; use ptr; use sync::atomics; - type LPCRITICAL_SECTION = *c_void; + type LPCRITICAL_SECTION = *mut c_void; static SPIN_COUNT: DWORD = 4000; #[cfg(target_arch = "x86")] static CRIT_SECTION_SIZE: uint = 24; @@ -325,7 +337,7 @@ mod imp { } pub unsafe fn init_lock() -> uint { - let block = malloc_raw(CRIT_SECTION_SIZE as uint) as *c_void; + let block = malloc_raw(CRIT_SECTION_SIZE as uint) as *mut c_void; InitializeCriticalSectionAndSpinCount(block, SPIN_COUNT); return block as uint; } From e51f673b5a3eb3f3976f47a0fbd53c249c6f3414 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Tue, 21 Jan 2014 23:15:43 -0800 Subject: [PATCH 11/13] extra: Make the mutex a little more fair The comments are in the commit itself, but the general idea is to have green threads relinquish control to native blockers and to have native blockers relinquish control to green threads. This should help keep things a little more fair instead of just always favoring green threads. --- src/libextra/sync/mutex.rs | 43 +++++++++++++++++++++++++++++++++++--- 1 file changed, 40 insertions(+), 3 deletions(-) diff --git a/src/libextra/sync/mutex.rs b/src/libextra/sync/mutex.rs index fe6d2e1b9d0d4..79113896abfb1 100644 --- a/src/libextra/sync/mutex.rs +++ b/src/libextra/sync/mutex.rs @@ -96,9 +96,12 @@ // // As you'll find out in the implementation, this approach cannot be fair to // native and green threads. In order to soundly drain the internal queue of -// green threads, they *must* be favored over native threads. It was an explicit -// non-goal of these mutexes to be completely fair to everyone, so this has been -// deemed acceptable. +// green threads, they *must* be favored over native threads. In order to +// provide a little bit of fairness, green threads locking the mutex will +// attempt to relinquish the mutex to native lockers. This should have a "ping +// pong" kind of effect where blocking threads prioritize relinquishing to green +// lockers and green lockers prioritize relinquishing the lock to native +// lockers. // // This is the high-level implementation of the mutexes, but the nitty gritty // details can be found in the code below. @@ -173,6 +176,13 @@ pub struct StaticMutex { /// thinking "this is impossible to manage atomically," and you would be /// correct! Keep on reading! priv held: atomics::AtomicBool, + /// Flag as to whether the current locker of the mutex was a blocking locker + /// or a nonblocking locker. This is used to determine who should get woken + /// up next. + priv is_blocking_locker: bool, + /// Speculative count of the number of threads which are blocked waiting for + /// the mutex. + priv blocking_cnt: atomics::AtomicUint, } /// An RAII implementation of a "scoped lock" of a mutex. When this structure is @@ -186,6 +196,8 @@ pub struct Guard<'a> { pub static MUTEX_INIT: StaticMutex = StaticMutex { lock: mutex::MUTEX_INIT, held: atomics::INIT_ATOMIC_BOOL, + is_blocking_locker: false, + blocking_cnt: atomics::INIT_ATOMIC_UINT, q: q::Queue { head: atomics::INIT_ATOMIC_UINT, tail: 0 as *mut q::Node, @@ -224,9 +236,16 @@ impl StaticMutex { // Tasks which can block are super easy. These tasks just accept the // TLS hit we just made, and then call the blocking `lock()` // function. Turns out the TLS hit is essentially 0 on contention. + // + // We keep a count of tasks blocked in 'lock' to help provide a + // little fairness on unlocking by having nonblocking lockers + // relinquish the mutex to blocking lockers. Local::put(t); + self.blocking_cnt.fetch_add(1, atomics::SeqCst); unsafe { self.lock.lock(); } self.held.store(true, atomics::SeqCst); // see below + self.blocking_cnt.fetch_sub(1, atomics::SeqCst); + self.is_blocking_locker = true; } else { // And here's where we come to the "fun part" of this // implementation. Contention with a green task is fairly difficult @@ -328,12 +347,28 @@ impl StaticMutex { } }); assert!(self.held.load(atomics::SeqCst)); + self.is_blocking_locker = false; } Guard { lock: self } } fn unlock(&mut self) { + // First, if we were *not* a blocking locker, and there are some + // blocking threads waiting, then we attempt to relinquish the lock to a + // blocking locker. This helps provide a little fairness to those who + // are blocking. + // + // Note that we do *not* set 'held' to false in order to prevent green + // threads from spinning unnecessarily. We're also guaranteed that a + // native thread will grab this lock and then later attempt to empty the + // green queue (which is why we don't check the queue). + if !self.is_blocking_locker && + self.blocking_cnt.load(atomics::SeqCst) > 0 { + unsafe { self.lock.unlock() } + return; + } + // As documented above, we *initially* flag our mutex as unlocked in // order to allow green threads just starting to block to realize that // they shouldn't completely block. @@ -380,6 +415,8 @@ impl Mutex { held: atomics::AtomicBool::new(false), q: q::Queue::new(), lock: unsafe { mutex::Mutex::new() }, + is_blocking_locker: false, + blocking_cnt: atomics::AtomicUint::new(0), } } } From 8c275b100ff4e7f38b5869f1356526fd8da73e13 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Mon, 27 Jan 2014 09:37:02 -0800 Subject: [PATCH 12/13] Change Relaxed operations to SeqCst on an os mutex This is becoming our general policy until we have a better understanding of these orderings. --- src/libstd/unstable/mutex.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/libstd/unstable/mutex.rs b/src/libstd/unstable/mutex.rs index 8bb4dc7f3041d..e36049cd3c1f8 100644 --- a/src/libstd/unstable/mutex.rs +++ b/src/libstd/unstable/mutex.rs @@ -301,14 +301,14 @@ mod imp { /// that no other thread is currently holding the lock or waiting on the /// condition variable contained inside. pub unsafe fn destroy(&mut self) { - let lock = self.lock.swap(0, atomics::Relaxed); - let cond = self.cond.swap(0, atomics::Relaxed); + let lock = self.lock.swap(0, atomics::SeqCst); + let cond = self.cond.swap(0, atomics::SeqCst); if lock != 0 { free_lock(lock) } if cond != 0 { free_cond(cond) } } unsafe fn getlock(&mut self) -> *mut c_void { - match self.lock.load(atomics::Relaxed) { + match self.lock.load(atomics::SeqCst) { 0 => {} n => return n as *mut c_void } @@ -318,11 +318,11 @@ mod imp { _ => {} } free_lock(lock); - return self.lock.load(atomics::Relaxed) as *mut c_void; + return self.lock.load(atomics::SeqCst) as *mut c_void; } unsafe fn getcond(&mut self) -> *mut c_void { - match self.cond.load(atomics::Relaxed) { + match self.cond.load(atomics::SeqCst) { 0 => {} n => return n as *mut c_void } @@ -332,7 +332,7 @@ mod imp { _ => {} } free_cond(cond); - return self.cond.load(atomics::Relaxed) as *mut c_void; + return self.cond.load(atomics::SeqCst) as *mut c_void; } } From f4f371363a52f9d109b95a740932975c21187433 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Mon, 27 Jan 2014 09:43:01 -0800 Subject: [PATCH 13/13] Work around recursive locking on windows mutexes It turns out that windows cricital sections will succeed in EnterCriticalSection or TryEnterCriticalSection if the calling thread already owns the mutex, but this is obviously bad for us because there could be two contenting green threads on the same os thread. With some extra checks of `self.held` in try_lock(), I believe that we can circumvent this limitation with minimal impact to the mutex. --- src/libextra/sync/mutex.rs | 43 +++++++++++++++++++++++++------------- 1 file changed, 28 insertions(+), 15 deletions(-) diff --git a/src/libextra/sync/mutex.rs b/src/libextra/sync/mutex.rs index 79113896abfb1..4dbe3c67de83e 100644 --- a/src/libextra/sync/mutex.rs +++ b/src/libextra/sync/mutex.rs @@ -210,9 +210,19 @@ pub static MUTEX_INIT: StaticMutex = StaticMutex { impl StaticMutex { /// Attempts to grab this lock, see `Mutex::try_lock` pub fn try_lock<'a>(&'a mut self) -> Option> { + // Turns out windows mutexes allow for recursive locking, meaning that + // the same thread will succeed in the trylock if it previously had the + // mutex. This is obviously bad for green threads, so we're forced to do + // a check after the trylock succeeds to whether we should have actually + // grabbed the lock. if unsafe { self.lock.trylock() } { - self.held.store(true, atomics::SeqCst); // see below - Some(Guard{ lock: self }) + if cfg!(windows) && self.held.load(atomics::SeqCst) { + unsafe { self.lock.unlock(); } + None + } else { + self.held.store(true, atomics::SeqCst); + Some(Guard{ lock: self }) + } } else { None } @@ -226,9 +236,16 @@ impl StaticMutex { // to OS TLS. In attempt to avoid this hit and to maintain efficiency in // the uncontended case (very important) we start off by hitting a // trylock on the OS mutex. If we succeed, then we're lucky! + // + // Also, see the comment above in try_lock for why we check `self.held` + // after we grab the lock, but only on windows. if unsafe { self.lock.trylock() } { - self.held.store(true, atomics::SeqCst); // see below - return Guard{ lock: self } + if cfg!(windows) && self.held.load(atomics::SeqCst) { + unsafe { self.lock.unlock(); } + } else { + self.held.store(true, atomics::SeqCst); + return Guard{ lock: self } + } } let t: ~Task = Local::take(); @@ -313,6 +330,7 @@ impl StaticMutex { // here for green threads). while !self.held.load(atomics::SeqCst) { if unsafe { self.lock.trylock() } { + assert!(!self.held.load(atomics::SeqCst)); self.held.store(true, atomics::SeqCst); stolen = true; break @@ -354,27 +372,22 @@ impl StaticMutex { } fn unlock(&mut self) { + // As documented above, we *initially* flag our mutex as unlocked in + // order to allow green threads just starting to block to realize that + // they shouldn't completely block. + assert!(self.held.load(atomics::SeqCst)); + self.held.store(false, atomics::SeqCst); + // First, if we were *not* a blocking locker, and there are some // blocking threads waiting, then we attempt to relinquish the lock to a // blocking locker. This helps provide a little fairness to those who // are blocking. - // - // Note that we do *not* set 'held' to false in order to prevent green - // threads from spinning unnecessarily. We're also guaranteed that a - // native thread will grab this lock and then later attempt to empty the - // green queue (which is why we don't check the queue). if !self.is_blocking_locker && self.blocking_cnt.load(atomics::SeqCst) > 0 { unsafe { self.lock.unlock() } return; } - // As documented above, we *initially* flag our mutex as unlocked in - // order to allow green threads just starting to block to realize that - // they shouldn't completely block. - assert!(self.held.load(atomics::SeqCst)); - self.held.store(false, atomics::SeqCst); - // Remember that the queues we are using may return None when there is // indeed data on the queue. In this case, we can just safely ignore it. // The reason for this ignorance is that a value of `None` with data on