Using Resque with Rust
2015-10-29 14:30:00
Disclaimer: I’m not a Rust expert by any means so please tell me if there is something I can improve in the examples below. I’m not going to explain how Rust works. If you’re not familiar with it I highly encourage you to read the Rust Book
Resque is a popular solution in the Ruby world to process background jobs. The great thing with it is the fact it uses Redis as a backend, making it easy to share jobs with workers written in other languages.
In this article we’ll see how we can write a fully functional Resque worker in Rust. This will allow us to either use Resque entirely with Rust, enqueue a job in Ruby and perform it with Rust or vice versa.
The full code for the following examples can be found here.
How does Resque work?
Resque jobs are enqueued in a Redis list using the RPUSH
command.
A Resque job is represented internally with a JSON string containing two keys, one for the job class name and one for its arguments (or payload in Resque terms).
{
'class': 'JobClass',
'args': [ arg1, arg2 ]
}
Available queues are defined in a resque:queues
Redis SET
whose entries are the queue name like some_queue_name
.
Enqueueing a job is done by issueing a RPUSH
command with a payload to some queue.
Knowing that let’s enqueue our first job from Rust.
Enqueing a Resque job
Let’s pretend we need to enqueue a Resque job in order to send a confirmation email.
First we need to start a new project with:
λ cargo new rust-worker --bin
First we will need to add some crates. In your Cargo.toml add:
[dependencies]
rustc-serialize = "0.3.16"
[dependencies.redis]
git = "https://github.com/mitsuhiko/redis-rs.git"
tag = "0.5.1"
Next let’s use those crates. In our main.rs:
extern crate redis;
extern crate rustc_serialize;
use redis::Commands;
use rustc_serialize::Encodable;
use rustc_serialize::json;
For our example we need to define a Job struct with with two fields: a class that will be a String and args that will be a vector of Strings.
#[derive(RustcEncodable, Debug)]
pub struct Job {
class: String,
args: Vec<String>
}
This struct needs the RustcEncodable
trait so that we can encode it in JSON and the Debug
trait for printing purposes so we’ll derive them.
Next, we will define an enqueue
function that will take a Job
as input and return a Redis Result
:
fn enqueue(job: Job) -> redis::RedisResult<Job> {
// Connect to a local Redis
let client = try!(redis::Client::open("redis://127.0.0.1/"));
let conn = try!(client.get_connection());
// Encode our Job in JSON
let json_job = json::encode(&job).unwrap();
// Add our queue in resque:queues Set
try!(conn.sadd("resque:queues", "rust_test_queue"));
// Push our job in the resque:queue:rust_test_queue list
try!(conn.rpush("resque:queue:rust_test_queue", json_job));
println!("Enqueued job: {:?}", job);
Ok(job)
}
Finally we create and enqueue a job in our main function:
fn main() {
// Create a Job
let job: Job = Job { class: "SignupEmail".to_owned(),
args: vec!["user@example.com".to_owned()] };
// Enqueue our job
match enqueue(job) {
Ok(job) => println!("Enqueued job: {:?}", job),
Err(_) => { /* handle failure here */ }
}
}
Great! We successfully enqueued a job that can be performed through Resque.
Now onto the perform part.
Performing a Resque job
A Resque worker will try to reserve
a job by polling a queue with the LPOP
command until it gets something to perform.
Let’s implement that.
We need a reserve function that will check if a job is present in a queue:
fn reserve() -> redis::RedisResult<()> {
println!("--: Checking rust_test_queue");
// Connect to a local Redis
let client = try!(redis::Client::open("redis://127.0.0.1/"));
let conn = try!(client.get_connection());
// Check if a job is present in the queue
let res = conn.lpop("resque:queue:rust_test_queue").unwrap();
// Perform the job or return
match res {
Some(job) => perform(job),
None => return Ok(()),
}
}
We also need to have a function that waits a few seconds to mimic Resque behaviour:
fn wait_a_bit() {
println!("--: Sleeping for 5.0 seconds");
std::thread::sleep_ms(5000);
}
In order to perform our job, we need to decode the JSON String retrieved from the Resque queue and then do something useful like sending an email in our case.
fn perform(json_job: String) -> redis::RedisResult<()> {
println!("Found job: {:?}", json_job);
// Decode JSON
let job: Job = json::decode(&*json_job).unwrap();
// Send our email with something like:
// send_email(job.args.first());
// not implemented here.
Ok(())
}
Our Job struct must derive the RustcDecodable
trait to be decodable. I’m decoding &*json_job
since json::decode
expects a &str
and not a String
.
Now let’s update our main()
function to enqueue a job, perform it and wait for other jobs to come.
fn main() {
let job: Job = Job { class: "SignupEmail".to_owned(),
args: vec!["user@example.com".to_owned()] };
enqueue(job).unwrap();
loop {
reserve().unwrap();
wait_a_bit();
}
}
Time to try our worker.
λ cargo build
Compiling rust-resque-example v0.1.0 (file:///Users/julien/Code/rust-resque-example)
λ cargo run
Running `target/debug/rust-resque-example`
Enqueued job: Job { class: "SignupEmail", args: ["user@example.com"] }
--: Checking rust_test_queue
Found job: Job { class: "SignupEmail", args: ["user@example.com"] }
--: Checking rust_test_queue
--: Sleeping for 5.0 seconds
--: Checking rust_test_queue
--: Sleeping for 5.0 seconds
And it works! We successfully enqueued and performed a job in Resque from Rust.
What’s missing?
Our implementation still needs to be Resque web compatible (display workers, failed jobs…). It also needs to be deployed alongside our Ruby workers but that’s for another article :)
Thanks a lot to the reviewers: Flavien, Marc, Steve & Yohan.