123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449(* This file is part of Lwt, released under the MIT license. See LICENSE.md for
details, or visit https://github.com/ocsigen/lwt/blob/master/LICENSE.md. *)(* [Lwt_sequence] is deprecated – we don't want users outside Lwt using it.
However, it is still used internally by Lwt. So, briefly disable warning 3
("deprecated"), and create a local, non-deprecated alias for
[Lwt_sequence] that can be referred to by the rest of the code in this
module without triggering any more warnings. *)[@@@ocaml.warning"-3"]moduleLwt_sequence=Lwt_sequence[@@@ocaml.warning"+3"](* +-----------------------------------------------------------------+
| Events |
+-----------------------------------------------------------------+ *)type_event={stop:unitLazy.t;(* The stop method of the event. *)node:Obj.tLwt_sequence.node;(* The node in the sequence of registered events. *)}typeevent=_eventrefexternalcast_node:'aLwt_sequence.node->Obj.tLwt_sequence.node="%identity"letstop_eventev=letev=!evinLwt_sequence.removeev.node;Lazy.forceev.stoplet_fake_event={stop=lazy();node=Lwt_sequence.add_l(Obj.repr())(Lwt_sequence.create());}letfake_event=ref_fake_event(* +-----------------------------------------------------------------+
| Engines |
+-----------------------------------------------------------------+ *)classvirtualabstract=object(self)methodvirtualiter:bool->unitmethodvirtualprivatecleanup:unitmethodvirtualprivateregister_readable:Unix.file_descr->(unit->unit)->unitLazy.tmethodvirtualprivateregister_writable:Unix.file_descr->(unit->unit)->unitLazy.tmethodvirtualprivateregister_timer:float->bool->(unit->unit)->unitLazy.tvalreadables=Lwt_sequence.create()(* Sequence of callbacks waiting for a file descriptor to become
readable. *)valwritables=Lwt_sequence.create()(* Sequence of callbacks waiting for a file descriptor to become
writable. *)valtimers=Lwt_sequence.create()(* Sequence of timers. *)methoddestroy=Lwt_sequence.iter_l(fun(_fd,_f,_g,ev)->stop_eventev)readables;Lwt_sequence.iter_l(fun(_fd,_f,_g,ev)->stop_eventev)writables;Lwt_sequence.iter_l(fun(_delay,_repeat,_f,_g,ev)->stop_eventev)timers;self#cleanupmethodtransfer(engine:abstract)=Lwt_sequence.iter_l(fun(fd,f,_g,ev)->stop_eventev;ev:=!(engine#on_readablefdf))readables;Lwt_sequence.iter_l(fun(fd,f,_g,ev)->stop_eventev;ev:=!(engine#on_writablefdf))writables;Lwt_sequence.iter_l(fun(delay,repeat,f,_g,ev)->stop_eventev;ev:=!(engine#on_timerdelayrepeatf))timersmethodfake_iofd=Lwt_sequence.iter_l(fun(fd',_f,g,_stop)->iffd=fd'theng())readables;Lwt_sequence.iter_l(fun(fd',_f,g,_stop)->iffd=fd'theng())writablesmethodon_readablefdf=letev=ref_fake_eventinletg()=fevinletstop=self#register_readablefdginev:={stop=stop;node=cast_node(Lwt_sequence.add_r(fd,f,g,ev)readables)};evmethodon_writablefdf=letev=ref_fake_eventinletg()=fevinletstop=self#register_writablefdginev:={stop=stop;node=cast_node(Lwt_sequence.add_r(fd,f,g,ev)writables)};evmethodon_timerdelayrepeatf=letev=ref_fake_eventinletg()=fevinletstop=self#register_timerdelayrepeatginev:={stop=stop;node=cast_node(Lwt_sequence.add_r(delay,repeat,f,g,ev)timers)};evmethodreadable_count=Lwt_sequence.lengthreadablesmethodwritable_count=Lwt_sequence.lengthwritablesmethodtimer_count=Lwt_sequence.lengthtimersmethodfork=()methodforwards_signal(_signum:int)=falseendclasstypet=objectinheritabstractmethoditer:bool->unitmethodprivatecleanup:unitmethodprivateregister_readable:Unix.file_descr->(unit->unit)->unitLazy.tmethodprivateregister_writable:Unix.file_descr->(unit->unit)->unitLazy.tmethodprivateregister_timer:float->bool->(unit->unit)->unitLazy.tend(* +-----------------------------------------------------------------+
| The libev engine |
+-----------------------------------------------------------------+ *)typeev_looptypeev_iotypeev_timermoduleEv_backend=structtypet=|EV_DEFAULT|EV_SELECT|EV_POLL|EV_EPOLL|EV_KQUEUE|EV_DEVPOLL|EV_PORTletdefault=EV_DEFAULTletselect=EV_SELECTletpoll=EV_POLLletepoll=EV_EPOLLletkqueue=EV_KQUEUEletdevpoll=EV_DEVPOLLletport=EV_PORTletequal=(=)letname=function|EV_DEFAULT->"EV_DEFAULT"|EV_SELECT->"EV_SELECT"|EV_POLL->"EV_POLL"|EV_EPOLL->"EV_EPOLL"|EV_KQUEUE->"EV_KQUEUE"|EV_DEVPOLL->"EV_DEVPOLL"|EV_PORT->"EV_PORT"letppfmtt=Format.pp_print_stringfmt(namet)endexternalev_init:Ev_backend.t->ev_loop="lwt_libev_init"externalev_backend:ev_loop->Ev_backend.t="lwt_libev_backend"externalev_stop:ev_loop->unit="lwt_libev_stop"externalev_loop:ev_loop->bool->unit="lwt_libev_loop"externalev_unloop:ev_loop->unit="lwt_libev_unloop"externalev_readable_init:ev_loop->Unix.file_descr->(unit->unit)->ev_io="lwt_libev_readable_init"externalev_writable_init:ev_loop->Unix.file_descr->(unit->unit)->ev_io="lwt_libev_writable_init"externalev_io_stop:ev_loop->ev_io->unit="lwt_libev_io_stop"externalev_timer_init:ev_loop->float->bool->(unit->unit)->ev_timer="lwt_libev_timer_init"externalev_timer_stop:ev_loop->ev_timer->unit="lwt_libev_timer_stop"classlibev?(backend=Ev_backend.default)()=objectinheritabstractvalloop=ev_initbackendmethodloop=loopmethodbackend=ev_backendloopmethodprivatecleanup=ev_stoploopmethoditerblock=tryev_looploopblockwithexn->ev_unlooploop;raiseexnmethodprivateregister_readablefdf=letev=ev_readable_initloopfdfinlazy(ev_io_stoploopev)methodprivateregister_writablefdf=letev=ev_writable_initloopfdfinlazy(ev_io_stoploopev)methodprivateregister_timerdelayrepeatf=letev=ev_timer_initloopdelayrepeatfinlazy(ev_timer_stoploopev)endclasslibev_deprecated=libev()(* +-----------------------------------------------------------------+
| Select/poll based engines |
+-----------------------------------------------------------------+ *)(* Type of a sleeper for the select engine. *)typesleeper={mutabletime:float;(* The time at which the sleeper should be wakeup. *)mutablestopped:bool;(* [true] iff the event has been stopped. *)action:unit->unit;(* The action for the sleeper. *)}moduleSleep_queue=Lwt_pqueue.Make(structtypet=sleeperletcompare{time=t1;_}{time=t2;_}=comparet1t2end)[@@ocaml.warning"-3"]moduleFd_map=Map.Make(structtypet=Unix.file_descrletcompare=compareend)letrecrestart_actionssleep_queuenow=matchSleep_queue.lookup_minsleep_queuewith|Some{stopped=true;_}->restart_actions(Sleep_queue.remove_minsleep_queue)now|Some{time=time;action=action;_}whentime<=now->(* We have to remove the sleeper to the queue before performing
the action. The action can change the sleeper's time, and this
might break the priority queue invariant if the sleeper is
still in the queue. *)letq=Sleep_queue.remove_minsleep_queueinaction();restart_actionsqnow|_->sleep_queueletrecget_next_timeoutsleep_queue=matchSleep_queue.lookup_minsleep_queuewith|Some{stopped=true;_}->get_next_timeout(Sleep_queue.remove_minsleep_queue)|Some{time=time;_}->max0.(time-.Unix.gettimeofday())|None->-1.letbad_fdfd=trylet_=Unix.fstatfdinfalsewithUnix.Unix_error(_,_,_)->trueletinvoke_actionsfdmap=matchFd_map.findfdmapwith|exceptionNot_found->()|actions->Lwt_sequence.iter_l(funf->f())actionsclassvirtualselect_or_poll_based=objectinheritabstractvalmutablesleep_queue=Sleep_queue.empty(* Threads waiting for a timeout to expire. *)valmutablenew_sleeps=[](* Sleepers added since the last iteration of the main loop:
They are not added immediately to the main sleep queue in order
to prevent them from being wakeup immediately. *)valmutablewait_readable=Fd_map.empty(* Sequences of actions waiting for file descriptors to become
readable. *)valmutablewait_writable=Fd_map.empty(* Sequences of actions waiting for file descriptors to become
writable. *)methodprivatecleanup=()methodprivateregister_timerdelayrepeatf=ifrepeatthenbeginletrecsleeper={time=Unix.gettimeofday()+.delay;stopped=false;action=g}andg()=sleeper.time<-Unix.gettimeofday()+.delay;new_sleeps<-sleeper::new_sleeps;f()innew_sleeps<-sleeper::new_sleeps;lazy(sleeper.stopped<-true)endelsebeginletsleeper={time=Unix.gettimeofday()+.delay;stopped=false;action=f}innew_sleeps<-sleeper::new_sleeps;lazy(sleeper.stopped<-true)endmethodprivateregister_readablefdf=letactions=tryFd_map.findfdwait_readablewithNot_found->letactions=Lwt_sequence.create()inwait_readable<-Fd_map.addfdactionswait_readable;actionsinletnode=Lwt_sequence.add_lfactionsinlazy(Lwt_sequence.removenode;ifLwt_sequence.is_emptyactionsthenwait_readable<-Fd_map.removefdwait_readable)methodprivateregister_writablefdf=letactions=tryFd_map.findfdwait_writablewithNot_found->letactions=Lwt_sequence.create()inwait_writable<-Fd_map.addfdactionswait_writable;actionsinletnode=Lwt_sequence.add_lfactionsinlazy(Lwt_sequence.removenode;ifLwt_sequence.is_emptyactionsthenwait_writable<-Fd_map.removefdwait_writable)endclassvirtualselect_based=object(self)inheritselect_or_poll_basedmethodprivatevirtualselect:Unix.file_descrlist->Unix.file_descrlist->float->Unix.file_descrlist*Unix.file_descrlistmethoditerblock=(* Transfer all sleepers added since the last iteration to the
main sleep queue: *)sleep_queue<-List.fold_left(funqe->Sleep_queue.addeq)sleep_queuenew_sleeps;new_sleeps<-[];(* Collect file descriptors. *)letfds_r=Fd_map.fold(funfd_l->fd::l)wait_readable[]inletfds_w=Fd_map.fold(funfd_l->fd::l)wait_writable[]in(* Compute the timeout. *)lettimeout=ifblockthenget_next_timeoutsleep_queueelse0.in(* Do the blocking call *)letfds_r,fds_w=tryself#selectfds_rfds_wtimeoutwith|Unix.Unix_error(Unix.EINTR,_,_)->([],[])|Unix.Unix_error(Unix.EBADF,_,_)->(* Keeps only bad file descriptors. Actions registered on
them have to handle the error: *)(List.filterbad_fdfds_r,List.filterbad_fdfds_w)in(* Restart threads waiting for a timeout: *)sleep_queue<-restart_actionssleep_queue(Unix.gettimeofday());(* Restart threads waiting on a file descriptors: *)List.iter(funfd->invoke_actionsfdwait_readable)fds_r;List.iter(funfd->invoke_actionsfdwait_writable)fds_wendclassvirtualpoll_based=object(self)inheritselect_or_poll_basedmethodprivatevirtualpoll:(Unix.file_descr*bool*bool)list->float->(Unix.file_descr*bool*bool)listmethoditerblock=(* Transfer all sleepers added since the last iteration to the
main sleep queue: *)sleep_queue<-List.fold_left(funqe->Sleep_queue.addeq)sleep_queuenew_sleeps;new_sleeps<-[];(* Collect file descriptors. *)letfds=[]inletfds=Fd_map.fold(funfd_l->(fd,true,false)::l)wait_readablefdsinletfds=Fd_map.fold(funfd_l->(fd,false,true)::l)wait_writablefdsin(* Compute the timeout. *)lettimeout=ifblockthenget_next_timeoutsleep_queueelse0.in(* Do the blocking call *)letfds=tryself#pollfdstimeoutwith|Unix.Unix_error(Unix.EINTR,_,_)->[]|Unix.Unix_error(Unix.EBADF,_,_)->(* Keeps only bad file descriptors. Actions registered on
them have to handle the error: *)List.filter(fun(fd,_,_)->bad_fdfd)fdsin(* Restart threads waiting for a timeout: *)sleep_queue<-restart_actionssleep_queue(Unix.gettimeofday());(* Restart threads waiting on a file descriptors: *)List.iter(fun(fd,readable,writable)->ifreadabletheninvoke_actionsfdwait_readable;ifwritabletheninvoke_actionsfdwait_writable)fdsendclassselect=objectinheritselect_basedmethodprivateselectfds_rfds_wtimeout=letfds_r,fds_w,_=Unix.selectfds_rfds_w[]timeoutin(fds_r,fds_w)end(* +-----------------------------------------------------------------+
| The current engine |
+-----------------------------------------------------------------+ *)letcurrent=ifLwt_config._HAVE_LIBEV&&Lwt_config.libev_defaultthenref(newlibev():>t)elseref(newselect:>t)letget()=!currentletset?(transfer=true)?(destroy=true)engine=iftransferthen!current#transfer(engine:#t:>abstract);ifdestroythen!current#destroy;current:=(engine:#t:>t)letiterblock=!current#iterblockleton_readablefdf=!current#on_readablefdfleton_writablefdf=!current#on_writablefdfleton_timerdelayrepeatf=!current#on_timerdelayrepeatfletfake_iofd=!current#fake_iofdletreadable_count()=!current#readable_countletwritable_count()=!current#writable_countlettimer_count()=!current#timer_countletfork()=!current#forkletforwards_signaln=!current#forwards_signalnmoduleVersioned=structclasslibev_1=libev_deprecatedclasslibev_2=libevend