123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483moduletypeCELL=sigtype'atvalinit:'atvalsegment_order:intvaldump:_tFmt.tend(* To avoid worrying about wrapping on 32-bit platforms,
we use 63-bit integers for indexes in all cases.
On 64-bit platforms, this is just [int]. *)moduleInt63=structincludeOptint.Int63(* Fallback for 32-bit platforms. *)letrecfetch_and_add_fallbacktdelta=letold=Atomic.gettinifAtomic.compare_and_settold(addold(of_intdelta))thenoldelsefetch_and_add_fallbacktdeltaletfetch_and_add:tAtomic.t->int->t=matchis_immediatewith|True->Atomic.fetch_and_add|False->fetch_and_add_fallbackendmoduleMake(Cell:CELL)=structletcells_per_segment=1lslCell.segment_orderletsegment_mask=cells_per_segment-1(* An index identifies a cell. It is a pair of the segment ID and the offset
within the segment, packed into a single integer so we can increment it
atomically. *)moduleIndex:sigtypettypesegment_id=Int63.tvalof_segment:segment_id->t(* [of_segment x] is the index of the first cell in segment [x]. *)valsegment:t->segment_idvaloffset:t->intvalzero:tvalsucc:t->tvalpred:t->tvalnext:tAtomic.t->t(* val pp : t Fmt.t *)end=structtypet=Int63.ttypesegment_id=Int63.tletsegmentt=Int63.shift_right_logicaltCell.segment_orderletof_segmentid=Int63.shift_leftidCell.segment_orderletoffsett=Int63.to_inttlandsegment_maskletzero=Int63.zeroletsucc=Int63.succletpred=Int63.predletnextt_atomic=Int63.fetch_and_addt_atomic(+1)(* let pp f t = Fmt.pf f "%d:%d" (segment t) (offset t) *)end(* A pair with counts for the number of cancelled cells in a segment and the
number of pointers to it, packed as an integer so it can be adjusted atomically. *)moduleCount:sigtypetvalcreate:pointers:int->t(* [create ~pointers] is a new counter for a segment.
Initially there are no cancelled cells. *)valremoved:t->bool(* [removed t] is true if a segment with this count should be removed
(i.e. all cells are cancelled and it has no pointers).
Once this returns [true], it will always return [true] in future. *)valincr_cancelled:t->bool(* Increment the count of cancelled cells, then return [removed t] for the new state. *)valtry_inc_pointers:t->bool(* Atomically increment the pointers count, unless [removed t].
Returns [true] on success. *)valdec_pointers:t->bool(* Decrement the pointers count, then return [removed t] for the new state. *)valvalidate:expected_pointers:int->t->unit(* [validate ~expected_pointers t] check that [t] is a valid count for a non-removed segment. *)valdump:tFmt.tend=structtypet=intAtomic.t(* We use 16 bits for the cancelled count, which should be plenty.
The remaining bits (at least 15) are used for the pointer count,
which normally doesn't go above 2 (except temporarily, and limited
by the number of domains). *)let()=assert(cells_per_segment<0x10000)letv~pointers~cancelled=(pointerslsl16)lorcancelledletv_removed=v~pointers:0~cancelled:cells_per_segmentletpointersv=vlsr16letcancelledv=vland0xffffletcreate~pointers=Atomic.make(v~pointers~cancelled:0)letdumpft=letv=Atomic.gettinFmt.pff"pointers=%d, cancelled=%d"(pointersv)(cancelledv)letincr_cancelledt=Atomic.fetch_and_addt1=v_removed-1letrectry_inc_pointerst=letv=Atomic.gettinifv=v_removedthenfalseelse(ifAtomic.compare_and_settv(v+(1lsl16))thentrueelsetry_inc_pointerst)letdec_pointerst=Atomic.fetch_and_addt(-1lsl16)=v_removed+(1lsl16)letremovedt=Atomic.gett=v_removedletvalidate~expected_pointerst=letv=Atomic.gettinassert(cancelledv>=0&&cancelledv<=cells_per_segment);ifcancelledv=cells_per_segmentthenassert(pointersv>0);ifpointersv<>expected_pointersthenFmt.failwith"Bad pointer count!"end(* A segment is a node in a linked list containing an array of [cells_per_segment] cells. *)moduleSegment:sigtype'atvalmake_init:unit->'at(* [make_init ()] is a new initial segment. *)valid:_t->Index.segment_idvalget:'at->int->'aCell.tAtomic.t(* [get t offset] is the cell at [offset] within [t]. *)valtry_inc_pointers:_t->bool(* Atomically increment the pointers count if the segment isn't removed.
Returns [true] on success, or [false] if the segment was removed first. *)valdec_pointers:_t->unit(* Decrement the pointers count, removing the segment if it is no longer
needed. *)valfind:'at->Index.segment_id->'at(* [find t id] finds the segment [id] searching forwards from [t].
If the target segment has not yet been created, this creates it (atomically).
If the target segment has been removed, this returns the next non-removed segment. *)valclear_prev:_t->unit(* Called when the resumer has reached this segment,
so it will never need to skip over any previous segments.
Therefore, the previous pointer is no longer required and
previous segments can be GC'd. *)valcancel_cell:_t->unit(* Increment the cancelled-cells counter, and remove the segment if it is no longer useful. *)valvalidate:'at->suspend:'at->resume:'at->unit(* [validate t ~suspend ~resume] checks that [t] is in a valid state,
assuming there are no operations currently in progress.
[suspend] and [resume] are the segments of the suspend and resume pointers.
It checks that both are reachable from [t]. *)valdump_list:label:Index.tFmt.t->'atFmt.t(* [dump_list] formats this segment and all following ones for debugging.
@param label Used to annotate indexes. *)end=structtype'at={id:Index.segment_id;count:Count.t;cells:'aCell.tAtomic.tarray;prev:'atoptionAtomic.t;(* None if first, or [prev] is no longer needed *)next:'atoptionAtomic.t;(* None if not yet created *)}letidt=t.idletgetti=Array.gett.cellsiletpp_idft=Int63.ppft.idletdump_cells~labelft=letidx=ref(Index.of_segmentt.id)infori=0toArray.lengtht.cells-1doFmt.pff"@,%a"Cell.dump(Atomic.gett.cells.(i));labelf!idx;idx:=Index.succ!idxdoneletrecdump_list~labelft=Fmt.pff"@[<v2>Segment %a (prev=%a, %a):%a@]"pp_idt(Fmt.Dump.optionpp_id)(Atomic.gett.prev)Count.dumpt.count(dump_cells~label)t;letnext=Atomic.gett.nextinbeginmatchnextwith|Somenextwhennext.id=Int63.succt.id->()(* We'll show the labels at the start of the next segment *)|_->Fmt.pff"@,End%a"label(Index.of_segment(Int63.succt.id))end;Option.iter(funnext->Fmt.cutf();dump_list~labelfnext)nextletnextt=matchAtomic.gett.nextwith|Somes->s|None->letnext={id=Int63.succt.id;count=Count.create~pointers:0;cells=Array.initcells_per_segment(fun(_:int)->Atomic.makeCell.init);next=Atomic.makeNone;prev=Atomic.make(Somet);}inifAtomic.compare_and_sett.nextNone(Somenext)thennextelseAtomic.gett.next|>Option.getletremovedt=Count.removedt.count(* Get the previous non-removed segment, if any. *)letrecalive_prevt=matchAtomic.gett.prevwith|Someprevwhenremovedprev->alive_prevprev|x->x(* Get the next non-removed segment. *)letalive_nextt=letnext=Atomic.gett.next|>Option.getinletreclivex=ifremovedxthen(matchAtomic.getx.nextwith|Somenext->livenext|None->x(* The paper says to return "tail if all are removed", but can that ever happen? *))elsexinlivenext(* Remove [t] from the linked-list by splicing together
the previous live segment before us to the next live one afterwards.
The tricky case is when two adjacent segments get removed at the same time.
If that happens, the next and prev lists will still always be valid
(i.e. will include all live segments, in the correct order), but may not be optimal.
However, we will detect that case when it happens and fix it up immediately. *)letrecremovet=ifAtomic.gett.next=Nonethen()(* Can't remove tail. This shouldn't happen anyway. *)else(letprev=alive_prevtandnext=alive_nexttin(* [prev] might have been removed by the time we do this, but it doesn't matter,
we're still only skipping removed segments (just not as many as desired).
We'll fix it up afterwards in that case. *)Atomic.setnext.prevprev;(* Likewise [next] might have been removed too by now, but we'll correct later. *)Option.iter(funprev->Atomic.setprev.next(Somenext))prev;(* If either got removed by now, start again. *)ifremovednext&&Atomic.getnext.next<>Nonethenremovetelsematchprevwith|Someprevwhenremovedprev->removet|_->())lettry_inc_pointerst=Count.try_inc_pointerst.countletdec_pointerst=ifCount.dec_pointerst.countthenremovetletcancel_cellt=ifCount.incr_cancelledt.countthenremovetletrecfindstartid=ifstart.id>=id&¬(removedstart)thenstartelsefind(nextstart)idletmake_init()={id=Int63.zero;count=Count.create~pointers:2;cells=Array.initcells_per_segment(fun(_:int)->Atomic.makeCell.init);next=Atomic.makeNone;prev=Atomic.makeNone;}(* Note: this assumes the system is at rest (no operations in progress). *)letrecvalidatet~suspend~resume~seen_pointers=letexpected_pointers=(ift==suspendthen1else0)+(ift==resumethen1else0)inCount.validate~expected_pointerst.count;letseen_pointers=seen_pointers+expected_pointersinmatchAtomic.gett.nextwith|None->assert(seen_pointers=2)|Somenext->beginmatchAtomic.getnext.prevwith|None->assert(resume.id>=next.id)|Somet2->assert(resume.id<next.id&&t==t2)end;validatenext~suspend~resume~seen_pointersletvalidate=validate~seen_pointers:0letclear_prevt=Atomic.sett.prevNoneend(* A mutable pointer into the list of cells. *)modulePosition:sigtype'atvalof_segment:'aSegment.t->'at(* [of_segment x] is a pointer to the first cell in [x]. *)valnext:clear_prev:bool->'at->'aSegment.t*'aCell.tAtomic.t(* [next t ~clear_prev] returns the segment and cell of [t], and atomically increments it.
If [t]'s segment is all cancelled and no longer exists it will skip it and retry.
If [clear_prev] then the previous pointer is no longer required. *)valresume_all:'at->stop:Index.t->('aCell.tAtomic.t->unit)->unit(* [resume_all t ~stop f] advances [t] to [stop], then calls [f cell] on each cell advanced over. *)valindex:_t->Index.t(* [index t] is the index of the cell currently pointed-to by [t]. *)valsegment:'at->'aSegment.t(* For debugging only. The segment containing the previously-returned cell (or the initial segment),
when the system is at rest. *)end=structtype'at={segment:'aSegment.tAtomic.t;(* Note: can lag [idx] *)idx:Index.tAtomic.t;}letsegmentt=Atomic.gett.segmentletindext=Atomic.gett.idxletof_segmentsegment={segment=Atomic.makesegment;idx=Atomic.makeIndex.zero;}(* Set [t.segment] to [target] if [target] is ahead of us.
Returns [false] if [target] gets removed first. *)letrecmove_forwardt(target:_Segment.t)=letcur=Atomic.gett.segmentinifSegment.idcur>=Segment.idtargetthentrueelse(ifnot(Segment.try_inc_pointerstarget)thenfalse(* target already removed *)else(ifAtomic.compare_and_sett.segmentcurtargetthen(Segment.dec_pointerscur;true)else((* Concurrent update of [t]. Undo ref-count changes and retry. *)Segment.dec_pointerstarget;move_forwardttarget)))(* Update [t] to the segment [id] (or the next non-removed segment after it). *)letrecfind_and_move_forwardtstartid=lettarget=Segment.findstartidinifmove_forwardttargetthentargetelsefind_and_move_forwardtstartid(* Removed before we could increase the ref-count; rety *)letrecnext~clear_prevt=(* Get the segment first before the index. Even if [idx] moves forwards after this,
we'll still be able to reach it from [r]. *)letr=Atomic.gett.segmentinleti=Index.nextt.idxinletid=Index.segmentiinlets=find_and_move_forwardtridinifclear_prevthenSegment.clear_prevs;ifSegment.ids=idthen((s,Segment.gets(Index.offseti)))else((* The segment we wanted contains only cancelled cells.
Try to update the index to jump over those cells, then retry. *)lets_index=Index.of_segment(Segment.ids)inignore(Atomic.compare_and_sett.idx(Index.succi)s_index:bool);next~clear_prevt)letrecresume_allt~stopf=(* Get the segment first before the index. Even if [idx] moves forwards after this,
we'll still be able to reach it from [start_seg]. *)letstart_seg=Atomic.gett.segmentinletstart=Atomic.gett.idxinifstart>=stopthen()elseifnot(Atomic.compare_and_sett.idxstartstop)then(resume_allt~stopf)else((* We are now responsible for resuming all cells from [start] to [stop]. *)(* Move [t.segment] forward so we can free older segments now. *)ignore(find_and_move_forwardtstart_seg(Index.segment(Index.predstop)):_Segment.t);(* Resume all cells from [i] to [stop] (reachable via [seg]): *)letrecauxsegi=ifi<stopthen(letseg=Segment.findseg(Index.segmenti)inSegment.clear_prevseg;letseg_start=Index.of_segment(Segment.idseg)inifseg_start<stopthen(leti=maxiseg_startinf(Segment.getseg(Index.offseti));auxseg(Index.succi)))inauxstart_segstart)endtype'at={resume:'aPosition.t;suspend:'aPosition.t;}type'asegment='aSegment.tletnext_suspendt=Position.nextt.suspend~clear_prev:falseletnext_resumet=snd@@Position.nextt.resume~clear_prev:trueletresume_alltf=Position.resume_allt.resume~stop:(Position.indext.suspend)fletcancel_cell=Segment.cancel_cellletmake()=letinit=Segment.make_init()in{resume=Position.of_segmentinit;suspend=Position.of_segmentinit;}letvalidatet=letsuspend=Position.segmentt.suspendinletresume=Position.segmentt.resumeinletstart=ifSegment.idsuspend<Segment.idresumethensuspendelseresumeinSegment.validatestart~suspend~resumeletdumpft=letsuspend=Position.indext.suspendinletresume=Position.indext.resumeinletstart=ifsuspend<resumethent.suspendelset.resumeinletlabelfi=ifi=suspendthenFormat.pp_print_stringf" (suspend)";ifi=resumethenFormat.pp_print_stringf" (resume)";inFormat.fprintff"@[<v>%a@]"(Segment.dump_list~label)(Position.segmentstart)end