Source code for Fitters

#<PyXMRTool: A Python Package for the analysis of X-Ray Magnetic Reflectivity data measured on heterostructures>
#    Copyright (C) <2018>  <Yannic Utz>
#
#    This program is free software: you can redistribute it and/or modify
#    it under the terms of the GNU Lesser General Public License as published by
#    the Free Software Foundation, either version 3 of the License, or
#    (at your option) any later version.
#
#    This program is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#    GNU Lesser General Public License for more details.
#
#    You should have received a copy of the GNU Lesser General Public License
#    along with this program.  If not, see <https://www.gnu.org/licenses/>.


"""Contains different optimization algorithms designed to fit reflectivity data.
   They take advantage of parallelization to be used on multiprocessor system.
   
   The algorithms are developed by Martin Zwiebel and I just adopted them with slight changes to PyXMRTool.
   More information can be found in the PhD thesis of Martin Zwiebler.
   
   The algorithms are not well developed yet. It is better to use existing optimizers. E.g. *scipy.optimize.least_squares*.
   
   Only :func:`.Explore` and related functions are recommended to use.
   :func:`Explore` uses *scipy.optimize.least_squares* to explore the complete parameter range and :func:`.list_clusters`, :func:`plot_clusters_onepar`, :func:`plot_clusters_allpars` and :func:`plot_fixpoints_allpars` are used to visualize the result.
   
   
"""

#Python Version 3.6

__author__ = "Yannic Utz"
__copyright__ = ""
__credits__ = ["Yannic Utz and Martin Zwiebler"]
__license__ = "GNU General Public License v3.0"
__version__ = "0.9"
__maintainer__ = "Yannic Utz"
__email__ = "yannic.utz@tu-dresden.de"
__status__ = "beta"



import numbers
import joblib
import types
import copy
from matplotlib import pyplot as plt
from matplotlib import patches as mpatches
from scipy import linalg as scipy_linalg
import numpy
import os.path
import sklearn.preprocessing
import sklearn.cluster
import scipy.optimize

from PyXMRTool import Parameters

#settings############################
numerical_derivative_factor=1.0e-9                          #defines in principle the the magnitude of  "Delta x" for the aproximation of a derivative by "Delta y/Delta x"
#####################################

