Rust UO server project pt2: Basic Timer Logic Implementation
I’ve finished v1 of the Rust UO server timer logic. The logic covers:
- registering new timers
- calculating their next run time
- checking registered timers for those due to be run
- removing timers that are due to be run and “running” them
- re-registering timers for next repetitions
At the moment there are no callbacks attached to the timers so when they are “run” nothing actually happens beyond dequeing the timer (and re-registering a repeat of it if there are repetitions remaining).
To test the logic I added a CLI module which allows starting timers from the command line and monitoring their repetitions being “run” via progress bars:
Code walkthrough
Timer structs
First I created the Timer
struct in a new timer module:
// src/timer.rs
pub struct Timer {
name: String,
repetitions: isize,
interval: i64,
next: i64,
}
repetitions
are the number of repetitions of the timer left to run. interval
is the time in ms between each repetition. next
is the next server tick that the timer is due to run at.
next
will be calculated by the server: when a timer is registered for the first time it will be current ticks + timer interval. When timer repetitions are registered it will be calculated as previous next + interval.
Because next
will be calculated by the server as above, it won’t be provided to the registration logic when code elsewhere requests a timer be registered for the first time. I therefore created a second TimerArgs
struct to represent the subset of timer properties that will be provided to the registration thread by code elsewhere:
// src/timer.rs
pub struct TimerArgs {
pub name: String,
pub repetitions: isize,
pub interval: i64,
}
In the future this will also allow properties to be optional when passed to the thread, with the thread then being responsible for setting defaults e.g. TimerArgs
provided with Option::None
for repetitions
would have repetitions
set to 1 when the Timer
struct was created by the registration thread.
Server ticks
Knowing that calculating next
for a timer required the current server ticks, I next set up a function to return them:
// src/timer.rs
use chrono::prelude::*;
fn current_ticks() -> i64 {
let utc_now = Utc::now();
utc_now.timestamp_millis()
}
I opted to go with a tick equalling a millisecond to begin with, but the resolution will need to be increased later.
Registration thread
Next I set up the registration thread:
// src/timer.rs
use std::sync::{Arc, Mutex, mpsc};
mod registration_thread;
// snip
fn start() -> mpsc::Sender<TimerArgs> {
let (register_tx, register_rx) = mpsc::channel::<TimerArgs>();
let new_timers: Vec<Timer> = vec![];
let new_timers = Arc::new(Mutex::new(new_timers));
registration_thread::spawn(register_rx, Arc::clone(&new_timers));
return register_tx
}
In the timer module I added a start()
function which will be called by main()
in src/main.rs
. This function returns an mpsc transmitter which the main()
function will use to send TimerArgs
to the registration thread. In the future it won’t be main()
that uses this transmitter, but other parts of the code responsible for requesting timer registration.
The start()
function also sets up a new_timers
vec. Ownership of this vec will be shared between the registration thread and the prioritisation thread. The registration thread will push new timers into it, the prioritisation thread will remove them from it and prioritise them. To allow shared ownership, the vec is wrapped in a Mutex
to allow each thread to modify it after acquiring a lock. Because the mutex needs to be passed to two threads, it is wrapped in an Arc
.
start()
calls the spawn()
function from the registration_thread submodule, passing it the mpsc receiver so it can receive TimerArgs
and the wrapped new_timers
vec, so it can add Timers
to it.
The spawn()
function was declared in src/timer/registration_thread.rs
:
// src/timer/registration_thread.rs
use std::sync::{Arc, Mutex, mpsc};
use std::thread;
use super::{Timer, TimerArgs, current_ticks};
pub fn spawn(register_rx: mpsc::Receiver<TimerArgs>, new_timers: Arc<Mutex<Vec<Timer>>>) {
thread::spawn(move || {
for timer_args in register_rx {
let TimerArgs { name, repetitions, interval } = timer_args;
let next = current_ticks() + interval;
let timer = Timer { name, repetitions, interval, next };
let mut new_timers = new_timers.lock().unwrap();
new_timers.push(timer);
}
});
}
spawn()
moves the register receiver and the Arc<Mutex<>>
new_timers into a newly spawned thread. It treats the register receiver as an iterator by using a for loop on it. This executes the for loop block each time TimerArgs
are sent to the mpsc channel.
When TimerArgs
are received, the thread calculates next
, constructs a Timer
struct using it and other properties from TimerArgs
and then pushes it into new_timers
after acquiring a lock.
At the moment I’ve left the unwrap()
call when acquiring the lock. This will need to be replaced with better error handling in the future.
Prioritisation thread
Next I set up the thread that will periodically check the new_timers
vec under lock and prioritise the Timer
structs it finds there:
// src/timer.rs
mod prioritisation_thread;
pub fn start() -> mpsc::Sender<TimerArgs> {
// snip
let (execute_tx, execute_rx) = mpsc::channel::<Timer>();
let new_timers: Vec<Timer> = vec![];
let new_timers = Arc::new(Mutex::new(new_timers));
// snip
prioritisation_thread::spawn(execute_tx, new_timers);
// snip
}
Similar to with the registration thread setup, start()
now also calls the spawn()
function from the prioritisation_thread submodule. It passes it an mpsc transmitter the prioritisation thread will use to send timers to be executed. It also passes the wrapped new_timers
vec, so the prioritisation thread can pull Timer
structs from it and prioritise them.
Nothing is done with the corresponding mpsc receiver yet - this will be given to the execution thread so it can receive the Timer
structs to be executed.
The new spawn()
function was declared in src/timer/prioritisation_thread.rs
. Like the spawn()
function in the registration thread module it spawns a new thread:
// src/timer/prioritisation_thread.rs
use super::Timer;
use std::sync::{Arc, Mutex, mpsc};
use std::thread;
pub fn spawn(execute_tx: mpsc::Sender<Timer>, new_timers: Arc<Mutex<Vec<Timer>>>) {
thread::spawn(move || {});
}
In the new thread, I first implemented acquiring a lock on new_timers
every millisecond and emptying it into a new vec that is owned solely by the thread:
// src/timer/prioritisation_thread.rs
// snip
pub fn spawn(execute_tx: mpsc::Sender<Timer>, new_timers: Arc<Mutex<Vec<Timer>>>) {
thread::spawn(move || {
let mut timers = vec![];
loop {
thread::sleep(Duration::from_millis(1));
{
let mut new_timers = new_timers.lock().unwrap();
while let Some(timer) = new_timers.pop() {
timers.push(timer);
}
}
}
});
}
This means that the thread only locks new_timers
once a millisecond and only for as long as it takes to remove all elements from it and push them to the new vec. This minimises the time that the registration thread is unable to acquire a lock and so can’t process timer registrations.
Next I added logic to go through the timers
vec and send any that are due to be run to the channel transmitter for the execution thread to pick up:
// src/timer/prioritisation_thread.rs
use super::{Timer, current_ticks};
// snip
pub fn spawn(execute_tx: mpsc::Sender<Timer>, new_timers: Arc<Mutex<Vec<Timer>>>) {
thread::spawn(move || {
let mut timers = vec![];
loop {
// snip
let now = current_ticks();
for timer in timers {
if timer.next <= now {
execute_tx.send(timer).unwrap();
}
}
}
});
}
This won’t compile because after the first iteration of the outer loop the timers
vec has been moved by the for loop and the variable is invalid. timers
needs to be reset at the end of the outer loop:
// src/timer/prioritisation_thread.rs
// snip
pub fn spawn(execute_tx: mpsc::Sender<Timer>, new_timers: Arc<Mutex<Vec<Timer>>>) {
thread::spawn(move || {
let mut timers = vec![];
loop {
// snip
let mut not_due = vec![];
let now = current_ticks();
for timer in timers {
if timer.next <= now {
execute_tx.send(timer).unwrap();
} else {
not_due.push(timer);
}
}
timers = not_due;
}
});
}
The last thing needed was to schedule repetitions of timers that had been sent for execution:
// src/timer/prioritisation_thread.rs
// snip
pub fn spawn(execute_tx: mpsc::Sender<Timer>, new_timers: Arc<Mutex<Vec<Timer>>>) {
thread::spawn(move || {
let mut timers = vec![];
loop {
// snip
let mut not_due = vec![];
let now = current_ticks();
for timer in timers {
if timer.next <= now {
if timer.repetitions > 1 {
let next_repetition = Timer {
name: String::from(&timer.name),
repetitions: timer.repetitions - 1,
interval: timer.interval,
next: timer.next + timer.interval,
};
not_due.push(next_repetition);
}
// snip
} else {
// snip
}
}
timers = not_due;
}
});
}
Now new Timer
structs are constructed for repetitions, with the repetitions
property decremented and next
incremented by the interval
. The repetition instances are also pushed to the not_due
vec which is then used as the timers
vec for the next iteration of the outer loop.
See the bottom of this post for the finished code.
Execution thread
The execution thread follows the same module pattern to the other two timer threads:
// src/timer.rs
mod execution_thread;
pub fn start() -> mpsc::Sender<TimerArgs> {
// snip
let (execute_tx, execute_rx) = mpsc::channel::<Timer>();
execution_thread::spawn(execute_rx);
// snip
}
// src/timer/execution_thread.rs
use std::collections::HashMap;
use std::sync::{Arc, Mutex, mpsc};
use std::thread;
use super::Timer;
pub fn spawn(execute_rx: mpsc::Receiver<Timer>) {
thread::spawn(move || {
for timer in execute_rx {
println!("{}", timer.name);
}
});
}
The execution thread module spawns a new thread, moves the execute receiver into it, treats it as an iterator and prints the name of each Timer
received. In the future it will call a callback on each Timer
.
Starting the threads from main.rs
At the moment nothing happens when the program runs because the main()
is empty. Next I updated it to call the start()
function from the timer module:
// src/main.rs
mod timer;
fn main() {
let timer_register_tx = timer::start();
}
So far so good, but nothing happens when the program runs because nothing is sending TimerArgs
to the registration thread. I hardcoded this in the main()
function to test it worked:
// src/main.rs
use crate::timer::TimerArgs;
// snip
fn main() {
let timer_register_tx = timer::start();
let timer_args = TimerArgs {
name: String::from("A Timer!"), repetitions: 1000, interval: 50
};
timer_register_tx.send(timer_args).unwrap();
}
Creating timers from the command line
The code written so far isn’t much good with only a single hard coded timer being registered. Next I updated the code to allow starting timers from the command line as the program is running:
// src/main.rs
mod timer;
mod cli;
fn main() {
let timer_register_tx = timer::start();
cli::start(timer_register_tx);
}
main()
now calls a start()
function from a new cli module:
// src/cli.rs
use crate::timer::TimerArgs;
use std::io;
use std::sync::mpsc;
pub fn start(timer_register_tx: mpsc::Sender<TimerArgs>) {
loop {
println!("Provide stdin with a string in the following format to register a new timer:");
println!("name repetitions interval(ms)");
println!("e.g.: \"timer0 100 50\"");
let mut input = String::new();
io::stdin().read_line(&mut input).unwrap();
let mut split_input = input.split_whitespace();
let name = split_input.next().unwrap();
let repetitions = split_input.next().unwrap();
let repetitions: isize = repetitions.parse().expect("Failed to parse numeric string");
let interval = split_input.next().unwrap();
let interval: i64 = interval.parse().expect("Failed to parse numeric string");
let timer_args = TimerArgs {
name: String::from(name), repetitions, interval
};
timer_register_tx.send(timer_args).unwrap();
}
}
The cli module’s start()
function starts a loop which waits for input from stdin. On the command line I could now enter arguments for a new timer in a space separated format of “name repetitions interval” (e.g. timer0 1000 50
) and the module will parse the string, construct TimerArgs
and send them to the registration thread.
Progress bars
The last change I made wasn’t strictly necessary but the println!
of each timer’s name when it was executied was bugging me because it would get in the way of trying to enter another timer on the command line. I decided it would be fun to display progress bars instead so went ahead and opened up that can of worms.
I pulled in the indicatif library and used it in the timer execution thread module to display a progress bar for each timer created and increment them on each repetition:
/// src/timer/execution_thread.rs
use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
use std::collections::HashMap;
// snip
pub fn spawn(execute_rx: mpsc::Receiver<Timer>) {
thread::spawn(move || {
let progress_bars = MultiProgress::new();
let sty = ProgressStyle::with_template(
"[{elapsed_precise}] {bar:40.cyan/blue} {pos:>7}/{len:7} {msg}",
).unwrap().progress_chars("##-");
let mut progress_bars_lookup: HashMap<String, ProgressBar> = HashMap::new();
for timer in execute_rx {
let progress_bar = progress_bars_lookup.get(&timer.name);
match progress_bar {
Some(pb) => {
pb.inc(1);
}
None => {
let total: u64 = timer.repetitions.try_into().unwrap();
let pb = progress_bars.add(ProgressBar::new(total));
pb.set_style(sty.clone());
pb.set_message(String::from(&timer.name));
pb.inc(1);
progress_bars_lookup.insert(String::from(&timer.name), pb);
}
}
}
});
}
Instead of printing the timer’s name on execution, the thread now tries to find an existing progress bar for the timer’s name from a hash map and increments it if one is found. If one doesn’t exist, it is created and added to the hash map.
This worked okay, but whenever the progress bars were incremented it would still disrupt the input of a new timer on the command line. I found that indicatif provided a suspend()
method that could be called to temporarily pause the display of the progress bars. The method needed to be called on progress_bars
:
let progress_bars = MultiProgress::new();
// ...
progress_bars.suspend(|| {
// do something
});
// progress bars reappear
The problem here is that the progress bars were created in the execution thread, but it would be the cli module (running in the main thread) that would know when the progress bars needed to be suspended. The progress_bars
needed to have ownership shared between the main thread and the execution thread - the main thread so it could suspend them and the execution thread so it could create new bars and increment them.
I did the quickest thing I could think of and moved creation of progress_bars
into the main()
function and wrapped it in Arc
and Mutex
before giving it to both the execution thread and the cli module:
// src/main.rs
use indicatif::MultiProgress;
use std::sync::{Arc, Mutex};
mod timer;
mod cli;
fn main() {
let progress_bars = Arc::new(Mutex::new(MultiProgress::new()));
let timer_register_tx = timer::start(Arc::clone(&progress_bars));
cli::start(progress_bars, timer_register_tx);
}
// src/timer.rs
// snip
pub fn start(progress_bars: Arc<Mutex<MultiProgress>>) -> mpsc::Sender<TimerArgs> {
// snip
execution_thread::spawn(execute_rx, progress_bars);
// snip
}
// src/timer/execution_thread.rs
// snip
pub fn spawn(execute_rx: mpsc::Receiver<Timer>, progress_bars: Arc<Mutex<MultiProgress>>) {
thread::spawn(move || {
// snip
for timer in execute_rx {
// snip
match progress_bar {
Some(pb) => {
// snip
}
None => {
let progress_bars = progress_bars.lock().unwrap();
// snip
}
}
}
});
}
And in the cli module I added the call to suspend()
when a return key was sent to stdin:
// src/cli.rs
// snip
pub fn start(progress_bars: Arc<Mutex<MultiProgress>>, timer_register_tx: mpsc::Sender<TimerArgs>) {
loop {
println!("Press return to start adding a new timer");
let mut input = String::new();
io::stdin().read_line(&mut input).unwrap();
let progress_bars = progress_bars.lock().unwrap();
progress_bars.suspend(|| {
println!("Provide stdin with a string in the following format to register a new timer:");
println!("name repetitions interval(ms)");
println!("e.g.: \"timer0 100 50\"");
// snip
})
}
}