[tracker/checkpoint: 16/16] libtracker-data: Switch to manual WAL checkpointing in a separate thread



commit 15f2cd5cb252173b3523e0c458e9e79773840894
Author: JÃrg Billeter <j bitron ch>
Date:   Thu Jun 23 14:56:17 2011 +0200

    libtracker-data: Switch to manual WAL checkpointing in a separate thread

 src/libtracker-data/libtracker-data.vapi          |    7 ++
 src/libtracker-data/tracker-db-interface-sqlite.c |   17 ++++
 src/libtracker-data/tracker-db-interface-sqlite.h |    4 +
 src/libtracker-data/tracker-db-manager.c          |    5 +-
 src/tracker-store/tracker-store.vala              |   88 +++++++++++++++++---
 5 files changed, 104 insertions(+), 17 deletions(-)
---
diff --git a/src/libtracker-data/libtracker-data.vapi b/src/libtracker-data/libtracker-data.vapi
index 02e9b43..491cf9e 100644
--- a/src/libtracker-data/libtracker-data.vapi
+++ b/src/libtracker-data/libtracker-data.vapi
@@ -57,10 +57,17 @@ namespace Tracker {
 		NONE
 	}
 
+	[CCode (has_target = false, cheader_filename = "libtracker-data/tracker-db-interface-sqlite.h")]
+	public delegate void DBWalCallback (int n_pages);
+
 	[CCode (cheader_filename = "libtracker-data/tracker-db-interface.h")]
 	public interface DBInterface : GLib.Object {
 		[PrintfFormat]
 		public abstract DBStatement create_statement (DBStatementCacheType cache_type, ...) throws DBInterfaceError;
+		[PrintfFormat]
+		public void execute_query (...) throws DBInterfaceError;
+		[CCode (cheader_filename = "libtracker-data/tracker-db-interface-sqlite.h")]
+		public void sqlite_wal_hook (DBWalCallback callback);
 	}
 
 	[CCode (cheader_filename = "libtracker-data/tracker-data-update.h")]
diff --git a/src/libtracker-data/tracker-db-interface-sqlite.c b/src/libtracker-data/tracker-db-interface-sqlite.c
index 0abb164..88f21d0 100644
--- a/src/libtracker-data/tracker-db-interface-sqlite.c
+++ b/src/libtracker-data/tracker-db-interface-sqlite.c
@@ -1024,6 +1024,23 @@ tracker_db_interface_sqlite_reset_collator (TrackerDBInterface *db_interface)
 	}
 }
 
+static gint
+wal_hook (gpointer     user_data,
+          sqlite3     *db,
+          const gchar *db_name,
+          gint         n_pages)
+{
+	((TrackerDBWalCallback) user_data) (n_pages);
+
+	return SQLITE_OK;
+}
+
+void
+tracker_db_interface_sqlite_wal_hook (TrackerDBInterface   *interface,
+                                      TrackerDBWalCallback  callback)
+{
+	sqlite3_wal_hook (interface->db, wal_hook, callback);
+}
 
 
 static void
diff --git a/src/libtracker-data/tracker-db-interface-sqlite.h b/src/libtracker-data/tracker-db-interface-sqlite.h
index e58ebc6..bdd1b56 100644
--- a/src/libtracker-data/tracker-db-interface-sqlite.h
+++ b/src/libtracker-data/tracker-db-interface-sqlite.h
@@ -32,6 +32,8 @@ G_BEGIN_DECLS
 
 #define TRACKER_COLLATION_NAME "TRACKER"
 
