[gvfs] gvfs-test: Support running Sftp without gvfs-testbed



commit d534958ddce8c33189c42f697ec0483b5c6d9a93
Author: Martin Pitt <martinpitt gnome org>
Date:   Mon Oct 15 12:02:50 2012 +0200

    gvfs-test: Support running Sftp without gvfs-testbed
    
    We absolutely need to avoid touching ~/.ssh if we don't run under gvfs-testbed:
     - Require that an SSH key already exists. Current gvfs-testbed creates it now.
     - Tell our local sshd to look for authorized_keys in $XDG_RUNTIME_DIR instead
       of ~/.ssh/authorized_keys, as we need to modify it for the tests.
    
    Make two adjustments to the test which previously assumed English strings (in
    gvfs-testbed gvfs runs under no locale).
    
    Stop depending on $SSHD being set by gvfs-testbed and instead determine the
    sshd path with "which", requiring that it is in $PATH.

 test/gvfs-test |   54 +++++++++++++++++++++++++++++++-----------------------
 1 files changed, 31 insertions(+), 23 deletions(-)
---
diff --git a/test/gvfs-test b/test/gvfs-test
index 991d1ab..9a66700 100755
--- a/test/gvfs-test
+++ b/test/gvfs-test
@@ -31,11 +31,13 @@ import time
 import shutil
 import fcntl
 import re
+import locale
 from glob import glob
 
 in_testbed = os.path.exists('/home/gvfs_sandbox_marker')
 samba_running = subprocess.call(['pidof', 'smbd'], stdout=subprocess.PIPE) == 0
 have_httpd = subprocess.call(['which', 'apachectl'], stdout=subprocess.PIPE) == 0
