From 2451f800289dbe7019f46a44d001938e410aecd7 Mon Sep 17 00:00:00 2001 From: nsfisis Date: Fri, 15 May 2026 01:23:30 +0900 Subject: feat(port): port Loop.php --- crates/shirabe/src/util/loop.rs | 110 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 110 insertions(+) (limited to 'crates/shirabe/src/util/loop.rs') diff --git a/crates/shirabe/src/util/loop.rs b/crates/shirabe/src/util/loop.rs index b9fe69b..41e1f25 100644 --- a/crates/shirabe/src/util/loop.rs +++ b/crates/shirabe/src/util/loop.rs @@ -1 +1,111 @@ //! ref: composer/src/Composer/Util/Loop.php + +use anyhow::Result; +use indexmap::IndexMap; +use shirabe_php_shim::microtime; +use shirabe_external_packages::react::promise::promise_interface::PromiseInterface; +use shirabe_external_packages::symfony::component::console::helper::progress_bar::ProgressBar; +use crate::util::http_downloader::HttpDownloader; +use crate::util::process_executor::ProcessExecutor; + +#[derive(Debug)] +pub struct Loop { + http_downloader: HttpDownloader, + process_executor: Option, + current_promises: IndexMap>>, + wait_index: i64, +} + +impl Loop { + pub fn new(mut http_downloader: HttpDownloader, process_executor: Option) -> Self { + http_downloader.enable_async(); + + let process_executor = process_executor.map(|mut pe| { + pe.enable_async(); + pe + }); + + Self { + http_downloader, + process_executor, + current_promises: IndexMap::new(), + wait_index: 0, + } + } + + pub fn get_http_downloader(&self) -> &HttpDownloader { + &self.http_downloader + } + + pub fn get_process_executor(&self) -> Option<&ProcessExecutor> { + self.process_executor.as_ref() + } + + pub fn wait(&mut self, promises: Vec>, progress: Option<&mut ProgressBar>) -> Result<()> { + let mut uncaught: Option = None; + + shirabe_external_packages::react::promise::all(&promises).then( + || {}, + |e: anyhow::Error| { + uncaught = Some(e); + }, + ); + + // keep track of every group of promises that is waited on, so abortJobs can + // cancel them all, even if wait() was called within a wait() + let wait_index = self.wait_index; + self.wait_index += 1; + self.current_promises.insert(wait_index, promises); + + if let Some(ref progress) = progress { + let mut total_jobs: i64 = 0; + total_jobs += self.http_downloader.count_active_jobs(); + if let Some(ref pe) = self.process_executor { + total_jobs += pe.count_active_jobs(); + } + progress.start(total_jobs); + } + + let mut last_update: f64 = 0.0; + loop { + let mut active_jobs: i64 = 0; + + active_jobs += self.http_downloader.count_active_jobs(); + if let Some(ref pe) = self.process_executor { + active_jobs += pe.count_active_jobs(); + } + + if let Some(ref progress) = progress { + if microtime(true) - last_update > 0.1 { + last_update = microtime(true); + progress.set_progress(progress.get_max_steps() - active_jobs); + } + } + + if active_jobs == 0 { + break; + } + } + + // as we skip progress updates if they are too quick, make sure we do one last one here at 100% + if let Some(ref progress) = progress { + progress.finish(); + } + + self.current_promises.remove(&wait_index); + if let Some(e) = uncaught { + return Err(e); + } + + Ok(()) + } + + pub fn abort_jobs(&self) { + for promise_group in self.current_promises.values() { + for promise in promise_group { + // to support react/promise 2.x we wrap the promise in a resolve() call for safety + shirabe_external_packages::react::promise::resolve(Some(promise)).cancel(); + } + } + } +} -- cgit v1.3.1