2024-09-08 23:35:23 +02:00
import requests
import json
import pandas as pd
import re
import os
import logging
2024-09-30 15:25:37 +02:00
from tqdm . auto import tqdm
2024-09-08 23:35:23 +02:00
class Validator ( ) :
def __init__ ( self , fhir_base_url = None ) :
self . fhir_base_url = fhir_base_url
if not self . fhir_base_url :
self . fhir_base_url = os . environ . get ( ' FHIR_VALIDATION_DATASOURCE_BASEURL ' )
# Keyword arguments for HTTP(s) requests (f.e. for auth)
# Example parameters:
# Authentication: https://requests.readthedocs.io/en/latest/user/authentication/#basic-authentication
# Proxies: https://requests.readthedocs.io/en/latest/user/advanced/#proxies
# SSL Certificates: https://requests.readthedocs.io/en/latest/user/advanced/#ssl-cert-verification
self . requests_kwargs = { }
# Init basic auth credentials from environment variables
if ( os . environ . get ( ' FHIR_VALIDATION_DATASOURCE_AUTH_NAME ' ) ) :
self . requests_kwargs [ ' auth ' ] = ( os . environ . get ( ' FHIR_VALIDATION_DATASOURCE_AUTH_NAME ' ) ,
os . environ . get ( ' FHIR_VALIDATION_DATASOURCE_AUTH_PASSWORD ' ) )
def fhir_operation_validate ( self , resource_type , resource , send_pretty = False ) :
headers = { ' User-Agent ' : ' Bulk FHIR validator ' ,
' Content-Type ' : ' application/fhir+json ' }
if send_pretty :
data = json . dumps ( resource , indent = 4 )
else :
data = json . dumps ( resource )
# todo: use environment variable and set it in docker-compose
r = requests . post ( ' http://fhir-validation-server:8080/fhir/ ' + resource_type + ' /$validate ' , headers = headers ,
data = data )
outcome = r . json ( )
return outcome
def validate ( self , resource_type , entry ) :
resource = entry . get ( ' resource ' )
fullUrl = entry . get ( ' fullUrl ' )
logging . debug ( f " Validating { fullUrl } " )
outcome = self . fhir_operation_validate ( resource_type , resource )
df = pd . DataFrame ( )
for issue in outcome . get ( ' issue ' ) :
diagnostics = issue . get ( ' diagnostics ' )
diagnostics_aggregated = remove_value_code ( diagnostics )
diagnostics_aggregated = remove_array_index ( diagnostics_aggregated )
severity = issue . get ( ' severity ' )
location = issue . get ( ' location ' )
location = location [ 0 ]
location_aggregated = remove_array_index ( location )
df_add = pd . DataFrame (
{ ' severity ' : severity , ' location ' : location , ' location_aggregated ' : location_aggregated ,
' diagnostics ' : diagnostics , ' diagnostics_aggregated ' : diagnostics_aggregated , ' fullUrl ' : fullUrl } ,
index = [ 0 ] )
df = pd . concat ( [ df , df_add ] , ignore_index = True )
return df
def search_and_validate ( self , resource_type = " Patient " , search_parameters = { } , limit = 0 ) :
count = 0
page = 0
headers = { ' User-Agent ' : ' Bulk FHIR validator ' ,
' Content-Type ' : ' application/fhir+json ' ,
' Prefer ' : ' handling=strict ' ,
# "Client requests that the server return an error for any unknown or unsupported parameter" instead of "ignore any unknown or unsupported parameter" (f.e. typo in search parameter) and getting all results by ignoring the filter criteria (https://www.hl7.org/fhir/R4/search.html#errors)
}
if ' _count ' not in search_parameters :
search_parameters [ ' _count ' ] = 200
df = pd . DataFrame ( )
is_limit_reached = False
page_url = f ' { self . fhir_base_url } / { resource_type } '
while page_url and not is_limit_reached :
page + = 1
if ( page == 1 ) :
logging . info ( f " FHIR Search: Requesting { page_url } " )
r = requests . get ( page_url ,
params = search_parameters ,
headers = headers ,
* * self . requests_kwargs
)
else :
logging . info ( f " FHIR Search: Requesting next page { page_url } " )
r = requests . get ( page_url ,
headers = headers ,
* * self . requests_kwargs
)
r . raise_for_status ( )
bundle_dict = r . json ( )
if ( page == 1 ) :
total = bundle_dict . get ( ' total ' )
2024-09-30 15:40:42 +02:00
if total is None :
2024-09-08 23:35:23 +02:00
total = 0
logging . info ( f " Found { total } resources " )
2024-09-30 15:40:42 +02:00
progressbar_total = total
if limit > 0 :
if limit < total :
progressbar_total = limit
2024-09-30 15:25:37 +02:00
progress_bar = tqdm ( total = progressbar_total , desc = " Validating " )
2024-09-08 23:35:23 +02:00
count_entries = 0
entries = bundle_dict . get ( ' entry ' )
if entries :
count_entries = len ( entries )
logging . info ( f " Starting validation of { count_entries } entries on this page " )
for entry in entries :
df_add = self . validate ( resource_type , entry )
df = pd . concat ( [ df , df_add ] , ignore_index = True )
count + = 1
2024-09-30 15:25:37 +02:00
progress_bar . update ( 1 )
2024-09-08 23:35:23 +02:00
if ( limit > 0 and count > = limit ) :
is_limit_reached = True
logging . info (
f " Custom limit of { limit } resources reached, no further FHIR search paging and validation " )
break
if ( ( limit == 0 ) or ( total < limit ) ) :
logging . info ( f " Validated { count } of { total } resources " )
else :
logging . info (
f " Validated { count } of { limit } resources (custom limit, found resources by FHIR search query: { total } ) " )
page_url = get_next_page_url ( bundle_dict )
if count > 0 :
logging . info ( f " Search and validation done for { count } of { total } found resources " )
return ( df )
def validate_resource_and_render_validation_outcome ( self , resource_url , resource_type = None ) :
resource_url = self . fhir_base_url + ' / ' + resource_url
# if no resource_type Parameter set, select FHIR resource type from URL
find_resource_type = re . search ( r " .*/(.*)/.* " , resource_url )
resource_type = find_resource_type . groups ( ) [ 0 ]
headers = { ' User-Agent ' : ' Bulk FHIR validator ' ,
' Content-Type ' : ' application/fhir+json ' }
r = requests . get ( resource_url ,
headers = headers ,
* * self . requests_kwargs
)
resource = r . json ( )
2024-09-12 14:27:42 +02:00
outcome = self . fhir_operation_validate ( resource_type , resource , send_pretty = True )
2024-09-08 23:35:23 +02:00
render_validation_outcome ( resource , outcome , resource_url = resource_url )
def get_next_page_url ( bundle_dict ) :
links = bundle_dict . get ( ' link ' )
if links :
for link in links :
relation = link . get ( ' relation ' )
if relation == ' next ' :
return link . get ( ' url ' )
return None
def remove_value_code ( diagnostics ) :
find_value_code = re . search ( r " Coding provided \ (.+? \ #(.+?) \ ) is not in the value set " , diagnostics )
if not find_value_code :
find_value_code = re . search ( r " Unknown code in fragment CodeSystem \ ' .+? \ #(.+?) \ ' " , diagnostics )
if find_value_code :
value_code = find_value_code . groups ( ) [ 0 ]
diagnostics_removed_valuecode = diagnostics . replace ( value_code , " REMOVEDCODE " )
else :
diagnostics_removed_valuecode = diagnostics
return diagnostics_removed_valuecode
def remove_array_index ( diagnostics ) :
diagnostics_removed_array_index = re . sub ( " \ [[0-9]+ \ ] " , " [x] " , diagnostics )
return diagnostics_removed_array_index
def select_location_line ( issue ) :
# Get Location line by scraping Element Location by regex
# location_linecolumn = issue['location'][1]
# find_line = re.search(r"Line\[([0-9]+)\]", location_linecolumn)
# location_line = find_line.groups()[0]
# location_line = int(location_line)
# return location_line
# Get location line from FHIR extension http://hl7.org/fhir/StructureDefinition/operationoutcome-issue-line
extensions = issue . get ( ' extension ' )
if extensions :
for extension in extensions :
url = extension . get ( ' url ' )
if ( url == ' http://hl7.org/fhir/StructureDefinition/operationoutcome-issue-line ' ) :
return extension . get ( ' valueInteger ' )
return None
def render_validation_outcome ( resource , outcome , resource_url = None , do_print_linenumber = True ) :
from IPython . display import display , HTML
import html
resource_id = resource . get ( ' id ' )
resource_html = json . dumps ( resource , indent = 4 )
resource_html = html . escape ( resource_html )
resource_html = resource_html . replace ( " " , " " ) . replace ( " \n " , " <br> " )
resource_html_array = resource_html . split ( ' <br> ' )
if do_print_linenumber :
resource_html_with_linenumber = [ ]
linenumber = 0
for line in resource_html_array :
linenumber + = 1
line = ' <span style= " background: lightgray; " > ' + str ( linenumber ) . zfill ( 3 ) + " </span> " + line
resource_html_with_linenumber . append ( line )
resource_html_array = resource_html_with_linenumber
# sort the issues by linenumber so status info for "issue 1 of 5", "issue 2 of 5" etc. is in right order like lines of document
# do it reverse because we add issue at begin of the line of fhir resource and multiple issues can be added to a line of fhir resource
issues_sorted = sorted ( outcome [ ' issue ' ] , key = select_location_line , reverse = True )
count_issues = len ( issues_sorted )
issuenumber = count_issues
summary_html = ' '
for issue in issues_sorted :
location_element = issue [ ' location ' ] [ 0 ]
location_line = select_location_line ( issue )
match issue [ ' severity ' ] :
case " error " :
style = " color: black; background: red; "
case " warning " :
style = " color: black; background: orange; "
case _ :
style = " color: black; background: lightgray; "
# Issue number and navigation
issue_html = f ' <span id= " { resource_id } -issue { issuenumber } " ><li style= " ' + style + ' " ><small> '
# Link to previous issue
if issuenumber > 1 :
issue_html + = f ' <a href= " # { resource_id } -issue ' + str ( issuenumber - 1 ) + ' " >< Previous issue</a> | '
issue_html + = f ' Issue { issuenumber } of { count_issues } '
# Link to summary
issue_html + = f ' | <a href= " # { resource_id } " >ˆ Back to summary</a> '
# Link to next issue
if issuenumber < len ( issues_sorted ) :
issue_html + = f ' | <a href= " # { resource_id } -issue ' + str ( issuenumber + 1 ) + ' " >Next issue ></a> '
issue_html + = ' </small><br> '
issue_html + = f ' { issue [ " severity " ] } for element <i><b> { location_element } </b></i> (beginning at line ' + str (
location_line ) + f ' ):<br><b> { issue [ " diagnostics " ] } </b> '
issue_html + = ' </li></span> '
summary_html = f ' <li style= " { style } " > { issue [ " severity " ] } for element <i><b> { location_element } </b></i> (beginning at line ' + str (
location_line ) + ' ):<br><b> {issue["diagnostics"]} </b></li><p><a href= " # {resource_id} -issue ' + str (
issuenumber ) + ' " >Navigate to JSON Code of the FHIR resource to location where this issue occurs</a> ' + summary_html
# add issue html to fhir resource line
resource_html_array [ location_line ] = issue_html + resource_html_array [ location_line ]
issuenumber - = 1
resource_html = ' <br style= " font-family: monospace; " > ' . join ( resource_html_array )
summary_html = f ' <h3 id= " { resource_id } " >Validation result for resource { resource_id } </h3><p>URL of the validated FHIR resource: <a target= " _blank " href= " { resource_url } " > { resource_url } </a></p><h3>Issues</h3>FHIR Validation returned ' + str (
len ( issues_sorted ) ) + ' issues:<ol> {summary_html} '
resource_html = summary_html + ' </ol><h4>Where Issues occur in the JSON Code of the FHIR Resource</h4> ' + resource_html
resource_html + = f ' <p><a href= " # { resource_id } " >Back to summary</a></p> '
display ( HTML ( resource_html ) )
outcome_html = html . escape ( json . dumps ( outcome , indent = 4 ) )
outcome_html = outcome_html . replace ( " " , " " ) . replace ( " \n " , " <br> " )
# display(HTML(outcome_html))