1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980(* A lock-free multi-producer, single-consumer, thread-safe queue without support for cancellation.
This makes a good data structure for a scheduler's run queue.
Based on Vesa Karvonen's examaple at:
https://github.com/ocaml-multicore/picos/blob/07d6c2d391e076b490098c0379d01208b3a9cc96/test/lib/foundation/mpsc_queue.ml
*)exceptionClosed(* A list where the end indicates whether the queue is closed. *)type'aclist=|(::)of'a*'aclist|Open|Closed(* [rev_append l1 l2] is like [rev l1 @ l2] *)letrecrev_appendl1l2=matchl1with|a::l->rev_appendl(a::l2)|Open->l2|Closed->assertfalselet[@tail_mod_cons]rec(@)l1l2=matchl1with|h1::tl->h1::(tl@l2)|Open->l2|Closed->assertfalse(* The queue contains [head @ rev tail].
If [tail] is non-empty then it ends in [Open]. *)type'at={mutablehead:'aclist;tail:'aclistAtomic.t;}letrecpushtx=matchAtomic.gett.tailwith|Closed->raiseClosed|before->letafter=x::beforeinifnot(Atomic.compare_and_sett.tailbeforeafter)thenpushtxletpush_headtx=matcht.headwith|Closed->raiseClosed|before->t.head<-x::beforeletrecpopt=matcht.headwith|x::xs->t.head<-xs;Somex|Closed->raiseClosed|Open->(* We know the tail is open because we just saw the head was open
and we don't run concurrently with [close]. *)matchAtomic.exchanget.tailOpenwith|Closed->assertfalse|Open->None(* Optimise the common case *)|tail->t.head<-rev_appendtailOpen;poptletcloset=matchAtomic.exchanget.tailClosedwith|Closed->invalid_arg"Lf_queue already closed!"|xs->t.head<-t.head@rev_appendxsClosedletis_emptyt=matcht.headwith|_::_->false|Closed->raiseClosed|Open->matchAtomic.gett.tailwith|_::_->false|_->trueletcreate()={head=Open;tail=Atomic.makeOpen;}