Re: New objects for GLib



Sebastian Wilhelmi <wilhelmi@ira.uka.de> writes:

> Hi everyone,
> 
> In the course of developing the multithreaded ORBit I wrote two GLib objects,
> that might be of general interest. The first one should in fact go into GLib,
> as it offeres asyncronous communication between threads, which makes life much
> more easy (and prevents everyone from reinventing the wheel). It is just a
> small wrapper aroung GQueue, so it shouldn't be a size problem. The stripped
> .o file is about 4K.
> 
> Here are the details for GAsyncQueue: (opaque datatype)

I implemented such a data structure a while back. It is attached to
this mail.

Instead of locking the entire queue for each lookup, it locks only a
single item. This should make the queue faster because a producer and
a consumer can then in most cases access it at the same time without
blocking.


This is the interface:

/* Protected queue
 */
typedef struct _LacQueue LacQueue;

LacQueue	*lac_queue_new			(void);
void		 lac_queue_free			(LacQueue	 *queue);
gboolean	 lac_queue_empty		(LacQueue	 *queue);
gpointer	 lac_queue_read			(LacQueue	 *queue,
						 gboolean	  may_block);
void		 lac_queue_write		(LacQueue	 *queue,
						 gpointer	  data);

#include "lac.h"

typedef struct _LacQueueNode LacQueueNode;
typedef enum _LacQueueNodeState {
  LAC_QUEUE_NODE_FREE,
  LAC_QUEUE_NODE_DATA
} LacQueueNodeState;

struct _LacQueue {
  LacQueueNode *data;
  LacQueueNode *free;
};

struct _LacQueueNode {
  LacQueueNodeState state;
  GMutex *state_mutex;
  GCond *state_cond;

  gpointer data;
  LacQueueNode *next;
};

G_LOCK_DEFINE_STATIC (lac_queue_global);
static GMemChunk *node_mem_chunk = NULL;
static LacQueueNode *node_free_list = NULL;

static void			 lac_queue_node_set_state	(LacQueueNode *node,
								 LacQueueNodeState state);
static LacQueueNodeState	 lac_queue_node_get_state	(LacQueueNode *node);
static LacQueueNode		*lac_queue_node_new		();
static void			 lac_queue_node_free_circle	(LacQueueNode *node);

static inline void
lac_queue_node_set_state (LacQueueNode *node, LacQueueNodeState state)
{
  g_mutex_lock (node->state_mutex);
  node->state = state;
  g_cond_signal (node->state_cond);
  g_mutex_unlock (node->state_mutex);
}

static inline LacQueueNodeState
lac_queue_node_get_state (LacQueueNode *node)
{
  LacQueueNodeState result;
  g_mutex_lock (node->state_mutex);
  result = node->state;
  g_mutex_unlock (node->state_mutex);
  return result;
}

static inline void
lac_queue_node_wait (LacQueueNode *node, LacQueueNodeState state)
{
  g_mutex_lock (node->state_mutex);
  while (node->state != state)
    g_cond_wait (node->state_cond, node->state_mutex);
  g_mutex_unlock (node->state_mutex);
}

static LacQueueNode *
lac_queue_node_new ()
{
  LacQueueNode *node;

  G_LOCK (lac_queue_global);
  if (node_free_list)
    {
      node = node_free_list;
      node_free_list = node->next;
    }
  else
    {
      if (!node_mem_chunk)
	node_mem_chunk = g_mem_chunk_create (LacQueueNode, 256, G_ALLOC_ONLY);
      node = g_chunk_new (LacQueueNode, node_mem_chunk);
    }
  G_UNLOCK (lac_queue_global);
  node->data = NULL;
  node->state = LAC_QUEUE_NODE_FREE;
  node->state_mutex = g_mutex_new ();
  node->state_cond = g_cond_new ();
  return node;
}

static void
lac_queue_node_free_circle (LacQueueNode *node)
{
  LacQueueNode *temp;

  G_LOCK (lac_queue_global);
  temp = node->next;
  node->next = node_free_list;
  node_free_list = temp;
  G_UNLOCK (lac_queue_global);
}

LacQueue *
lac_queue_new ()
{
  LacQueue *queue;
  
  queue = g_new (LacQueue, 1);
  queue->free = lac_queue_node_new();
  queue->free->next = lac_queue_node_new();
  queue->free->next->next = lac_queue_node_new();
  queue->free->next->next->next = queue->free;
  queue->data = queue->free;
  
  return queue;
}

void
lac_queue_free (LacQueue *queue)
{
  g_return_if_fail (queue != NULL);
  
  lac_queue_node_free_circle (queue->free);
  g_free (queue);
}

gboolean
lac_queue_empty (LacQueue *queue)
{
  g_return_val_if_fail (queue != NULL, FALSE);

  return (lac_queue_node_get_state (queue->data) != LAC_QUEUE_NODE_DATA);
}

gpointer
lac_queue_read (LacQueue *queue, gboolean may_block)
{
  gpointer result;

  g_return_val_if_fail (queue != NULL, NULL);

  if (may_block)
    lac_queue_node_wait (queue->data, LAC_QUEUE_NODE_DATA);
  else if (lac_queue_node_get_state (queue->data) != LAC_QUEUE_NODE_DATA)
    return NULL;

  result = queue->data->data;
  lac_queue_node_set_state (queue->data, LAC_QUEUE_NODE_FREE);
  queue->data = queue->data->next;
  
  return result;
}

void
lac_queue_write (LacQueue *queue, gpointer data)
{
  LacQueueNode *red_node;
  LacQueueNode *new;

  g_return_if_fail (queue != NULL);

  red_node = queue->free;
  if (lac_queue_node_get_state (queue->free->next) != LAC_QUEUE_NODE_FREE)
    {
      new = lac_queue_node_new ();
      new->next = red_node->next;
      red_node->next = new;
      queue->free = new;
    }
  else
    queue->free = queue->free->next;

  red_node->data = data;

  lac_queue_node_set_state (red_node, LAC_QUEUE_NODE_DATA);
}


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]