+sshd_path = subprocess.check_output(['which', 'sshd'], universal_newlines=True).strip()
 
 local_ip = subprocess.check_output("ip -4 addr | sed -nr '/127\.0\.0/ n; "
                                    "/inet / {  s/^.*inet ([0-9.]+).*$/\\1/; p; q  }'",
@@ -43,6 +45,11 @@ local_ip = subprocess.check_output("ip -4 addr | sed -nr '/127\.0\.0/ n; "
 
 my_dir = os.path.dirname(os.path.abspath(__file__))
 
+# we need this flag to check if we can test error messages
+locale.setlocale(locale.LC_ALL, '')
+lc = locale.getlocale(locale.LC_MESSAGES)[0]
+english_messages = not lc or lc.startswith('en_')
+
 # http://sg.danny.cz/sg/sdebug26.html
 PTYPE_DISK = 0 
 PTYPE_CDROM = 5 
@@ -233,21 +240,16 @@ class ArchiveMounter(GvfsTestCase):
         finally:
             self.unmount(uri)
 
- unittest skipUnless(in_testbed, 'not running under gvfs-testbed')
+ unittest skipUnless(os.getenv('XDG_RUNTIME_DIR'), 'No $XDG_RUNTIME_DIR available')
 class Sftp(GvfsTestCase):
-    @classmethod
-    def setUpClass(klass):
-        # triple-check that we are in the testbed
-        assert not os.path.exists('.ssh')
-        # generate ssh key for test user
-        os.mkdir('.ssh')
-        subprocess.check_call(['ssh-keygen', '-q', '-f', '.ssh/id_rsa', '-N', ''])
-
     def setUp(self):
         '''Run ssh server'''
 
         super().setUp()
 
+        self.assertTrue(os.path.exists(os.path.expanduser('~/.ssh/id_rsa')),
+                        'This test needs an existing ~/.ssh/id_rsa')
+
         # find sftp-server
         for dir in ['/usr/local/lib/openssh',
                     '/usr/lib/openssh',
@@ -259,31 +261,36 @@ class Sftp(GvfsTestCase):
         else:
             self.fail('Cannot locate OpenSSH sftp-server program, please update tests for your distribution')
 
+        # look for authorized_keys in a temporary dir, to avoid having to mess
+        # with the actual user files when not calling this through gvfs-testbed
+        # (unfortunately ssh doesn't consider $HOME); NB we cannot use
+        # self.workdir as ssh refuses files in /tmp.
+        self.authorized_keys = os.path.join(os.environ['XDG_RUNTIME_DIR'],
+                                            'gvfs_test_authorized_keys')
+
         # generate sshd configuration; note that we must ensure that the
         # private key is not world-readable, so we need to copy it
         shutil.copy(os.path.join(my_dir, 'files', 'ssh_host_rsa_key'), self.workdir)
         os.chmod(os.path.join(self.workdir, 'ssh_host_rsa_key'), 0o600)
         shutil.copy(os.path.join(my_dir, 'files', 'ssh_host_rsa_key.pub'), self.workdir)
-
         self.sshd_config = os.path.join(self.workdir, 'sshd_config')
         with open(self.sshd_config, 'w') as f:
             f.write('''Port 2222
 HostKey %(workdir)s/ssh_host_rsa_key
 UsePrivilegeSeparation no
+AuthorizedKeysFile %(authorized_keys)s
 UsePam no
 Subsystem sftp %(sftp_server)s
 ''' % {'workdir': self.workdir,
-       'sftp_server': sftp_server
+       'sftp_server': sftp_server,
+       'authorized_keys': self.authorized_keys,
       })
 
-        self.sshd = subprocess.Popen([os.environ['SSHD'], '-Dde', '-f', self.sshd_config],
+        self.sshd = subprocess.Popen([sshd_path, '-Dde', '-f', self.sshd_config],
                                      universal_newlines=True,
                                      stderr=subprocess.PIPE)
 
     def tearDown(self):
-        if os.path.exists('.ssh/authorized_keys'):
-            os.unlink('.ssh/authorized_keys')
-
         if self.sshd.returncode is None:
             self.sshd.terminate()
             self.sshd.wait()
@@ -302,7 +309,7 @@ Subsystem sftp %(sftp_server)s
         '''sftp://localhost with RSA authentication'''
 
         # accept our key for localhost logins
-        shutil.copy('.ssh/id_rsa.pub', '.ssh/authorized_keys')
+        shutil.copy(os.path.expanduser('~/.ssh/id_rsa.pub'), self.authorized_keys)
 
         # mount it
         uri = 'sftp://localhost:2222'
@@ -314,8 +321,8 @@ Subsystem sftp %(sftp_server)s
     def test_unknown_host(self):
         '''sftp:// with RSA authentication for unknown host'''
 
-        # accept our RSA key 
-        shutil.copy('.ssh/id_rsa.pub', '.ssh/authorized_keys')
+        # accept our key for localhost logins
+        shutil.copy(os.path.expanduser('~/.ssh/id_rsa.pub'), self.authorized_keys)
 
         # try to mount it; should fail as it's an unknown host
         uri = 'sftp://%s:2222' % local_ip
@@ -324,7 +331,8 @@ Subsystem sftp %(sftp_server)s
         self.assertNotEqual(code, 0)
         # there is nothing in our testbed which would show or answer the
         # dialog
-        self.assertTrue('Login dialog cancelled' in err, err)
+        if english_messages:
+            self.assertTrue('Login dialog cancelled' in err, err)
 
     def do_mount_check(self, uri):
         with open('stuff.txt', 'w') as f:
@@ -337,17 +345,17 @@ Subsystem sftp %(sftp_server)s
 
             # check gvfs-info
             out = self.program_out_success(['gvfs-info', uri])
-            self.assertTrue('display name: / on localhost' in out, out)
+            self.assertRegex(out, 'display name: / .* localhost')
             self.assertTrue('type: directory' in out, out)
             self.assertTrue('access::can-read: TRUE' in out, out)
 
             # check gvfs-ls
             out = self.program_out_success(['gvfs-ls', uri + '/home'])
-            self.assertTrue('gvfs_sandbox_marker' in out, out)
+            self.assertTrue('%s\n' % os.environ['USER'] in out, out)
 
             # check gvfs-cat
-            out = self.program_out_success(['gvfs-cat', uri + '/home/%s/stuff.txt' % os.environ['USER']])
-            self.assertEqual(out, 'moo!')
+            out = self.program_out_success(['gvfs-cat', uri + '/etc/passwd'])
+            self.assertTrue('root:' in out, out)
         finally:
             self.unmount(uri)
 



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]