r7480 - dumbhippo/trunk/firehose/firehose/jobs



Author: walters
Date: 2008-05-12 17:36:45 -0500 (Mon, 12 May 2008)
New Revision: 7480

Modified:
   dumbhippo/trunk/firehose/firehose/jobs/poller.py
Log:
Discover S3 connections are not threadsafe

Handle exceptions from S3 in a non-fatal way


Modified: dumbhippo/trunk/firehose/firehose/jobs/poller.py
===================================================================
--- dumbhippo/trunk/firehose/firehose/jobs/poller.py	2008-05-12 20:23:06 UTC (rev 7479)
+++ dumbhippo/trunk/firehose/firehose/jobs/poller.py	2008-05-12 22:36:45 UTC (rev 7480)
@@ -115,6 +115,11 @@
         
     def is_identity(self):
         return self.__is_identity
+    
+    def size(self):
+        if self.is_identity():
+            return 0
+        return len(self._processors)
         
     def process(self, data):
         for processor in self._processors:
@@ -127,7 +132,7 @@
 # This maps from a regular expression matching a URL to a list of processors
 feed_transformations = [
   (r'digg.com/users/.*/history/diggs.rss', [rss_eater]),
-  (r'picasaweb.google.com.*feed.*base.*album', [rss_eater, atom_eater]),
+  (r'picasaweb.google.com/data/feed', [rss_eater, atom_eater]),
   (r'google.com/reader/public', [XmlElementEater(['/feed/updated'])]),
   (r'blogs.gnome.org', [RegexpEater(['<!--.*page served in.*seconds.*-->'])]),
   # We try to consume all HTML
@@ -145,8 +150,22 @@
 class FeedTaskHandler(object):
     FAMILY = 'FEED'
     
-    def __init__(self, feedcache_bucket=None):
-        self.__feedcache_bucket = feedcache_bucket
+    def __init__(self, aws_accessid=None, aws_secretkey=None, feedcache_bucket_name=None):
+        self.__aws_accessid = aws_accessid
+        self.__aws_secretkey = aws_secretkey
+        self.__feedcache_bucket_name = feedcache_bucket_name
+        
+    def _store_to_s3(self, targeturl, temppath):
+        try:
+            s3_conn = S3Connection(self.__aws_accessid, self.__aws_secretkey)
+            feedcache_bucket = s3_conn.get_bucket(self.__feedcache_bucket_name)                    
+            k = Key(feedcache_bucket)
+            ts = int(time.time())                    
+            k.key = targeturl + ('.%d' % (ts,))
+            _logger.debug("storing to bucket %s key %s", feedcache_bucket.name, k.key)      
+            k.set_contents_from_filename(temppath)
+        except:
+            _logger.exception("failed to store to S3: key %s", targeturl)        
 
     def run(self, id, prev_hash, prev_timestamp):
         targeturl = id
@@ -169,7 +188,7 @@
             if response.status == 304:
                 _logger.info("Got 304 Unmodified for %r", targeturl)
                 return (prev_hash, prev_timestamp)
-            if self.__feedcache_bucket is not None:
+            if self.__feedcache_bucket_name is not None:
                 (tempfd, temppath) = tempfile.mkstemp()
                 outfile = os.fdopen(tempfd, 'w')
             else:
@@ -194,20 +213,15 @@
             if outfile is not None:
                 outfile.close()
                 if prev_hash != hash_hex:
-                    k = Key(self.__feedcache_bucket)
-                    ts = int(time.time())                    
-                    k.key = targeturl + ('.%d' % (ts,))
-                    _logger.debug("storing to bucket %s key %s", self.__feedcache_bucket.name, k.key)      
-                    k.set_contents_from_filename(temppath)
-                else:
-                    os.unlink(temppath)
+                    self._store_to_s3(targeturl, temppath)    
+                os.unlink(temppath)
             timestamp_str = response.getheader('Last-Modified', None)
             if timestamp_str is not None:
                 timestamp = mktime_tz(parsedate_tz(timestamp_str))
             else:
                 _logger.debug("no last-modified for %r", targeturl)
                 timestamp = time.time()
-            filters_applied = (not processor.is_identity()) and "(filters applied)" or ""  
+            filters_applied = (not processor.is_identity()) and ("(%d filters applied)" % (processor.size())) or ""  
             if prev_hash != hash_hex:
                 _logger.info("Got new hash:%r (prev:%r) ts:%r %s for url %r", hash_hex, prev_hash, timestamp, filters_applied, targeturl)                
                 return (hash_hex, timestamp)
@@ -248,13 +262,10 @@
         self.__savefetches = config.get('firehose.savefetches') == "true"
         self.__server = BaseHTTPServer.HTTPServer(('', bindport), TaskRequestHandler)
         self.__active_collectors = set()
-        aws_accessid = config.get('firehose.awsAccessKeyId')
-        aws_secretkey = config.get('firehose.awsSecretAccessKey')       
-        self.__s3_conn = S3Connection(aws_accessid, aws_secretkey)
+        self.__aws_accessid = config.get('firehose.awsAccessKeyId')
+        self.__aws_secretkey = config.get('firehose.awsSecretAccessKey')       
+        self.__feedcache_bucket_name = 'feedcache.' + config.get('firehose.awsS3Bucket')
         
-        bname = config.get('firehose.awsS3Bucket')
-        self.__feedcache_bucket = self.__s3_conn.get_bucket('feedcache.' + bname)              
-        
     def run_async(self):
         thr = threading.Thread(target=self.run)
         thr.setDaemon(True)
@@ -295,7 +306,9 @@
             _logger.exception("Failed to find family for task %r", taskid)
             return
         if self.__savefetches:
-            inst_kwargs = {'feedcache_bucket': self.__feedcache_bucket}
+            inst_kwargs = {'aws_accessid': self.__aws_accessid,
+                           'aws_secretkey': self.__aws_secretkey,
+                           'feedcache_bucket_name': self.__feedcache_bucket_name}
         else:
             inst_kwargs = {}
         inst = fclass(**inst_kwargs)



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]