Re: New objects for GLib



Soeren Sandmann wrote:
> 
> Sebastian Wilhelmi <wilhelmi@ira.uka.de> writes:
> 
> > In my experience you often need to hold the lock even longer than the queue
> > itself (that's why I added the explicit lock/unlock functions and the
> > _unlocked function variants): Look at a part of GThreadPool:
> >
> > void
> > g_thread_pool_push (GThreadPool     *pool,
> >                     gpointer         data)
> > {
> >   GRealThreadPool *real = (GRealThreadPool*) pool;
> >   g_async_queue_lock (real->queue);
> >   if (g_async_queue_length_unlocked (real->queue) >= 0)
> >     {
> >       /* No thread is waiting in the queue */
> >       g_thread_pool_start_thread (real);
> >     }
> >   g_async_queue_push_unlocked (real->queue, data);
> >   g_async_queue_unlock (real->queue);
> > }
> >
> > Here I test the length and add a thread, when there is no thread waiting and I
> > release the lock after that. That makes things slower, ok, but I can now
> > garantee all sorts of properties, that would otherwise not be possible. I
> > know, it's not a very convincing example, but thats what I found to be
> > practical.
> 
> You can guarantee that the queue *is* in fact empty when you start a
> new thread. Without the lock you could only 'guarantee' that the queue
> would be 'almost' empty. I am not convinced that this guarantee is
> worth it. The number of extra threads started would be low. I did
> something similar which when stressed started only very few extra
> threads.
> 
> > Now for your code. It looks, like it only works for exactly one reader and
> > exactly one writer. But my Queue also works for multiple readers/writers and
> > thats also what is needed for a thread pool. A comparing performance test of
> > the two solutions might be interesting for the one-reader/one-writer case,
> > though.
> 
> That is true. I forgot that I wrote the queue to solve a
> producer/consumer problem. It is not very difficult to modify it to
> support more than one reader/writer, just add a reader-mutex and a
> writer-mutex. I have attached such a changed version. This version
> also cleans up a few leftovers from an old version.

I looked at the code. Actually there are so many lock calls here, that I'm not
sure it really is faster anymore. So I tried it. Here are the results. The
program is appended. It should work with CVS GLib. 

And there it turns out, that your solution is (noticably) slower than mine
(which indeed also baffles me a bit, so there might be an error in the
comparision, please check). Also it seems that lac_queue_read(bla, FALSE) will
not work, if another thread is already waiting.

Bye,
Sebastian
-- 
Sebastian Wilhelmi                   |            här ovanför alla molnen
mailto:wilhelmi@ira.uka.de           |     är himmlen så förunderligt blå
http://goethe.ira.uka.de/~wilhelmi   |
#include <glib.h>

/* Compile with latest CVS Glib: e.g.

      gcc -I. -o queue-compare queue-compare.c .libs/libglib.a \
         gthread/.libs/libgthread.a -lpthread

   from within the glib dir. */

/* LacQueue */

#define g_mutex_my_lock(a) if (1) { g_print("%d lock %s, \n", __LINE__, #a); g_mutex_lock (a); } else 
#define g_mutex_my_unlock(a) if (1) { g_print("%d unlock %s, \n", __LINE__, #a); g_mutex_unlock (a); } else

typedef struct _LacQueue LacQueue;
typedef struct _LacQueueNode LacQueueNode;
typedef enum _LacQueueNodeState {
  LAC_QUEUE_NODE_FREE,
  LAC_QUEUE_NODE_DATA
} LacQueueNodeState;

struct _LacQueue {
  LacQueueNode *data_node;
  LacQueueNode *free_node;
  GMutex *read_mutex;
  GMutex *write_mutex;
};

struct _LacQueueNode {
  LacQueueNodeState state;
  GMutex *state_mutex;
  GCond *state_cond;

  gpointer data;
  LacQueueNode *next;
};

G_LOCK_DEFINE (lac_queue_global);
static GMemChunk *node_mem_chunk= NULL;
static LacQueueNode *node_free_list = NULL;

