Tag Archives: asynchronous

Rate limiting of asynchronous calls in Vala

Following on from single-thread synchronisation in Vala, I just used a similar technique to easily implement rate limiting of concurrent calls to the same function in Vala. The use case for this was bug #705742, caused by attempting to write out many avatars to libfolks’ on-disk avatar cache simultaneously. So many files were opened simultaneously that the process’ file descriptor limit was hit and subsequent writes failed.

The principle for fixing this is to maintain a counter of the number of ongoing operations. If this hits a limit, operations which are started subsequently are added to a work queue and yielded. Any ongoing operation which finishes will pop a yielded operation off the work queue (if it’s non-empty) and resume that operation. This forms a FIFO queue which is guaranteed to progress due to each completed operation causing the next to resume (which will, in turn, cause the next to resume, etc., until the queue is empty).

A code example:

using GLib;

public class AvatarCache : Object
{
  private uint _n_ongoing_stores = 0;
  private Queue<DelegateWrapper> _pending_stores =
      new Queue<DelegateWrapper> ();

  /* Change this to change the cap on the number of concurrent
   * operations. */
  private const uint _max_n_ongoing_stores = 10;

  public async string store_avatar (string id, LoadableIcon avatar)
      throws GLib.Error
    {
      string retval = "";

      /* If the number of ongoing operations is at the limit,
       * queue this operation and yield. */
      if (this._n_ongoing_stores >
          AvatarCache._max_n_ongoing_stores)
        {
          /* Add to the pending queue. */
          var wrapper = new DelegateWrapper ();
          wrapper.cb = store_avatar.callback;
          this._pending_stores.push_tail ((owned) wrapper);
          yield;
        }

      /* Do the actual store operation. */
      try
        {
          this._n_ongoing_stores++;
          retval = yield this._store_avatar_unlimited (id, avatar);
        }
      finally
        {
          this._n_ongoing_stores--;

          /* If there is a store operation pending, resume it,
           * FIFO-style. */
          var wrapper = this._pending_stores.pop_head ();
          if (wrapper != null)
            {
              wrapper.cb ();
            }
        }

      return retval;
    }

  private async string _store_avatar_unlimited (string id,
                                                LoadableIcon avatar)
      throws GLib.Error
    {
      return /* the actual computation goes here */;
    }
}

/* See:
 * https://mail.gnome.org/archives/vala-list/2011-June/msg00005.html */
[Compact]
private class DelegateWrapper
{
  public SourceFunc cb;
}

This is all done using Vala’s asynchronous operation support, so runs in a single thread using the global default main context, and no locking or thread synchronisation is needed. If you were to use AvatarCache.store_avatar() from multiple threads, locking would have to be added and things would become more complex.

As with the single-thread synchronisation example from before, the key lines are: wrapper.cb = store_avatar.callback; yield, which stores the current function pointer and its closure (in Vala terminology, a delegate with target for the current method); and wrapper.cb (), which calls that function pointer with the stored closure (in Vala terminology, executes the delegate), effectively resuming computation from the yield statement.

So that’s it: a way to rate-limit concurrent method calls in Vala, ensuring they all block correctly (i.e. calls which are waiting for earlier ones to complete continue to block until they themselves are resumed and complete computation). By changing the scheduling function applied to the GQueue, priorities can be applied to queued calls if desired.

What is GMainContext?

This article has been tweaked and upstreamed to developer.gnome.org. The original is kept below, but future updates will be made there. If you find a problem, please file a bug.

GMainContext is at the core of almost every GLib application, yet it was only recently that I took the time to fully explore it — the details of it have always been a mystery. Doing some I/O work required me to look a little closer and try to get my head around the ins and outs of GMainContext, GMainLoop and GSources. Here I’ll try and write down a bit of what I’ve learned. If you want to skip to the conclusion, there’s a list of key points for using GMainContexts in libraries at the bottom of the post.

What is GMainContext? It’s a generalised implementation of an event loop, useful for implementing polled file I/O or event-based widget systems (i.e. GTK+). If you don’t know what poll() does, read about that first, since GMainContext can’t be properly understood without understanding polled I/O. A GMainContext has a set of GSources which are ‘attached’ to it, each of which can be thought of as an expected event with an associated callback function which will be invoked when that event is received; or equivalently as a set of file descriptors (FDs) to check. An event could be a timeout or data being received on a socket, for example. One iteration of the event loop will:

  1. Prepare sources, determining if any of them are ready to dispatch immediately.
  2. Poll the sources, blocking the current thread until an event is received for one of the sources.
  3. Check which of the sources received an event (several could have).
  4. Dispatch callbacks from those sources.

