Skip to content
Snippets Groups Projects
Commit ee2acd5c authored by Tu, Ethan's avatar Tu, Ethan
Browse files

Replace pk2Comp.py

parent b3396da9
No related branches found
No related tags found
No related merge requests found
"""The pk2Comp object is a two compartment PK model
that outputs graphs of concentration of tracer over time.
"""
#!/usr/bin/env python
# coding: utf-8
# In[51]:
# Import commands
from scipy.stats import gamma
# In[1]:
import pathlib
import os
import csv
import re
import math
import numpy as np
import matplotlib.pyplot as plt
#%matplotlib inline
from scipy.integrate import odeint
import math as math
import os
import pandas as pd
from scipy.integrate import solve_ivp
from scipy.optimize import fmin
# %matplotlib inline
# np.set_printoptions(threshold=sys.maxsize)
class pk2Comp:
"""The pk2Comp object is a two compartment PK model that outputs graphs of concentration of tracer over time."""
"""The pk2Comp object is a two compartment PK model
that outputs graphs of concentration of tracer over time.
"""
def __init__ (self, numParam = 4, Flow = 1/60, Vp = 0.05, Visf = 0.15, PS = 1/60):
def __init__(self, wd=pathlib.Path('Data').absolute(), filename='CTPERF005_stress.csv'):
"""Initializes the model with default parameter values for flow, Vp, Visf, and PS.
Parameters
----------
numParam: int
numParam is the number of parameters you want to optimize for the model. Defaults to 4.
----------
time : double[]
list of all timepoints
aorta : double[]
concentration of tracer in aorta (input function)
myo : double[]
concentration of tracer in myocardial tissue (conc_isf)
Flow : double
Flow is the flow of plasma through the blood vessel in mL/(mL*min). Defaults to 1/60.
Vp : double
Vp is the volume of plasma in mL. Defaults to 0.05.
Visf : double
Visf is the volume of interstitial fluid in mL. Defaults to 0.15.
PS : double
PS is the permeability-surface area constant in mL/(g*min). Defaults to 1/60.
PS is the permeability-surface area constant in mL/(g*min). Defaults to 1/60.
"""
if numParam <= 0 or Flow <= 0 or Vp < 0 or Visf < 0 or PS<0 or PS>10 or Vp>10 or Visf>10:
raise ValueError("Input values are incorrect.")
# Subject Data
self.wd = wd
self.filename = filename
self.time = []
self.aorta = []
self.myo = []
# Declare Variables for initial conditions
self.numParam = numParam
self.flow = Flow
self.Vp = Vp
self.Visf = Visf
self.PS = PS
self.sol = []
self.Cp0 = 0 # Initial concentration of tracer in plasma
self.Cisf0 = 0 # Initial concentration of tracer in interstitial space
self.tmax = 10 #Time in seconds
self.dt = 1 #Time step
self.a = 2. # Alpha for gamma distribution
self.rv = gamma(self.a, loc = 2, scale = 0.55) #input function
# Compartment variables to be fitted
self.flow = 1/60
self.visf = 0.15
self.baseline = 60
# Other Compartmental Modelvariables
self.perm_surf = 0.35
self.vol_plasma = 0.10
# Solved ode
self.sol = []
self.Mass_plasma = [] #mass of tracer in plasma
self.Mass_isf = []
self.Q = []
# Define the time array
self.time = np.arange(0, self.tmax + self.dt, self.dt)
# Gamma variables
self.ymax = 250
self.tmax = 6.5
self.alpha = 2.5
self.delay = 0
self.f2 = np.array([])
self.f3 = np.array([])
def Get_data(self, filename):
"""Imports data from all .csv files in directory.
Parameters
----------
wd : str
wd is the working directory path
Attributes
----------
time : double[]
list of all timepoints
aorta : double[]
concentration of tracer in aorta (input function)
myo : double[]
concentration of tracer in myocardial tissue (conc_isf)
Returns
-------
time : double[]
list of all timepoints
aorta : double[]
concentration of tracer in aorta (input function)
myo : double[]
concentration of tracer in myocardial tissue (conc_isf)
"""
os.chdir(self.wd)
# File not found error
if not os.path.isfile(filename):
raise ValueError(
"Input file does not exist: {0}. I'll quit now.".format(filename))
data = list(csv.reader(open(filename), delimiter='\t'))
for i in range(12):
self.time.append(
float(re.compile('\d+[.]+\d+|\d+').findall(data[i+1][0])[0]))
self.aorta.append(
float(re.compile('\d+[.]+\d+|\d+').findall(data[i+1][1])[0]))
self.myo.append(
float(re.compile('\d+[.]+\d+|\d+').findall(data[i+1][2])[0]))
return self.time, self.aorta, self.myo
# gamma_var distribution curve
def gamma_var(self, time=np.arange(0, 25), ymax=10, tmax=10, alpha=2, delay=0):
"""Creates a gamma variate probability density function with given alpha,
location, and scale values.
Parameters
----------
t : double[]
array of timepoints
ymax : double
peak y value of gamma distribution
tmax : double
location of 50th percentile of function
alpha : double
scale parameter
delay : double
time delay of which to start gamma distribution
Returns
-------
y : double[]
probability density function of your gamma variate.
"""
# Following Madsen 1992 simplified parameterization for gamma variate
t = time
self.ymax = ymax
self.tmax = tmax
self.alpha = alpha
self.delay = delay
y = np.zeros(np.size(t)) # preallocate output
# For odeint, checks if t input is array or float
if isinstance(t, (list, np.ndarray)):
for i in range(np.size(y)):
if t[i] < delay:
y[i] = 0
else:
y[i] = round((ymax*tmax**(-alpha)*math.exp(alpha))*(t[i]-delay)
** alpha*math.exp(-alpha*(t[i]-delay)/tmax), 3)
return y
else:
y = round((ymax*tmax**(-alpha)*math.exp(alpha))*(t-delay)
** alpha*math.exp(-alpha*(t-delay)/tmax), 3)
return y
# gamma_var_error
def inputMSE(self, guess=[10, 10, 2, 5]):
"""Calculates Mean squared error (MSE) between data and
gamma variate with given parameters values.
Parameters
----------
param : ndarray[]
time : double[]
array of timepoints
ymax : double
peak y value of gamma distribution
tmax : double
location of 50th percentile of function
alpha : double
scale parameter
delay : double
time delay of which to start gamma distribution
Returns
-------
MSE : double
Mean squared error
"""
if len(guess) < 1:
self.ymax = 10
self.tmax = 10
self.alpha = 2
self.delay = 5
elif len(guess) < 2:
self.ymax = guess[0]
self.tmax = 10
self.alpha = 2
self.delay = 5
elif len(guess) < 3:
self.ymax = guess[0]
self.tmax = guess[1]
self.alpha = 2
self.delay = 5
elif len(guess) < 4:
self.ymax = guess[0]
self.tmax = guess[1]
self.alpha = guess[2]
self.delay = 5
else:
# Mean squared error (MSE) between data and gamma variate with given parameters
self.ymax = guess[0]
self.tmax = guess[1]
self.alpha = guess[2]
self.delay = guess[3]
mse = 0
if self.tmax <= 0 or self.ymax <= 10 or self.delay < 0 or self.alpha < 0 \
or self.alpha > 1000 or self.tmax > 1000:
mse = 1000000 # just return a big number
else:
model_vals = self.gamma_var(
self.time, self.ymax, self.tmax, self.alpha, self.delay)
for i in range(len(self.aorta)):
mse = (self.aorta[i] - model_vals[i])**2 + mse
mse = mse / len(self.aorta)
return round(mse, 3)
def inputFuncFit(self, initGuesses):
"""Uses fmin algorithm (Nelder-Mead simplex algorithm) to
minimize loss function (MSE) of input function.
Parameters
----------
initGuesses : ndarray[]
Array of initial guesses containing:
time : double[]
array of timepoints
ymax : double
peak y value of gamma distribution
tmax : double
location of 50th percentile of function
alpha : double
scale parameter
delay : double
time delay of which to start gamma distribution
Returns
-------
opt : double[]
optimized parameters
"""
# Mean squared error (MSE) between data and gamma variate with given parameters
opt = fmin(self.inputMSE, initGuesses, maxiter=1000)
self.ymax = opt[0]
self.tmax = opt[1]
self.alpha = opt[2]
self.delay = opt[3]
return opt.round(2)
# Derivative function
def derivs(self, curr_vals, time):
def derivs(self, time, curr_vals):
"""Finds derivatives of ODEs.
Parameters
----------
----------
curr_vals : double[]
curr_vals it he current values of the variables we wish to "update" from the curr_vals list.
curr_vals it he current values of the variables we wish to
"update" from the curr_vals list.
time : double[]
time is our time array from 0 to tmax with timestep dt.
Returns
-------
dCp_dt : double[]
dconc_plasma_dt : double[]
contains the derivative of concentration in plasma with respect to time.
dCisf_dt : double[]
dconc_isf_dt : double[]
contains the derivative of concentration in interstitial fluid with respect to time.
"""
# Unpack the current values of the variables we wish to "update" from the curr_vals list
Cp, Cisf = curr_vals
conc_plasma, conc_isf = curr_vals
# Define value of input function Cin
Cin = self.rv.pdf(time)
# Define value of input function conc_in
conc_in = self.gamma_var(time, self.ymax, self.tmax,
self.alpha, self.delay)
# Right-hand side of odes, which are used to computer the derivative
dCp_dt = (self.flow/self.Vp)*(Cin - Cp) + (self.PS/self.Vp)*(Cisf - Cp)
dCisf_dt = (self.PS/self.Visf)*(Cp - Cisf)
dconc_plasma_dt = (self.flow/self.vol_plasma)*(conc_in - conc_plasma) + (self.perm_surf/self.vol_plasma)*(conc_isf - conc_plasma)
dconc_isf_dt = (self.perm_surf/self.visf)*(conc_plasma - conc_isf)
return dconc_plasma_dt, dconc_isf_dt
return dCp_dt, dCisf_dt
def outputMSE(self, guess):
"""Calculates Mean squared error (MSE) between data and
gamma variate with given parameters values.
Parameters
----------
guess : ndarray[]
def main(self):
"""Main function to solve ODEs"""
# Store the initial values in a list
init = [self.Cp0, self.Cisf0]
Flow : double
Flow is the flow of plasma through the blood vessel in mL/(mL*min).
Defaults to 1/60.
# Solve the odes with odeint
self.sol = odeint(self.derivs, init, self.time)
Vp : double
Vp is the volume of plasma in mL. Defaults to 0.05.
self.Mass_plasma = self.Vp * self.sol[:,0] #mass of tracer in plasma
self.Mass_isf = self.Visf * self.sol[:,1] #mass of tracer in isf
#Tp = Vp/(flow + PS) # mean transit time
#E = 1 - np.exp(-PS/flow) #extraction fraction
self.Q = self.Mass_plasma + self.Mass_isf
Visf : double
Visf is the volume of interstitial fluid in mL. Defaults to 0.15.
#print('The mean transit time is ' + str(Tp))
#print('The extraction fraction is ' + str(E))
PS : double
PS is the permeability-surface area constant in mL/(g*min). Defaults to 1/60.
Returns
-------
MSE : double
Mean squared error
"""
self.flow = guess[0]
self.visf = guess[1]
self.baseline = guess[2]
def getPlot(self):
"""Plots the solution of the solved ODEs.
Attributes
----------
sol : double[]
contains the solutions of our ODE functions.
mse = 0
if self.flow <= 0 or self.flow >= 25 or self.visf > 100 or self.visf < 0 \
or self.baseline > 150 or self.baseline < 0:
mse = 100000 # just return a big number
else:
sol = solve_ivp(self.derivs, [0, 30], [0, 0], t_eval=self.time)
MBF = sol.y[0] + sol.y[1]
temp = np.asarray(self.myo) - self.baseline
for i in range(len(self.myo)):
mse = (temp[i] - MBF[i])**2 + mse
mse = mse / len(self.myo)
return mse
def outputFuncFit(self, initGuesses):
"""Uses fmin algorithm (Nelder-Mead simplex algorithm) to minimize
loss function (MSE) of input function.
Parameters
----------
initGuesses : ndarray[]
Array of initial guesses containing:
time : double[]
array of timepoints
ymax : double
peak y value of gamma distribution
tmax : double
location of 50th percentile of function
alpha : double
scale parameter
delay : double
time delay of which to start gamma distribution
Returns
-------
opt : double[]
optimized parameters
"""
# Mean squared error (MSE) between data and gamma variate with given parameters
opt1 = fmin(self.outputMSE, initGuesses, maxiter=10000)
self.flow = opt1[0]
self.visf = opt1[1]
self.baseline = opt1[2]
return opt1 # .round(4)
def main(self):
# Gets data from file
self.Get_data(self.filename)
# Plots original data
plt.plot(self.time, self.aorta, 'bo')
plt.plot(self.time, self.myo, 'ro')
# Fit gamma_var input function and plots it
opt = self.inputFuncFit([250, 7, 4, 0])
print(opt)
print(self.ymax, self.tmax, self.alpha, self.delay)
plt.plot(np.arange(0, 25, 0.01), self.gamma_var(np.arange(0, 25, 0.01),
opt[0], opt[1], opt[2], opt[3]), 'k-')
# Plot the results using the values stored in the solution variable, "sol"
# Plot Cp using the "0" element from the solution
plt.figure(1)
plt.plot(self.time, self.rv.pdf(self.time), color = 'blue', label = 'Input Function')
plt.plot(self.time, self.sol[:,0],color="green", label = 'Cp')
# Plot Cisf using the "1" element from the solution
plt.plot(self.time, self.sol[:,1],color="purple", label = 'Cisf')
plt.xlabel('Time [s]')
plt.ylabel('Concentration [mM]')
plt.legend(loc = 'best')
plt.grid()
# Plot mass of tracer using the "2" element from the solution
plt.figure(2)
plt.plot(self.time, self.Mass_plasma,color="red", label = 'Plasma')
# Plot mass of tracer in tissue using the "3" element from the solution
plt.plot(self.time, self.Mass_isf,color="black", label = 'Interstitial Space')
plt.plot(self.time, self.Q, color="blue", label = 'Total mass')
plt.xlabel('Time [s]')
plt.ylabel('Mass [mg]')
plt.legend(loc = 'best')
plt.grid()
print('Cp at 10 sec is ' + str(self.sol[100,0]))
print('Cisf at 10 sec is ' + str(self.sol[100,1]))
# Fit uptake function and plot it
opt2 = self.outputFuncFit([.011418, .62, self.myo[0]])
print('myo is ', self.myo[0])
print(opt2)
print(self.flow, self.visf, self.baseline)
print('time is ', self.time)
self.f2 = solve_ivp(self.derivs, [0, 30], [0, 0], t_eval=self.time)
self.f3 = self.f2.y[0] + self.f2.y[1]
plt.plot(self.time, self.f3 + self.baseline, 'm-')
# In[2]:
PK = pk2Comp()
PK.main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment