1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
|
//! ref: composer/src/Composer/Util/Loop.php
use crate::util::http_downloader::HttpDownloader;
use crate::util::process_executor::ProcessExecutor;
use anyhow::Result;
use indexmap::IndexMap;
use shirabe_external_packages::react::promise::promise_interface::PromiseInterface;
use shirabe_external_packages::symfony::component::console::helper::progress_bar::ProgressBar;
use shirabe_php_shim::microtime;
#[derive(Debug)]
pub struct Loop {
http_downloader: HttpDownloader,
process_executor: Option<ProcessExecutor>,
current_promises: IndexMap<i64, Vec<Box<dyn PromiseInterface>>>,
wait_index: i64,
}
impl Loop {
pub fn new(
mut http_downloader: HttpDownloader,
process_executor: Option<ProcessExecutor>,
) -> Self {
http_downloader.enable_async();
let process_executor = process_executor.map(|mut pe| {
pe.enable_async();
pe
});
Self {
http_downloader,
process_executor,
current_promises: IndexMap::new(),
wait_index: 0,
}
}
pub fn get_http_downloader(&self) -> &HttpDownloader {
&self.http_downloader
}
pub fn get_process_executor(&self) -> Option<&ProcessExecutor> {
self.process_executor.as_ref()
}
pub fn wait(
&mut self,
promises: Vec<Box<dyn PromiseInterface>>,
progress: Option<&mut ProgressBar>,
) -> Result<()> {
let mut uncaught: Option<anyhow::Error> = None;
shirabe_external_packages::react::promise::all(&promises).then(
|| {},
|e: anyhow::Error| {
uncaught = Some(e);
},
);
// keep track of every group of promises that is waited on, so abortJobs can
// cancel them all, even if wait() was called within a wait()
let wait_index = self.wait_index;
self.wait_index += 1;
self.current_promises.insert(wait_index, promises);
if let Some(ref progress) = progress {
let mut total_jobs: i64 = 0;
total_jobs += self.http_downloader.count_active_jobs();
if let Some(ref pe) = self.process_executor {
total_jobs += pe.count_active_jobs();
}
progress.start(total_jobs);
}
let mut last_update: f64 = 0.0;
loop {
let mut active_jobs: i64 = 0;
active_jobs += self.http_downloader.count_active_jobs();
if let Some(ref pe) = self.process_executor {
active_jobs += pe.count_active_jobs();
}
if let Some(ref progress) = progress {
if microtime(true) - last_update > 0.1 {
last_update = microtime(true);
progress.set_progress(progress.get_max_steps() - active_jobs);
}
}
if active_jobs == 0 {
break;
}
}
// as we skip progress updates if they are too quick, make sure we do one last one here at 100%
if let Some(ref progress) = progress {
progress.finish();
}
self.current_promises.remove(&wait_index);
if let Some(e) = uncaught {
return Err(e);
}
Ok(())
}
pub fn abort_jobs(&self) {
for promise_group in self.current_promises.values() {
for promise in promise_group {
// to support react/promise 2.x we wrap the promise in a resolve() call for safety
shirabe_external_packages::react::promise::resolve(Some(promise)).cancel();
}
}
}
}
|