This is explained very well in the GLib documentation.

At its core, GMainContext is just a poll() loop, with the preparation, check and dispatch stages of the loop corresponding to the normal preamble and postamble in a typical poll() loop implementation, such as listing 1 from http://www.linux-mag.com/id/357/. Typically, some complexity is needed in non-trivial poll()-using applications to track the lists of FDs which are being polled. Additionally, GMainContext adds a lot of useful functionality which vanilla poll() doesn’t support. Most importantly, it adds thread safety.

GMainContext is completely thread safe, meaning that a GSource can be created in one thread and attached to a GMainContext running in another thread. A typical use for this might be to allow worker threads to control which sockets are being listened to by a GMainContext in a central I/O thread. Each GMainContext is ‘acquired’ by a thread for each iteration it’s put through. Other threads cannot iterate a GMainContext without acquiring it, which guarantees that a GSource and its FDs will only be polled by one thread at once (since each GSource is attached to at most one GMainContext). A GMainContext can be swapped between threads across iterations, but this is expensive.

Why use GMainContext instead of poll()? Mostly for convenience, as it takes all the grunt work out of dynamically managing the array of FDs to pass to poll(), especially when operating over multiple threads. This is done by encapsulating FDs in GSources, which decide whether those FDs should be passed to the poll() call on each ‘prepare’ stage of the main context iteration.

So if that’s GMainContext, what’s GMainLoop? Ignoring reference counting and locking gubbins, it is essentially just the following three lines of code (in g_main_loop_run()):

loop-&amp;gt;is_running = TRUE;
while (loop-&amp;gt;is_running)
	g_main_context_iteration (context, TRUE);

Plus a fourth line in g_main_loop_quit() which sets loop->is_running = FALSE and which will cause the loop to terminate once the current main context iteration ends. i.e. GMainLoop is a convenient, thread-safe way of running a GMainContext to process events until a desired exit condition is met, at which point you call g_main_loop_quit(). Typically, in a UI program, this will be the user clicking ‘exit’. In a socket handling program, this might be the final socket closing.

It is important not to confuse main contexts with main loops. Main contexts do the bulk of the work: preparing source lists, waiting for events, and dispatching callbacks. A main loop just iterates a context.

One of the important features of GMainContext is its support for ‘default’ contexts. There are two levels of default context: the thread-default, and the global-default. The global-default (accessed using g_main_context_default()) is what’s run by GTK+ when you call gtk_main(). It’s also used for timeouts (g_timeout_add()) and idle callbacks (g_idle_add()) — these won’t be dispatched unless the default context is running!

What are the thread-default contexts then? These are a later addition to GLib (since version 2.22), and are generally used for I/O operations which need to run and dispatch callbacks in a thread. By calling g_main_context_push_thread_default() before starting an I/O operation, the thread-default context has been set, and the I/O operation can add its sources to that context. The context can then be run in a new main loop in an I/O thread, causing the callbacks to be dispatched on that thread’s stack rather than on the stack of the thread running the global-default main context. This allows I/O operations to be run entirely in a separate thread without explicitly passing a specific GMainContext pointer around everywhere.

Conversely, by starting a long-running operation with a specific thread-default context set, your code can guarantee that the operation’s callbacks will be emitted in that context, even if the operation itself runs in a worker thread. This is the principle behind GTask: when a new GTask is created, it stores a reference to the current thread-default context, and dispatches its completion callback in that context, even if the task itself is run using g_task_run_in_thread().

For example, the code below will run a GTask which performs two writes in parallel from a thread. The callbacks for the writes will be dispatched in the worker thread, whereas the callback from the task as a whole will be dispatched in the interesting context.

typedef struct {
	GMainLoop *main_loop;
	guint n_remaining;
} WriteData;

/* This is always called in the same thread as thread_cb() because
 * it’s always dispatched in the @worker_context. */
static void
write_cb (GObject *source_object, GAsyncResult *result,
          gpointer user_data)
{
	WriteData *data = user_data;
	GOutputStream *stream = G_OUTPUT_STREAM (source_object);
	GError *error = NULL;
	gssize len;

	/* Finish the write. */
	len = g_output_stream_write_finish (stream, result, &amp;amp;error);
	if (error != NULL) {
		g_error ("Error: %s", error-&amp;gt;message);
		g_error_free (error);
	}

	/* Check whether all parallel operations have finished. */
	write_data-&amp;gt;n_remaining--;

	if (write_data-&amp;gt;n_remaining == 0) {
		g_main_loop_quit (write_data-&amp;gt;main_loop);
	}
}