static LacQueueNode     *lac_queue_node_new             ();
static void              lac_queue_node_free_circle     (LacQueueNode *node);

static 
LacQueueNode *
lac_queue_node_new ()
{
  LacQueueNode *node;

  G_LOCK (lac_queue_global);
  if (node_free_list)
    {
      node = node_free_list;
      node_free_list = node->next;
    }
  else
    {
      if (!node_mem_chunk)
        node_mem_chunk = g_mem_chunk_create (LacQueueNode, 256, G_ALLOC_ONLY);
      node = g_chunk_new (LacQueueNode, node_mem_chunk);
    }
  G_UNLOCK (lac_queue_global);
  node->data = NULL;
  node->state = LAC_QUEUE_NODE_FREE;
  node->state_mutex = g_mutex_new ();
  node->state_cond = g_cond_new ();
  return node;
}

static void
lac_queue_node_free_circle (LacQueueNode *node)
{
  LacQueueNode *temp;

  G_LOCK (lac_queue_global);
  temp = node->next;
  node->next = node_free_list;
  node_free_list = temp;
  G_UNLOCK (lac_queue_global);
}

LacQueue *
lac_queue_new ()
{
  LacQueue *queue;
  
  queue = g_new (LacQueue, 1);
  queue->free_node = lac_queue_node_new();
  queue->free_node->next = lac_queue_node_new();
  queue->free_node->next->next = lac_queue_node_new();
  queue->free_node->next->next->next = queue->free_node;
  queue->data_node = queue->free_node;
  queue->read_mutex = g_mutex_new ();
  queue->write_mutex = g_mutex_new ();

  return queue;
}

void
lac_queue_free (LacQueue *queue)
{
  g_return_if_fail (queue != NULL);
  
  lac_queue_node_free_circle (queue->free_node);
  g_mutex_free (queue->read_mutex);
  g_mutex_free (queue->write_mutex);
  g_free (queue);
}

gboolean
lac_queue_empty (LacQueue *queue)
{
  gboolean empty;

  g_return_val_if_fail (queue != NULL, FALSE);

  g_mutex_lock (queue->data_node->state_mutex);
  if (queue->data_node->state == LAC_QUEUE_NODE_DATA)
    empty = FALSE;
  else
    empty = TRUE;
  g_mutex_unlock (queue->data_node->state_mutex);

  return empty;
}

gpointer
lac_queue_read (LacQueue *queue, gboolean may_block)
{
  gpointer result;
  LacQueueNode *tmp;

  g_return_val_if_fail (queue != NULL, NULL);

  g_mutex_lock (queue->read_mutex);
  g_mutex_lock (queue->data_node->state_mutex);
  if (may_block)
    while (queue->data_node->state != LAC_QUEUE_NODE_DATA)
      g_cond_wait (queue->data_node->state_cond, queue->data_node->state_mutex);
  else if (queue->data_node->state != LAC_QUEUE_NODE_DATA)
    {
      g_mutex_unlock (queue->data_node->state_mutex);
      g_mutex_unlock (queue->read_mutex);
      return NULL;
    }
  result = queue->data_node->data;
  queue->data_node->state = LAC_QUEUE_NODE_FREE;
  tmp = queue->data_node;
  queue->data_node = queue->data_node->next;
  g_mutex_unlock (tmp->state_mutex);
  g_mutex_unlock (queue->read_mutex);
  return result;
}

void
lac_queue_write (LacQueue *queue, gpointer data)
{
  LacQueueNode *new;

  g_return_if_fail (queue != NULL);

  g_mutex_lock (queue->write_mutex);
  g_mutex_lock (queue->free_node->next->state_mutex);
  if (queue->free_node->next->state != LAC_QUEUE_NODE_FREE)
    {
      g_mutex_unlock (queue->free_node->next->state_mutex);
      new = lac_queue_node_new ();
      new->next = queue->free_node->next;
      queue->free_node->next = new;
    }
  else
    g_mutex_unlock (queue->free_node->next->state_mutex);

  g_mutex_lock (queue->free_node->state_mutex);
  queue->free_node->data = data;
  queue->free_node->state = LAC_QUEUE_NODE_DATA;
  g_mutex_unlock (queue->free_node->state_mutex);

  g_cond_signal (queue->free_node->state_cond);

  queue->free_node = queue->free_node->next;
  g_mutex_unlock (queue->write_mutex);
}

/* GAsyncQueue */

typedef struct _GAsyncQueue GAsyncQueue;
struct _GAsyncQueue
{
  GMutex *mutex;
  GCond *cond;
  GQueue *queue;
  guint waiting_threads;
};

GAsyncQueue*
g_async_queue_new ()
{
  GAsyncQueue* retval = g_new (GAsyncQueue, 1);
  retval->mutex = g_mutex_new ();
  retval->cond = g_cond_new ();
  retval->queue = g_queue_new ();
  retval->waiting_threads = 0;
  return retval;
}

void 
g_async_queue_free (GAsyncQueue *queue)
{
  g_assert (queue);

  g_mutex_free (queue->mutex);
  g_cond_free (queue->cond);
  g_queue_free (queue->queue);
}

void
g_async_queue_lock (GAsyncQueue *queue)
{
  g_assert (queue);

  g_mutex_lock (queue->mutex);
}

void 
g_async_queue_unlock (GAsyncQueue *queue)
{
  g_assert (queue);

  g_mutex_unlock (queue->mutex);
}

void
g_async_queue_push_unlocked (GAsyncQueue* queue, gpointer data)
{
  g_assert (queue);
  g_return_if_fail (data);

  g_queue_push_head (queue->queue, data);
  g_cond_signal (queue->cond);
}

void
g_async_queue_push (GAsyncQueue* queue, gpointer data)
{
  g_assert (queue);

  g_mutex_lock (queue->mutex);
  g_async_queue_push_unlocked (queue, data);
  g_mutex_unlock (queue->mutex);
}

static gpointer
g_async_queue_pop_intern_unlocked (GAsyncQueue* queue, gboolean try, 
                                   GTimeVal *end_time)
{
  gpointer retval;

  if (!g_queue_peek_tail (queue->queue))
    {
      if (try)
        return NULL;
      if (!end_time)
        {
          queue->waiting_threads++;
          while (!g_queue_peek_tail (queue->queue))
            g_cond_wait(queue->cond, queue->mutex);
          queue->waiting_threads--;
        }
      else
        {
          queue->waiting_threads++;
          while (!g_queue_peek_tail (queue->queue))
            if (!g_cond_timed_wait (queue->cond, queue->mutex, end_time))
              break;
          queue->waiting_threads--;
          if (!g_queue_peek_tail (queue->queue))
            return NULL;
        }
    }

  retval = g_queue_pop_tail (queue->queue);

  g_assert (retval);

  return retval;
}

gpointer
g_async_queue_pop (GAsyncQueue* queue)
{
  gpointer retval;

  g_return_val_if_fail (queue, NULL);

  g_mutex_lock (queue->mutex);
  retval = g_async_queue_pop_intern_unlocked (queue, FALSE, NULL);
  g_mutex_unlock (queue->mutex);

  return retval;
}

gpointer
g_async_queue_pop_unlocked (GAsyncQueue* queue)
{
  g_return_val_if_fail (queue, NULL);

  return g_async_queue_pop_intern_unlocked (queue, FALSE, NULL);
}

gpointer
g_async_queue_try_pop (GAsyncQueue* queue)
{
  gpointer retval;

  g_return_val_if_fail (queue, NULL);

  g_mutex_lock (queue->mutex);
  retval = g_async_queue_pop_intern_unlocked (queue, TRUE, NULL);
  g_mutex_unlock (queue->mutex);

  return retval;
}

gpointer
g_async_queue_try_pop_unlocked (GAsyncQueue* queue)
{
  g_return_val_if_fail (queue, NULL);

  return g_async_queue_pop_intern_unlocked (queue, TRUE, NULL);
}

