r7462 - in dumbhippo/trunk: firehose/firehose/jobs super super/firehose/files/conf



Author: walters
Date: 2008-04-29 16:49:24 -0500 (Tue, 29 Apr 2008)
New Revision: 7462

Modified:
   dumbhippo/trunk/firehose/firehose/jobs/master.py
   dumbhippo/trunk/firehose/firehose/jobs/poller.py
   dumbhippo/trunk/super/base.conf
   dumbhippo/trunk/super/firehose/files/conf/mugshot.cfg
Log:
Support for saving fetched data, so we can compute diffs over it later


Modified: dumbhippo/trunk/firehose/firehose/jobs/master.py
===================================================================
--- dumbhippo/trunk/firehose/firehose/jobs/master.py	2008-04-29 18:24:58 UTC (rev 7461)
+++ dumbhippo/trunk/firehose/firehose/jobs/master.py	2008-04-29 21:49:24 UTC (rev 7462)
@@ -127,17 +127,24 @@
                                                             prev_time DATETIME)''')
         cursor.execute('''CREATE INDEX IF NOT EXISTS TasksIdx on Tasks (key)''')
         
-        _logger.debug("retrieving current task set...")
-        bucket = self.__s3_conn.get_bucket(config.get('firehose.awsS3Bucket'))
-        key = bucket.get_key(config.get('firehose.awsS3Key'))
+        bname = config.get('firehose.awsS3Bucket')
+        kname = config.get('firehose.awsS3Key')
+        _logger.debug("retrieving current task set from bucket: %r  key: %r", bname, kname)
+        bucket = self.__s3_conn.get_bucket(bname)
+        
         # FIXME should stream this
-        current_task_keys = {}
-        f = StringIO.StringIO(key.get_contents_as_string())
-        for line in f:
-            current_task_keys[line.strip()] = True
-        _logger.debug("have %d tasks", len(current_task_keys))        
-        self.__ensure_tasks_persisted(current_task_keys)        
-        
+        current_task_keys = {}        
+        key = bucket.get_key(kname)
+        if key is not None:
+            contents = key.get_contents_as_string()
+            if contents is not None:
+                f = StringIO.StringIO(contents)
+                for line in f:
+                    current_task_keys[line.strip()] = True
+                _logger.debug("have %d tasks", len(current_task_keys))        
+                self.__ensure_tasks_persisted(current_task_keys)
+        else:
+            _logger.warn("no currently saved task set!")
         curtime = time.time()
         taskentries = []
         dropped_task_keys = {}

Modified: dumbhippo/trunk/firehose/firehose/jobs/poller.py
===================================================================
--- dumbhippo/trunk/firehose/firehose/jobs/poller.py	2008-04-29 18:24:58 UTC (rev 7461)
+++ dumbhippo/trunk/firehose/firehose/jobs/poller.py	2008-04-29 21:49:24 UTC (rev 7462)
@@ -4,6 +4,7 @@
 import BaseHTTPServer,httplib,urlparse,urllib
 from email.Utils import formatdate,parsedate_tz,mktime_tz
 import logging
+from StringIO import StringIO
 
 import boto
 
@@ -41,7 +42,7 @@
 class FeedTaskHandler(object):
     FAMILY = 'FEED'
     
-    def run(self, id, prev_hash, prev_timestamp):
+    def run(self, id, prev_hash, prev_timestamp, outpath=None):
         targeturl = id
         parsedurl = urlparse.urlparse(targeturl)
         try:
@@ -61,12 +62,23 @@
             if response.status == 304:
                 _logger.info("Got 304 Unmodified for %r", targeturl)
                 return (prev_hash, prev_timestamp)
-            hash = sha.new()            
+            if outpath is not None:
+                outpath_tmpname = outpath+'.tmp'
+                outfile = open(outpath_tmpname, 'w')
+            else:
+                outpath_tmpname = None
+                outfile = None
+            hash = sha.new()
             buf = response.read(8192)
             while buf:
                 hash.update(buf)
+                if outfile is not None:
+                    outfile.write(buf)
                 buf = response.read(8192)
             hash_hex = hash.hexdigest()
+            if outfile is not None:
+                outfile.close()
+                os.rename(outpath_tmpname, outpath)
             timestamp_str = response.getheader('Last-Modified', None)
             if timestamp_str is not None:
                 timestamp = mktime_tz(parsedate_tz(timestamp_str))
@@ -110,6 +122,7 @@
         
     def __init__(self):
         bindport = int(config.get('firehose.localslaveport'))
+        self.__savefetches = config.get('firehose.savefetches') == "true"
         self.__server = BaseHTTPServer.HTTPServer(('', bindport), TaskRequestHandler)
         self.__active_collectors = set()
         
@@ -153,8 +166,14 @@
             _logger.exception("Failed to find family for task %r", taskid)
             return
         inst = fclass()
+        kwargs = {}
+        if self.__savefetches:
+            quotedname = urllib.quote_plus(taskid)
+            ts = int(time.time())
+            outpath = os.path.join(os.getcwd(), 'data', quotedname + '.' + unicode(ts))
+            kwargs['outpath'] = outpath       
         try:
-            (new_hash, new_timestamp) = inst.run(tid, prev_hash, prev_timestamp)            
+            (new_hash, new_timestamp) = inst.run(tid, prev_hash, prev_timestamp, **kwargs)            
         except Exception, e:
             _logger.error("Failed task id %r: %s", tid, e)
             (new_hash, new_timestamp) = (None, None)

Modified: dumbhippo/trunk/super/base.conf
===================================================================
--- dumbhippo/trunk/super/base.conf	2008-04-29 18:24:58 UTC (rev 7461)
+++ dumbhippo/trunk/super/base.conf	2008-04-29 21:49:24 UTC (rev 7462)
@@ -508,7 +508,8 @@
         <targetAttributes pattern="/run/*"   ignore="yes"/>
         <targetAttributes pattern="/data/*"  ignore="yes" preserve="yes"/>        
         
-        <parameter name="firehoseLocalSlavePort">$((baseport+81))</parameter>    
+        <parameter name="firehoseLocalSlavePort">$((baseport+81))</parameter>
+        <parameter name="firehoseSaveFetches">false</parameter>
 
         <parameter name="startCommand">$targetdir/scripts/firehose-start.sh</parameter>
         <parameter name="stopCommand">$targetdir/scripts/firehose-stop.sh</parameter>

Modified: dumbhippo/trunk/super/firehose/files/conf/mugshot.cfg
===================================================================
--- dumbhippo/trunk/super/firehose/files/conf/mugshot.cfg	2008-04-29 18:24:58 UTC (rev 7461)
+++ dumbhippo/trunk/super/firehose/files/conf/mugshot.cfg	2008-04-29 21:49:24 UTC (rev 7462)
@@ -23,6 +23,8 @@
 firehose.awsSqsIncomingName="@@firehoseAwsSqsIncomingName@@"
 firehose.awsSqsOutgoingName="@@firehoseAwsSqsOutgoingName@@"
 
+firehose.savefetches="@@firehoseSaveFetches@@"
+
 # if you are using a database or table type without transactions
 # (MySQL default, for example), you should turn off transactions
 # by prepending notrans_ on the uri



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]