Tag Archives: concurrency

Rate limiting of asynchronous calls in Vala

Following on from single-thread synchronisation in Vala, I just used a similar technique to easily implement rate limiting of concurrent calls to the same function in Vala. The use case for this was bug #705742, caused by attempting to write out many avatars to libfolks’ on-disk avatar cache simultaneously. So many files were opened simultaneously that the process’ file descriptor limit was hit and subsequent writes failed.

The principle for fixing this is to maintain a counter of the number of ongoing operations. If this hits a limit, operations which are started subsequently are added to a work queue and yielded. Any ongoing operation which finishes will pop a yielded operation off the work queue (if it’s non-empty) and resume that operation. This forms a FIFO queue which is guaranteed to progress due to each completed operation causing the next to resume (which will, in turn, cause the next to resume, etc., until the queue is empty).

A code example:

using GLib;

public class AvatarCache : Object
{
  private uint _n_ongoing_stores = 0;
  private Queue<DelegateWrapper> _pending_stores =
      new Queue<DelegateWrapper> ();

  /* Change this to change the cap on the number of concurrent
   * operations. */
  private const uint _max_n_ongoing_stores = 10;

  public async string store_avatar (string id, LoadableIcon avatar)
      throws GLib.Error
    {
      string retval = "";

      /* If the number of ongoing operations is at the limit,
       * queue this operation and yield. */
      if (this._n_ongoing_stores >
          AvatarCache._max_n_ongoing_stores)
        {
          /* Add to the pending queue. */
          var wrapper = new DelegateWrapper ();
          wrapper.cb = store_avatar.callback;
          this._pending_stores.push_tail ((owned) wrapper);
          yield;
        }

      /* Do the actual store operation. */
      try
        {
          this._n_ongoing_stores++;
          retval = yield this._store_avatar_unlimited (id, avatar);
        }
      finally
        {
          this._n_ongoing_stores--;

          /* If there is a store operation pending, resume it,
           * FIFO-style. */
          var wrapper = this._pending_stores.pop_head ();
          if (wrapper != null)
            {
              wrapper.cb ();
            }
        }

      return retval;
    }

  private async string _store_avatar_unlimited (string id,
                                                LoadableIcon avatar)
      throws GLib.Error
    {
      return /* the actual computation goes here */;
    }
}

/* See:
 * https://mail.gnome.org/archives/vala-list/2011-June/msg00005.html */
[Compact]
private class DelegateWrapper
{
  public SourceFunc cb;
}

This is all done using Vala’s asynchronous operation support, so runs in a single thread using the global default main context, and no locking or thread synchronisation is needed. If you were to use AvatarCache.store_avatar() from multiple threads, locking would have to be added and things would become more complex.

As with the single-thread synchronisation example from before, the key lines are: wrapper.cb = store_avatar.callback; yield, which stores the current function pointer and its closure (in Vala terminology, a delegate with target for the current method); and wrapper.cb (), which calls that function pointer with the stored closure (in Vala terminology, executes the delegate), effectively resuming computation from the yield statement.

So that’s it: a way to rate-limit concurrent method calls in Vala, ensuring they all block correctly (i.e. calls which are waiting for earlier ones to complete continue to block until they themselves are resumed and complete computation). By changing the scheduling function applied to the GQueue, priorities can be applied to queued calls if desired.