gpointer
g_async_queue_timed_pop (GAsyncQueue* queue, GTimeVal *end_time)
{
  gpointer retval;

  g_return_val_if_fail (queue, NULL);

  g_mutex_lock (queue->mutex);
  retval = g_async_queue_pop_intern_unlocked (queue, FALSE, end_time);
  g_mutex_unlock (queue->mutex);

  return retval;  
}

gpointer
g_async_queue_timed_pop_unlocked (GAsyncQueue* queue, GTimeVal *end_time)
{
  g_return_val_if_fail (queue, NULL);

  return g_async_queue_pop_intern_unlocked (queue, FALSE, end_time);
}

gint
g_async_queue_length_unlocked (GAsyncQueue* queue)
{
  g_return_val_if_fail (queue, 0);

  return queue->queue->length - queue->waiting_threads;
}

gint
g_async_queue_length(GAsyncQueue* queue)
{
  glong retval;

  g_return_val_if_fail (queue, 0);

  g_mutex_lock (queue->mutex);
  retval = queue->queue->length - queue->waiting_threads;
  g_mutex_unlock (queue->mutex);

  return retval;
}

/* Main program */

GAsyncQueue *async_queue;
LacQueue *lac_queue;
G_LOCK_DEFINE_STATIC (lock);

#define NUM_OF_ELS 100000

void
first_thread (gpointer async)
{
  GTimer* timer = g_timer_new();
  int i;
  G_LOCK (lock);
  G_UNLOCK (lock);

  g_timer_start (timer);
  for (i = 0; i < NUM_OF_ELS; i++)
    {
      if (async)
	g_async_queue_push (async_queue, GUINT_TO_POINTER (i+1));
      else
	lac_queue_write (lac_queue, GUINT_TO_POINTER (i+1)); 
    }

  g_print ("pushing %d elements into %s: %f seconds\n", 
           NUM_OF_ELS, async ? "GAsyncQueue" : "LacQueue", 
	   g_timer_elapsed (timer, NULL));
}

void
second_thread (gpointer async)
{
  GTimer* timer = g_timer_new();
  int i;
  G_LOCK (lock);
  G_UNLOCK (lock);

  g_timer_start (timer);
  for (i = 0; i < NUM_OF_ELS; i++)
    {
      if (async)      
	g_assert (i+1 == GPOINTER_TO_UINT (g_async_queue_pop (async_queue)));
      else
	g_assert (i+1 == GPOINTER_TO_UINT (lac_queue_read (lac_queue, TRUE)));
	
    }

  g_print ("popping %d elements from %s: %f seconds\n", 
           NUM_OF_ELS, async ? "GAsyncQueue" : "LacQueue", 
	   g_timer_elapsed (timer, NULL));
}

int
main()
{
  GThread *thread1, *thread2;
  g_thread_init (NULL);
  async_queue = g_async_queue_new ();
  lac_queue = lac_queue_new ();
  G_LOCK (lock);
  thread1 = g_thread_create (first_thread, GUINT_TO_POINTER (FALSE), 0, TRUE, 
			     TRUE, G_THREAD_PRIORITY_NORMAL); 
  thread2 = g_thread_create (second_thread, GUINT_TO_POINTER (FALSE), 0, TRUE, 
			     TRUE, G_THREAD_PRIORITY_NORMAL);
  g_usleep (G_MICROSEC);
  G_UNLOCK (lock);
  
  g_thread_join (thread1);
  g_thread_join (thread2);

  G_LOCK (lock);
  thread1 = g_thread_create (first_thread, GUINT_TO_POINTER (TRUE), 0, TRUE, 
			     TRUE, G_THREAD_PRIORITY_NORMAL); 
  thread2 = g_thread_create (second_thread, GUINT_TO_POINTER (TRUE), 0, TRUE,
			     TRUE, G_THREAD_PRIORITY_NORMAL);
  g_usleep (G_MICROSEC);
  G_UNLOCK (lock);
  
  g_thread_join (thread1);
  g_thread_join (thread2);

  return 0;
}


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]