123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110(* Copyright (C) 2019--2025 Petter A. Urkedal <paurkedal@gmail.com>
*
* This library is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at your
* option) any later version, with the LGPL-3.0 Linking Exception.
*
* This library is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
* License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* and the LGPL-3.0 Linking Exception along with this library. If not, see
* <http://www.gnu.org/licenses/> and <https://spdx.org>, respectively.
*)moduleMake_helpers(System:System_sig.S)=structopenSystemopenSystem.Fiber.Infixletassert_single_use~whatin_usef=if!in_usethenfailwith("Invalid concurrent usage of "^what^" detected.");in_use:=true;Fiber.cleanup(fun()->f()>|=funres->in_use:=false;res)(fun()->in_use:=false;Fiber.return())endmoduleMake_convenience(System:System_sig.S)(C:Caqti_connection_sig.Basewithtype'afiber:='aSystem.Fiber.tandtype('a,'err)stream:=('a,'err)System.Stream.t)=structopenSystemopenSystem.Fiber.InfixmoduleResponse=C.Responselet(>>=?)mf=m>>=functionOkx->fx|Error_asr->Fiber.returnrlet(>|=?)mf=m>|=functionOkr->Ok(fr)|Error_asr->rletexecqp=C.call~f:Response.execqpletfindqp=C.call~f:Response.findqpletfind_optqp=C.call~f:Response.find_optqpletfoldqfpacc=C.call~f:(funresp->Response.foldfrespacc)qpletfold_sqfpacc=C.call~f:(funresp->Response.fold_sfrespacc)qpletiter_sqfp=C.call~f:(funresp->Response.iter_sfresp)qpletcollect_listqp=letfresp=Response.foldList.consresp[]>|=Result.mapList.revinC.call~fqpletrev_collect_listqp=letfresp=Response.foldList.consresp[]inC.call~fqpletexec_with_affected_countqp=letfresponse=Response.execresponse>>=funexecResult->matchexecResultwith|Ok()->Response.affected_countresponse|Errorx->Fiber.return(Errorx)inC.call~fqpletwith_transactionf=C.start()>>=?fun()->Fiber.cleanup(fun()->f()>>=(function|Oky->C.commit()>|=?fun()->y|Error_asr->C.rollback()>|=fun_->r))(fun()->C.rollback()>|=ignore)endmoduleMake_populate(System:System_sig.S)(C:Caqti_connection_sig.Basewithtype'afiber:='aSystem.Fiber.tandtype('a,'e)stream:=('a,'e)System.Stream.t)=structopenSystemopenSystem.Fiber.Infixlet(>>=?)mf=m>>=functionOkx->fx|Error_asr->Fiber.returnrletpopulate~table~columnsrow_type=letrequest=letopenCaqti_template.Createindynamic_genT.(row_type-->.unit)@@Fun.const@@Q.concat[Q.lit"INSERT INTO ";Q.littable;Q.lit"(";Q.concat~sep:", "(List.mapQ.litcolumns);Q.lit") VALUES (";Q.concat~sep:", "(List.mapi(funi_->Q.parami)columns);Q.lit")";]infundata->C.start()>>=?fun()->Stream.iter_s~f:(C.call~f:C.Response.execrequest)data>>=funres->C.deallocaterequest>>=fun_->(matchreswith|Ok()->C.commit()|Error(`Congestederr)->C.rollback()>>=?fun()->Fiber.return(Error(`Congestederr))|Errorerr->Fiber.return(Errorerr))end