1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
|
//! ref: composer/src/Composer/Util/Loop.php
use crate::util::HttpDownloader;
use crate::util::ProcessExecutor;
use anyhow::Result;
use indexmap::IndexMap;
use shirabe_external_packages::symfony::component::console::helper::ProgressBar;
use shirabe_php_shim::microtime;
pub struct Loop {
http_downloader: std::rc::Rc<std::cell::RefCell<HttpDownloader>>,
process_executor: Option<std::rc::Rc<std::cell::RefCell<ProcessExecutor>>>,
current_promises: IndexMap<i64, Vec<Box<dyn PromiseInterface>>>,
wait_index: i64,
}
impl std::fmt::Debug for Loop {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Loop")
.field("http_downloader", &self.http_downloader)
.field("process_executor", &self.process_executor)
.field("wait_index", &self.wait_index)
.finish()
}
}
impl Loop {
pub fn new(
http_downloader: std::rc::Rc<std::cell::RefCell<HttpDownloader>>,
process_executor: Option<std::rc::Rc<std::cell::RefCell<ProcessExecutor>>>,
) -> Self {
http_downloader.borrow_mut().enable_async();
let process_executor = process_executor.map(|pe| {
pe.borrow_mut().enable_async();
pe
});
Self {
http_downloader,
process_executor,
current_promises: IndexMap::new(),
wait_index: 0,
}
}
pub fn get_http_downloader(&self) -> &std::rc::Rc<std::cell::RefCell<HttpDownloader>> {
&self.http_downloader
}
pub fn get_process_executor(
&self,
) -> Option<&std::rc::Rc<std::cell::RefCell<ProcessExecutor>>> {
self.process_executor.as_ref()
}
pub fn wait(
&mut self,
promises: Vec<Box<dyn PromiseInterface>>,
mut progress: Option<&mut ProgressBar>,
) -> Result<()> {
let uncaught: Option<anyhow::Error> = None;
// TODO(phase-b): Promise::then captures uncaught by Fn; needs a Cell/RefCell wrapper
// and a thunk that matches FnOnce(Option<PhpMixed>) -> Option<PhpMixed>.
let _ = shirabe_external_packages::react::promise::all(
promises
.iter()
.map(|_| todo!("clone Box<dyn PromiseInterface>"))
.collect(),
);
// keep track of every group of promises that is waited on, so abortJobs can
// cancel them all, even if wait() was called within a wait()
let wait_index = self.wait_index;
self.wait_index += 1;
self.current_promises.insert(wait_index, promises);
if let Some(ref mut progress) = progress {
let mut total_jobs: i64 = 0;
total_jobs += self.http_downloader.borrow_mut().count_active_jobs(None);
if let Some(ref pe) = self.process_executor {
total_jobs += pe.borrow_mut().count_active_jobs(None);
}
progress.start(Some(total_jobs));
}
let mut last_update: f64 = 0.0;
loop {
let mut active_jobs: i64 = 0;
active_jobs += self.http_downloader.borrow_mut().count_active_jobs(None);
if let Some(ref pe) = self.process_executor {
active_jobs += pe.borrow_mut().count_active_jobs(None);
}
if let Some(ref mut progress) = progress {
if microtime(true) - last_update > 0.1 {
last_update = microtime(true);
let new_progress = progress.get_max_steps() - active_jobs;
progress.set_progress(new_progress);
}
}
if active_jobs == 0 {
break;
}
}
// as we skip progress updates if they are too quick, make sure we do one last one here at 100%
if let Some(ref mut progress) = progress {
progress.finish();
}
self.current_promises.remove(&wait_index);
if let Some(e) = uncaught {
return Err(e);
}
Ok(())
}
pub fn abort_jobs(&self) {
for promise_group in self.current_promises.values() {
for _promise in promise_group {
// TODO(phase-b): cancel requires CancellablePromiseInterface; PromiseInterface trait
// doesn't expose it. Drop the wrap+cancel until we have the right trait.
}
}
}
}
|