Using and testing Vapor Queues

When you’re working on a backend application, you want your response time to be as low as possible, which is complicated when the server has to interact with third party services to send emails, process files and so on. Vapor provides a library that allows you to move this work out of the request’s critical path by dispatching jobs into queues and processing them in the background. Let’s see how to integrate it into a simple project and what are our options for testing.

Why this post?

I’ve been working as a contractor for the past few weeks and one of the projects has involved using Vapor Queues to offload the processing of (potentially) large uploaded files. As part of this work, I’ve looked into the source code of the Queues package on GitHub and learned a lot. I even got a very impactful pull request merged into the repository. The official documentation already covers the basics of integrating Vapor Queues in your project, but they don’t say anything about testing, which is what I wanted to cover.

As an example, I’ve created a project similar to the example used in Vapor’s documentation:

  1. It exposes a POST /users endpoint that a client can use to create a bunch of users at once
  2. It also exposes a GET version of that same endpoint to retrieve all the users’ informations
  3. When a user is created, the application dispatches a job that will send a welcome email to the user and store the timestamp when that email was sent in the database

The code is available on GitHub.

Testing

The approach we choose to test our code depends on what part of the feature we want to validate. For example, in UserCreationService we want to make sure that a job is dispatched when we create a new user. That said, if we take a step back, what we actually want to test is that after creating an account using our API, a welcome email is eventually sent to the user. One could argue that the last one is more important, but I personally think that both tests are useful. Let’s setup our test.

As mentioned by the documentation, you need to select a driver to configure how the queues are going to run and where the jobs are going to be stored. The officially supported way is to use Redis, but 3rd party implementations use Fluent or even Mongo. Either one or those are fine, but if you want reproducible tests, it’s better if we can remove the dependencies on specific storages (like requiring a redis or a database server). This is why the Queues package provides an XCTQueue library and an AsyncTestQueuesDriver driver dedicated to testing.

final class UserCreationServiceTests: XCTestCase {
    var app: Application!

    override func setUp() async throws {
        self.app = try await Application.make(.testing)
        try await configure(app)

        // Override the driver being used for testing
        app.queues.use(.asyncTest)
    }

    override func tearDown() async throws {
        try await self.app.asyncShutdown()
        self.app = nil
    }
}

Now, we can write our test. As mentioned earlier, we want to ensure that when n users are created, n jobs are dispatched.

func testCreateUsers() async throws {
    let users = [
        ("Bruce Wayne", "bruce@wayne.co"),
        ("Barry Allen", "nottheflash@hotmail.com"),
    ]

    for (fullname, email) in users {
        _ = try await app.users.create(fullname: fullname, email: email)
    }

    XCTAssertEqual(
        app.queues.asyncTest.jobs.count { $0.value.jobName == String(describing: SendWelcomeEmailJob.self)},
        2,
        "Expected 2 jobs to have been dispatched"
    )
}

Additionally, we want to test our API and make sure that after sending a POST request to the /users endpoint, those users will eventually have received a welcome email. Because the email are sent asynchronously, we need to wait a certain amount of time before fetching our list of users and validate that the welcome_email_sent_at field in the response has been updated. There are multiple ways to achieve this:

  1. We can wait a few seconds using Task.sleep and hope it was enough for all the jobs to have ran.
  2. We can check the response of a GET request to the /users in a loop and use a test expectation to prevent the test from exiting until we’re done
  3. We can actually wait that the jobs have all run

I won’t go into details as to how terrible the first suggestion is. The second one actually makes sense but let’s focus on the third one, because it allows us to dig into the internals of Vapor’s queues mechanism. Specifically, the QueueWorker struct that exposes a run method that runs until there is no more work to be done. Once this method has return, we can fetch our list of users and check that all the welcome_email_sent_at have been updated.

func testAccountCreation() async throws {
    let client = APIClient(tester: app)
    let response = try await client.createUsers([
        ("Bruce Wayne", "bruce@wayne.co"),
        ("Hal Jordan", "hal@gmail.com"),
        ("Barry Allen", "definitelynottheflash@gmail.com"),
    ])
    XCTAssertEqual(response.status, .created)

    try await app.queues.queue.worker.run()

    let users = try await client.listUsers()
    XCTAssertEqual(users.count, 3)
    XCTAssertTrue(users.allSatisfy { $0.welcomeEmailSentAt != nil })
}

Conclusion

Those tools are no silver bullets, as one could point that we need to know a little too much about the internals of the Queues library and that upgrading it to a higher version could mean having to rewrite your tests in the future… but that’s part of the deal when it comes to adding dependencies to your project. That being said, I do think that they are helpful and can play a part in a larger test suite.