The async events queue is not a queue?!

The async events API can be used to process events in the background. Although these events are pushed to a “queue,” they are processed immediately and in parallel instead of being queued and processed one after the other.

For example, this code pushes four events to a queue with two push calls:

const queue = new Queue({ key: "events" });

await queue.push([{ id: 1 }, { id: 2 }]);
await queue.push([{ id: 3 }, { id: 4 }]);

The event handler function outputs the event id and waits for five seconds:

resolver.define("processEvent", async ({ payload }) => {
  console.log(`Processing event: ${payload.id}`);
  await new Promise((resolve) => setTimeout(resolve, 5_000));
});

As you can see in the logs, the events are not queued, but processed in parallel:

INFO    18:57:12.368  b933d06e47ac448d  Processing event: 2
INFO    18:57:12.368  b933d06e47ac448d  Processing event: 1
INFO    18:57:12.956  b933d06e47ac448d  Processing event: 3
INFO    18:57:13.047  b933d06e47ac448d  Processing event: 4

I expected the output to look like this (correct order and about five seconds between every event):

INFO    18:57:12.000  b933d06e47ac448d  Processing event: 1
INFO    18:57:17.000  b933d06e47ac448d  Processing event: 2
INFO    18:57:22.000  b933d06e47ac448d  Processing event: 3
INFO    18:57:27.000  b933d06e47ac448d  Processing event: 4

Is that a bug or expected behavior? Is there a way to process events sequentially? If you want to try the example yourself, you can find the code in this repository.

3 Likes

Hi,

This is expected behaviour. It works similar to an SQS without FiFo on.

There are a few solutions to creating a list of long running sequential actions.

If the number of items is small enough and you have them available at the start. Pass the whole list as a single event to the queue.

Then pop from this queue yourself.

If the number of items is large or these items are being streamed to you over time then storing these into Forge’s app storage would be another solution.

4 Likes

Also experienced this last week which had me scratching my head a bit because I was sending multiple events to the queue and each one would call a REST API and store the data using the Forge Storage API and I was running into race conditions where not everything was saving properly.

Would be good if there’s a way to specify a param like sequential: true to make sure that events are processed one at a time rather than in parralel.

Any chance we can get this implemented @JoshuaHwang :grinning::pray:

1 Like

Thanks for the clarification, @JoshuaHwang! I vaguely remember that @iragudo held a workshop about this topic during DevDay this year, but I think the session wasn’t recorded. Ian, do you still have the code for the demo that you built?

Thanks for remembering, @klaussner!

Yes, I discussed async events during DevDay but was not able to touch on its non-FiFo nature. I still have the final solution, any specific areas you are interested in?

If you’re more interested in the pushing and consuming part, I crafted this documentation based on the DevDay session. I added a bit of extra in the docs (as part of the DevDay homework :slight_smile:) which is to incorporate a reasonable retry backoff mechanism.

Hope this helps.
Ian

3 Likes

@iragudo How you dealt with the storage rate limits is exactly what I was looking for. Thank you for writing the article. :slightly_smiling_face:

2 Likes