[tracker/miner-twitter: 2/4] Add Twitter/Identi.ca miner
- From: Adrien Bustany <abustany src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [tracker/miner-twitter: 2/4] Add Twitter/Identi.ca miner
- Date: Tue, 9 Mar 2010 11:53:19 +0000 (UTC)
commit 1f3ecb7658a2cf3ae91356b59c6f2dd051cd830c
Author: Adrien Bustany <abustany gnome org>
Date: Tue Mar 2 09:39:27 2010 -0300
Add Twitter/Identi.ca miner
configure.ac | 33 ++
src/Makefile.am | 3 +
src/tracker-miner-twitter/Makefile.am | 64 +++
src/tracker-miner-twitter/query-queue.vala | 58 +++
.../tracker-miner-twitter.vala | 437 ++++++++++++++++++++
5 files changed, 595 insertions(+), 0 deletions(-)
---
diff --git a/configure.ac b/configure.ac
index a46696d..f674bdc 100644
--- a/configure.ac
+++ b/configure.ac
@@ -158,6 +158,8 @@ LIBSTREAMANALYZER_REQUIRED=0.7.0
GEE_REQUIRED=0.3
ID3LIB_REQUIRED=3.8.3
GNOME_KEYRING_REQUIRED=2.26
+REST_REQUIRED=0.6
+TWITTER_GLIB_REQUIRED=0.9
# Library Checks
PKG_CHECK_MODULES(GLIB2, [glib-2.0 >= $GLIB_REQUIRED])
@@ -792,6 +794,32 @@ fi
AM_CONDITIONAL(HAVE_GNOME_KEYRING, test "x$have_gnome_keyring" = "xyes")
+##################################################################
+# Twitter miner
+##################################################################
+
+AC_ARG_ENABLE(miner_twitter,
+ AS_HELP_STRING([--enable-miner-twitter],
+ [enable Twitter miner [[default=auto]]]),,
+ [enable_miner_twitter=auto])
+
+if test "x$enable_miner_twitter" != "xno"; then
+ PKG_CHECK_MODULES(MINER_TWITTER,
+ [ rest-0.6 >= $REST_REQUIRED twitter-glib-1.0 >= $TWITTER_GLIB_REQUIRED ],
+ [have_miner_twitter_deps=yes],
+ [have_miner_twitter_deps=no])
+ AC_SUBST(MINER_TWITTER_LIBS)
+ AC_SUBST(MINER_TWITTER_CFLAGS)
+fi
+
+if test "x$enable_miner_twitter" = "xyes"; then
+ if test "x$have_miner_twitter_deps" != "xyes"; then
+ AC_MSG_ERROR([Couldn't find the required dependencies for the Twitter miner: rest-0.6 >= $REST_REQUIRED.])
+ fi
+fi
+
+AM_CONDITIONAL(HAVE_MINER_TWITTER, test "x$have_miner_twitter_deps" = "xyes")
+
####################################################################
# Mail miners
####################################################################
@@ -1678,6 +1706,7 @@ AC_CONFIG_FILES([
src/tracker-control/Makefile
src/tracker-extract/Makefile
src/tracker-miner-fs/Makefile
+ src/tracker-miner-twitter/Makefile
src/tracker-preferences/Makefile
src/tracker-preferences/tracker-preferences.desktop.in
src/tracker-search-bar/Makefile
@@ -1793,6 +1822,10 @@ Plugins/Extensions:
Evolution plugin (data-push): $enable_evolution_miner
KMail plugin (data-push): $enable_kmail_miner
+Extra miners:
+
+ Twitter miner $have_miner_twitter_deps
+
Writeback:
MP3 writeback: $have_id3lib
diff --git a/src/Makefile.am b/src/Makefile.am
index 2f5ff33..f4ee63d 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -39,3 +39,6 @@ if HAVE_TRACKER_EXPLORER
SUBDIRS += tracker-explorer
endif
+if HAVE_MINER_TWITTER
+SUBDIRS += tracker-miner-twitter
+endif
diff --git a/src/tracker-miner-twitter/Makefile.am b/src/tracker-miner-twitter/Makefile.am
new file mode 100644
index 0000000..9e26a00
--- /dev/null
+++ b/src/tracker-miner-twitter/Makefile.am
@@ -0,0 +1,64 @@
+include $(top_srcdir)/Makefile.decl
+
+INCLUDES = \
+ -Wall \
+ -DSHAREDIR=\""$(datadir)"\" \
+ -DPKGLIBDIR=\""$(libdir)/tracker"\" \
+ -DLOCALEDIR=\""$(localedir)"\" \
+ -DLIBEXEC_PATH=\""$(libexecdir)"\" \
+ -DG_LOG_DOMAIN=\"Tracker\" \
+ -DTRACKER_COMPILATION \
+ -I$(top_srcdir)/src \
+ $(WARN_CFLAGS) \
+ $(GMODULE_CFLAGS) \
+ $(PANGO_CFLAGS) \
+ $(DBUS_CFLAGS) \
+ $(MINER_TWITTER_CFLAGS) \
+ $(GCOV_CFLAGS)
+
+VALAFLAGS = \
+ --pkg gio-2.0 \
+ --pkg twitter-glib-1.0 \
+ --pkg posix \
+ --thread
+
+libexec_PROGRAMS = tracker-miner-twitter
+
+tracker_miner_twitter_VALASOURCES = \
+ tracker-miner-twitter.vala \
+ query-queue.vala \
+ $(top_srcdir)/src/vapi/tracker-client-0.7.vapi
+
+tracker_miner_twitter_SOURCES = \
+ $(tracker_miner_twitter_VALASOURCES:.vala=.c)
+
+tracker_miner_twitter_LDADD = \
+ $(top_builddir)/src/libtracker-miner/libtracker-miner- TRACKER_API_VERSION@.la \
+ $(top_builddir)/src/libtracker-client/libtracker-client- TRACKER_API_VERSION@.la \
+ $(DBUS_LIBS) \
+ $(GMODULE_LIBS) \
+ $(GTHREAD_LIBS) \
+ $(GIO_LIBS) \
+ $(GCOV_LIBS) \
+ $(GLIB2_LIBS) \
+ $(MINER_TWITTER_LIBS) \
+ -lz \
+ -lm
+
+vapi_sources = \
+ $(top_srcdir)/src/libtracker-miner/tracker-miner- TRACKER_API_VERSION@.vapi
+
+tracker-miner-twitter.vala.stamp: $(tracker_miner_twitter_VALASOURCES) $(vapi_sources)
+ $(AM_V_GEN)$(VALAC) $(GCOV_VALAFLAGS) -C $(VALAFLAGS) $^
+ touch $@
+
+
+BUILT_SOURCES = tracker-miner-twitter.vala.stamp
+
+MAINTAINERCLEANFILES = \
+ tracker-miner-twitter.vala.stamp \
+ $(tracker_miner_twitter_VALASOURCES:.vala=.c)
+
+EXTRA_DIST = \
+ $(tracker_miner_twitter_VALASOURCES) \
+ tracker-miner-twitter.vala.stamp
diff --git a/src/tracker-miner-twitter/query-queue.vala b/src/tracker-miner-twitter/query-queue.vala
new file mode 100644
index 0000000..ea3ac73
--- /dev/null
+++ b/src/tracker-miner-twitter/query-queue.vala
@@ -0,0 +1,58 @@
+public class QueryQueue : GLib.Object {
+ /* Holds the pending sparql updates and monitors them */
+ private HashTable<uint, string> queue;
+ private uint cookie;
+
+ private Mutex flush_mutex;
+
+ private Tracker.Miner miner;
+
+ public QueryQueue (Tracker.Miner parent) {
+ miner = parent;
+
+ queue = new HashTable<uint, string> (direct_hash, direct_equal);
+ cookie = 0;
+
+ flush_mutex = new Mutex ();
+ }
+
+ public async void append (string query) {
+ uint current_cookie = cookie ++;
+ queue.insert (current_cookie, query);
+
+ message ("SPARQL query: %s", query);
+
+ try {
+ yield miner.execute_batch_update (query);
+ } catch (Error tracker_error) {
+ warning ("BatchUpdate query failed: %s", tracker_error.message);
+ }
+
+ queue.remove (current_cookie);
+ }
+
+ /* BLOCKING flush */
+ public void flush () {
+ if (!flush_mutex.trylock ()) {
+ message ("There's already a flush taking place");
+ return;
+ }
+
+ if (queue.size () > 0) {
+ MainLoop wait_loop;
+ try {
+ wait_loop = new MainLoop (null, false);
+ miner.commit (null, () => { wait_loop.quit (); });
+ wait_loop.run ();
+ } catch (Error tracker_error) {
+ warning ("Commit query failed: %s", tracker_error.message);
+ }
+ }
+
+ flush_mutex.unlock ();
+ }
+
+ public uint size () {
+ return queue.size ();
+ }
+}
diff --git a/src/tracker-miner-twitter/tracker-miner-twitter.vala b/src/tracker-miner-twitter/tracker-miner-twitter.vala
new file mode 100644
index 0000000..0abf85b
--- /dev/null
+++ b/src/tracker-miner-twitter/tracker-miner-twitter.vala
@@ -0,0 +1,437 @@
+/*
+ * Copyright (C) 2010, Adrien Bustany <abustany gnome org>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+namespace Tracker {
+
+public class MinerTwitter : Tracker.MinerWeb {
+ public Twitter.Provider provider { get; construct; }
+ private string friendly_name; /* Human readable name of the provider */
+ private string channel_urn = null;
+
+ private Twitter.Client service;
+
+ private static const string DATASOURCE_URN = "urn:103e7d6e-2334-4cd2-b0a5-f1b0c8bb10ef";
+ private static const string SERVICE_DESCRIPTION = "Tracker miner for %s";
+
+ private static const uint PULL_INTERVAL = 60; /* seconds */
+ private uint pull_timeout_handle;
+
+ private QueryQueue query_queue;
+
+ /* used to store state information like last pulled tweet*/
+ private static const string STATE_FILE_NAME = "tracker-miner-%s.state";
+ private static const string STATE_FILE_GROUP = "General";
+ private KeyFile state_file;
+
+ private uint last_status_timestamp;
+
+ private static MainLoop main_loop;
+
+
+ construct {
+ if (provider == Twitter.Provider.DEFAULT_PROVIDER) {
+ set ("name", "Twitter");
+ friendly_name = "Twitter";
+ } else if (provider == Twitter.Provider.IDENTI_CA) {
+ set ("name", "Identica");
+ friendly_name = "Identi.ca";
+ } else {
+ assert_not_reached ();
+ }
+
+ set ("association-status", MinerWebAssociationStatus.UNASSOCIATED);
+
+ service = new Twitter.Client.full (provider, null, null, null);
+ service.status_received.connect (status_received_cb);
+ service.timeline_complete.connect (timeline_complete_cb);
+
+ state_file = new KeyFile ();
+ load_state_file ();
+
+ pull_timeout_handle = 0;
+ this.notify["association-status"].connect (association_status_changed);
+
+ query_queue = new QueryQueue (this);
+
+ init_feed_channel ();
+ }
+
+ public void shutdown () {
+ set ("association-status", MinerWebAssociationStatus.UNASSOCIATED);
+ save_state_file ();
+ }
+
+ private async void init_feed_channel () {
+ string sparql;
+ unowned PtrArray results;
+
+ sparql = "select ?c where { ?c a mfo:FeedChannel ; mfo:type ?type ."
+ + " ?type mfo:name ?typeName . "
+ + " FILTER (?typeName = \"%s\") }".printf (friendly_name);
+
+ try {
+ results = yield execute_sparql (sparql);
+
+ if (results.len == 0) {
+ /* No optimal, but we're waiting for blank support in TrackerMiner */
+ sparql = "insert { _:channel a mfo:FeedChannel ; rdfs:label \"%s home timeline\";".printf (friendly_name)
+ + " mfo:type [ a mfo:FeedType ; mfo:name \"%s\" ] }".printf (friendly_name);
+ yield execute_update (sparql);
+ sparql = "select ?c where { ?c a mfo:FeedChannel ; mfo:type ?type ."
+ + " ?type mfo:name ?typeName . "
+ + " FILTER (?typeName = \"%s\") }".printf (friendly_name);
+ results = yield execute_sparql (sparql);
+ }
+
+ assert (results.len == 1);
+ channel_urn = ((string[][])results.pdata)[0][0];
+ } catch (Error tracker_error) {
+ critical ("Couldn't initialize feed channel: %s", tracker_error.message);
+ }
+ }
+
+ private void status_received_cb (ulong handle, Twitter.Status? status, Error error) {
+ SparqlBuilder builder;
+ string name;
+ TimeVal tv = TimeVal ();
+
+ get ("name", out name);
+
+ if (error != null) {
+ if (!(error is Twitter.Error.NOT_MODIFIED)) {
+ warning ("An error occurred while pulling statuses: %s", error.message);
+ }
+ return;
+ }
+
+ builder = new SparqlBuilder.update ();
+
+ builder.insert_open (DATASOURCE_URN);
+ builder.subject ("_:author");
+ builder.predicate ("a");
+ builder.object ("nco:Contact");
+ builder.predicate ("nco:fullname");
+ builder.object_string (status.user.name);
+ builder.predicate ("nco:photo");
+ builder.object_blank_open ();
+ builder.predicate ("a");
+ builder.object ("nfo:RemoteDataObject");
+ builder.predicate ("nie:url");
+ builder.object_string (status.user.profile_image_url);
+ builder.object_blank_close (); /* nco:photo */
+
+ builder.subject ("_:message");
+ builder.predicate ("a");
+ builder.object ("mfo:FeedMessage");
+
+ builder.predicate ("a");
+ builder.object ("nfo:RemoteDataObject");
+ builder.predicate ("nie:url");
+ builder.object_string (status.url);
+
+ builder.predicate ("nmo:communicationChannel");
+ builder.object_iri (channel_urn);
+
+ builder.predicate ("nmo:messageId");
+ builder.object_string (rdf_message_id (status.id));
+
+ builder.predicate ("nmo:from");
+ builder.object ("_:author");
+
+ builder.predicate ("nie:plainTextContent");
+ builder.object_string (status.text);
+
+ if (status.reply_to_status != 0) {
+ builder.predicate ("nmo:inReplyTo");
+ builder.object_blank_open ();
+ builder.predicate ("a");
+ builder.object ("mfo:FeedMessage");
+ builder.predicate ("nmo:communicationChannel");
+ builder.object_iri (channel_urn);
+ builder.predicate ("nmo:messageId");
+ builder.object_string (rdf_message_id (status.reply_to_status));
+ builder.object_blank_close ();
+ }
+
+ if (Twitter.date_to_time_val (status.created_at, out tv)) {
+ builder.predicate ("nmo:receivedDate");
+ builder.object_string (tv.to_iso8601 ());
+
+ /* We receive the status in chronological order */
+ last_status_timestamp = (int)tv.tv_sec;
+ }
+
+ tv.get_current_time ();
+ builder.predicate ("mfo:downloadedTime");
+ builder.object_string (tv.to_iso8601 ());
+
+ builder.insert_close ();
+
+ query_queue.append (builder.get_result ());
+ }
+
+ private void timeline_complete_cb () {
+ message ("Timeline downloaded");
+
+ query_queue.flush ();
+ state_file.set_integer (STATE_FILE_GROUP, "since", (int)last_status_timestamp);
+ }
+
+ private string rdf_message_id (uint status_id)
+ {
+ string name;
+
+ get ("name", out name);
+ return "feed:%s:%u".printf (name, status_id);
+ }
+
+ private void association_status_changed (Object source, ParamSpec pspec) {
+ MinerWebAssociationStatus status;
+
+ get ("association-status", out status);
+
+ switch (status) {
+ case MinerWebAssociationStatus.ASSOCIATED:
+ if (pull_timeout_handle != 0)
+ return;
+
+ message ("Miner is now associated. Initiating periodic pull.");
+ Timeout.add_seconds (PULL_INTERVAL, pull_timeout_cb);
+ Idle.add ( () => { pull_timeout_cb (); return false; });
+ break;
+ case MinerWebAssociationStatus.UNASSOCIATED:
+ if (pull_timeout_handle == 0)
+ return;
+
+ Source.remove (pull_timeout_handle);
+ break;
+ }
+ }
+
+ private bool pull_timeout_cb () {
+ int since;
+
+ if (channel_urn == null) {
+ message ("Feed channel not initialized yet, skipping this cycle");
+ return true;
+ }
+
+ message ("Pulling new data");
+ try {
+ since = state_file.get_integer (STATE_FILE_GROUP, "since");
+ } catch (Error error) {
+ critical ("Cannot load config variable: %s", error.message);
+ return true;
+ }
+
+ service.get_friends_timeline ("", since);
+ return true;
+ }
+
+ private void load_state_file () {
+ string name;
+ get ("name", out name);
+
+ try {
+ state_file.load_from_file (Path.build_filename (Environment.get_user_cache_dir (),
+ "tracker",
+ STATE_FILE_NAME.printf (name)),
+ KeyFileFlags.NONE);
+ } catch (Error error) {
+ message ("Couldn't load the state file");
+ }
+
+
+ try {
+ state_file.get_integer (STATE_FILE_GROUP, "since");
+ } catch (Error error) {
+ state_file.set_integer (STATE_FILE_GROUP, "since", 0);
+ }
+ }
+
+ private void save_state_file () {
+ string name;
+ string file_path;
+
+ get ("name", out name);
+ file_path = Path.build_filename (Environment.get_user_cache_dir (),
+ "tracker",
+ STATE_FILE_NAME.printf (name));
+
+ try {
+ FileUtils.set_contents (file_path, state_file.to_data ());
+ } catch (Error error) {
+ warning ("Couldn't save state file: %s", error.message);
+ }
+ }
+
+ // If we don't protect the function with a mutex, it could be called by the
+ // inner loop, starting at inner-inner one, and so on...
+ private Mutex authenticate_mutex = new Mutex ();
+ public override void authenticate () throws MinerWebError {
+ PasswordProvider password_provider;
+ string name;
+ string username;
+ string password;
+ bool verified = false;
+ Error twitter_error = null;
+
+ if(!authenticate_mutex.trylock ()) {
+ warning ("authenticate called while it was still running");
+ return;
+ }
+
+ password_provider = PasswordProvider.get ();
+ get ("name", out name);
+
+ set ("association-status", MinerWebAssociationStatus.UNASSOCIATED);
+
+ try {
+ password = password_provider.get_password (name, out username);
+ } catch (Error e) {
+ authenticate_mutex.unlock ();
+ if (e is PasswordProviderError.SERVICE) {
+ throw new MinerWebError.KEYRING (e.message);
+ }
+ if (e is PasswordProviderError.NOTFOUND) {
+ throw new MinerWebError.NO_CREDENTIALS ("Miner is not associated");
+ }
+
+ critical ("Internal error: %s", e.message);
+ return;
+ }
+
+ message ("Verifying username and password");
+ service.set_user (username, password);
+ service.verify_user ();
+
+ var wait_loop = new MainLoop (null, false);
+ service.user_verified.connect ( (h, v, e) => {
+ verified = v;
+ twitter_error = e;
+ wait_loop.quit (); });
+ wait_loop.run ();
+ authenticate_mutex.unlock ();
+
+ if (twitter_error != null) {
+ throw new MinerWebError.SERVICE (twitter_error.message);
+ }
+
+ if (!verified) {
+ throw new MinerWebError.WRONG_CREDENTIALS ("Wrong username and/or password");
+ } else {
+ message ("Authentication sucessful");
+ set ("association-status", MinerWebAssociationStatus.ASSOCIATED);
+ }
+
+ return;
+ }
+
+ public override void dissociate () throws MinerWebError {
+ var password_provider = PasswordProvider.get ();
+ string name;
+ get ("name", out name);
+
+ try {
+ password_provider.forget_password (name);
+ } catch (Error e) {
+ if (e is PasswordProviderError.SERVICE) {
+ throw new MinerWebError.KEYRING (e.message);
+ }
+
+ critical ("Internal error: %s", e.message);
+ return;
+ }
+
+ set ("association-status", MinerWebAssociationStatus.UNASSOCIATED);
+ }
+
+ public override void associate (HashTable<string, string> association_data) throws Tracker.MinerWebError {
+ assert (association_data.lookup ("username") != null && association_data.lookup ("password") != null);
+
+ var password_provider = PasswordProvider.get ();
+ string name;
+ get ("name", out name);
+
+ try {
+ password_provider.store_password (name,
+ SERVICE_DESCRIPTION.printf (name),
+ association_data.lookup ("username"),
+ association_data.lookup ("password"));
+ } catch (Error e) {
+ if (e is PasswordProviderError.SERVICE) {
+ throw new MinerWebError.KEYRING (e.message);
+ }
+
+ critical ("Internal error: %s", e.message);
+ return;
+ }
+ }
+
+ private static bool in_loop = false;
+ private static void signal_handler (int signo) {
+ if (in_loop) {
+ Posix.exit (Posix.EXIT_FAILURE);
+ }
+
+ switch (signo) {
+ case Posix.SIGINT:
+ case Posix.SIGTERM:
+ in_loop = true;
+ main_loop.quit ();
+ break;
+ }
+ }
+
+ private static void init_signals () {
+#if G_OS_WIN32
+#else
+ Posix.sigaction_t act = Posix.sigaction_t ();
+ Posix.sigset_t empty_mask = Posix.sigset_t ();
+ Posix.sigemptyset (empty_mask);
+ act.sa_handler = signal_handler;
+ act.sa_mask = empty_mask;
+ act.sa_flags = 0;
+
+ Posix.sigaction (Posix.SIGTERM, act, null);
+ Posix.sigaction (Posix.SIGINT, act, null);
+#endif
+ }
+
+ public static void main (string[] args) {
+ Environment.set_application_name ("Twitter/Identi.ca tracker miner");
+ MinerTwitter twitter_miner;
+ twitter_miner = Object.new (typeof (MinerTwitter),
+ "provider", Twitter.Provider.DEFAULT_PROVIDER) as MinerTwitter;
+
+ MinerTwitter identica_miner;
+ identica_miner = Object.new (typeof (MinerTwitter),
+ "provider", Twitter.Provider.IDENTI_CA) as MinerTwitter;
+
+ init_signals ();
+
+ main_loop = new MainLoop (null, false);
+ main_loop.run ();
+
+ twitter_miner.shutdown ();
+ identica_miner.shutdown ();
+ }
+}
+
+} // End namespace Tracker
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]