aboutsummaryrefslogtreecommitdiffhomepage
path: root/crates/shirabe/src/util
diff options
context:
space:
mode:
authornsfisis <nsfisis@gmail.com>2026-05-15 01:23:30 +0900
committernsfisis <nsfisis@gmail.com>2026-05-15 19:51:17 +0900
commit2451f800289dbe7019f46a44d001938e410aecd7 (patch)
tree9947ffd6eb400b0b681d80b21949420f8688daeb /crates/shirabe/src/util
parent8a202bc9eb377f672ebed701706048ae8b8d024c (diff)
downloadphp-shirabe-2451f800289dbe7019f46a44d001938e410aecd7.tar.gz
php-shirabe-2451f800289dbe7019f46a44d001938e410aecd7.tar.zst
php-shirabe-2451f800289dbe7019f46a44d001938e410aecd7.zip
feat(port): port Loop.php
Diffstat (limited to 'crates/shirabe/src/util')
-rw-r--r--crates/shirabe/src/util/loop.rs110
1 files changed, 110 insertions, 0 deletions
diff --git a/crates/shirabe/src/util/loop.rs b/crates/shirabe/src/util/loop.rs
index b9fe69b..41e1f25 100644
--- a/crates/shirabe/src/util/loop.rs
+++ b/crates/shirabe/src/util/loop.rs
@@ -1 +1,111 @@
//! ref: composer/src/Composer/Util/Loop.php
+
+use anyhow::Result;
+use indexmap::IndexMap;
+use shirabe_php_shim::microtime;
+use shirabe_external_packages::react::promise::promise_interface::PromiseInterface;
+use shirabe_external_packages::symfony::component::console::helper::progress_bar::ProgressBar;
+use crate::util::http_downloader::HttpDownloader;
+use crate::util::process_executor::ProcessExecutor;
+
+#[derive(Debug)]
+pub struct Loop {
+ http_downloader: HttpDownloader,
+ process_executor: Option<ProcessExecutor>,
+ current_promises: IndexMap<i64, Vec<Box<dyn PromiseInterface>>>,
+ wait_index: i64,
+}
+
+impl Loop {
+ pub fn new(mut http_downloader: HttpDownloader, process_executor: Option<ProcessExecutor>) -> Self {
+ http_downloader.enable_async();
+
+ let process_executor = process_executor.map(|mut pe| {
+ pe.enable_async();
+ pe
+ });
+
+ Self {
+ http_downloader,
+ process_executor,
+ current_promises: IndexMap::new(),
+ wait_index: 0,
+ }
+ }
+
+ pub fn get_http_downloader(&self) -> &HttpDownloader {
+ &self.http_downloader
+ }
+
+ pub fn get_process_executor(&self) -> Option<&ProcessExecutor> {
+ self.process_executor.as_ref()
+ }
+
+ pub fn wait(&mut self, promises: Vec<Box<dyn PromiseInterface>>, progress: Option<&mut ProgressBar>) -> Result<()> {
+ let mut uncaught: Option<anyhow::Error> = None;
+
+ shirabe_external_packages::react::promise::all(&promises).then(
+ || {},
+ |e: anyhow::Error| {
+ uncaught = Some(e);
+ },
+ );
+
+ // keep track of every group of promises that is waited on, so abortJobs can
+ // cancel them all, even if wait() was called within a wait()
+ let wait_index = self.wait_index;
+ self.wait_index += 1;
+ self.current_promises.insert(wait_index, promises);
+
+ if let Some(ref progress) = progress {
+ let mut total_jobs: i64 = 0;
+ total_jobs += self.http_downloader.count_active_jobs();
+ if let Some(ref pe) = self.process_executor {
+ total_jobs += pe.count_active_jobs();
+ }
+ progress.start(total_jobs);
+ }
+
+ let mut last_update: f64 = 0.0;
+ loop {
+ let mut active_jobs: i64 = 0;
+
+ active_jobs += self.http_downloader.count_active_jobs();
+ if let Some(ref pe) = self.process_executor {
+ active_jobs += pe.count_active_jobs();
+ }
+
+ if let Some(ref progress) = progress {
+ if microtime(true) - last_update > 0.1 {
+ last_update = microtime(true);
+ progress.set_progress(progress.get_max_steps() - active_jobs);
+ }
+ }
+
+ if active_jobs == 0 {
+ break;
+ }
+ }
+
+ // as we skip progress updates if they are too quick, make sure we do one last one here at 100%
+ if let Some(ref progress) = progress {
+ progress.finish();
+ }
+
+ self.current_promises.remove(&wait_index);
+ if let Some(e) = uncaught {
+ return Err(e);
+ }
+
+ Ok(())
+ }
+
+ pub fn abort_jobs(&self) {
+ for promise_group in self.current_promises.values() {
+ for promise in promise_group {
+ // to support react/promise 2.x we wrap the promise in a resolve() call for safety
+ shirabe_external_packages::react::promise::resolve(Some(promise)).cancel();
+ }
+ }
+ }
+}