/* This is called in a new thread. */
static void
thread_cb (GTask *task, gpointer source_object, gpointer task_data,
           GCancellable *cancellable)
{
	/* These streams come from somewhere else in the program: */
	GOutputStream *output_stream1, *output_stream;
	GMainContext *worker_context;
	GBytes *data;
	const guint8 *buf;
	gsize len;

	/* Set up a worker context for the writes’ callbacks. */
	worker_context = g_main_context_new ();
	g_main_context_push_thread_default (worker_context);

	/* Set up the writes. */
	write_data.n_remaining = 2;
	write_data.main_loop = g_main_loop_new (worker_context, FALSE);

	data = g_task_get_task_data (task);
	buf = g_bytes_get_data (data, &amp;amp;len);

	g_output_stream_write_async (output_stream1, buf, len,
	                             G_PRIORITY_DEFAULT, NULL, write_cb,
	                             &amp;amp;write_data);
	g_output_stream_write_async (output_stream2, buf, len,
	                             G_PRIORITY_DEFAULT, NULL, write_cb,
	                             &amp;amp;write_data);

	/* Run the main loop until both writes have finished. */
	g_main_loop_run (write_data.main_loop);
	g_task_return_boolean (task, TRUE);  /* ignore errors */

	g_main_loop_unref (write_data.main_loop);

	g_main_context_pop_thread_default (worker_context);
	g_main_context_unref (worker_context);
}

/* This can be called from any thread. Its @callback will always be
 * dispatched in the thread which currently owns
 * @interesting_context. */
void
parallel_writes_async (GBytes *data,
                       GMainContext *interesting_context,
                       GCancellable *cancellable,
                       GAsyncReadyCallback callback,
                       gpointer user_data)
{
	GTask *task;

	g_main_context_push_thread_default (interesting_context);

	task = g_task_new (NULL, cancellable, callback, user_data);
	g_task_set_task_data (task, data,
	                      (GDestroyNotify) g_bytes_unref);
	g_task_run_in_thread (task, thread_cb);
	g_object_unref (task);

	g_main_context_pop_thread_default (interesting_context);
}

From the work I’ve been doing recently with GMainContext, here are a few rules of thumb for using main contexts in libraries which I’m going to follow in future:

  • Never iterate a context you don’t own, including the global-default or thread-default contexts, or you can cause the user’s sources to be dispatched unexpectedly and cause re-entrancy problems.
  • Always remove GSources from a main context once you’re done with them, especially if that context may have been exposed to the user (e.g. as a thread-default). Otherwise the user may keep a reference to the main context and continue iterating it after your code expects it to have been destroyed, potentially causing unexpected source dispatches in your code.
  • If your API is designed to be used in threads, or in a context-aware fashion, always document which context callbacks will be dispatched in. For example, “callbacks will always be dispatched in the context which is the thread-default at the time of the object’s construction”. Users of your API need to know this information.
  • Use g_main_context_invoke() to ensure callbacks are dispatched in the right context. It’s much easier than manually using g_idle_source_new().
  • Libraries should never use g_main_context_default() (or, equivalently, pass NULL to a GMainContext-typed parameter). Always store and explicitly use a specific GMainContext, even if that reduces to being some default context. This makes your code easier to split out into threads in future, if needed, without causing hard-to-debug problems with callbacks being invoked in the wrong context.
  • Always write things asynchronously internally (using the amazing GTask where appropriate), and keep synchronous wrappers to the very top level, where they can be implemented by calling g_main_context_iteration() on a specific GMainContext. Again, this makes future refactoring easier. You can see it in the above example: the thread uses g_output_stream_write_async() rather than g_output_stream_write().
  • Always match pushes and pops of the thread-default main context.

In a future post, I hope to explain in detail what’s in a GSource, and how to implement one, plus do some more in-depth comparison of poll() and GMainContext. Any feedback or corrections are gratefully received!

Single-thread synchronisation in Vala

Update: You might also be interested in the follow-up article on rate limiting of asynchronous calls in Vala.

In libfolks, there are some asynchronous prepare() methods, which may be called multiple times, but must only change program state when called for the first time — subsequent calls must do nothing. As the prepare() method is asynchronous, and yields on other asynchronous methods, control may be yielded to the main loop part-way through running. At this point, another call to prepare() could happen from the same thread; and without adequate synchronisation the second call would proceed to change program state a second time. This is bad. (Throughout this post I’m only considering calls from a single thread; multi-threaded concurrency is completely different, and can rely on locking for safety.)

What are the possible solutions?

  • Locking.
  • Condition variables.
  • Boolean “prepare() in progress” flag.

