diff options
| author | nsfisis <nsfisis@gmail.com> | 2026-05-23 15:45:33 +0900 |
|---|---|---|
| committer | nsfisis <nsfisis@gmail.com> | 2026-05-23 15:48:00 +0900 |
| commit | bd6d0186d2c01a3e1d6324ad5a0bcdd71de53098 (patch) | |
| tree | 939eb1dccbfb3341a2f618e734ca23ef84a8e5cc /crates/shirabe/src/util/loop.rs | |
| parent | e068a9d644fde6659a88accd55b3f1d0d9d7cf46 (diff) | |
| download | php-shirabe-bd6d0186d2c01a3e1d6324ad5a0bcdd71de53098.tar.gz php-shirabe-bd6d0186d2c01a3e1d6324ad5a0bcdd71de53098.tar.zst php-shirabe-bd6d0186d2c01a3e1d6324ad5a0bcdd71de53098.zip | |
refactor(promise): drop \React\Promise
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Diffstat (limited to 'crates/shirabe/src/util/loop.rs')
| -rw-r--r-- | crates/shirabe/src/util/loop.rs | 90 |
1 files changed, 16 insertions, 74 deletions
diff --git a/crates/shirabe/src/util/loop.rs b/crates/shirabe/src/util/loop.rs index 18f2359..5512211 100644 --- a/crates/shirabe/src/util/loop.rs +++ b/crates/shirabe/src/util/loop.rs @@ -3,25 +3,12 @@ use crate::util::HttpDownloader; use crate::util::ProcessExecutor; use anyhow::Result; -use indexmap::IndexMap; use shirabe_external_packages::symfony::component::console::helper::ProgressBar; -use shirabe_php_shim::microtime; +#[derive(Debug)] pub struct Loop { http_downloader: std::rc::Rc<std::cell::RefCell<HttpDownloader>>, process_executor: Option<std::rc::Rc<std::cell::RefCell<ProcessExecutor>>>, - current_promises: IndexMap<i64, Vec<Box<dyn PromiseInterface>>>, - wait_index: i64, -} - -impl std::fmt::Debug for Loop { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("Loop") - .field("http_downloader", &self.http_downloader) - .field("process_executor", &self.process_executor) - .field("wait_index", &self.wait_index) - .finish() - } } impl Loop { @@ -39,8 +26,6 @@ impl Loop { Self { http_downloader, process_executor, - current_promises: IndexMap::new(), - wait_index: 0, } } @@ -54,65 +39,25 @@ impl Loop { self.process_executor.as_ref() } - pub fn wait( + pub async fn wait<'p>( &mut self, - promises: Vec<Box<dyn PromiseInterface>>, - mut progress: Option<&mut ProgressBar>, + promises: Vec<std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + 'p>>>, + _progress: Option<&mut ProgressBar>, ) -> Result<()> { - let uncaught: Option<anyhow::Error> = None; - - // TODO(phase-b): Promise::then captures uncaught by Fn; needs a Cell/RefCell wrapper - // and a thunk that matches FnOnce(Option<PhpMixed>) -> Option<PhpMixed>. - let _ = shirabe_external_packages::react::promise::all( - promises - .iter() - .map(|_| todo!("clone Box<dyn PromiseInterface>")) - .collect(), - ); - - // keep track of every group of promises that is waited on, so abortJobs can - // cancel them all, even if wait() was called within a wait() - let wait_index = self.wait_index; - self.wait_index += 1; - self.current_promises.insert(wait_index, promises); - - if let Some(ref mut progress) = progress { - let mut total_jobs: i64 = 0; - total_jobs += self.http_downloader.borrow_mut().count_active_jobs(None); - if let Some(ref pe) = self.process_executor { - total_jobs += pe.borrow_mut().count_active_jobs(None); - } - progress.start(Some(total_jobs)); - } - - let mut last_update: f64 = 0.0; - loop { - let mut active_jobs: i64 = 0; + let mut uncaught: Option<anyhow::Error> = None; - active_jobs += self.http_downloader.borrow_mut().count_active_jobs(None); - if let Some(ref pe) = self.process_executor { - active_jobs += pe.borrow_mut().count_active_jobs(None); - } - - if let Some(ref mut progress) = progress { - if microtime(true) - last_update > 0.1 { - last_update = microtime(true); - let new_progress = progress.get_max_steps() - active_jobs; - progress.set_progress(new_progress); + // TODO(phase-c-promise): the asynchronous worker classes (HttpDownloader / ProcessExecutor) + // run single-threaded for now, so the promises are consumed serially. Once the workers run + // on a multi-thread runtime these futures should be driven concurrently instead of in order. + // The PHP progress bar is tied to the worker active-job count and is also deferred until then. + for promise in promises { + if let Err(e) = promise.await { + if uncaught.is_none() { + uncaught = Some(e); } } - - if active_jobs == 0 { - break; - } } - // as we skip progress updates if they are too quick, make sure we do one last one here at 100% - if let Some(ref mut progress) = progress { - progress.finish(); - } - - self.current_promises.remove(&wait_index); if let Some(e) = uncaught { return Err(e); } @@ -121,11 +66,8 @@ impl Loop { } pub fn abort_jobs(&self) { - for promise_group in self.current_promises.values() { - for _promise in promise_group { - // TODO(phase-b): cancel requires CancellablePromiseInterface; PromiseInterface trait - // doesn't expose it. Drop the wrap+cancel until we have the right trait. - } - } + // TODO(phase-c-promise): no-op until a cancellation mechanism is introduced. PHP cancels + // every in-flight promise group it tracks in $currentPromises; reintroduce that tracking + // once the asynchronous workers support cancellation on a multi-thread runtime. } } |
