Tag Archives: continuations

Rate limiting of asynchronous calls in Vala

Following on from single-thread synchronisation in Vala, I just used a similar technique to easily implement rate limiting of concurrent calls to the same function in Vala. The use case for this was bug #705742, caused by attempting to write out many avatars to libfolks’ on-disk avatar cache simultaneously. So many files were opened simultaneously that the process’ file descriptor limit was hit and subsequent writes failed.

The principle for fixing this is to maintain a counter of the number of ongoing operations. If this hits a limit, operations which are started subsequently are added to a work queue and yielded. Any ongoing operation which finishes will pop a yielded operation off the work queue (if it’s non-empty) and resume that operation. This forms a FIFO queue which is guaranteed to progress due to each completed operation causing the next to resume (which will, in turn, cause the next to resume, etc., until the queue is empty).

A code example:

using GLib;

public class AvatarCache : Object
{
  private uint _n_ongoing_stores = 0;
  private Queue<DelegateWrapper> _pending_stores =
      new Queue<DelegateWrapper> ();

  /* Change this to change the cap on the number of concurrent
   * operations. */
  private const uint _max_n_ongoing_stores = 10;

  public async string store_avatar (string id, LoadableIcon avatar)
      throws GLib.Error
    {
      string retval = "";

      /* If the number of ongoing operations is at the limit,
       * queue this operation and yield. */
      if (this._n_ongoing_stores >
          AvatarCache._max_n_ongoing_stores)
        {
          /* Add to the pending queue. */
          var wrapper = new DelegateWrapper ();
          wrapper.cb = store_avatar.callback;
          this._pending_stores.push_tail ((owned) wrapper);
          yield;
        }

      /* Do the actual store operation. */
      try
        {
          this._n_ongoing_stores++;
          retval = yield this._store_avatar_unlimited (id, avatar);
        }
      finally
        {
          this._n_ongoing_stores--;

          /* If there is a store operation pending, resume it,
           * FIFO-style. */
          var wrapper = this._pending_stores.pop_head ();
          if (wrapper != null)
            {
              wrapper.cb ();
            }
        }

      return retval;
    }

  private async string _store_avatar_unlimited (string id,
                                                LoadableIcon avatar)
      throws GLib.Error
    {
      return /* the actual computation goes here */;
    }
}

/* See:
 * https://mail.gnome.org/archives/vala-list/2011-June/msg00005.html */
[Compact]
private class DelegateWrapper
{
  public SourceFunc cb;
}

This is all done using Vala’s asynchronous operation support, so runs in a single thread using the global default main context, and no locking or thread synchronisation is needed. If you were to use AvatarCache.store_avatar() from multiple threads, locking would have to be added and things would become more complex.

As with the single-thread synchronisation example from before, the key lines are: wrapper.cb = store_avatar.callback; yield, which stores the current function pointer and its closure (in Vala terminology, a delegate with target for the current method); and wrapper.cb (), which calls that function pointer with the stored closure (in Vala terminology, executes the delegate), effectively resuming computation from the yield statement.

So that’s it: a way to rate-limit concurrent method calls in Vala, ensuring they all block correctly (i.e. calls which are waiting for earlier ones to complete continue to block until they themselves are resumed and complete computation). By changing the scheduling function applied to the GQueue, priorities can be applied to queued calls if desired.

Single-thread synchronisation in Vala

Update: You might also be interested in the follow-up article on rate limiting of asynchronous calls in Vala.

In libfolks, there are some asynchronous prepare() methods, which may be called multiple times, but must only change program state when called for the first time — subsequent calls must do nothing. As the prepare() method is asynchronous, and yields on other asynchronous methods, control may be yielded to the main loop part-way through running. At this point, another call to prepare() could happen from the same thread; and without adequate synchronisation the second call would proceed to change program state a second time. This is bad. (Throughout this post I’m only considering calls from a single thread; multi-threaded concurrency is completely different, and can rely on locking for safety.)

What are the possible solutions?

  • Locking.
  • Condition variables.
  • Boolean “prepare() in progress” flag.

Locking and condition variables can’t be used because the prepare() calls may come from any thread. Vala uses recursive locks, so calls from the same thread wouldn’t get blocked at all. If non-recursive locks were used, calls from the same thread would cause a deadlock.

