import requests import json import pandas as pd import re import os import logging from tqdm.auto import tqdm class Validator(): def __init__(self, fhir_base_url=None): self.fhir_base_url = fhir_base_url if not self.fhir_base_url: self.fhir_base_url = os.environ.get('FHIR_VALIDATION_DATASOURCE_BASEURL') # Keyword arguments for HTTP(s) requests (f.e. for auth) # Example parameters: # Authentication: https://requests.readthedocs.io/en/latest/user/authentication/#basic-authentication # Proxies: https://requests.readthedocs.io/en/latest/user/advanced/#proxies # SSL Certificates: https://requests.readthedocs.io/en/latest/user/advanced/#ssl-cert-verification self.requests_kwargs = {} # Init basic auth credentials from environment variables if (os.environ.get('FHIR_VALIDATION_DATASOURCE_AUTH_NAME')): self.requests_kwargs['auth'] = (os.environ.get('FHIR_VALIDATION_DATASOURCE_AUTH_NAME'), os.environ.get('FHIR_VALIDATION_DATASOURCE_AUTH_PASSWORD')) def fhir_operation_validate(self, resource_type, resource, send_pretty=False): headers = {'User-Agent': 'Bulk FHIR validator', 'Content-Type': 'application/fhir+json'} if send_pretty: data = json.dumps(resource, indent=4) else: data = json.dumps(resource) # todo: use environment variable and set it in docker-compose r = requests.post('http://fhir-validation-server:8080/fhir/' + resource_type + '/$validate', headers=headers, data=data) outcome = r.json() return outcome def validate(self, resource_type, entry): resource = entry.get('resource') fullUrl = entry.get('fullUrl') logging.debug(f"Validating {fullUrl}") outcome = self.fhir_operation_validate(resource_type, resource) df = pd.DataFrame() for issue in outcome.get('issue'): diagnostics = issue.get('diagnostics') diagnostics_aggregated = remove_value_code(diagnostics) diagnostics_aggregated = remove_array_index(diagnostics_aggregated) severity = issue.get('severity') location = issue.get('location') location = location[0] location_aggregated = remove_array_index(location) df_add = pd.DataFrame( {'severity': severity, 'location': location, 'location_aggregated': location_aggregated, 'diagnostics': diagnostics, 'diagnostics_aggregated': diagnostics_aggregated, 'fullUrl': fullUrl}, index=[0]) df = pd.concat([df, df_add], ignore_index=True) return df def search_and_validate(self, resource_type="Patient", search_parameters={}, limit=0): count = 0 page = 0 headers = {'User-Agent': 'Bulk FHIR validator', 'Content-Type': 'application/fhir+json', 'Prefer': 'handling=strict', # "Client requests that the server return an error for any unknown or unsupported parameter" instead of "ignore any unknown or unsupported parameter" (f.e. typo in search parameter) and getting all results by ignoring the filter criteria (https://www.hl7.org/fhir/R4/search.html#errors) } if '_count' not in search_parameters: search_parameters['_count'] = 200 df = pd.DataFrame() is_limit_reached = False page_url = f'{self.fhir_base_url}/{resource_type}' while page_url and not is_limit_reached: page += 1 if (page == 1): logging.info(f"FHIR Search: Requesting {page_url}") r = requests.get(page_url, params=search_parameters, headers=headers, **self.requests_kwargs ) else: logging.info(f"FHIR Search: Requesting next page {page_url}") r = requests.get(page_url, headers=headers, **self.requests_kwargs ) r.raise_for_status() bundle_dict = r.json() if (page == 1): total = bundle_dict.get('total') if total == None: total = 0 logging.info(f"Found {total} resources") if limit < total: progressbar_total = limit else: progressbar_total = total progress_bar = tqdm(total=progressbar_total, desc="Validating") count_entries = 0 entries = bundle_dict.get('entry') if entries: count_entries = len(entries) logging.info(f"Starting validation of {count_entries} entries on this page") for entry in entries: df_add = self.validate(resource_type, entry) df = pd.concat([df, df_add], ignore_index=True) count += 1 progress_bar.update(1) if (limit > 0 and count >= limit): is_limit_reached = True logging.info( f"Custom limit of {limit} resources reached, no further FHIR search paging and validation") break if ((limit == 0) or (total < limit)): logging.info(f"Validated {count} of {total} resources") else: logging.info( f"Validated {count} of {limit} resources (custom limit, found resources by FHIR search query: {total})") page_url = get_next_page_url(bundle_dict) if count > 0: logging.info(f"Search and validation done for {count} of {total} found resources") return (df) def validate_resource_and_render_validation_outcome(self, resource_url, resource_type=None): resource_url = self.fhir_base_url + '/' + resource_url # if no resource_type Parameter set, select FHIR resource type from URL find_resource_type = re.search(r".*/(.*)/.*", resource_url) resource_type = find_resource_type.groups()[0] headers = {'User-Agent': 'Bulk FHIR validator', 'Content-Type': 'application/fhir+json'} r = requests.get(resource_url, headers=headers, **self.requests_kwargs ) resource = r.json() outcome = self.fhir_operation_validate(resource_type, resource, send_pretty=True) render_validation_outcome(resource, outcome, resource_url=resource_url) def get_next_page_url(bundle_dict): links = bundle_dict.get('link') if links: for link in links: relation = link.get('relation') if relation == 'next': return link.get('url') return None def remove_value_code(diagnostics): find_value_code = re.search(r"Coding provided \(.+?\#(.+?)\) is not in the value set", diagnostics) if not find_value_code: find_value_code = re.search(r"Unknown code in fragment CodeSystem \'.+?\#(.+?)\'", diagnostics) if find_value_code: value_code = find_value_code.groups()[0] diagnostics_removed_valuecode = diagnostics.replace(value_code, "REMOVEDCODE") else: diagnostics_removed_valuecode = diagnostics return diagnostics_removed_valuecode def remove_array_index(diagnostics): diagnostics_removed_array_index = re.sub("\[[0-9]+\]", "[x]", diagnostics) return diagnostics_removed_array_index def select_location_line(issue): # Get Location line by scraping Element Location by regex # location_linecolumn = issue['location'][1] # find_line = re.search(r"Line\[([0-9]+)\]", location_linecolumn) # location_line = find_line.groups()[0] # location_line = int(location_line) # return location_line # Get location line from FHIR extension http://hl7.org/fhir/StructureDefinition/operationoutcome-issue-line extensions = issue.get('extension') if extensions: for extension in extensions: url = extension.get('url') if (url == 'http://hl7.org/fhir/StructureDefinition/operationoutcome-issue-line'): return extension.get('valueInteger') return None def render_validation_outcome(resource, outcome, resource_url=None, do_print_linenumber=True): from IPython.display import display, HTML import html resource_id = resource.get('id') resource_html = json.dumps(resource, indent=4) resource_html = html.escape(resource_html) resource_html = resource_html.replace(" ", " ").replace("\n", "
") resource_html_array = resource_html.split('
') if do_print_linenumber: resource_html_with_linenumber = [] linenumber = 0 for line in resource_html_array: linenumber += 1 line = '' + str(linenumber).zfill(3) + " " + line resource_html_with_linenumber.append(line) resource_html_array = resource_html_with_linenumber # sort the issues by linenumber so status info for "issue 1 of 5", "issue 2 of 5" etc. is in right order like lines of document # do it reverse because we add issue at begin of the line of fhir resource and multiple issues can be added to a line of fhir resource issues_sorted = sorted(outcome['issue'], key=select_location_line, reverse=True) count_issues = len(issues_sorted) issuenumber = count_issues summary_html = '' for issue in issues_sorted: location_element = issue['location'][0] location_line = select_location_line(issue) match issue['severity']: case "error": style = "color: black; background: red;" case "warning": style = "color: black; background: orange;" case _: style = "color: black; background: lightgray;" # Issue number and navigation issue_html = f'
  • ' # Link to previous issue if issuenumber > 1: issue_html += f'< Previous issue | ' issue_html += f'Issue {issuenumber} of {count_issues}' # Link to summary issue_html += f' | ˆ Back to summary' # Link to next issue if issuenumber < len(issues_sorted): issue_html += f' | Next issue >' issue_html += '
    ' issue_html += f'{issue["severity"]} for element {location_element} (beginning at line ' + str( location_line) + f'):
    {issue["diagnostics"]}' issue_html += '
  • ' summary_html = f'
  • {issue["severity"]} for element {location_element} (beginning at line ' + str( location_line) + '):
    {issue["diagnostics"]}
  • Navigate to JSON Code of the FHIR resource to location where this issue occurs' + summary_html # add issue html to fhir resource line resource_html_array[location_line] = issue_html + resource_html_array[location_line] issuenumber -= 1 resource_html = '
    '.join(resource_html_array) summary_html = f'

    Validation result for resource {resource_id}

    URL of the validated FHIR resource: {resource_url}

    Issues

    FHIR Validation returned ' + str( len(issues_sorted)) + ' issues:
      {summary_html}' resource_html = summary_html + '

    Where Issues occur in the JSON Code of the FHIR Resource

    ' + resource_html resource_html += f'

    Back to summary

    ' display(HTML(resource_html)) outcome_html = html.escape(json.dumps(outcome, indent=4)) outcome_html = outcome_html.replace(" ", " ").replace("\n", "
    ") # display(HTML(outcome_html))