Skip to content

Commit 64c77f8

Browse files
msiergiej85mjr-blockjoy
authored andcommittedMar 20, 2025·
feat: Apply node params changed on update #1047
1 parent 940195a commit 64c77f8

File tree

13 files changed

+237
-75
lines changed

13 files changed

+237
-75
lines changed
 

‎babel/src/babel_service.rs

+18
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,14 @@ impl<J: JobsManagerClient + Sync + Send + 'static, P: BabelPal + Sync + Send + '
160160
Ok(Response::new(()))
161161
}
162162

163+
async fn stop_all_jobs(&self, _request: Request<()>) -> Result<Response<()>, Status> {
164+
self.jobs_manager
165+
.stop_all()
166+
.await
167+
.map_err(|err| Status::internal(format!("stop_all_jobs failed: {err:#}")))?;
168+
Ok(Response::new(()))
169+
}
170+
163171
async fn skip_job(&self, request: Request<String>) -> Result<Response<()>, Status> {
164172
self.jobs_manager
165173
.skip(&request.into_inner())
@@ -195,6 +203,15 @@ impl<J: JobsManagerClient + Sync + Send + 'static, P: BabelPal + Sync + Send + '
195203
))
196204
}
197205

206+
async fn get_active_jobs_shutdown_timeout(
207+
&self,
208+
_request: Request<()>,
209+
) -> Result<Response<Duration>, Status> {
210+
Ok(Response::new(
211+
self.jobs_manager.get_active_jobs_shutdown_timeout().await,
212+
))
213+
}
214+
198215
async fn get_jobs(&self, _request: Request<()>) -> Result<Response<JobsInfo>, Status> {
199216
let jobs = self
200217
.jobs_manager
@@ -441,6 +458,7 @@ mod tests {
441458
async fn start(&self, name: &str) -> Result<()>;
442459
async fn get_job_shutdown_timeout(&self, name: &str) -> Duration;
443460
async fn stop(&self, name: &str) -> Result<()>;
461+
async fn stop_all(&self) -> Result<()>;
444462
async fn skip(&self, name: &str) -> Result<()>;
445463
async fn cleanup(&self, name: &str) -> Result<()>;
446464
async fn info(&self, name: &str) -> Result<JobInfo>;

‎babel/src/jobs_manager.rs

+31-50
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,7 @@ pub trait JobsManagerClient {
267267
async fn start(&self, name: &str) -> Result<()>;
268268
async fn get_job_shutdown_timeout(&self, name: &str) -> Duration;
269269
async fn stop(&self, name: &str) -> Result<()>;
270+
async fn stop_all(&self) -> Result<()>;
270271
async fn skip(&self, name: &str) -> Result<()>;
271272
async fn cleanup(&self, name: &str) -> Result<()>;
272273
async fn info(&self, name: &str) -> Result<JobInfo>;
@@ -330,7 +331,7 @@ impl<C: BabelEngineConnector + Send> JobsManagerClient for Client<C> {
330331
PosixSignal::SIGTERM,
331332
);
332333
} else {
333-
terminate_job(name, *pid, &job.config).await?;
334+
terminate_job_process(name, *pid, &job.config).await?;
334335
}
335336
// job_runner process has been stopped, but job should be restarted on next jobs manager startup
336337
job.state = JobState::Inactive(JobStatus::Running);
@@ -356,9 +357,10 @@ impl<C: BabelEngineConnector + Send> JobsManagerClient for Client<C> {
356357
info!("Requested '{name}' job to create: {config:?}",);
357358
let mut jobs_context = self.jobs_registry.lock().await;
358359

359-
if let Some(Job { state, .. }) = jobs_context.jobs.get(name) {
360-
if let JobState::Active(_) = state {
361-
bail!("can't create job '{name}' while it is already running")
360+
if let Some(Job { state, config, .. }) = jobs_context.jobs.get(name) {
361+
if let JobState::Active(pid) = state {
362+
info!("Job '{name}' already running - stop and recreate with new config");
363+
terminate_job_process(name, *pid, config).await?;
362364
}
363365
} else if jobs_context.jobs.len() >= MAX_JOBS {
364366
bail!("Exceeded max number of supported jobs: {MAX_JOBS}");
@@ -423,29 +425,29 @@ impl<C: BabelEngineConnector + Send> JobsManagerClient for Client<C> {
423425
info!("Requested '{name} job to stop'");
424426
let mut jobs_context = self.jobs_registry.lock().await;
425427
if let Some(job) = jobs_context.jobs.get_mut(name) {
426-
match &mut job.state {
427-
JobState::Active(pid) => {
428-
terminate_job(name, *pid, &job.config).await?;
429-
job.state = JobState::Inactive(JobStatus::Stopped);
430-
}
431-
JobState::Inactive(status) => {
432-
*status = JobStatus::Stopped;
433-
}
434-
}
435-
job.save_status()?;
428+
stop_job(name, job).await?;
436429
} else {
437430
bail!("can't stop, job '{name}' not found")
438431
}
439432
Ok(())
440433
}
441434

435+
async fn stop_all(&self) -> Result<()> {
436+
info!("Requested all jobs to stop'");
437+
let mut jobs_context = self.jobs_registry.lock().await;
438+
for (name, job) in &mut jobs_context.jobs {
439+
stop_job(name, job).await?;
440+
}
441+
Ok(())
442+
}
443+
442444
async fn skip(&self, name: &str) -> Result<()> {
443445
info!("Requested '{name} job to stop'");
444446
let mut jobs_context = self.jobs_registry.lock().await;
445447
if let Some(job) = jobs_context.jobs.get_mut(name) {
446448
match &mut job.state {
447449
JobState::Active(pid) => {
448-
terminate_job(name, *pid, &job.config).await?;
450+
terminate_job_process(name, *pid, &job.config).await?;
449451
job.state = JobState::Inactive(JobStatus::Finished {
450452
exit_code: Some(0),
451453
message: "Skipped".to_string(),
@@ -517,7 +519,20 @@ fn build_job_info(job: &Job) -> JobInfo {
517519
}
518520
}
519521

520-
async fn terminate_job(name: &str, pid: Pid, config: &JobConfig) -> Result<()> {
522+
async fn stop_job(name: &str, job: &mut Job) -> Result<()> {
523+
match &mut job.state {
524+
JobState::Active(pid) => {
525+
terminate_job_process(name, *pid, &job.config).await?;
526+
job.state = JobState::Inactive(JobStatus::Stopped);
527+
}
528+
JobState::Inactive(status) => {
529+
*status = JobStatus::Stopped;
530+
}
531+
}
532+
job.save_status()
533+
}
534+
535+
async fn terminate_job_process(name: &str, pid: Pid, config: &JobConfig) -> Result<()> {
521536
let shutdown_timeout = Duration::from_secs(
522537
config
523538
.shutdown_timeout_secs
@@ -1076,40 +1091,6 @@ mod tests {
10761091
test_env.client.info("test_job").await?
10771092
);
10781093

1079-
// start failed
1080-
test_env
1081-
.client
1082-
.jobs_registry
1083-
.lock()
1084-
.await
1085-
.jobs
1086-
.get_mut("test_job")
1087-
.unwrap()
1088-
.state = JobState::Active(Pid::from_u32(0));
1089-
assert_eq!(
1090-
JobStatus::Running,
1091-
test_env.client.info("test_job").await?.status
1092-
);
1093-
let _ = test_env
1094-
.client
1095-
.create(
1096-
"test_job",
1097-
JobConfig {
1098-
job_type: JobType::RunSh("different".to_string()),
1099-
restart: RestartPolicy::Never,
1100-
shutdown_timeout_secs: None,
1101-
shutdown_signal: None,
1102-
needs: Some(vec![]),
1103-
wait_for: None,
1104-
run_as: None,
1105-
log_buffer_capacity_mb: None,
1106-
log_timestamp: None,
1107-
use_protocol_data: None,
1108-
one_time: None,
1109-
},
1110-
)
1111-
.await
1112-
.unwrap_err();
11131094
test_env.server.assert().await;
11141095
Ok(())
11151096
}

‎babel_api/src/babel.rs

+4
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ pub trait Babel {
3333
fn start_job(job_name: String);
3434
/// Stop background job with given unique name if running.
3535
fn stop_job(job_name: String);
36+
/// Stop all background jobs.
37+
fn stop_all_jobs();
3638
/// Skip background job with given unique name if running.
3739
fn skip_job(job_name: String);
3840
/// Cleanup background job with given unique name - remove any intermediate files,
@@ -42,6 +44,8 @@ pub trait Babel {
4244
fn job_info(job_name: String) -> JobInfo;
4345
/// Get maximum time it may take to gracefully shutdown job.
4446
fn get_job_shutdown_timeout(job_name: String) -> Duration;
47+
/// Get maximum time it may take to gracefully shutdown all active jobs.
48+
fn get_active_jobs_shutdown_timeout() -> Duration;
4549
/// Get jobs list.
4650
fn get_jobs() -> JobsInfo;
4751

‎babel_api/src/engine.rs

+2
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ pub trait Engine {
2121
fn start_job(&self, job_name: &str) -> Result<()>;
2222
/// Stop background job with given unique name if running.
2323
fn stop_job(&self, job_name: &str) -> Result<()>;
24+
/// Stop all background jobs.
25+
fn stop_all_jobs(&self) -> Result<()>;
2426
/// Cleanup background job with given unique name - remove any intermediate files,
2527
/// so next time it will start from scratch.
2628
fn cleanup_job(&self, job_name: &str) -> Result<()>;

‎babel_api/src/rhai_plugin.rs

+3
Original file line numberDiff line numberDiff line change
@@ -512,6 +512,7 @@ impl<E: Engine + Sync + Send + 'static> BarePlugin<E> {
512512
let Some(config) = self.plugin_config.clone() else {
513513
bail!("Missing {PLUGIN_CONFIG_FN_NAME} function")
514514
};
515+
self.babel_engine.stop_all_jobs()?;
515516
self.render_configs_files(config.config_files)?;
516517
self.start_aux_services(config.aux_services)?;
517518
let mut services_needs = self.run_actions(config.init, vec![])?;
@@ -968,6 +969,7 @@ mod tests {
968969
fn create_job(&self, job_name: &str, job_config: JobConfig) -> Result<()>;
969970
fn start_job(&self, job_name: &str) -> Result<()>;
970971
fn stop_job(&self, job_name: &str) -> Result<()>;
972+
fn stop_all_jobs(&self) -> Result<()>;
971973
fn cleanup_job(&self, job_name: &str) -> Result<()>;
972974
fn job_info(&self, job_name: &str) -> Result<engine::JobInfo>;
973975
fn get_jobs(&self) -> Result<engine::JobsInfo>;
@@ -1828,6 +1830,7 @@ mod tests {
18281830
node_name: "node name".to_string(),
18291831
..Default::default()
18301832
});
1833+
babel.expect_stop_all_jobs().once().returning(|| Ok(()));
18311834
babel
18321835
.expect_render_template()
18331836
.with(

‎babel_api/src/rhai_plugin_linter.rs

+4
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,10 @@ impl Engine for LinterEngine {
6060
Ok(())
6161
}
6262

63+
fn stop_all_jobs(&self) -> eyre::Result<()> {
64+
Ok(())
65+
}
66+
6367
fn cleanup_job(&self, _job_name: &str) -> eyre::Result<()> {
6468
Ok(())
6569
}

‎babel_api/tests/test_examples.rs

+1
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,7 @@ fn test_plugin_config() -> eyre::Result<()> {
219219
protocol_data_path: PathBuf::from("/blockjoy/protocol_data"),
220220
..Default::default()
221221
});
222+
babel.expect_stop_all_jobs().times(2).returning(|| Ok(()));
222223
babel
223224
.expect_render_template()
224225
.with(

‎bv/src/babel_engine.rs

+36
Original file line numberDiff line numberDiff line change
@@ -400,6 +400,14 @@ impl<N: NodeConnection, P: Plugin + Clone + Send + 'static> BabelEngine<N, P> {
400400
Err(err) => Err(err),
401401
});
402402
}
403+
EngineRequest::StopAllJobs { response_tx } => {
404+
let _ = response_tx.send(match self.node_connection.babel_client().await {
405+
Ok(babel_client) => stop_all_jobs(babel_client)
406+
.await
407+
.map_err(|err| self.handle_connection_errors(err)),
408+
Err(err) => Err(err),
409+
});
410+
}
403411
EngineRequest::CleanupJob {
404412
job_name,
405413
response_tx,
@@ -597,6 +605,12 @@ async fn stop_job(client: &mut BabelClient, job_name: &str) -> Result<(), tonic:
597605
.map(|v| v.into_inner())
598606
}
599607

608+
async fn stop_all_jobs(client: &mut BabelClient) -> Result<(), tonic::Status> {
609+
let jobs_timeout = with_retry!(client.get_active_jobs_shutdown_timeout(()))?.into_inner();
610+
with_retry!(client.stop_all_jobs(with_timeout((), jobs_timeout + NODE_REQUEST_TIMEOUT)))
611+
.map(|v| v.into_inner())
612+
}
613+
600614
async fn skip_job(client: &mut BabelClient, job_name: &str) -> Result<(), tonic::Status> {
601615
let job_timeout =
602616
with_retry!(client.get_job_shutdown_timeout(job_name.to_string()))?.into_inner();
@@ -635,6 +649,9 @@ enum EngineRequest {
635649
job_name: String,
636650
response_tx: ResponseTx<Result<()>>,
637651
},
652+
StopAllJobs {
653+
response_tx: ResponseTx<Result<()>>,
654+
},
638655
CleanupJob {
639656
job_name: String,
640657
response_tx: ResponseTx<Result<()>>,
@@ -724,6 +741,13 @@ impl babel_api::engine::Engine for Engine {
724741
response_rx.blocking_recv()?
725742
}
726743

744+
fn stop_all_jobs(&self) -> Result<()> {
745+
let (response_tx, response_rx) = tokio::sync::oneshot::channel();
746+
self.tx
747+
.blocking_send(EngineRequest::StopAllJobs { response_tx })?;
748+
response_rx.blocking_recv()?
749+
}
750+
727751
fn cleanup_job(&self, job_name: &str) -> Result<()> {
728752
let (response_tx, response_rx) = tokio::sync::oneshot::channel();
729753
self.tx.blocking_send(EngineRequest::CleanupJob {
@@ -1005,6 +1029,10 @@ mod tests {
10051029
&self,
10061030
request: Request<String>,
10071031
) -> Result<Response<Duration>, Status>;
1032+
async fn get_active_jobs_shutdown_timeout(
1033+
&self,
1034+
request: Request<()>,
1035+
) -> Result<Response<Duration>, Status>;
10081036
async fn create_job(
10091037
&self,
10101038
request: Request<(String, JobConfig)>,
@@ -1014,6 +1042,7 @@ mod tests {
10141042
request: Request<String>,
10151043
) -> Result<Response<()>, Status>;
10161044
async fn stop_job(&self, request: Request<String>) -> Result<Response<()>, Status>;
1045+
async fn stop_all_jobs(&self, request: Request<()>) -> Result<Response<()>, Status>;
10171046
async fn skip_job(&self, request: Request<String>) -> Result<Response<()>, Status>;
10181047
async fn cleanup_job(&self, request: Request<String>) -> Result<Response<()>, Status>;
10191048
async fn job_info(&self, request: Request<String>) -> Result<Response<JobInfo>, Status>;
@@ -1123,6 +1152,7 @@ mod tests {
11231152
)?;
11241153
self.engine.start_job(name)?;
11251154
self.engine.stop_job(name)?;
1155+
self.engine.stop_all_jobs()?;
11261156
self.engine.job_info(name)?;
11271157
self.engine.get_jobs()?;
11281158
self.engine.run_jrpc(
@@ -1347,6 +1377,12 @@ mod tests {
13471377
.expect_stop_job()
13481378
.withf(|req| req.get_ref() == "custom_name")
13491379
.return_once(|_| Ok(Response::new(())));
1380+
babel_mock
1381+
.expect_get_active_jobs_shutdown_timeout()
1382+
.return_once(|_| Ok(Response::new(Duration::from_secs(1))));
1383+
babel_mock
1384+
.expect_stop_all_jobs()
1385+
.return_once(|_| Ok(Response::new(())));
13501386
babel_mock
13511387
.expect_job_info()
13521388
.withf(|req| req.get_ref() == "custom_name")

‎bv/src/node.rs

+133-22
Original file line numberDiff line numberDiff line change
@@ -419,6 +419,28 @@ impl<P: Pal + Debug> Node<P> {
419419
}
420420

421421
pub async fn update(&mut self, config_update: ConfigUpdate) -> commands::Result<()> {
422+
let status = self.status().await;
423+
if status == VmStatus::Failed {
424+
return Err(commands::Error::Internal(anyhow!(
425+
"can't update node in Failed state"
426+
)));
427+
}
428+
let params_changed = !config_update.new_values.is_empty();
429+
if params_changed {
430+
if status == VmStatus::Running
431+
&& self
432+
.babel_engine
433+
.get_jobs()
434+
.await?
435+
.iter()
436+
.any(|(_, job)| job.status == JobStatus::Running && job.upgrade_blocking)
437+
{
438+
command_failed!(commands::Error::BlockingJobRunning {
439+
retry_hint: DEFAULT_UPGRADE_RETRY_HINT,
440+
});
441+
}
442+
self.state.initialized = false;
443+
}
422444
self.state.image.config_id = config_update.config_id;
423445
if let Some(display_name) = config_update.new_display_name {
424446
self.state.display_name = display_name;
@@ -448,10 +470,17 @@ impl<P: Pal + Debug> Node<P> {
448470
self.state.expected_status = VmStatus::Failed;
449471
}
450472
self.save_state().await?;
451-
res?
473+
res?;
474+
} else {
475+
self.save_state().await?;
452476
}
453477
self.machine.update_node_env(&self.state);
454478
self.node_env = self.machine.node_env();
479+
if params_changed && status == VmStatus::Running {
480+
self.babel_engine.init().await?;
481+
self.state.initialized = true;
482+
self.save_state().await?;
483+
}
455484
Ok(())
456485
}
457486

@@ -1102,10 +1131,12 @@ pub mod tests {
11021131
request: Request<String>,
11031132
) -> Result<Response<()>, Status>;
11041133
async fn stop_job(&self, request: Request<String>) -> Result<Response<()>, Status>;
1134+
async fn stop_all_jobs(&self, request: Request<()>) -> Result<Response<()>, Status>;
11051135
async fn skip_job(&self, request: Request<String>) -> Result<Response<()>, Status>;
11061136
async fn cleanup_job(&self, request: Request<String>) -> Result<Response<()>, Status>;
11071137
async fn job_info(&self, request: Request<String>) -> Result<Response<JobInfo>, Status>;
11081138
async fn get_job_shutdown_timeout(&self, request: Request<String>) -> Result<Response<Duration>, Status>;
1139+
async fn get_active_jobs_shutdown_timeout(&self, request: Request<()>) -> Result<Response<Duration>, Status>;
11091140
async fn get_jobs(&self, request: Request<()>) -> Result<Response<JobsInfo>, Status>;
11101141
async fn run_jrpc(
11111142
&self,
@@ -1861,6 +1892,14 @@ pub mod tests {
18611892
})
18621893
.times(3)
18631894
.returning(|_| Ok(Response::new(())));
1895+
babel_mock
1896+
.expect_get_active_jobs_shutdown_timeout()
1897+
.times(2)
1898+
.returning(|_| Ok(Response::new(Duration::from_secs(1))));
1899+
babel_mock
1900+
.expect_stop_all_jobs()
1901+
.times(2)
1902+
.returning(|_| Ok(Response::new(())));
18641903
babel_mock
18651904
.expect_run_sh()
18661905
.withf(|req| req.get_ref() == "echo ok")
@@ -2079,6 +2118,8 @@ pub mod tests {
20792118
pal.expect_create_vm().return_once(move |_, _| {
20802119
let mut mock = MockTestVM::new();
20812120
let plugin_path = plugin_path.clone();
2121+
mock.expect_state().once().returning(|| VmState::SHUTOFF);
2122+
mock.expect_state().times(2).returning(|| VmState::RUNNING);
20822123
mock.expect_plugin_path()
20832124
.once()
20842125
.returning(move || plugin_path.clone());
@@ -2115,29 +2156,60 @@ pub mod tests {
21152156
.await?;
21162157
assert_eq!(VmStatus::Running, node.expected_status());
21172158

2118-
assert_eq!(node.state.firewall, node_state.firewall);
2119-
node.update(ConfigUpdate {
2120-
config_id: "new-cfg_id".to_string(),
2121-
new_display_name: Some("new name".to_string()),
2122-
new_firewall: Some(updated_firewall.clone()),
2123-
new_org_id: Some("new org_id".to_string()),
2124-
new_org_name: Some("org name".to_string()),
2125-
new_values: HashMap::from_iter([
2126-
("new_key1".to_string(), "new value ".to_string()),
2127-
("new_key2".to_string(), "new value 2".to_string()),
2128-
]),
2129-
})
2130-
.await?;
2131-
assert_eq!(node.state.firewall, updated_firewall);
2132-
assert_eq!(node.state.display_name, "new name".to_string());
2133-
assert_eq!(node.state.org_id, "new org_id".to_string());
2134-
assert_eq!(node.state.org_name, "org name".to_string());
21352159
assert_eq!(
2136-
node.state.properties.get("new_key1"),
2137-
Some("new value ".to_string()).as_ref()
2160+
"BV internal error: 'can't update node in Failed state'",
2161+
node.update(ConfigUpdate {
2162+
config_id: "new-cfg_id".to_string(),
2163+
new_display_name: None,
2164+
new_firewall: None,
2165+
new_org_id: None,
2166+
new_org_name: None,
2167+
new_values: Default::default(),
2168+
},)
2169+
.await
2170+
.unwrap_err()
2171+
.to_string()
21382172
);
2139-
test_env.assert_node_state_saved(&node.state).await;
21402173

2174+
let mut babel_mock = MockTestBabelService::new();
2175+
babel_mock
2176+
.expect_get_jobs()
2177+
.times(2)
2178+
.returning(|_| Ok(Response::new(HashMap::default())));
2179+
babel_mock
2180+
.expect_get_active_jobs_shutdown_timeout()
2181+
.once()
2182+
.returning(|_| Ok(Response::new(Duration::from_millis(1))));
2183+
babel_mock
2184+
.expect_stop_all_jobs()
2185+
.once()
2186+
.returning(|_| Ok(Response::new(())));
2187+
babel_mock
2188+
.expect_run_sh()
2189+
.withf(|req| req.get_ref() == "touch /blockjoy/.protocol_data.lock")
2190+
.once()
2191+
.returning(|_| {
2192+
Ok(Response::new(ShResponse {
2193+
exit_code: 0,
2194+
stdout: "".to_string(),
2195+
stderr: "".to_string(),
2196+
}))
2197+
});
2198+
babel_mock
2199+
.expect_protocol_data_stamp()
2200+
.once()
2201+
.returning(|_| Ok(Response::new(Some(SystemTime::now()))));
2202+
babel_mock
2203+
.expect_create_job()
2204+
.times(2)
2205+
.returning(|_| Ok(Response::new(())));
2206+
babel_mock
2207+
.expect_start_job()
2208+
.times(2)
2209+
.returning(|_| Ok(Response::new(())));
2210+
let server = test_env.start_server(babel_mock).await;
2211+
2212+
assert_eq!(node.state.firewall, node_state.firewall);
21412213
assert_eq!(
21422214
"BV internal error: 'failed to apply firewall config'",
21432215
node.update(ConfigUpdate {
@@ -2146,15 +2218,46 @@ pub mod tests {
21462218
new_firewall: Some(firewall::Config::default()),
21472219
new_org_id: Some("failed_org_id".to_string()),
21482220
new_org_name: None,
2149-
new_values: Default::default(),
2221+
new_values: HashMap::from_iter([(
2222+
"new_key0".to_string(),
2223+
"new value 0".to_string()
2224+
),]),
21502225
},)
21512226
.await
21522227
.unwrap_err()
21532228
.to_string()
21542229
);
21552230
assert_eq!(0, node.state.firewall.rules.len());
2231+
assert!(!node.state.initialized);
21562232
test_env.assert_node_state_saved(&node.state).await;
2233+
assert_eq!(VmStatus::Failed, node.expected_status());
21572234

2235+
node.state.initialized = true;
2236+
node.state.expected_status = VmStatus::Running;
2237+
node.update(ConfigUpdate {
2238+
config_id: "new-cfg_id".to_string(),
2239+
new_display_name: Some("new name".to_string()),
2240+
new_firewall: Some(updated_firewall.clone()),
2241+
new_org_id: Some("new org_id".to_string()),
2242+
new_org_name: Some("org name".to_string()),
2243+
new_values: HashMap::from_iter([
2244+
("new_key1".to_string(), "new value 1".to_string()),
2245+
("new_key2".to_string(), "new value 2".to_string()),
2246+
]),
2247+
})
2248+
.await
2249+
.unwrap();
2250+
assert!(node.state.initialized);
2251+
assert_eq!(node.state.firewall, updated_firewall);
2252+
assert_eq!(node.state.display_name, "new name".to_string());
2253+
assert_eq!(node.state.org_id, "new org_id".to_string());
2254+
assert_eq!(node.state.org_name, "org name".to_string());
2255+
assert_eq!(
2256+
node.state.properties.get("new_key1"),
2257+
Some("new value 1".to_string()).as_ref()
2258+
);
2259+
test_env.assert_node_state_saved(&node.state).await;
2260+
server.assert().await;
21582261
Ok(())
21592262
}
21602263

@@ -2429,6 +2532,14 @@ pub mod tests {
24292532
.expect_get_jobs()
24302533
.once()
24312534
.returning(|_| Ok(Response::new(HashMap::default())));
2535+
babel_mock
2536+
.expect_get_active_jobs_shutdown_timeout()
2537+
.once()
2538+
.returning(|_| Ok(Response::new(Duration::from_millis(1))));
2539+
babel_mock
2540+
.expect_stop_all_jobs()
2541+
.once()
2542+
.returning(|_| Ok(Response::new(())));
24322543
let data_dir = test_env.tmp_root.clone();
24332544
babel_mock.expect_run_sh().once().returning(move |_| {
24342545
babel_api::utils::touch_protocol_data(&data_dir).unwrap();

‎bv/src/scheduler.rs

+4-1
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,10 @@ async fn worker(
160160

161161
fn handle_action(tasks: &mut Vec<(DateTime<TZ>, Scheduled)>, action: Action) {
162162
match action {
163-
Action::Add(task) => tasks.push((TZ::now(), task)),
163+
Action::Add(task) => {
164+
tasks.retain(|(_, existing_task)| existing_task.name != task.name);
165+
tasks.push((TZ::now(), task))
166+
}
164167
Action::Delete(name) => tasks.retain(|(_, task)| task.name != name),
165168
Action::DeleteNode(id) => tasks.retain(|(_, task)| task.node_id != id),
166169
};

‎bv/tests/image_v1/main.rhai

-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
const API_HOST = "http://localhost:4467/";
2-
const TEST_CMD = run_sh("echo ok").unwrap();
32

43
fn plugin_config() {#{
54
init: #{

‎bv/tests/image_v2/main.rhai

-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
const API_HOST = "http://localhost:4467/";
2-
const TEST_CMD = run_sh("echo ok").unwrap();
32

43
fn plugin_config() {#{
54
init: #{

‎bv_tests_utils/src/babel_engine_mock.rs

+1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ mock! {
2020
fn create_job(&self, job_name: &str, job_config: JobConfig) -> Result<()>;
2121
fn start_job(&self, job_name: &str) -> Result<()>;
2222
fn stop_job(&self, job_name: &str) -> Result<()>;
23+
fn stop_all_jobs(&self) -> Result<()>;
2324
fn cleanup_job(&self, job_name: &str) -> Result<()>;
2425
fn job_info(&self, job_name: &str) -> Result<JobInfo>;
2526
fn get_jobs(&self) -> Result<JobsInfo>;

0 commit comments

Comments
 (0)
Please sign in to comment.