123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265(* This file is part of Lwt, released under the MIT license. See LICENSE.md for
details, or visit https://github.com/ocsigen/lwt/blob/master/LICENSE.md. *)(* [Lwt_sequence] is deprecated – we don't want users outside Lwt using it.
However, it is still used internally by Lwt. So, briefly disable warning 3
("deprecated"), and create a local, non-deprecated alias for
[Lwt_sequence] that can be referred to by the rest of the code in this
module without triggering any more warnings. *)[@@@ocaml.warning"-3"]moduleLwt_sequence=Lwt_sequence[@@@ocaml.warning"+3"]openLwt.Infix(* +-----------------------------------------------------------------+
| Parameters |
+-----------------------------------------------------------------+ *)(* Minimum number of preemptive threads: *)letmin_threads:intref=ref0(* Maximum number of preemptive threads: *)letmax_threads:intref=ref0(* Size of the waiting queue: *)letmax_thread_queued=ref1000letget_max_number_of_threads_queued_=!max_thread_queuedletset_max_number_of_threads_queuedn=ifn<0theninvalid_arg"Lwt_preemptive.set_max_number_of_threads_queued";max_thread_queued:=n(* The total number of preemptive threads currently running: *)letthreads_count=ref0(* +-----------------------------------------------------------------+
| Preemptive threads management |
+-----------------------------------------------------------------+ *)moduleCELL:sigtype'atvalmake:unit->'atvalget:'at->'avalset:'at->'a->unitend=structtype'at={m:Mutex.t;cv:Condition.t;mutablecell:'aoption;}letmake()={m=Mutex.create();cv=Condition.create();cell=None}letgett=letrecawait_valuet=matcht.cellwith|None->Condition.waitt.cvt.m;await_valuet|Somev->t.cell<-None;Mutex.unlockt.m;vinMutex.lockt.m;await_valuetletsettv=Mutex.lockt.m;t.cell<-Somev;Mutex.unlockt.m;Condition.signalt.cvendtypethread={task_cell:(int*(unit->unit))CELL.t;(* Channel used to communicate notification id and tasks to the
worker thread. *)mutablethread:Thread.t;(* The worker thread. *)mutablereuse:bool;(* Whether the thread must be re-added to the pool when the work is
done. *)}(* Pool of worker threads: *)letworkers:threadQueue.t=Queue.create()(* Queue of clients waiting for a worker to be available: *)letwaiters:threadLwt.uLwt_sequence.t=Lwt_sequence.create()(* Code executed by a worker: *)letrecworker_loopworker=letid,task=CELL.getworker.task_cellintask();(* If there is too much threads, exit. This can happen if the user
decreased the maximum: *)if!threads_count>!max_threadsthenworker.reuse<-false;(* Tell the main thread that work is done: *)Lwt_unix.send_notificationid;ifworker.reusethenworker_loopworker(* create a new worker: *)letmake_worker()=incrthreads_count;letworker={task_cell=CELL.make();thread=Thread.self();reuse=true;}inworker.thread<-Thread.createworker_loopworker;worker(* Add a worker to the pool: *)letadd_workerworker=matchLwt_sequence.take_opt_lwaiterswith|None->Queue.addworkerworkers|Somew->Lwt.wakeupwworker(* Wait for worker to be available, then return it: *)letget_worker()=ifnot(Queue.is_emptyworkers)thenLwt.return(Queue.takeworkers)elseif!threads_count<!max_threadsthenLwt.return(make_worker())else(Lwt.add_task_r[@ocaml.warning"-3"])waiters(* +-----------------------------------------------------------------+
| Initialisation, and dynamic parameters reset |
+-----------------------------------------------------------------+ *)letget_bounds()=(!min_threads,!max_threads)letset_bounds(min,max)=ifmin<0||max<mintheninvalid_arg"Lwt_preemptive.set_bounds";letdiff=min-!threads_countinmin_threads:=min;max_threads:=max;(* Launch new workers: *)for_i=1todiffdoadd_worker(make_worker())doneletinitialized=reffalseletinitminmax_errlog=initialized:=true;set_bounds(min,max)letsimple_init()=ifnot!initializedthenbegininitialized:=true;set_bounds(0,4)endletnbthreads()=!threads_countletnbthreadsqueued()=Lwt_sequence.fold_l(fun_x->x+1)waiters0letnbthreadsbusy()=!threads_count-Queue.lengthworkers(* +-----------------------------------------------------------------+
| Detaching |
+-----------------------------------------------------------------+ *)letinit_result=Result.Error(Failure"Lwt_preemptive.detach")letdetachfargs=simple_init();letresult=refinit_resultin(* The task for the worker thread: *)lettask()=tryresult:=Result.Ok(fargs)withexnwhenLwt.Exception_filter.runexn->result:=Result.Errorexninget_worker()>>=funworker->letwaiter,wakener=Lwt.wait()inletid=Lwt_unix.make_notification~once:true(fun()->Lwt.wakeup_resultwakener!result)inLwt.finalize(fun()->(* Send the id and the task to the worker: *)CELL.setworker.task_cell(id,task);waiter)(fun()->ifworker.reusethen(* Put back the worker to the pool: *)add_workerworkerelsebegindecrthreads_count;(* Or wait for the thread to terminates, to free its associated
resources: *)Thread.joinworker.threadend;Lwt.return_unit)(* +-----------------------------------------------------------------+
| Running Lwt threads in the main thread |
+-----------------------------------------------------------------+ *)(* Queue of [unit -> unit Lwt.t] functions. *)letjobs=Queue.create()(* Mutex to protect access to [jobs]. *)letjobs_mutex=Mutex.create()letjob_notification=Lwt_unix.make_notification(fun()->(* Take the first job. The queue is never empty at this
point. *)Mutex.lockjobs_mutex;letthunk=Queue.takejobsinMutex.unlockjobs_mutex;ignore(thunk()))letrun_in_main_dont_waitf=(* Add the job to the queue. *)Mutex.lockjobs_mutex;Queue.addfjobs;Mutex.unlockjobs_mutex;(* Notify the main thread. *)Lwt_unix.send_notificationjob_notification(* There is a potential performance issue from creating a cell every time this
function is called. See:
https://github.com/ocsigen/lwt/issues/218
https://github.com/ocsigen/lwt/pull/219
https://github.com/ocaml/ocaml/issues/7158 *)letrun_in_mainf=letcell=CELL.make()in(* Create the job. *)letjob()=(* Execute [f] and wait for its result. *)Lwt.try_bindf(funret->Lwt.return(Result.Okret))(funexn->Lwt.return(Result.Errorexn))>>=funresult->(* Send the result. *)CELL.setcellresult;Lwt.return_unitinrun_in_main_dont_waitjob;(* Wait for the result. *)matchCELL.getcellwith|Result.Okret->ret|Result.Errorexn->raiseexn(* This version shadows the one above, adding an exception handler *)letrun_in_main_dont_waitfhandler=letf()=Lwt.catchf(funexc->handlerexc;Lwt.return_unit)inrun_in_main_dont_waitf