[tracker] Add threaded turtle pull parser



commit 3d34a1f2daf4af7d9fe8b83df3144fd5022b7618
Author: Jürg Billeter <j bitron ch>
Date:   Wed Apr 15 14:55:36 2009 +0200

    Add threaded turtle pull parser
---
 configure.ac                         |    5 +
 src/libtracker-data/Makefile.am      |    3 +
 src/libtracker-data/tracker-turtle.c |  239 +++++++++++++++++++++++++++++++++-
 src/libtracker-data/tracker-turtle.h |   35 +++---
 4 files changed, 259 insertions(+), 23 deletions(-)

diff --git a/configure.ac b/configure.ac
index 8454c26..5667d30 100644
--- a/configure.ac
+++ b/configure.ac
@@ -203,6 +203,11 @@ AC_SUBST(GCONF_LIBS)
 
 AM_CONDITIONAL(HAVE_GCONF, test "$have_gconf" = "yes")
 
+# Check for libuuid
+PKG_CHECK_MODULES(UUID, [uuid])
+AC_SUBST(UUID_CFLAGS)
+AC_SUBST(UUID_LIBS)
+
 # Check for Raptor
 PKG_CHECK_MODULES(RAPTOR, [raptor >= 1.4.17])
 AC_SUBST(RAPTOR_CFLAGS)
diff --git a/src/libtracker-data/Makefile.am b/src/libtracker-data/Makefile.am
index 63e2ba6..2e5d3e1 100644
--- a/src/libtracker-data/Makefile.am
+++ b/src/libtracker-data/Makefile.am
@@ -8,6 +8,7 @@ INCLUDES =								\
 	$(WARN_CFLAGS)							\
 	$(GLIB2_CFLAGS)							\
 	$(DBUS_CFLAGS)							\
+	$(UUID_CFLAGS)							\
 	$(RAPTOR_CFLAGS)						\
 	$(GCOV_CFLAGS)
 
@@ -42,6 +43,8 @@ libtracker_data_la_LIBADD = 						\
 	$(top_builddir)/src/libtracker-db/libtracker-db.la		\
 	$(DBUS_LIBS)							\
 	$(GLIB2_LIBS)							\
+	$(UUID_LIBS)							\
 	$(RAPTOR_LIBS)							\
 	$(GCOV_LIBS)							\
 	-lz
+
diff --git a/src/libtracker-data/tracker-turtle.c b/src/libtracker-data/tracker-turtle.c
index f67ff79..102eabb 100644
--- a/src/libtracker-data/tracker-turtle.c
+++ b/src/libtracker-data/tracker-turtle.c
@@ -29,9 +29,24 @@
 #include <gio/gio.h>
 #include <glib/gstdio.h>
 
+#include <uuid.h>
+
 #include "tracker-turtle.h"
 
