123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260(**************************************************************************)(* *)(* OCaml *)(* *)(* KC Sivaramakrishnan, Indian Institute of Technology, Madras *)(* Stephen Dolan, University of Cambridge *)(* Tom Kelly, OCaml Labs Consultancy *)(* *)(* Copyright 2019 Indian Institute of Technology, Madras *)(* Copyright 2014 University of Cambridge *)(* Copyright 2021 OCaml Labs Consultancy Ltd *)(* *)(* All rights reserved. This file is distributed under the terms of *)(* the GNU Lesser General Public License version 2.1, with the *)(* special exception on linking described in the file LICENSE. *)(* *)(**************************************************************************)moduleRaw=struct(* Low-level primitives provided by the runtime *)typet=privateintexternalspawn:(unit->unit)->Mutex.t->t="caml_domain_spawn"externalself:unit->t="caml_ml_domain_id"externalcpu_relax:unit->unit="caml_ml_domain_cpu_relax"externalget_recommended_domain_count:unit->int="caml_recommended_domain_count"[@@noalloc]endletcpu_relax()=Raw.cpu_relax()typeid=Raw.ttype'astate=|Running|Finishedof('a,exn)resulttype'at={domain:Raw.t;term_mutex:Mutex.t;term_condition:Condition.t;term_state:'astateref(* protected by [term_mutex] *)}moduleDLS=structtypedls_state=Obj.tarrayletunique_value=Obj.repr(ref0)externalget_dls_state:unit->dls_state="%dls_get"externalset_dls_state:dls_state->unit="caml_domain_dls_set"[@@noalloc]letcreate_dls()=letst=Array.make8unique_valueinset_dls_statestlet_=create_dls()type'akey=int*(unit->'a)letkey_counter=Atomic.make0typekey_initializer=KI:'akey*('a->'a)->key_initializerletparent_keys=Atomic.make([]:key_initializerlist)letrecadd_parent_keyki=letl=Atomic.getparent_keysinifnot(Atomic.compare_and_setparent_keysl(ki::l))thenadd_parent_keykiletnew_key?split_from_parentinit_orphan=letidx=Atomic.fetch_and_addkey_counter1inletk=(idx,init_orphan)inbeginmatchsplit_from_parentwith|None->()|Somesplit->add_parent_key(KI(k,split))end;k(* If necessary, grow the current domain's local state array such that [idx]
* is a valid index in the array. *)letmaybe_growidx=letst=get_dls_state()inletsz=Array.lengthstinifidx<szthenstelsebeginletreccompute_new_sizes=ifidx<sthenselsecompute_new_size(2*s)inletnew_sz=compute_new_sizeszinletnew_st=Array.makenew_szunique_valueinArray.blitst0new_st0sz;set_dls_statenew_st;new_stendletset(idx,_init)x=letst=maybe_growidxin(* [Sys.opaque_identity] ensures that flambda does not look at the type of
* [x], which may be a [float] and conclude that the [st] is a float array.
* We do not want OCaml's float array optimisation kicking in here. *)st.(idx)<-Obj.repr(Sys.opaque_identityx)letget(idx,init)=letst=maybe_growidxinletv=st.(idx)inifv==unique_valuethenletv'=Obj.repr(init())inst.(idx)<-(Sys.opaque_identityv');Obj.magicv'elseObj.magicvletget_initial_keys():(int*Obj.t)list=List.map(fun(KI((idx,_)ask,split))->(idx,Obj.repr(split(getk))))(Atomic.getparent_keys)letset_initial_keys(l:(int*Obj.t)list)=List.iter(fun(idx,v)->letst=maybe_growidxinst.(idx)<-v)lend(******** Identity **********)letget_id{domain;_}=domainletself()=Raw.self()letis_main_domain()=(self():>int)=0(******** Callbacks **********)(* first spawn, domain startup and at exit functionality *)letfirst_domain_spawned=Atomic.makefalseletfirst_spawn_function=ref(fun()->())letbefore_first_spawnf=ifAtomic.getfirst_domain_spawnedthenraise(Invalid_argument"first domain already spawned")elsebeginletold_f=!first_spawn_functioninletnew_f()=old_f();f()infirst_spawn_function:=new_fendletdo_before_first_spawn()=ifnot(Atomic.getfirst_domain_spawned)thenbeginAtomic.setfirst_domain_spawnedtrue;!first_spawn_function();(* Release the old function *)first_spawn_function:=(fun()->())endletat_exit_key=DLS.new_key(fun()->(fun()->()))letat_exitf=letold_exit:unit->unit=DLS.getat_exit_keyinletnew_exit()=(* The domain termination callbacks ([at_exit]) are run in
last-in-first-out (LIFO) order in order to be symmetric with the domain
creation callbacks ([at_each_spawn]) which run in first-in-fisrt-out
(FIFO) order. *)f();old_exit()inDLS.setat_exit_keynew_exitletdo_at_exit()=letf:unit->unit=DLS.getat_exit_keyinf()let_=Stdlib.do_domain_local_at_exit:=do_at_exit(******* Creation and Termination ********)letspawnf=do_before_first_spawn();letpk=DLS.get_initial_keys()in(* The [term_mutex] and [term_condition] are used to
synchronize with the joining domains *)letterm_mutex=Mutex.create()inletterm_condition=Condition.create()inletterm_state=refRunninginletbody()=letresult=matchDLS.create_dls();DLS.set_initial_keyspk;letres=f()inreswith|x->Okx|exceptionex->Errorexinletresult'=(* Run the [at_exit] callbacks when the domain computation either
terminates normally or exceptionally. *)matchdo_at_exit()with|()->result|exceptionex->beginmatchresultwith|Ok_->(* If the domain computation terminated normally, but the
[at_exit] callbacks raised an exception, then return the
exception. *)Errorex|Error_->(* If both the domain computation and the [at_exit] callbacks
raised exceptions, then ignore the exception from the
[at_exit] callbacks and return the original exception. *)resultendin(* Synchronize with joining domains *)Mutex.lockterm_mutex;match!term_statewith|Running->term_state:=Finishedresult';Condition.broadcastterm_condition;|Finished_->failwith"internal error: Am I already finished?"(* [term_mutex] is unlocked in the runtime after the cleanup functions on
the C side are finished. *)in{domain=Raw.spawnbodyterm_mutex;term_mutex;term_condition;term_state}letjoin{term_mutex;term_condition;term_state;_}=Mutex.lockterm_mutex;letrecloop()=match!term_statewith|Running->Condition.waitterm_conditionterm_mutex;loop()|Finishedres->Mutex.unlockterm_mutex;resinmatchloop()with|Okx->x|Errorex->raiseexletrecommended_domain_count=Raw.get_recommended_domain_count