import gnome
import gobject
import re
import sys
import cgi
import os.path
import deskbar
import deskbar.core.Utils
import deskbar.interfaces.Module
import deskbar.interfaces.Match
import deskbar.interfaces.Action
from deskbar.core.Utils import is_program_in_path, spawn_async
from deskbar.handlers.actions.OpenWithApplicationAction import \
OpenWithApplicationAction
from deskbar.handlers.actions.OpenDesktopFileAction import \
OpenDesktopFileAction
from deskbar.handlers.actions.ShowUrlAction import \
ShowUrlAction
from gettext import gettext as _
MAX_RESULTS = 20
HANDLERS = ['TrackerStaticSearchHandler', 'TrackerLiveSearchHandler']
class TrackerStaticSearchMatch (deskbar.interfaces.Match):
def __init__(self, **kwargs):
deskbar.interfaces.Match.__init__(self, **kwargs)
self.add_action (TrackerStaticSearchAction (kwargs['name']))
class TrackerStaticSearchAction (deskbar.interfaces.Action):
def __init__(self, name):
deskbar.interfaces.Action.__init__ (self, name)
self.name = name
def activate(self, text=None):
gobject.spawn_async(['tracker-search-tool', self.name], \
flags=gobject.SPAWN_SEARCH_PATH)
def get_verb(self):
return _('Search for %s with Tracker Search Tool') % '%(name)s'
def get_hash (self):
return 't-s-t:'+self.name
class TrackerStaticSearchHandler(deskbar.interfaces.Module):
INFOS = {
'icon': deskbar.core.Utils.load_icon ('tracker'),
'name': _('Tracker Search'),
'description': _('Search with Tracker Search Tool'),
}
def __init__(self):
deskbar.interfaces.Module.__init__(self)
def query(self, query):
self._emit_query_ready (query, [TrackerStaticSearchMatch(name=query, \
category='actions', priority=self.get_priority ())])
@staticmethod
def has_requirements ():
return is_program_in_path ('tracker-search-tool')
#For now description param it's not used
TYPES = {
'Applications': {
'description': (_('Launch %s (%s)') % ('%(name)s', '%(app_name)s') ),
'category': 'actions',
},
'GaimConversations': {
'description': (_('See %s conversation\n%s %s\nfrom %s') % ('%(proto)s', '%(channel)s', '%(conv_to)s', '%(time)s')),
'category': 'conversations',
},
'Email': {
'description': (_('Email from %s') % '%(publisher)s' ) + '\n%(title)s',
'category': 'emails',
'action' : 'evolution %(uri)s',
},
'Music': {
'description': _('Listen to music %s\nin %s') % ('%(base)s', '%(dir)s'),
'category': 'music',
'icon': 'audio-x-generic',
},
'Documents': {
'description': _('See document %s\nin %s') % ('%(base)s', '%(dir)s'),
'category': 'documents',
},
'Development Files': {
'description': _('Open file %s\nin %s') % ('%(base)s', '%(dir)s'),
'category': 'develop',
},
'Images': {
'description': _('View image %s\nin %s') % ('%(base)s', '%(dir)s'),
'category': 'images',
'icon': 'image-x-generic',
},
'Videos': {
'description': _('Watch video %s\nin %s') % ('%(base)s', '%(dir)s'),
'category': 'videos',
'icon': 'video-x-generic',
},
'Other Files': {
'description': _('Open file %s\nin %s') % ('%(base)s', '%(dir)s'),
'category': 'files',
},
}
class TrackerMoreMatch (deskbar.interfaces.Match):
def __init__(self, backend, qstring, category='files', **args):
deskbar.interfaces.Match.__init__(self, backend, **args)
self._icon = deskbar.core.Utils.load_icon('tracker')
self.qstring = qstring
self.category = category
def get_verb(self):
return TYPES['Extra']['description']
def get_category (self):
try:
return TYPES[self.category]['category']
except:
pass
def action(self, text=None):
gobject.spawn_async(['tracker-search-tool', self.qstring], flags=gobject.SPAWN_SEARCH_PATH)
class TrackerLiveFileMatch (deskbar.interfaces.Match):
def __init__(self, result=None, **args):
deskbar.interfaces.Match.__init__ (self)
print result
# Set the match icon
if result.has_key ('icon'):
result['icon'] = result['icon'].strip ()
if result['icon'] == '':
del result['icon']
else:
self._icon = result['icon']
else:
if TYPES[result['type']].has_key ('icon'):
self._icon = TYPES[result['type']]['icon']
self.result = result
if result['type'] == 'Applications':
if result.has_key ('desktop'):
self.add_action (OpenDesktopFileAction (result['name'], result['desktop']))
else:
self.add_action (TrackerLiveSearchAction (result))
def get_name (self, text = None):
return { 'name': self.result ['name'] }
def get_verb(self):
try:
return TYPES[self.result['type']]['description']
except:
return _('Open file %s\nin %s') % ('%(base)s', '%(dir)s')
def get_hash(self, text=None):
return self.result['uri']
def get_category (self):
try:
return TYPES[self.result['type']]['category']
except:
return 'files'
class TrackerLiveSearchAction (deskbar.interfaces.Action):
def __init__ (self, result):
deskbar.interfaces.Action (self)
self.name = result['name']
self.result = result
self.fullpath = result['uri']
self.init_names()
self.result['base'] = self.base
self.result['dir'] = self.dir
def get_name(self, text=None):
try:
return self.result
except:
pass
def get_hash(self, text=None):
try:
if self.result ['type'] == 'Applications':
# return a name that matches the one returned by the Program handler of deskbar
return 'generic_' + self.result ['app_basename']
return self.result['uri']
except:
pass
def get_verb(self):
try:
return TYPES[self.result['type']]['description']
except:
return _('Open file %s\nin %s') % ('%(base)s', '%(dir)s')
def activate (self, text=None):
if TYPES[self.result['type']].has_key('action'):
cmd = TYPES[self.result['type']]['action']
cmd = map(lambda arg : arg % self.result, cmd.split()) # we need this to handle spaces correctly
print 'Opening Tracker hit with command:', cmd
deskbar.core.Utils.spawn_async(cmd)
else:
deskbar.core.Utils.url_show ('file://'+dummy_escape(self.result['uri']))
print 'Opening Tracker hit:', self.result['uri']
def init_names (self):
#print 'Parsing «%r»' % self.fullpath
dirname, filename = os.path.split(self.fullpath)
if filename == '': #We had a trailing slash
dirname, filename = os.path.split(dirname)
#Reverse-tilde-expansion
home = os.path.normpath(os.path.expanduser('~'))
regexp = re.compile(r'^%s(/|$)' % re.escape(home))
dirname = re.sub(regexp, r'~\1', dirname)
self.dir = dirname
self.base = filename
class TrackerLiveSearchHandler(deskbar.interfaces.Module):
INFOS = {
'icon': deskbar.core.Utils.load_icon ('tracker'),
'name': _('Tracker Live Search'),
'description': _('Search with Tracker, as you type'),
'categories' : {
'develop' : {
'name': _('Development Files'),
},
'music' : {
'name': _('Music'),
},
'images' : {
'name': _('Images'),
},
'videos' : {
'name': _('Videos'),
},
'applications' : {
'name': _('Applications'),
},
},
}
@staticmethod
def has_prerequisites ():
try:
import dbus
try :
if getattr(dbus, 'version', (0,0,0)) >= (0,41,0):
import dbus.glib
# Check that Tracker can be started via dbus activation, we will have trouble if it's not
bus = dbus.SessionBus()
proxy_obj = bus.get_object('org.freedesktop.DBus', '/org/freedesktop/DBus')
dbus_iface = dbus.Interface(proxy_obj, 'org.freedesktop.DBus')
activatables = dbus_iface.ListActivatableNames()
if not 'org.freedesktop.Tracker' in activatables:
return (deskbar.Handler.HANDLER_IS_NOT_APPLICABLE, 'Tracker is not activatable via dbus', None)
except:
return (deskbar.Handler.HANDLER_IS_NOT_APPLICABLE, 'Python dbus.glib bindings not found.', None)
return (deskbar.Handler.HANDLER_IS_HAPPY, None, None)
except:
return (deskbar.Handler.HANDLER_IS_NOT_APPLICABLE, 'Python dbus bindings not found.', None)
def __init__(self):
deskbar.interfaces.Module.__init__(self)
# initing on search request, see self.query
self.tracker = self.search_iface = self.keywords_iface = self.files_iface = None
self.conv_re = re.compile (r'^.*?/logs/([^/]+)/([^/]+)/([^/]+)/(.+?)\.(:?txt|html)$') # all, proto, account, to-whom, time
def handle_email_hits (self, info, output):
output['title'] = dummy_escape(info[3])
output['publisher'] = dummy_escape(info[4])
def handle_conversation_hits (self, info, output):
output ['uri'] = info [0]
m = self.conv_re.match (output['uri'])
output['channel']=_('with')
output['proto']=output['conv_from']=output['conv_to']=output['time']='' # XXX, never happened during tests
if m:
output['proto'] = m.group (1)
output['conv_from'] = m.group (2)
output['conv_to'] = m.group (3)
output['time'] = m.group (4)
if output['conv_to'].endswith ('.chat'):
output['channel'] = _('in channel')
output['conv_to'] = output['conv_to'].replace ('.chat','')
if output['proto'] == 'irc':
nick_server = output['conv_from'].split ('@')
if len (nick_server) > 1:
output['conv_to'] = '%s on %s' % (output['conv_to'], nick_server[1])
# escape those entities, purple uses this to escape / on jabber channel/user conversations
output['uri'] = output['uri'].replace ('%', '%25')
# escape irc channel prefixes, else the path name parsing of stops at '#' (this holds also for the icon search)
output['uri'] = output['uri'].replace ('#', '%23')
def handle_application_hits (self, info, output):
# print info
# dbus.Array(
# [
# dbus.String(u'/usr/share/applications/gksu.desktop'), # TrackerUri 0
# dbus.String(u'Applications'), # TrackerType 1
# dbus.String(u'Application'), # DesktopType 2
# dbus.String(u'Root Terminal'), # DesktopName 3
# dbus.String(u'gksu /usr/bin/x-terminal-emulator'), # DesktopExec 4
# dbus.String(u'gksu-root-terminal') # DesktopIcon 5
# ],
# signature=dbus.Signature('s'))
# Strip %U or whatever arguments in Exec field
output['app_name'] = re.sub(r'%\w+', '', info [4]).strip ()
output['app_basename'] = dummy_escape (os.path.basename (output['app_name']))
output['app_name'] = dummy_escape (output['app_name'])
if output['app_basename'] == '': # strange // in app_name, e.g. nautilus burn:///
output['app_basename'] = output['app_name']
output['name'] = dummy_escape (info [3])
output['icon'] = dummy_escape (info [5])
desktop = parse_desktop_file (output['uri'])
if desktop:
output['desktop'] = desktop
def recieve_hits (self, qstring, hits, max):
matches = []
results = {}
for info in hits:
output = {}
output['name'] = os.path.basename(info[0])
output['uri'] = str(dummy_escape(info[0]))
output['type'] = info[1]
if not TYPES.has_key(output['type']):
output['type'] = 'Other Files'
if output['type'] == 'Email':
self.handle_email_hits (info, output)
elif output['type'] in ('GaimConversations', 'Conversations'):
self.handle_conversation_hits (info, output)
elif output['type'] == 'Applications':
self.handle_application_hits (info, output)
try:
results[output['type']].append(output)
except:
results[output['type']] = [output]
for key in results.iterkeys ():
for res in results[key][:MAX_RESULTS]:
matches.append(TrackerLiveFileMatch (res))
self._emit_query_ready (qstring, matches)
print 'Tracker response for %s; %d hits returned, %d shown' % \
(qstring, len(hits), len(matches))
def recieve_error (self, error):
print >> sys.stderr, '*** Tracker dbus error:', error
def query (self, qstring):
max = MAX_RESULTS
if not self.tracker:
try:
import dbus
bus = dbus.SessionBus()
self.tracker = bus.get_object('org.freedesktop.Tracker', \
'/org/freedesktop/tracker')
self.search_iface = dbus.Interface(self.tracker, \
'org.freedesktop.Tracker.Search')
self.keywords_iface = dbus.Interface(self.tracker, \
'org.freedesktop.Tracker.Keywords')
self.files_iface = dbus.Interface(self.tracker, \
'org.freedesktop.Tracker.Files')
except:
print >> sys.stderr, 'DBus connection to tracker failed, check your settings.'
return
if qstring.count('tag:') == 0:
for service in ('Files', 'Emails', 'Conversations', 'Applications'):
print 'Searching %s' % service
self.search_iface.TextDetailed (-1, service, qstring, 0,10, \
reply_handler=lambda hits : self.recieve_hits(qstring, \
hits, max), error_handler=self.recieve_error)
print 'Tracker query:', qstring
else:
if self.tracker.GetVersion() == 502:
self.search_iface.Query(-1,'Files',['File.Format'],'', \
qstring.replace('tag:',''),'',False,0,100, \
reply_handler=lambda hits : self.recieve_hits(qstring, hits, max), \
error_handler=self.recieve_error)
elif self.tracker.GetVersion() == 503:
self.search_iface.Query(-1,'Files',['File:Mime'],'', \
qstring.replace('tag:',''),'',False,0,100, \
reply_handler=lambda hits : self.recieve_hits(qstring, hits, max), \
error_handler=self.recieve_error)
print 'Tracker tag query:', qstring.replace('tag:','')
def dummy_escape (s):
return s
#return cgi.escape (s)
def parse_desktop_file(desktop, only_if_visible=False):
try:
desktop = deskbar.core.gnomedesktop.item_new_from_file(desktop, deskbar.core.gnomedesktop.LOAD_ONLY_IF_EXISTS)
except Exception, e:
print 'Couldn\'t read desktop file:%s:%s' % (desktop, e)
return None
if desktop == None or desktop.get_entry_type() != deskbar.core.gnomedesktop.TYPE_APPLICATION:
return None
return desktop