-static gboolean initialized = FALSE;
+static gboolean  initialized = FALSE;
+static GMutex   *turtle_mutex;
+static GCond    *turtle_cond;
+
+static gboolean turtle_first;
+static gchar * volatile turtle_subject;
+static gchar * volatile turtle_predicate;
+static char * volatile turtle_object;
+static volatile gboolean     turtle_eof;
+
+typedef struct {
+	gchar                *about_uri;
+	TurtleFile           *turtle; /* For internal use only */
+} TrackerTurtleMetadataItem;
 
 struct TurtleFile {
 	FILE              *file;
@@ -45,6 +60,11 @@ typedef struct {
 	GHashTable        *hash;
 } TurtleOptimizerInfo;
 
+typedef struct {
+	gchar *file;
+	gchar *base_uri;
+} TurtleThreadData;
+
 void
 tracker_turtle_init (void)
 {
@@ -153,8 +173,8 @@ consume_triple_optimizer (void                   *user_data,
 
 static void
 foreach_in_metadata (TrackerProperty *field, 
-		     gpointer      value, 
-		     gpointer      user_data)
+		     gpointer         value, 
+		     gpointer         user_data)
 {
 	raptor_statement          *statement;
 	TrackerTurtleMetadataItem *item = user_data;
@@ -192,6 +212,8 @@ foreach_in_metadata (TrackerProperty *field,
 	g_free (statement);
 }
 
+
+
 TurtleFile *
 tracker_turtle_open (const gchar *turtle_file)
 {
@@ -248,9 +270,155 @@ raptor_error (void           *user_data,
 	      raptor_locator *locator, 
 	      const gchar    *message)
 {
-	g_message ("RAPTOR parse error: %s for %s\n", 
-		   message, 
-		   (gchar *) user_data);
+	g_message ("RAPTOR parse error: %s:%d:%d: %s\n", 
+		   (gchar *) user_data,
+		   locator->line,
+		   locator->column,
+		   message);
+}
+
+static unsigned char*
+turtle_generate_id (void              *user_data,
+                    raptor_genid_type  type,
+                    unsigned char     *user_id)
+{
+	static gint id = 0;
+
+	/* user_id is NULL for anonymous nodes */
+	if (user_id == NULL) {
+		return (guchar *) g_strdup_printf (":%d", ++id);
+	} else {
+		GChecksum   *checksum;
+		const gchar *sha1;
+
+		checksum = g_checksum_new (G_CHECKSUM_SHA1);
+		/* base UUID, unique per file */
+		g_checksum_update (checksum, user_data, 16);
+		/* node ID */
+		g_checksum_update (checksum, user_id, -1);
+
+		sha1 = g_checksum_get_string (checksum);
+
+		/* generate name based uuid */
+		return (guchar *) g_strdup_printf (
+			"urn:uuid:%.8s-%.4s-%.4s-%.4s-%.12s",
+			sha1, sha1 + 8, sha1 + 12, sha1 + 16, sha1 + 20);
+	}
+}
+
+static void
+turtle_statement_handler (void                   *user_data,
+                          const raptor_statement *triple) 
+{
+	g_mutex_lock (turtle_mutex);
+
+	/* wait until last statement has been released */
+	while (turtle_subject != NULL) {
+		g_cond_wait (turtle_cond, turtle_mutex);
+	}
+
+	/* set new statement */
+	turtle_subject = g_strdup ((const gchar *) raptor_uri_as_string ((raptor_uri *) triple->subject));
+	turtle_predicate = g_strdup ((const gchar *) raptor_uri_as_string ((raptor_uri *) triple->predicate));
+	turtle_object = g_strdup ((const gchar *) triple->object);
+
+	/* signal main thread to pull statement */
+	g_cond_signal (turtle_cond);
+
+	g_mutex_unlock (turtle_mutex);
+}
+
+static gboolean
+turtle_next (void)
+{
+	g_mutex_lock (turtle_mutex);
+
+	/* release last statement */
+	if (turtle_first) {
+		/* never release first statement */
+		turtle_first = FALSE;
+	} else if (turtle_subject != NULL) {
+		g_free (turtle_subject);
+		g_free (turtle_predicate);
+		g_free (turtle_object);
+
+		turtle_subject = NULL;
+		turtle_predicate = NULL;
+		turtle_object = NULL;
+
+		/* notify thread that last statement has been cleared */
+		g_cond_signal (turtle_cond);
+	}
+
+	/* wait for new statement or EOF */
+	while (turtle_subject == NULL && !turtle_eof) {
+		g_cond_wait (turtle_cond, turtle_mutex);
+	}
+
+	g_mutex_unlock (turtle_mutex);
+
+	return !turtle_eof;
+}
+
+static gpointer
+turtle_thread_func (gpointer data)
+{
+	TurtleThreadData *thread_data;
+	unsigned char  *uri_string;
+	raptor_uri     *uri, *buri;
+	raptor_parser  *parser;
+	uuid_t          base_uuid;
+
+	thread_data = (TurtleThreadData *) data;
+
+	parser = raptor_new_parser ("turtle");
+
+	/* generate UUID as base for blank nodes */
+	uuid_generate (base_uuid);
+
+	raptor_set_statement_handler (parser, NULL, (raptor_statement_handler) turtle_statement_handler);
+	raptor_set_generate_id_handler (parser, base_uuid, turtle_generate_id);
+	raptor_set_fatal_error_handler (parser, (void *)thread_data->file, raptor_error);
+	raptor_set_error_handler (parser, (void *)thread_data->file, raptor_error);
+	raptor_set_warning_handler (parser, (void *)thread_data->file, raptor_error);
+
+	uri_string = raptor_uri_filename_to_uri_string (thread_data->file);
+	uri = raptor_new_uri (uri_string);
+	if (thread_data->base_uri != NULL) {
+		buri = raptor_new_uri ((unsigned char *) thread_data->base_uri);
+	} else {
+		buri = NULL;
+	}
+
+	raptor_parse_file (parser, uri, buri);
+
+	g_mutex_lock (turtle_mutex);
+
+	/* wait until last statement has been released */
+	while (turtle_subject != NULL) {
+		g_cond_wait (turtle_cond, turtle_mutex);
+	}
+
+	turtle_eof = TRUE;
+
+	/* signal main thread to pull eof */
+	g_cond_signal (turtle_cond);
+
+	g_mutex_unlock (turtle_mutex);
+
+	raptor_free_uri (uri);
+	raptor_free_memory (uri_string);
+	if (buri != NULL) {
+		raptor_free_uri (buri);
+	}
+
+	raptor_free_parser (parser);
+
+	g_free (thread_data->file);
+	g_free (thread_data->base_uri);
+	g_free (thread_data);
+
+	return NULL;
 }
 
 void
@@ -343,3 +511,62 @@ tracker_turtle_optimize (const gchar *turtle_file)
 
 	g_free (tmp_file);
 }
+
+void
+tracker_turtle_reader_init (const gchar *turtle_file,
+                            const gchar *base_uri)
+{
+	GThread        *parser_thread;
+	TurtleThreadData *thread_data;
+
+	if (!initialized) {
+		g_critical ("Using tracker_turtle module without initialization");
+	}
+
+	turtle_mutex = g_mutex_new ();
+	turtle_cond = g_cond_new ();
+
+	thread_data = g_new0 (TurtleThreadData, 1);
+	thread_data->file = g_strdup (turtle_file);
+	thread_data->base_uri = g_strdup (base_uri);
+
+	turtle_first = TRUE;
+
+	parser_thread = g_thread_create (turtle_thread_func, thread_data, FALSE, NULL);
+}
+
+gboolean
+tracker_turtle_reader_next (void)
+{
+	if (turtle_next ()) {
+		return TRUE;
+	} else {
+		/* EOF, cleanup */
+
+		turtle_eof = FALSE;
+
+		g_mutex_free (turtle_mutex);
+		g_cond_free (turtle_cond);
+
+		return FALSE;
+	}
+}
+
+const gchar *
+tracker_turtle_reader_get_subject (void)
+{
+	return turtle_subject;
+}
+
+const gchar *
+tracker_turtle_reader_get_predicate (void)
+{
+	return turtle_predicate;
+}
+
+const gchar *
+tracker_turtle_reader_get_object (void)
+{
+	return turtle_object;
+}
+
diff --git a/src/libtracker-data/tracker-turtle.h b/src/libtracker-data/tracker-turtle.h
index 8ea31c1..28d9558 100644
--- a/src/libtracker-data/tracker-turtle.h
+++ b/src/libtracker-data/tracker-turtle.h
@@ -25,7 +25,7 @@
 
 #include <stdio.h>
 
-#include <libtracker-data/tracker-data-metadata.h>
+#include <libtracker-common/tracker-common.h>
 
 #include <raptor.h>
 
@@ -42,29 +42,30 @@ typedef void (* TurtleTripleCallback) (void                         *user_data,
 
 typedef struct TurtleFile TurtleFile;
 
-typedef struct {
-	gchar                *about_uri;
-	TrackerDataMetadata  *metadata;
-	TurtleFile           *turtle; /* For internal use only */
-} TrackerTurtleMetadataItem;
-
 /* Initialization (use in both cases) */
 void        tracker_turtle_init          (void);
 void        tracker_turtle_shutdown      (void);
 
 /* Transactions style */
-TurtleFile *tracker_turtle_open          (const gchar          *turtle_file);
-void        tracker_turtle_add_triple    (TurtleFile           *turtle,
-					  const gchar          *uri,
-					  TrackerProperty         *property,
-					  const gchar          *value);
-void        tracker_turtle_close         (TurtleFile           *turtle);
+TurtleFile *tracker_turtle_open            (const gchar         *turtle_file);
+void        tracker_turtle_add_triple      (TurtleFile          *turtle,
+					    const gchar         *uri,
+					    TrackerProperty        *property,
+					    const gchar         *value);
+void        tracker_turtle_close           (TurtleFile          *turtle);
 
 /* Reading functions */
-void        tracker_turtle_process       (const gchar          *turtle_file,
-					  const gchar          *base_uri,
-					  TurtleTripleCallback  callback,
-					  void                 *user_data);
+void        tracker_turtle_process         (const gchar          *turtle_file,
+					    const gchar          *base_uri,
+					    TurtleTripleCallback  callback,
+					    void                 *user_data);
+
+void         tracker_turtle_reader_init          (const gchar    *turtle_file,
+					          const gchar    *base_uri);
+gboolean     tracker_turtle_reader_next          (void);
+const gchar *tracker_turtle_reader_get_subject   (void);
+const gchar *tracker_turtle_reader_get_predicate (void);
+const gchar *tracker_turtle_reader_get_object    (void);
 
 /* Optimizer, reparser */
 void        tracker_turtle_optimize      (const gchar          *turtle_file);



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]