# Copyright (c) Fraunhofer MEVIS, Germany. All rights reserved.
# **InsertLicense** code
# -------------------------------------------------------------------------
import logging
import os
import re
import timeit
import warnings
from contextlib import contextmanager
from http import HTTPStatus
from urllib.parse import urlparse
from typing import List, Optional, Iterator, Tuple
import requests
from CertificateUtils import SystemCertsHTTPSAdapter
from dicomweb_client.api import DICOMwebClient
from dicomweb_client import session_utils
from dicomweb_client.web import _Transaction # see usage below on why we need this
[docs]class DicomWebAdapter( object ):
"""
This adapter class is supposed to provide a simple API for dealing with a DICOMWeb http(s) server, implemented
by wrapping the ``dicomweb_client`` library.
It is currently not supposed to fully abstract from ``DICOMwebClient`` and make the implementation interchangeable, but
if you only use public functionality, you won't have to change any code if at some point we decide that it is.
:param dicomWebUrl: URL for the dicom web API, e.g. https://pacs.example.com/dicom-web
:param session: Optional ``requests`` session to reuse. If omitted, a new one is created using the username and \
password parameters
:param username: Username to use with the DicomWeb server to create a new session
:param password: Password to use with the DicomWeb server to create a new session
:param additionalClientParameters: Additional parameters for the ``DICOMwebClient`` construction. See \
https://dicomweb-client.readthedocs.io/en/latest/usage.html#application-programming-interface-api for further info
"""
DICOM_TAG_StudyInstanceUID = "0020000D"
DICOM_TAG_SeriesInstanceUID = "0020000E"
def __init__( self, dicomWebUrl:str, session:Optional[requests.Session]=None,
username:Optional[str]=None, password:str="",
**additionalClientParameters ) -> None:
"""
Constructor method. Parameter documentation see class docs.
"""
if session is None:
if username is not None:
session = session_utils.create_session_from_user_pass( username=username, password=password )
else:
session = session_utils.create_session()
self.duration_s = {}
self.__mountSystemCertsHTTPSAdapter( session, dicomWebUrl )
self._client = DICOMwebClient( dicomWebUrl, session, **additionalClientParameters )
def __mountSystemCertsHTTPSAdapter( self, session, dicomWebUrl ):
dicomWebUrlParsed = urlparse( dicomWebUrl )
baseUrl = f"{dicomWebUrlParsed.scheme}://{dicomWebUrlParsed.netloc}"
session.mount( baseUrl, SystemCertsHTTPSAdapter() ) # enable use of system certificates
[docs] def setHttpRetryParameters( self, retry: bool=True, max_attempts: int=5, wait_exponential_multiplier: int=1000,
retriable_error_codes: Tuple[HTTPStatus, ...] = (
HTTPStatus.TOO_MANY_REQUESTS,
HTTPStatus.REQUEST_TIMEOUT,
HTTPStatus.SERVICE_UNAVAILABLE,
HTTPStatus.GATEWAY_TIMEOUT,
) ):
"""
Sets parameters for HTTP retrying logic. These parameters are passed
to ``retrying.retry`` which wraps the HTTP requests and retries all
responses that return an error code defined in ``retriable_error_codes``.
The retrying method uses exponential back off using the multiplier
``wait_exponential_multiplier`` for a max attempts defined by
``max_attempts``.
:param retry: bool, optional: \
whether HTTP retrying should be performed, if it is set to \
``False``, the rest of the parameters are ignored.
:param max_attempts: int, optional: \
the maximum number of request attempts.
:param wait_exponential_multiplier: float, optional: \
exponential multiplier applied to delay between attempts in ms.
:param retriable_error_codes: tuple, optional: \
tuple of HTTP error codes to retry if raised.
"""
self._client.set_http_retry_params( retry=retry, max_attempts=max_attempts,
wait_exponential_multiplier=wait_exponential_multiplier,
retriable_error_codes=retriable_error_codes)
[docs] def getStudyInstanceUIDForSeries( self, seriesInstanceUID:str ) -> [ None, str ]:
"""
Returns the study instance UID for the given series.
:param seriesInstanceUID: Instance UID of the series
:returns: The study's instance UID if the series could be found, otherwise None
:raises: ``ConnectionError`` if there is a problem accessing the server
"""
studyInstanceUID = None
with self.__wrapClientCallForConnectionExceptionHandling( "Error getting study UID for series: " ):
series = self._client.search_for_series( search_filters={ 'SeriesInstanceUID': seriesInstanceUID } )
if series:
studyInstanceUID = series[ 0 ][ self.DICOM_TAG_StudyInstanceUID ][ "Value" ][ 0 ]
return studyInstanceUID
[docs] def getSeriesInstanceUIDsForStudy( self, studyInstanceUID:str ) -> List[ str ]:
"""
Returns all series instance UIDs for the given study.
:param studyInstanceUID: Instance UID of the study
:returns: A list of the series instance UIDs if the study could be found, otherwise an empty list.
:raises: ``ConnectionError`` if there is a problem accessing the server
"""
seriesInstanceUIDs = []
with self.__wrapClientCallForConnectionExceptionHandling( "Error getting series UID for study: " ):
series = self._client.search_for_series( search_filters={ 'StudyInstanceUID': studyInstanceUID } )
if series is not None:
for s in series:
seriesInstanceUID = s[ self.DICOM_TAG_SeriesInstanceUID ][ "Value" ][ 0 ]
seriesInstanceUIDs.append( seriesInstanceUID )
return seriesInstanceUIDs
[docs] def retrieveSeriesIntoFolder( self,
studyInstanceUID:str,
seriesInstanceUID:str,
targetFolderRoot:str,
useStudySubFolder:bool=True ) -> None:
"""
Fetches the specified series and stores it as files in a newly created ``<seriesInstanceUID>`` subdirectory in the
``targetFolder`` (which must not previously exist). If ``useStudySubFolder`` is True, an intermediate study directory
will be created
Files are numbered with filenames starting with ``file000000.dcm``.
:raises: Different ``OSError`` versions if the ``targetFolderRoot`` was not found, is not writable, or if the \
series subfolder already exists.
:raises: ``ConnectionError`` if there is a problem accessing the server
"""
if useStudySubFolder:
targetFolderRoot = self._setUpTargetSubFolder( targetFolderRoot, studyInstanceUID, "study", mayExist=True )
targetFolderForSeries = self._setUpTargetSubFolder( targetFolderRoot, seriesInstanceUID, "series", mayExist=False )
with TimedAction( f"Retrieving series {seriesInstanceUID} into {targetFolderForSeries}",
shortDescription=f"Retrieving series {seriesInstanceUID}", logLevel=logging.INFO ):
with self.__wrapClientCallForConnectionExceptionHandling( "Error retrieving series: " ):
self._downloadSeries( studyInstanceUID, seriesInstanceUID, targetFolderForSeries )
[docs] def retrieveStudyIntoFolder( self, studyInstanceUID: str, targetFolderRoot: str ) -> None:
"""
Fetches given study into ``targetFolder`` by calling ``retrieveSeriesIntoFolder()`` for each
contained series.
:raises: Different ``OSError`` versions if the ``targetFolderRoot`` was not found, is not writable, or if the \
study subfolder already exists.
:raises: ``ConnectionError`` if there is a problem accessing the server
"""
targetFolderForStudy = self._setUpTargetSubFolder( targetFolderRoot, studyInstanceUID, "study", mayExist=False )
seriesInstanceUIDs = self.getSeriesInstanceUIDsForStudy( studyInstanceUID )
with TimedAction( f"Retrieving study {studyInstanceUID} with {len(seriesInstanceUIDs)} series into {targetFolderForStudy}",
shortDescription=f"Retrieving study {studyInstanceUID}", logLevel=logging.INFO ):
for seriesInstanceUID in seriesInstanceUIDs:
try:
self.retrieveSeriesIntoFolder( studyInstanceUID, seriesInstanceUID, targetFolderRoot )
except Exception as e:
logging.getLogger( self.__class__.__name__ ).log( logging.ERROR, "Error retrieving series %s: %s. Continuing with next series.", seriesInstanceUID, e )
[docs] def deleteStudy( self, studyInstanceUID:str ) -> None:
"""
Deletes the study with the given UID from the PACS.
**WARNING:** Apparently non-standard, may not be supported on your PACS-server. See tests for
a REST-based workaround for ORTHANC servers (that allow deletion).
:param studyInstanceUID: Instance UID of the study to delete
"""
with self.__wrapClientCallForConnectionExceptionHandling( "Error deleting study with UID: " ):
self._client.delete_study( studyInstanceUID )
[docs] def deleteSeries( self, studyInstanceUID:str, seriesInstanceUID:str ) -> None:
"""
Deletes the series with the given UID from the PACS.
**WARNING:** Apparently non-standard, may not be supported on your PACS-server. See tests for
a REST-based workaround for ORTHANC servers (that allow deletion).
:param studyInstanceUID: Instance UID of the study the series belongs to
:param seriesInstanceUID: Instance UID of the series to delete
"""
with self.__wrapClientCallForConnectionExceptionHandling( "Error deleting series with UID: " ):
self._client.delete_series( studyInstanceUID, seriesInstanceUID )
[docs] def storeDirectory( self, sourceDirectory:str, include_regex:str=None ) -> None:
"""
Deletes the study with the given UID from the PACS.
WARNING: Has not been profiled, so the rerouting through pydicom may be similarly slow as ``DicomWebAdapter_Fallback._downloadSeries()``.
:param sourceDirectory: Directory to recursively traverse and look for files in
:param include_regex: Regular expression to restrict the files to be sent
"""
if not os.path.isdir( sourceDirectory ):
raise FileNotFoundError( f"Cannot find/access source data directory at '{sourceDirectory}'." )
import pydicom
from pydicom.errors import InvalidDicomError
for directory, _, filenames in os.walk( sourceDirectory ):
data = [ ]
for f in filenames:
path = os.path.join( directory, f )
if not include_regex or re.match( include_regex, path ):
try:
data.append( pydicom.dcmread( path ) )
except InvalidDicomError:
warnings.warn( f"Warning: tried to send file {path} to PACS which is apparently not a DICOM file" )
with self.__wrapClientCallForConnectionExceptionHandling( f"Error storing instances from directory {directory}:" ):
self._client.store_instances( data )
# --------------------------------------------------------------------------------------------
@staticmethod
def _setUpTargetSubFolder( targetFolderRoot, uidForSubFolder, description, mayExist=False ):
if not os.path.isdir( targetFolderRoot ):
raise FileNotFoundError( f"Target folder not found: {targetFolderRoot}" )
if not os.access( targetFolderRoot, os.W_OK ):
raise FileNotFoundError( f"Target folder not writable: {targetFolderRoot}" )
targetSubFolder = os.path.join( targetFolderRoot, uidForSubFolder )
if not mayExist and os.path.isdir( targetSubFolder ):
raise FileExistsError( f"Target {description} folder already exists: {targetSubFolder}" )
#
os.makedirs( targetSubFolder, exist_ok=mayExist )
return targetSubFolder
@staticmethod
def _composeFilename( folder, counter:int ) -> str:
return f"{folder}/file{counter:06}.dcm"
def _downloadSeries( self, studyInstanceUID:str, seriesInstanceUID:str, targetFolderForSeries:str ) -> None:
for count, blob in enumerate( self.__iter_retrieve_series_blobs( studyInstanceUID, seriesInstanceUID ) ):
with open( self._composeFilename( targetFolderForSeries, count ), "wb" ) as fp:
fp.write( blob )
def __iter_retrieve_series_blobs( self, studyInstanceUID:str, seriesInstanceUID:str ) -> Iterator[bytes]:
"""
NOTE: This uses protected methods of the ``DICOMwebClient``, because just downloading binary files
is not directly supported by ``DICOMwebClient``, ``DICOMwebClient.iter_series`` always loads the
result files with pydicom, which is overkill when one only wants to save them to disk.
"""
# TODO: The validity of the comment above needs to be rechecked, maybe newer versions of ``DICOMwebClient`` don't do that anymore.
url = self._client._get_series_url( _Transaction.RETRIEVE, studyInstanceUID, seriesInstanceUID )
default_media_type = 'application/dicom'
media_types = (default_media_type,)
headers = {
'Accept': self._client._build_multipart_accept_header_field_value(
media_types, set(media_types)
),
}
response = self._client._http_get(
url,
headers=headers,
stream=True
)
return self._client._decode_multipart_message( response, stream=True )
@contextmanager
def __wrapClientCallForConnectionExceptionHandling( self, errorMessagePrefix:str ) -> None:
try:
yield
except Exception as e:
raise ConnectionError( f"{errorMessagePrefix}{e}" ) from None
#--------------------------------------------------------------------------------------------------
[docs]class DicomWebAdapter_Fallback( DicomWebAdapter ):
"""
This class uses only public functionality from ``DICOMwebClient`` and may thus be less fragile.
However, last time we checked (``SAT-1082``), the download implementation in the base class was much faster.
"""
[docs] def _downloadSeries( self, studyInstanceUID:str, seriesInstanceUID:str, targetFolderForSeries:str ) -> None:
"""
This method is how we would download a series with only the public interface of ``DICOMwebClient``. Turns out it is
substantially slower than the version in ``DicomWebAdapter``, cf. ``SAT-1082``.
"""
for count, instance in enumerate( self._client.iter_series( studyInstanceUID, seriesInstanceUID ) ):
# Would also allow to use custom file names, e.g. the SOPInstanceUID via instance.SOIInstanceUID
# However, that would not be compatible with the 'fast' mode.
instance.save_as( self._composeFilename( targetFolderForSeries, count ) )
#--------------------------------------------------------------------------------------------------
class TimedAction(object):
def __init__(self, description, shortDescription=None, logLevel=logging.INFO ):
self.__description = description
self.__shortDescription = shortDescription or description
self.__logLevel = logLevel
self.__last_start_time_s = None
def __enter__(self):
self.__last_start_time_s = timeit.default_timer()
logging.getLogger( DicomWebAdapter.__name__ ).log( self.__logLevel, " %s ...", self.__description )
def __exit__(self, exc_type, exc_value, traceback):
logging.getLogger( DicomWebAdapter.__name__ ).log( self.__logLevel, " %s done (took %.2f sec.)",
self.__shortDescription, timeit.default_timer() - self.__last_start_time_s )