[docs]def Explore(residualsfunction, parameter_settings, number_of_seeds, verbose=2,number_of_clusters=None): """ A scanning function which should be usefull to explore the parameter space. It chooses **number_of_seeds** different random start parameter vectors (seeds) within the given paramteter range. Each seed is used as start parameter set for a least_square fitter to find the minimum of the sum of squared residuals (*ssr*) (using :func:`scipy.optimize.least_squares` with the *trust region reflective algorithm). This will lead to **number_of_seeds** fixpoints. They will then be analysed with a (k-means) clustering algorithm to group these fixpoint in **number_of_clusters** different clusters. If **number_of_clusters** = *None* (default), the clustering with the best sillouette coefficient will be used. These clusters will then be analysed: What is the SSR corresponding to the cluster centers? How many seeds lead to the corresponding clusters? What are the means and spreads of parameter values within each cluster? The return value will be a structure containing the results. Parameters ---------- residualsfunction : callable A function which returns the differences between simulated and measured data points (residuals) as list/array. It should usally be the method :meth:`SampleRepresentation.ReflDataSimulator.getResiduals' of an instance of :class:`SampleRepresentation.ReflDataSimulator`. parameter_settings : tuple of lists/arrays of floats Sets start values, lower and upper limit of the parameters as *(startfitparameters, lower_limits, upper_limits )*, where each of the entries is an list/array of values of same length. The *startfitparameters* are not used (can be *None*) and just necessaray for compatibility. number_of_seeds : int number of random seeds which should be generated verbose : {0, 1, 2} determines the level of the optimizer's algorithm's verbosity: 0 : work silently. 1 : display a termination report for each seed. 2 (default) : display progress during iterations. number_of_clusters : int number of clusters in which the resulting fixpoints shall be grouped """ #unpack parameter settings (startfitparameters, lower_limits, upper_limits )=parameter_settings #check parameters if not callable(residualsfunction): raise TypeError("\'residualsfunction\' has to be callable.") if not len(lower_limits)==len(upper_limits): raise ValueError("Constraints does not match in length.") if not isinstance(number_of_seeds, numbers.Integral): raise TypeError("Invalid parameter. \'number_of_seeds\' has to be integer.") if number_of_seeds<3: raise ValueError("\'number_of_seeds\' has to be 3 at least.") if number_of_clusters is not None: if not isinstance(number_of_clusters, numbers.Integral): raise TypeError("Invalid parameter. \'number_of_clusters\' has to be integer.") if number_of_clusters<0: raise ValueError("\'number_of_clusters\' has to be positive.") #performing the least squares optimizations print(("... performing least squares optimization for "+str(number_of_seeds) +" seeds")) fixpoints=[] ssrs=[] upper_limits=numpy.array(upper_limits) lower_limits=numpy.array(lower_limits) for i in range(number_of_seeds): res = scipy.optimize.least_squares(residualsfunction, numpy.random.rand(len(upper_limits))*(upper_limits-lower_limits)+lower_limits, bounds=(lower_limits,upper_limits), method='trf', x_scale=upper_limits-lower_limits, jac='3-point',verbose=verbose) fixpoints.append(res.x) ssrs.append(2*res.cost) fixpoints=numpy.array(fixpoints) ssrs=numpy.array(ssrs) ssrfunction=lambda fitpararray : numpy.sum ( numpy.square ( residualsfunction(fitpararray) ) ) scan_output=Cluster({'fixpoints': fixpoints, 'ssrs_of_fixpoints' : ssrs}, ssrfunction ,number_of_clusters) return scan_output
[docs]def Cluster(scan_output,ssrfunction, number_of_clusters=None): """ Function to deal with output of :func:`Explore`. Clusters the found fixpoints and returns the result as structure in the same format as :func:`Explore`. Actually, it is used by :func:`Explore` internally. Parameters ---------- scan_output : struct return value of :func:`Explore` ssrfunction : callable A function which returns the sum of squared residuals between simulated and measured data points (ssr). It should usally be the method :meth:`SampleRepresentation.ReflDataSimulator.getSSR` of an instance of :class:`SampleRepresentation.ReflDataSimulator`. number_of_clusters : int number of clusters in which the resulting fixpoints shall be grouped """ #parameter checking if not callable(ssrfunction): raise TypeError("\'ssrsfunction\' has to be callable.") scan_output=scan_output.copy() fixpoints=scan_output['fixpoints'] ssrs=scan_output['ssrs_of_fixpoints'] #performing cluster analysis scaler=sklearn.preprocessing.StandardScaler() #parameters have to be scaled to allow for a reasonable clustering fixpoints_scaled=scaler.fit_transform(fixpoints) if number_of_clusters is None: print(("... clustering "+str(len(fixpoints)) +" fixpoints in an optimal number of clusters")) scores=[] for i in numpy.arange(2,min([20,len(fixpoints)])): km=sklearn.cluster.KMeans(n_clusters=i,init='random',n_init=100).fit(fixpoints_scaled) scores.append(sklearn.metrics.silhouette_score(fixpoints_scaled, km.labels_)) number_of_clusters=2+scores.index(max(scores)) else: print(("... clustering "+str(number_of_seeds) +" fixpoints in "+str(number_of_clusters)+" clusters")) km=sklearn.cluster.KMeans(n_clusters=number_of_clusters, init='random',n_init=100).fit(fixpoints_scaled) clusters=[] clusters_members=[] clusters_members_ssrs=[] for i,v in enumerate(km.cluster_centers_): number_fp=list(km.labels_).count(i) cluster_center=scaler.inverse_transform(v) members_indices=numpy.where(km.labels_==i) members=fixpoints[members_indices] members_ssrs=ssrs[members_indices] std_dev=numpy.std(members,axis=0) max_spread=numpy.max(members,axis=0)-numpy.min(members,axis=0) clusters.append({'center': cluster_center, 'std_dev': std_dev, 'rel_std_dev': std_dev/cluster_center, 'max_spread': max_spread, 'rel_max_spread': max_spread/cluster_center , 'number_of_members': number_fp, 'ratio_of_seeds': float(number_fp)/len(fixpoints), 'ssr_of_center': ssrfunction(cluster_center)}) clusters_members.append(members) clusters_members_ssrs.append(members_ssrs) scan_output['clusters'] = clusters scan_output['clusters_members'] = clusters_members scan_output['clusters_members_ssrs'] = clusters_members_ssrs print("... printing overview") list_clusters(scan_output) return scan_output
[docs]def list_clusters(scan_output): """ Function to deal with output of :func:`Explore`. Lists all found clusters (which should correspond to fixpoints) and their properties. """ clusters=scan_output['clusters'] for i,v in enumerate(clusters): print("cluster "+str(i)+": catches "+str(v['ratio_of_seeds']*100)+"% of seeds, ssr_of_center="+str(v['ssr_of_center']))
[docs]def plot_clusters_onepar(scan_output, p, parameter_pool=None, ssr_lim=None): """ Function to deal with output of :func:`Explore` (**scan_output**). Shows the parameter values of the centers of the found clusters of one parameter. On the y axis the corresponding sum of squared residuals of the fit are shown. The size (area) of the bubles corresponds to the ratio of seeds which converged to this fixpoint. The bars show the range of parameter values which were assigned to this cluster/fixpoint. Parameters ---------- scan_output : struct Structure as returned by :func:`Explore`. p : int or str Selects the parameter. Either with its index or its name. In the second case **parameter_pool** has to be given. parameter_pool : :class:`Parameters.ParameterPool` The paramter pool containing the parameters, which are under consideration. If given, parameter names are plotted and the x axis is adjusted to lower and upper limits stored in **parameter_pool**. ssr_lim : list/tuple lower and upper limit of y-axis (ssr) """ #parameter checking if parameter_pool is not None and not isinstance(parameter_pool,Parameters.ParameterPool): raise TypeError('\'parameter_pool\' has to be an instance of \'Parameters.ParameterPool\'.') if not isinstance(p, numbers.Integral): if isinstance(p, str): if isinstance(parameter_pool,Parameters.ParameterPool): pname=p pnumber=parameter_pool.getNames().index(pname) ll=parameter_pool.getParameter(pname).lower_lim ul=parameter_pool.getParameter(pname).upper_lim else: TypeError('\'p\' has to be either integer (index of parameter) or a string (name of parameter).') else: if isinstance(parameter_pool,Parameters.ParameterPool): pnumber=p pname=parameter_pool.getNames()[pnumber] ll=parameter_pool.getParameter(pname).lower_lim ul=parameter_pool.getParameter(pname).upper_lim else: pnumber=p pname='Parameter Index '+str(p) ll=None ul=None #create a color for each cluster cmap=plt.get_cmap('gist_rainbow') n_clusters=len(scan_output['clusters']) colors=[cmap(i/float(n_clusters-1),0.5) for i in range(n_clusters)] #get center values centers=[] for cl in scan_output['clusters']: centers.append(cl['center'][pnumber]) #get sums of squared residuals ssrs=[] for cl in scan_output['clusters']: ssrs.append(cl['ssr_of_center']) #get ratios of seeds (meaning: how many of the seeds convered to a certain cluster) rofs=[] for cl in scan_output['clusters']: rofs.append(cl['ratio_of_seeds']) rofs=numpy.array(rofs) #get spreads of the parameter values (min and max) spreads_lower=[] spreads_upper=[] for i,cl in enumerate(scan_output['clusters_members']): spreads_lower.append(centers[i]-min(cl[:,pnumber])) spreads_upper.append(max(cl[:,pnumber])-centers[i]) spreads=[spreads_lower,spreads_upper] spreads=numpy.array(spreads) #plotting fig=plt.figure() ax=plt.subplot(111) #ploting circles with size related to the ratio of seeds, position=(center,ssr) ax.scatter(centers,ssrs,s=rofs*5000,c=colors,edgecolors='face') #plotting errorbars related to the spread ax.errorbar(centers,ssrs, xerr=spreads,fmt='.',color='black') #legend patches=[] for i,c in enumerate(colors): patches.append(mpatches.Patch(color=c, label=str(i))) ax.legend(handles=patches,loc='upper left',bbox_to_anchor=(1,1)) box=ax.get_position() ax.set_position([box.x0,box.y0,box.width*0.9,box.height]) #label xaxis ax.set_xlabel(pname) #set limits of xaxis ax.set_xlim([ll,ul]) if ssr_lim is not None: ax.set_ylim(ssr_lim) plt.show()
[docs]def plot_clusters_allpars(scan_output, parameter_pool=None, ssr_lim=None): """ Function to deal with output of :func:`Explore` (**scan_output**). Shows the parameter values of the centers of the found clusters of all parameter in a multiplot. On the y axis the corresponding sum of squared residuals of the fit are shown. The size (area) of the bubles corresponds to the ratio of seeds which converged to this fixpoint. The bars show the range of parameter values which were assigned to this cluster/fixpoint. Parameters ---------- scan_output : struct Structure as returned by :func:`Explore`. parameter_pool : :class:`Parameters.ParameterPool` The paramter pool containing the parameters, which are under consideration. If given, parameter names are plotted and the x axes are adjusted to lower and upper limits stored in **parameter_pool**. ssr_lim : list/tuple lower and upper limit of y-axis (ssr) """ #parameter checking if parameter_pool is not None and not isinstance(parameter_pool,Parameters.ParameterPool): raise TypeError('\'parameter_pool\' has to be an instance of \'Parameters.ParameterPool\'.') #create a color for each cluster cmap=plt.get_cmap('gist_rainbow') n_clusters=len(scan_output['clusters']) colors=[cmap(i/float(n_clusters-1),0.5) for i in range(n_clusters)] #create figure fig=plt.figure(figsize= tuple(numpy.array(plt.rcParams["figure.figsize"])*2)) #get number of parameters number_of_parameters=len(scan_output['clusters'][0]['center']) grid=numpy.ceil(numpy.sqrt(number_of_parameters)) #looping trough parameters for pnumber in range(number_of_parameters): if isinstance(parameter_pool,Parameters.ParameterPool): pname=parameter_pool.getNames()[pnumber] ll=parameter_pool.getParameter(pname).lower_lim ul=parameter_pool.getParameter(pname).upper_lim else: pname='Parameter Index '+str(pnumber) ll=None ul=None #get center values centers=[] for cl in scan_output['clusters']: centers.append(cl['center'][pnumber]) #get sums of squared residuals ssrs=[] for cl in scan_output['clusters']: ssrs.append(cl['ssr_of_center']) #get ratios of seeds (meaning: how many of the seeds convered to a certain cluster) rofs=[] for cl in scan_output['clusters']: rofs.append(cl['ratio_of_seeds']) rofs=numpy.array(rofs) #get spreads of the parameter values (min and max) spreads_lower=[] spreads_upper=[] for i,cl in enumerate(scan_output['clusters_members']): spreads_lower.append(centers[i]-min(cl[:,pnumber])) spreads_upper.append(max(cl[:,pnumber])-centers[i]) spreads=[spreads_lower,spreads_upper] spreads=numpy.array(spreads) #plotting ax=plt.subplot(grid,grid,pnumber+1) #ploting circles with size related to the ratio of seeds, position=(center,ssr) ax.scatter(centers,ssrs,s=rofs*5000,c=colors,edgecolors='face') #plotting errorbars related to the spread ax.errorbar(centers,ssrs, xerr=spreads,fmt='.',color='black') #label xaxis ax.set_xlabel(pname) #set limits of xaxis ax.set_xlim([ll,ul]) if ssr_lim is not None: ax.set_ylim(ssr_lim) #legend patches=[] lbls=[] for i,c in enumerate(colors): patches.append(mpatches.Patch(color=c, label=str(i))) lbls.append(str(i)) fig.legend(handles=patches,labels=lbls,loc=7) plt.show()
[docs]def plot_fixpoints_allpars(scan_output, parameter_pool=None, ssr_lim=None): """ Function to deal with output of :func:`Explore` (**scan_output**). Shows the parameter values of the centers of the found clusters of all parameter in a multiplot. On the y axis the corresponding sum of squared residuals of the fit are shown. The size (area) of the bubles corresponds to the ratio of seeds which converged to this fixpoint. The bars show the range of parameter values which were assigned to this cluster/fixpoint. Additionally, all fixpoints are also shown, colored according to their cluster asscociated. Parameters ---------- scan_output : struct Structure as returned by :func:`Explore`. ssrfunction : callable A function which returns the sum of squared residuals between simulated and measured data points (ssr). It should usally be the method :meth:`SampleRepresentation.ReflDataSimulator.getSSR` of an instance of :class:`SampleRepresentation.ReflDataSimulator`. parameter_pool : :class:`Parameters.ParameterPool` The paramter pool containing the parameters, which are under consideration. If given, parameter names are plotted and the x axes are adjusted to lower and upper limits stored in **parameter_pool**. ssr_lim : list/tuple lower and upper limit of y-axis (ssr) """ #parameter checking if parameter_pool is not None and not isinstance(parameter_pool,Parameters.ParameterPool): raise TypeError('\'parameter_pool\' has to be an instance of \'Parameters.ParameterPool\'.') #create a color for each cluster cmap=plt.get_cmap('gist_rainbow') n_clusters=len(scan_output['clusters']) colors=[cmap(i/float(n_clusters-1),0.5) for i in range(n_clusters)] #create figure fig=plt.figure(figsize= tuple(numpy.array(plt.rcParams["figure.figsize"])*2)) #get number of parameters number_of_parameters=len(scan_output['clusters'][0]['center']) grid=numpy.ceil(numpy.sqrt(number_of_parameters)) #get members and their ssrs members_per_par=[] for pnumber in range(number_of_parameters): members=[] for cl in scan_output['clusters_members']: m=[] for point in cl: m.append(point[pnumber]) members.append(m) members_per_par.append(members) members_ssrs=scan_output['clusters_members_ssrs'] #looping trough parameters for pnumber in range(number_of_parameters): if isinstance(parameter_pool,Parameters.ParameterPool): pname=parameter_pool.getNames()[pnumber] ll=parameter_pool.getParameter(pname).lower_lim ul=parameter_pool.getParameter(pname).upper_lim else: pname='Parameter Index '+str(pnumber) ll=None ul=None #get center values centers=[] for cl in scan_output['clusters']: centers.append(cl['center'][pnumber]) #get sums of squared residuals ssrs=[] for cl in scan_output['clusters']: ssrs.append(cl['ssr_of_center']) #get ratios of seeds (meaning: how many of the seeds convered to a certain cluster) rofs=[] for cl in scan_output['clusters']: rofs.append(cl['ratio_of_seeds']) rofs=numpy.array(rofs) #get spreads of the parameter values (min and max) spreads_lower=[] spreads_upper=[] for i,cl in enumerate(scan_output['clusters_members']): spreads_lower.append(centers[i]-min(cl[:,pnumber])) spreads_upper.append(max(cl[:,pnumber])-centers[i]) spreads=[spreads_lower,spreads_upper] spreads=numpy.array(spreads) #plotting ax=plt.subplot(grid,grid,pnumber+1) #ploting circles with size related to the ratio of seeds, position=(center,ssr) ax.scatter(centers,ssrs,s=rofs*5000,c=colors,edgecolors='face') #plotting errorbars related to the spread ax.errorbar(centers,ssrs, xerr=spreads,fmt='.',color='black') #plotting each fixpoint in the color of its associated cluster for i,members in enumerate(members_per_par[pnumber]): ax.scatter(members,members_ssrs[i],c=colors[i],marker='x') #label xaxis ax.set_xlabel(pname) #set limits of xaxis ax.set_xlim([ll,ul]) if ssr_lim is not None: ax.set_ylim(ssr_lim) #legend patches=[] lbls=[] for i,c in enumerate(colors): patches.append(mpatches.Patch(color=c, label=str(i))) lbls.append(str(i)) fig.legend(handles=patches,labels=lbls,loc=7) plt.show()
[docs]def Evolution(costfunction, parameter_settings , iterations, number_of_cores=1, generation_size=300, mutation_strength=0.01, elite=2, parent_percentage=0.25, control_file=None, plotfunction=None): """ Evolutionary fit algorithm. Slow but good in finding the global minimum. Return the optimized parameter set and the coresponding value of the costfunction. Parameters ---------- costfunction : callable A function which returns a measure (cost) for the difference between measurement and simulated data according to the paramter set given as list of values. Usually the sum of squared residuals (SSR) is used as cost. It should usally be the method :meth:`SampleRepresentation.ReflDataSimulator.getSSR` of an instance of :class:`SampleRepresentation.ReflDataSimulator` wrapped in a function. The wrapping is necessaray due to some implemetation issues connected to the parallelization. Example for the wrapping:: simu = SampleRepresentation.ReflDataSimulator("l") ... def cost(fitpararray): return simu.getSSR(fitpararray) Pass then the function *cost* as **costfunction**. It can also be any other function which takes the array of fit parameters and returns one real value which should be minimized by :meth:`.Evolution`. parameter_settings: tuple of lists of floats Sets start values, lower and upper limit of the parameters as *(startfitparameters, lower_limits, upper_limits )*, where each of the entries is an list/array of values of same length. iterations : int number of iterations/generations number_of_cores : int Number of jobs used in parallel. Best performance when set to the number of available cores on your computer. generation_size : int Generate this many individual fit parameter sets in each generation. mutation_strength : float Mutates children by adding this factor times (upper_limit - lower_limit) --> use rather small values elite : int Remember the best individuals for the next generation. parent_percentage : flota Use this fraction of a gereneration (the best) for reproduction. control_file : str Filename of a control file. If it is given, you can abort the optimization routine by writing "terminate 1" to the beginning of its first line. plotfunction : callable Function which is used to plot the current state of fitting (simulated data with currently best parameter set) after every iteration if given. It should take only one parameter: the array of fitparameters. This Evolutionary algorithm is mainly the same as Martins. Only the rule for mutation has changed: | Martin: ``children[i]=children[i] * (1 + s * random float(-1,1))`` | I: ``children[i]=children[i] + s * random float(-1,1)*(upper_limits-lower_limits)`` """ #unpack parameter settings (startfitparameters, lower_limits, upper_limits )=parameter_settings #check parameters #if not callable(costfunction): # raise TypeError("\'costfunction\' has to be callable.") if not (len(startfitparameters)==len(lower_limits) and len(lower_limits)==len(upper_limits)): raise ValueError("\'startfitparameters\' and constraints don't match in length.") pos_integer_pars=[number_of_cores,iterations,generation_size] for p in pos_integer_pars: if not isinstance(p, numbers.Integral): raise TypeError("Invalid parameter.") if p<0: raise ValueError("Parameter has to be positive.") pos_real_pars=[mutation_strength, elite, parent_percentage] for p in pos_real_pars: if not isinstance(p, numbers.Real): raise TypeError("Invalid parameter.") if p<0: raise ValueError("Parameter has to be positive.") if control_file is not None and not os.path.exists(control_file): raise Exception(str(control_file)+" is not an existing path.") if plotfunction is not None and not callable(plotfunction): raise TypeError("\'plotfunction\' has to be callable.") #use numpy arrays startfitparameters=numpy.array(startfitparameters) lower_limits=numpy.array(lower_limits) upper_limits=numpy.array(upper_limits) number_of_parents=int(generation_size*parent_percentage) #Initial state: number_of_fitparameters=len(startfitparameters) randomnumbers=numpy.random.rand(generation_size, number_of_fitparameters) all_fitpararrays=lower_limits+randomnumbers*(upper_limits-lower_limits) all_fitpararrays[0]=startfitparameters #use the given start values just as one of many guesses (replace one random guess) children=numpy.zeros((generation_size, number_of_fitparameters)) print("Start Evolution of "+str(iterations)+" Generations with "+str(len(startfitparameters))+" Parameters.") ite=0 while True: ite+=1 #Calculate the costfunction (usually chisquare) for each individuum out=joblib.Parallel(n_jobs=number_of_cores)(joblib.delayed(costfunction)(all_fitpararrays[i]) for i in range(generation_size) ) ranking_list=numpy.argsort(out) #stores the best results as indices of the elements of out #Write the current state print(" Generation " + str(ite) + ": Cost=" + str(out[ranking_list[0]])) #plot the current state (best guess) if plotfunction is not None: plotfunction(all_fitpararrays[ranking_list[0]]) #check for termination if control_file is not None: with open(control_file) as f: for line in f: line=line.split(" ") if(line[0]=="terminate" and int(line[1])==1): print("!! Iteration terminated, returning current status") return all_fitpararrays[ranking_list[0]], out[ranking_list[0]] #return if number of iterations is reached if(ite==iterations): return all_fitpararrays[ranking_list[0]], out[ranking_list[0]] #return best fit parameters and corresponding value of the costfunction #determin next generation for i in range(generation_size): #Copy the elite if(i<elite): children[i]=all_fitpararrays[ranking_list[i]] else: #Reproduce children from the mother and father randomly Mother=i%number_of_parents; Father=numpy.random.randint(0,number_of_parents) #No Cloning while(Father==Mother): Father=numpy.random.randint(0,number_of_parents) children[i]=0.5*(all_fitpararrays[ranking_list[Mother]]+all_fitpararrays[ranking_list[Father]]) #Mutate the children r=2*numpy.random.rand(number_of_fitparameters)-1 children[i]=children[i] + r*mutation_strength #changed mutation rule by Yannic (mutation not depending on parameter value) for j in range(number_of_fitparameters): #periodic boundaries (changed by Yannic; Martin sets to the limits if theey are exceeded) if(children[i][j]<lower_limits[j]): children[i][j]=upper_limits[j]-(lower_limits[j]-children[i][j])%(upper_limits[j]-lower_limits[j]) elif(children[i][j]>upper_limits[j]): children[i][j]=lower_limits[j]+(children[i][j]-upper_limits[j])%(upper_limits[j]-lower_limits[j]) #The children are now the new generation all_fitpararrays=children
[docs]def Levenberg_Marquardt_Fitter(residualandcostfunction, parameter_settings , parallel_points ,number_of_cores=1, strict=True, convergence_criterium=1e-7, control_file=None, plotfunction=None): """ Modified Levenberg-Marquard algorithm (see PhD thesis of Martin Zwiebler). Good convergence, but might end up in a local mininum. Return the optimized parameter set and the coresponding value of the costfunction. Parameters ---------- residualandcostfunction : callable A function which returns the differences between simulated and measured data points (residuals) as list and a scalar measure (cost) for these differences in total according to the paramter set given as list of values. Usually the sum of squared residuals (SSR) is used as cost. It should usally be the method :meth:`SampleRepresentation.ReflDataSimulator.getResidualsSSR` of an instance of :class:`SampleRepresentation.ReflDataSimulator` wrapped in a function. The wrapping is necessaray due to some implemetation issues connected to the parallelization. Example for the wrapping:: simu = SampleRepresentation.ReflDataSimulator("l") ... def rescost(fitpararray): return simu.getResidualsSSR(fitpararray) Pass then the function *rescost* as **costfunction**. It can also be any other function which takes the array of fit parameters and returns a tuple of 1.) a list of residuals (will be used to determine derivatives) 2.) a value of the costfunction which should be minimized by :meth:`.Levenberg_Marquardt_Fitter`. parameter_settings: tuple of lists of floats Sets start values, lower and upper limit of the parameters as *(startfitparameters, lower_limits, upper_limits )*, where each of the entries is an list/array of values of same length. parallel_points : int This should be something like the number of threads that can run in parallel/number of cores. The algorithm will first find a direction for a good descent and then check this number of points on the line. The best one will yield the new fit parameter set. number_of_cores : int Number of jobs used in parallel. Best performance when set to the number of available cores on your computer. strict : bool Usually this algorithm fails if the residuals are locally independent of one of the parameters. If you set **stict** = *False* this parameter will be neglected locally. convergence_criterium : float If the relative difference between the costs in two succeeding iterations is smaller than **convergence_criterium**, the fitting is defined as \`converged\`. control_file : str Filename of a control file. If it is given, you can abort the optimization routine by writing "terminate 1" to the beginning of its first line. plotfunction : callable Function which is used to plot the current state of fitting (simulated data with currently best parameter set) after every iteration if given. It should take only one parameter: the array of fitparameters. """ #unpack parameter settings (startfitparameters, lower_limits, upper_limits )=parameter_settings #check parameters if not callable(residualandcostfunction): raise TypeError("\'residualandcostfunction\' has to be callable.") if not (len(startfitparameters)==len(lower_limits) and len(lower_limits)==len(upper_limits)): raise ValueError("\'startfitparameters\' and constraints don't match in length.") pos_integer_pars=[number_of_cores,parallel_points] for p in pos_integer_pars: if not isinstance(p, numbers.Integral): raise TypeError("Invalid parameter.") if p<0: raise ValueError("Parameter has to be positive.") if not isinstance(strict,bool): raise TypeError("\'strict\' has to be of type bool.") if control_file is not None and not os.path.exists(control_file): raise Exception(str(control_file)+" is not an existing path.") if plotfunction is not None and not callable(plotfunction): raise TypeError("\'plotfunction\' has to be callable.") #use numpy arrays startfitparameters=numpy.array(startfitparameters) lower_limits=numpy.array(lower_limits) upper_limits=numpy.array(upper_limits) number_of_fitparameters=len(startfitparameters) aite=startfitparameters "Start Levenberg-Marquardt algorithm with "+str(len(startfitparameters))+" Parameters." ite=0 while(True): if(ite>0): #Go here once you calculated the first step #Calculate the fit parameters on the line of descent all_fitpararrays=[copy.copy(aite) for i in range(parallel_points)] for i in range(parallel_points): scale=0.5**i all_fitpararrays[i]=all_fitpararrays[i]-scale*DTr - scale*(1-scale)*DTr2 for j in range(parallel_points): for i in range(number_of_fitparameters): if(all_fitpararrays[j][i]<lower_limits[i]): all_fitpararrays[j][i]=lower_limits[i] elif(all_fitpararrays[j][i]>upper_limits[i]): all_fitpararrays[j][i]=upper_limits[i] ##Calculate the residuals and cost (e.g. difference between simulated and measured reflectivities) on the line of descent, in parallel out=numpy.array(joblib.Parallel(n_jobs=number_of_cores)(joblib.delayed(residualandcostfunction)( all_fitpararrays[i] ) for i in range(parallel_points) )) #Find the lowest chisquare min_i=numpy.argmin(out[:,1]) aite=copy.copy(all_fitpararrays[min_i]) fiterror2=out[min_i][1] if( abs( (fiterror1-fiterror2)/(fiterror1+fiterror2) ) < convergence_criterium ): print(( " --> Converged at cost=" + str(fiterror2) )) return aite, out[min_i][1] #return best fit parameters and corresponding value of the costfunction elif control_file is not None: with open(control_file) as f: for line in f: line=line.split(" ") if(line[0]=="terminate" and int(line[1])==1): print("!! Iteration terminated, current status") return aite, out[min_i][1] print(" Iteration "+ str(ite)+": old cost = "+str(fiterror1)+", new cost = "+str(fiterror2)) #plot the current state of fitting if plotfunction is not None: plotfunction(aite) #Make Fit parameters for the calculation of the derivative all_fitpararrays=[copy.copy(aite) for i in range(number_of_fitparameters+1)] ##this Matrix stores fitparameters for each point that is calculated in parallel delta=numpy.zeros(number_of_fitparameters) for i in range(number_of_fitparameters): #YANNIC: geaendert, weil es mir so sinnvoller erschien #if(all_fitpararrays[i][i]==0 or (lower_limits[i]<0<upper_limits[i]) ): #delta[i]=numerical_derivative_factor*max( abs(upper_limits[i]), abs(lower_limits[i] ) ) #else: #delta[i]=numerical_derivative_factor*all_fitpararrays[number_of_fitparameters][i] delta[i]=numerical_derivative_factor*abs(upper_limits[i]-lower_limits[i] ) all_fitpararrays[i][i]+=delta[i] ##Calculate the residuals and cost (e.g. difference between simulated and measured reflectivities) at each delta-step, in parallel out=joblib.Parallel(n_jobs=number_of_cores)(joblib.delayed(residualandcostfunction)( all_fitpararrays[i] ) for i in range(number_of_fitparameters+1) ) if(ite==0): #Calculate the first fit error fiterror1= out[ number_of_fitparameters ][1] #in first iteration: create the matrix DT to stores all the residuals derivatives number_of_datapoints=len(out[0][0]) DT=numpy.zeros((number_of_fitparameters,number_of_datapoints)) else: fiterror1=fiterror2 #Calculate the derivative of the reflectivity for i in range(number_of_fitparameters): DT[i] = ( out[i][0]-out[number_of_fitparameters][0] )/delta[i] ##Calculate the gradient A=numpy.dot(DT,DT.T) b=numpy.dot(DT,(out[number_of_fitparameters][0]).T ) #If one of the derivatives is entirely zero, the fit parameter is essentially meaningless. That may happen for a number of reasons. However, Gauss-Newton fails for this case. #If strict=False make it work nevertheless irrelevantparameterlist=[] for i in range(number_of_fitparameters): if(b[i]==0): if strict: print("WARNING! No gradient component" + str(i) + "! Singular matrix!\n Try \'strict=False\' for less rigorous treatment.") print(b) raise Exception else: irrelevantparameterlist.append(i) if not strict and not irrelevantparameterlist==[]: #remove the elements corresponding to the irrelevant parameters print("WARNING! Parameters " + str(irrelevantparameterlist) + " are locally irrelevant (no gradient component) and will be ignored for this iteration.") DT_reduced=numpy.delete(DT,irrelevantparameterlist,0) A=numpy.dot(DT_reduced,DT_reduced.T) b=numpy.delete(b,irrelevantparameterlist,0) #Solve this system of equations to calculate the descent vector that is used for large steps DTr=scipy_linalg.solve(A,b,sym_pos=True) #This is another good descent vector that is used for small steps DTr2=numpy.zeros(number_of_fitparameters-len(irrelevantparameterlist)) for i in range(number_of_fitparameters-len(irrelevantparameterlist)): DTr2[i]=b[i]/A[i][i] if not strict: #fill components of the descent vector which correspond to irrelevant parameters, with zero for parind in irrelevantparameterlist: DTr=numpy.insert(DTr,parind,0,0) DTr2=numpy.insert(DTr2,parind,0,0) ite+=1