diff --git a/qubesagent/firewall.py b/qubesagent/firewall.py index 7e36f7f..6145804 100755 --- a/qubesagent/firewall.py +++ b/qubesagent/firewall.py @@ -62,6 +62,13 @@ class FirewallWorker(object): '''Apply rules in given source address''' raise NotImplementedError + def run_user_script(self): + '''Run user script in /rw/config''' + user_script_path = '/rw/config/qubes-firewall-user-script' + if os.path.isfile(user_script_path) and \ + os.access(user_script_path, os.X_OK): + subprocess.call([user_script_path]) + def read_rules(self, target): '''Read rules from QubesDB and return them as a list of dicts''' entries = self.qdb.multiread('/qubes-firewall/{}/'.format(target)) @@ -133,6 +140,7 @@ class FirewallWorker(object): def main(self): self.terminate_requested = False self.init() + self.run_user_script() # initial load for source_addr in self.list_targets(): self.handle_addr(source_addr) diff --git a/qubesagent/test_firewall.py b/qubesagent/test_firewall.py index f122eb6..7270afd 100644 --- a/qubesagent/test_firewall.py +++ b/qubesagent/test_firewall.py @@ -63,6 +63,7 @@ class FirewallWorker(qubesagent.firewall.FirewallWorker): self.init_called = False self.cleanup_called = False + self.user_script_called = False self.rules = {} def apply_rules(self, source_addr, rules): @@ -74,6 +75,9 @@ class FirewallWorker(qubesagent.firewall.FirewallWorker): def init(self): self.cleanup_called = True + def run_user_script(self): + self.user_script_called = True + class IptablesWorker(qubesagent.firewall.IptablesWorker): '''Override methods actually modifying system state to only log what @@ -522,10 +526,31 @@ class TestFirewallWorker(TestCase): self.obj.handle_addr('10.137.0.4') self.assertEqual(self.obj.rules['10.137.0.4'], [{'action': 'drop'}]) + @patch('os.path.isfile') + @patch('os.access') + @patch('subprocess.call') + def test_run_user_script(self, mock_subprocess, mock_os_access, + mock_os_path_isfile): + mock_os_path_isfile.return_value = False + mock_os_access.return_value = False + super(FirewallWorker, self.obj).run_user_script() + self.assertFalse(mock_subprocess.called) + + mock_os_path_isfile.return_value = True + mock_os_access.return_value = False + super(FirewallWorker, self.obj).run_user_script() + self.assertFalse(mock_subprocess.called) + + mock_os_path_isfile.return_value = True + mock_os_access.return_value = True + super(FirewallWorker, self.obj).run_user_script() + mock_subprocess.assert_called_once_with( + ['/rw/config/qubes-firewall-user-script']) def test_main(self): self.obj.main() self.assertTrue(self.obj.init_called) self.assertTrue(self.obj.cleanup_called) + self.assertTrue(self.obj.user_script_called) self.assertEqual(set(self.obj.rules.keys()), self.obj.list_targets()) # rules content were already tested