+typedef void (*TrackerDBWalCallback) (gint n_pages);
+
 TrackerDBInterface *tracker_db_interface_sqlite_new                    (const gchar              *filename,
                                                                         GError                  **error);
 TrackerDBInterface *tracker_db_interface_sqlite_new_ro                 (const gchar              *filename,
@@ -41,6 +43,8 @@ void                tracker_db_interface_sqlite_enable_shared_cache    (void);
 void                tracker_db_interface_sqlite_fts_init               (TrackerDBInterface       *interface,
                                                                         gboolean                  create);
 void                tracker_db_interface_sqlite_reset_collator         (TrackerDBInterface       *interface);
+void                tracker_db_interface_sqlite_wal_hook               (TrackerDBInterface       *interface,
+                                                                        TrackerDBWalCallback      callback);
 
 #if HAVE_TRACKER_FTS
 int                 tracker_db_interface_sqlite_fts_update_init        (TrackerDBInterface       *interface,
diff --git a/src/libtracker-data/tracker-db-manager.c b/src/libtracker-data/tracker-db-manager.c
index 3734e50..22964ba 100644
--- a/src/libtracker-data/tracker-db-manager.c
+++ b/src/libtracker-data/tracker-db-manager.c
@@ -267,9 +267,8 @@ db_set_params (TrackerDBInterface   *iface,
 			g_object_unref (stmt);
 		}
 
-		/* increase WAL autocheckpoint threshold from the default of 1000 pages
-		   to 10000 pages to improve update performance */
-		tracker_db_interface_execute_query (iface, NULL, "PRAGMA wal_autocheckpoint = 10000");
+		/* disable autocheckpoint */
+		tracker_db_interface_execute_query (iface, NULL, "PRAGMA wal_autocheckpoint = 0");
 
 		if (page_size != TRACKER_DB_PAGE_SIZE_DONT_SET) {
 			g_message ("  Setting page size to %d", page_size);
diff --git a/src/tracker-store/tracker-store.vala b/src/tracker-store/tracker-store.vala
index 120dbf4..ec4b0e9 100644
--- a/src/tracker-store/tracker-store.vala
+++ b/src/tracker-store/tracker-store.vala
@@ -30,6 +30,7 @@ public class Tracker.Store {
 	static bool update_running;
 	static ThreadPool<Task> update_pool;
 	static ThreadPool<Task> query_pool;
+	static ThreadPool<bool> checkpoint_pool;
 	static GenericArray<Task> running_tasks;
 	static int max_task_time;
 	static bool active;
@@ -212,24 +213,29 @@ public class Tracker.Store {
 				var cursor = Tracker.Data.query_sparql_cursor (query_task.query);
 
 				query_task.in_thread (cursor);
-			} else if (task.type == TaskType.UPDATE) {
-				var update_task = (UpdateTask) task;
+			} else {
+				var iface = DBManager.get_db_interface ();
+				iface.sqlite_wal_hook (wal_hook);
 
-				Tracker.Data.update_sparql (update_task.query);
-			} else if (task.type == TaskType.UPDATE_BLANK) {
-				var update_task = (UpdateTask) task;
+				if (task.type == TaskType.UPDATE) {
+					var update_task = (UpdateTask) task;
 
-				update_task.blank_nodes = Tracker.Data.update_sparql_blank (update_task.query);
-			} else if (task.type == TaskType.TURTLE) {
-				var turtle_task = (TurtleTask) task;
+					Tracker.Data.update_sparql (update_task.query);
+				} else if (task.type == TaskType.UPDATE_BLANK) {
+					var update_task = (UpdateTask) task;
 
-				var file = File.new_for_path (turtle_task.path);
+					update_task.blank_nodes = Tracker.Data.update_sparql_blank (update_task.query);
+				} else if (task.type == TaskType.TURTLE) {
+					var turtle_task = (TurtleTask) task;
 
-				Tracker.Events.freeze ();
-				try {
-					Tracker.Data.load_turtle_file (file);
-				} finally {
-					Tracker.Events.reset_pending ();
+					var file = File.new_for_path (turtle_task.path);
+
+					Tracker.Events.freeze ();
+					try {
+						Tracker.Data.load_turtle_file (file);
+					} finally {
+						Tracker.Events.reset_pending ();
+					}
 				}
 			}
 		} catch (Error e) {
@@ -242,6 +248,48 @@ public class Tracker.Store {
 		});
 	}
 
+	static void wal_checkpoint () {
+		try {
+			debug ("Checkpointing database...");
+			var iface = DBManager.get_db_interface ();
+			iface.execute_query ("PRAGMA wal_checkpoint");
+			debug ("Checkpointing complete...");
+		} catch (Error e) {
+			warning (e.message);
+		}
+	}
+
+	static int checkpointing;
+
+	static void wal_hook (int n_pages) {
+		// run in update thread
+
+		debug ("WAL: %d pages", n_pages);
+
+		if (n_pages >= 10000) {
+			// do immediate checkpointing (blocking updates)
+			// to prevent excessive wal file growth
+			wal_checkpoint ();
+		} else if (n_pages >= 1000) {
+			if (AtomicInt.compare_and_exchange (ref checkpointing, 0, 1)) {
+				// initiate asynchronous checkpointing (not blocking updates)
+				try {
+					checkpoint_pool.push (true);
+				} catch (Error e) {
+					warning (e.message);
+					AtomicInt.set (ref checkpointing, 0);
+				}
+			}
+		}
+	}
+
+	static void checkpoint_dispatch_cb (bool task) {
+		// run in checkpoint thread
+
+		wal_checkpoint ();
+		AtomicInt.set (ref checkpointing, 0);
+	}
+
 	public static void init () {
 		string max_task_time_env = Environment.get_variable ("TRACKER_STORE_MAX_TASK_TIME");
 		if (max_task_time_env != null) {
@@ -260,6 +308,7 @@ public class Tracker.Store {
 		try {
 			update_pool = new ThreadPool<Task> (pool_dispatch_cb, 1, true);
 			query_pool = new ThreadPool<Task> (pool_dispatch_cb, MAX_CONCURRENT_QUERIES, true);
+			checkpoint_pool = new ThreadPool<bool> (checkpoint_dispatch_cb, 1, true);
 		} catch (Error e) {
 			warning (e.message);
 		}
@@ -274,6 +323,7 @@ public class Tracker.Store {
 	public static void shutdown () {
 		query_pool = null;
 		update_pool = null;
+		checkpoint_pool = null;
 
 		for (int i = 0; i < Priority.N_PRIORITIES; i++) {
 			query_queues[i] = null;
@@ -424,6 +474,16 @@ public class Tracker.Store {
 			active_callback = null;
 		}
 
+		if (AtomicInt.get (ref checkpointing) != 0) {
+			// this will wait for checkpointing to finish
+			checkpoint_pool = null;
+			try {
+				checkpoint_pool = new ThreadPool<bool> (checkpoint_dispatch_cb, 1, true);
+			} catch (Error e) {
+				warning (e.message);
+			}
+		}
+
 		if (active) {
 			sched ();
 		}



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]