A boolean “prepare() in progress” flag is what folks used to use. This was set to true as soon as the first asynchronous call began, so it prevented subsequent calls from changing program state erroneously, but also meant that subsequent calls returned immediately. Returning from a prepare() call typically implies that preparation has completed — but with this implementation that wasn’t correct for anything except the first call to prepare(). How can a call know that it’s not the first call? It can’t. This solution is unsatisfactory.

The solution which has just been implemented in folks is to use a list of waiting continuations for all calls subsequent to the first. The first call will execute all these continuations once it’s finished. This solution feels quite elegant to me, but suggestions from improvement are more than welcome.

A code example and explanation:

public class Logger : GLib.Object
{
  private static Logger _logger;
  private static DelegateWrapper[] _prepare_waiters = null;

  public async void prepare () throws GLib.Error
    {
      if (Logger._logger == null && Logger._prepare_waiters == null)
        {
          /* If this is the first call to prepare(), start some async calls. We
           * then yield to the main thread. Any subsequent calls to prepare()
           * will have their continuations added to the _prepare_waiters list,
           * and will be signalled once the first call returns. */
          Logger._prepare_waiters = new DelegateWrapper[0];

          /* Insert asynchronous initial preparation code here. */
          Logger._logger = yield new Logger ();

          /* Wake up any waiters. */
          foreach (unowned DelegateWrapper wrapper in Logger._prepare_waiters)
            {
              wrapper.cb ();
            }

          Logger._prepare_waiters = null;
        }
      else if (Logger._logger == null && Logger._prepare_waiters != null)
        {
          /* Yield until the first ongoing prepare() call finishes. */
          var wrapper = new DelegateWrapper ();
          wrapper.cb = prepare.callback;
          Logger._prepare_waiters += (owned) wrapper;
          yield;
        }

      /* Put shared code here which is executed on every call to prepare().
       * For example, it could check for failure in the results of the
       * initial call and throw an error as appropriate. */
      if (Logger._logger == null)
        {
          /* Throw an error or something. */
          return;
        }
    }
}

/* See: https://mail.gnome.org/archives/vala-list/2011-June/msg00005.html */
[Compact]
private class DelegateWrapper
{
  public SourceFunc cb;
}

(For the full code to this example, see folks’ source code.) The first call to prepare() is meant to asynchronously set the static Logger._logger variable. Subsequent calls must not re-initialise this variable, but should block until the variable has been set.

How does this work? The first call to prepare() finds Logger._logger is null and Logger._prepare_waiters is also null, so it falls into the initialisation block. It sets Logger._prepare_waiters to be an empty array (which is distinct from null), then proceeds to initialise Logger._logger asynchronously. Since the initialisation is asynchronous, control returns to the main loop, and a second call to prepare() could be made. Assume it is. The second call finds Logger._logger is still null, but finds that Logger._prepare_waiters is non-null, and so falls into the second branch of the if-statement. This saves its continuation into the array of waiters, and then yields control to the main loop again, indefinitely delaying the second asynchronous call from finishing. The first call returns from its asynchronous initialisation code, and proceeds to “Wake up any waiters”. For each of the saved waiters, it calls the waiter’s continuation (in an idle callback), which resumes execution just after the yield statement. Consequently, each waiter executes the shared code at the bottom of prepare(), then its prepare() call finishes and any asynchronous callbacks are executed in normal GLib fashion. The first call then proceeds to the shared code at the bottom of prepare(). Finally, its asynchronous callback is executed, and everything’s finished.

This all hinges on the fact that Vala supports “delegates with targets” (or continuations): the prepare.callback in the code above. This is a copy of the runtime state of the prepare() call, complete enough that it can later be called (using wrapper.cb()) to continue execution as if it never stopped at the yield. In C terms, a delegate is a function pointer; a delegate with a target is a function pointer with a struct full of the values of local variables at the time the delegate was stored. This is similar to the this pointer in C++, but includes more local state.

Why is DelegateWrapper needed? Why can’t the array just contain the continuations directly? Good question. I’d like to know the answer. Vala throws an error message if one tries to define an array of delegates with targets (“Delegates with target are not supported as array element type”). Other methods of encapsulating the continuations result in just the delegate being stored, and the target (local state) being lost, which is not what we want. I got the tip to use a DelegateWrapper class from Jim Nelson in this mailing list post (thanks!).

So that’s it: one way to synchronise multiple concurrent asynchronous method calls in Vala.