123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108(* See the Cells module for an overview of this system.
Each new waiter atomically increments the "suspend" pointer and writes
a callback there. The waking fiber removes all the callbacks and calls them.
In this version, "resume" never gets ahead of "suspend" (broadcasting just
brings it up-to-date with the "suspend" pointer).
When the resume fiber runs, some of the cells reserved for callbacks might
not yet have been filled. In this case, the resuming fiber just marks them
as needing to be resumed. When the suspending fiber continues, it will
notice this and continue immediately. *)moduleCell=struct(* For any given cell, there are two actors running in parallel: the
suspender and the resumer.
The resumer only performs a single operation (resume).
The consumer waits to be resumed and then, optionally, cancels.
This means we only have three cases to think about:
1. Consumer adds request (Empty -> Request).
1a. Provider fulfills it (Request -> Resumed).
1b. Consumer cancels it (Request -> Cancelled).
2. Provider gets to cell first (Empty -> Resumed).
When the consumer tries to wait, it resumes immediately.
The Resumed state should never been seen. It exists only to allow the
request to be GC'd promptly. We could replace it with Empty, but having
separate states is clearer for debugging. *)type_t=|Requestof(unit->unit)|Cancelled|Resumed|Emptyletinit=Emptyletsegment_order=2letdumpf=function|Request_->Fmt.stringf"Request"|Empty->Fmt.stringf"Empty"|Resumed->Fmt.stringf"Resumed"|Cancelled->Fmt.stringf"Cancelled"endmoduleCells=Cells.Make(Cell)typecell=unitCell.ttypet=unitCells.ttyperequest=unitCells.segment*cellAtomic.tletrecresumecell=match(Atomic.getcell:cell)with|Requestrascur->(* The common case: we have a waiter for the value *)ifAtomic.compare_and_setcellcurResumedthenr();(* else it was cancelled at the same time; ignore *)|Empty->(* The consumer has reserved this cell but not yet stored the request.
We place Resumed there and it will handle it soon. *)ifAtomic.compare_and_setcellEmptyResumedthen()(* The consumer will deal with it *)elseresumecell(* The Request was added concurrently; use it *)|Cancelled->()|Resumed->(* This state is unreachable because we (the provider) haven't set this yet *)assertfalseletcancel(segment,cell)=match(Atomic.getcell:cell)with|Request_asold->ifAtomic.compare_and_setcelloldCancelledthen(Cells.cancel_cellsegment;true)elsefalse(* We got resumed first *)|Resumed->false(* We got resumed first *)|Cancelled->invalid_arg"Already cancelled!"|Empty->(* To call [cancel] the user needs a [request] value,
which they only get once we've reached the [Request] state.
[Empty] is unreachable from [Request]. *)assertfalseletsuspendtk=let(_,cell)asrequest=Cells.next_suspendtinifAtomic.compare_and_setcellEmpty(Requestk)thenSomerequestelsematchAtomic.getcellwith|Resumed->(* Resumed before we could add the waiter *)k();None|Cancelled|Request_|Empty->(* These are unreachable from the previously-observed non-Empty state
without us taking some action first *)assertfalseletresume_allt=Cells.resume_alltresumeletcreate=Cells.makeletdumpft=Cells.dumpft