Locking and condition variables can’t be used because the prepare() calls may come from any thread. Vala uses recursive locks, so calls from the same thread wouldn’t get blocked at all. If non-recursive locks were used, calls from the same thread would cause a deadlock.

A boolean “prepare() in progress” flag is what folks used to use. This was set to true as soon as the first asynchronous call began, so it prevented subsequent calls from changing program state erroneously, but also meant that subsequent calls returned immediately. Returning from a prepare() call typically implies that preparation has completed — but with this implementation that wasn’t correct for anything except the first call to prepare(). How can a call know that it’s not the first call? It can’t. This solution is unsatisfactory.

The solution which has just been implemented in folks is to use a list of waiting continuations for all calls subsequent to the first. The first call will execute all these continuations once it’s finished. This solution feels quite elegant to me, but suggestions from improvement are more than welcome.

A code example and explanation:

public class Logger : GLib.Object
{
  private static Logger _logger;
  private static DelegateWrapper[] _prepare_waiters = null;

  public async void prepare () throws GLib.Error
    {
      if (Logger._logger == null && Logger._prepare_waiters == null)
        {
          /* If this is the first call to prepare(), start some async calls. We
           * then yield to the main thread. Any subsequent calls to prepare()
           * will have their continuations added to the _prepare_waiters list,
           * and will be signalled once the first call returns. */
          Logger._prepare_waiters = new DelegateWrapper[0];

          /* Insert asynchronous initial preparation code here. */
          Logger._logger = yield new Logger ();

          /* Wake up any waiters. */
          foreach (unowned DelegateWrapper wrapper in Logger._prepare_waiters)
            {
              wrapper.cb ();
            }

          Logger._prepare_waiters = null;
        }
      else if (Logger._logger == null && Logger._prepare_waiters != null)
        {
          /* Yield until the first ongoing prepare() call finishes. */
          var wrapper = new DelegateWrapper ();
          wrapper.cb = prepare.callback;
          Logger._prepare_waiters += (owned) wrapper;
          yield;
        }

      /* Put shared code here which is executed on every call to prepare().
       * For example, it could check for failure in the results of the
       * initial call and throw an error as appropriate. */
      if (Logger._logger == null)
        {
          /* Throw an error or something. */
          return;
        }
    }
}

/* See: https://mail.gnome.org/archives/vala-list/2011-June/msg00005.html */
[Compact]
private class DelegateWrapper
{
  public SourceFunc cb;
}

(For the full code to this example, see folks’ source code.) The first call to prepare() is meant to asynchronously set the static Logger._logger variable. Subsequent calls must not re-initialise this variable, but should block until the variable has been set.

How does this work? The first call to prepare() finds Logger._logger is null and Logger._prepare_waiters is also null, so it falls into the initialisation block. It sets Logger._prepare_waiters to be an empty array (which is distinct from null), then proceeds to initialise Logger._logger asynchronously. Since the initialisation is asynchronous, control returns to the main loop, and a second call to prepare() could be made. Assume it is. The second call finds Logger._logger is still null, but finds that Logger._prepare_waiters is non-null, and so falls into the second branch of the if-statement. This saves its continuation into the array of waiters, and then yields control to the main loop again, indefinitely delaying the second asynchronous call from finishing. The first call returns from its asynchronous initialisation code, and proceeds to “Wake up any waiters”. For each of the saved waiters, it calls the waiter’s continuation (in an idle callback), which resumes execution just after the yield statement. Consequently, each waiter executes the shared code at the bottom of prepare(), then its prepare() call finishes and any asynchronous callbacks are executed in normal GLib fashion. The first call then proceeds to the shared code at the bottom of prepare(). Finally, its asynchronous callback is executed, and everything’s finished.

This all hinges on the fact that Vala supports “delegates with targets” (or continuations): the prepare.callback in the code above. This is a copy of the runtime state of the prepare() call, complete enough that it can later be called (using wrapper.cb()) to continue execution as if it never stopped at the yield. In C terms, a delegate is a function pointer; a delegate with a target is a function pointer with a struct full of the values of local variables at the time the delegate was stored. This is similar to the this pointer in C++, but includes more local state.

Why is DelegateWrapper needed? Why can’t the array just contain the continuations directly? Good question. I’d like to know the answer. Vala throws an error message if one tries to define an array of delegates with targets (“Delegates with target are not supported as array element type”). Other methods of encapsulating the continuations result in just the delegate being stored, and the target (local state) being lost, which is not what we want. I got the tip to use a DelegateWrapper class from Jim Nelson in this mailing list post (thanks!).

So that’s it: one way to synchronise multiple concurrent asynchronous method calls in Vala.