Source code for WORC.detectors.detectors

#!/usr/bin/env python

# Copyright 2016-2020 Biomedical Imaging Group Rotterdam, Departments of
# Medical Informatics and Radiology, Erasmus MC, Rotterdam, The Netherlands
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import csv
import string
from abc import ABC, abstractmethod
from pathlib import Path
from os import environ
import platform
import os
import pkg_resources
import site
import sys


[docs]class AbstractDetector(ABC): # noinspection PyBroadException
[docs] def do_detection(self, *args, **kwargs): try: result = self._is_detected(*args, **kwargs) except: result = False print(self._generate_detector_message(result)) return result
def _generate_detector_message(self, detected_Value): return f"{self.__class__.__name__[0:-8]} detected: {detected_Value}." @abstractmethod def _is_detected(self, *args, **kwargs): pass
[docs]class CsvDetector(AbstractDetector):
[docs] def __init__(self, csv_file_path): self._csv_file_path = csv_file_path
def _is_detected(self, *args, **kwargs): try: with open(self._csv_file_path, newline='') as csvfile: start = csvfile.read(4096) # isprintable does not allow newlines, printable does not allow umlauts... if not all([c in string.printable or c.isprintable() for c in start]): return False dialect = csv.Sniffer().sniff(start) # this triggers csv.Error if it can't sniff the csv dialect return True except csv.Error: # Could not get a csv dialect -> probably not a csv. return False
[docs]class SnelliusClusterDetector(AbstractDetector): def _is_detected(self): if LinuxDetector()._is_detected(): try: if 'localhost6.localdomain6' in Path('/etc/hosts').read_text(): return True except: return False return False
[docs]class DebugDetector(AbstractDetector): def _is_detected(self): try: if environ.get('WORCDEBUG') is not None: return True else: return False except: return False
[docs]class BigrClusterDetector(AbstractDetector): def _is_detected(self): if LinuxDetector()._is_detected(): try: if 'bigr-cluster' in Path('/etc/hosts').read_text(): return True except: return False return False
[docs]class HostnameDetector(AbstractDetector): def _is_detected(self): if platform.node() == self._expected_hostname: return True return False
[docs] def __init__(self, expected_hostname): self._expected_hostname = expected_hostname
[docs]class LinuxDetector(AbstractDetector): def _is_detected(self): if platform.system().lower().strip() == 'linux': return True return False
[docs]class WORCDirectoryDetector(AbstractDetector): def _is_detected(self): # Get directory in which WORC package is installed working_set = pkg_resources.working_set requirement_spec = pkg_resources.Requirement.parse('WORC') egg_info = working_set.find(requirement_spec) if egg_info is None: # Backwards compatibility with WORC2 try: packagedir = site.getsitepackages()[0] except AttributeError: # Inside virtualenvironment, so getsitepackages doesnt work. paths = sys.path for p in paths: if os.path.isdir(p) and os.path.basename(p) == 'site-packages': packagedir = p else: packagedir = egg_info.location packagedir = os.path.join(packagedir, 'WORC') return packagedir