Source code for metintos.flow_tuning

# -*- coding: utf-8 -*-

import time
from collections import namedtuple
from typing import NamedTuple, List
from itertools import product
from metintos.optiflow import *
from metintos.constants import hour
from hyperopt import fmin, tpe, hp, STATUS_OK, Trials

#: :obj:`tuple` :
#: TimeSelector = namedtuple("TimeSelector", ['time', 'step'])
TimeSelector = namedtuple("TimeSelector", ['time', 'step'])


[docs]def add_times_to_steps_sources_targets(steps_sources_targets, dataset): """Given a dict mapping (source step, source step) -> (target step), return a dict replacing step keys and values with analogous TimeSelector values that also include time information. :param steps_sources_targets: :type steps_sources_targets: :param dataset: :type dataset: :returns: None """ new_dict = {} times = dataset.coords['time'].values for t in times: for sources, target in steps_sources_targets.items(): source_0 = TimeSelector(time=t, step=sources[0]) source_1 = TimeSelector(time=t, step=sources[1]) target = TimeSelector(time=t, step=target) new_dict[(source_0, source_1)] = target return new_dict
[docs]def add_steps_to_times_sources_targets(times_sources_targets, steps_list=(0*hour,)): """Given a dict mapping (source step, source step) -> (target step), return a dict replacing step keys and values with analogous TimeSelector values that also include time information. :param times_sources_targets: :type times_sources_targets: :param steps_list: default 0 :type steps_list: :returns: None """ new_dict = {} for s in steps_list: for sources, target in times_sources_targets.items(): source_0 = TimeSelector(time=sources[0], step=s) source_1 = TimeSelector(time=sources[1], step=s) target = TimeSelector(time=target, step=s) new_dict[(source_0, source_1)] = target return new_dict
[docs]def filter_dims(d, dims): """Returns d filtered so that only key-value pairs where the key is in dims are kept :param d: :type d: :param dims: :type dims: :returns: None """ return {k: v for k, v in d.items() if k in dims}
[docs]def evaluate_flow_on_dataset(flow_calc, dataset, variables, sources_and_targets, array_norm=None, aggregation_norm=None, cut_evaluation_array=None, aggregate=True): """ :param flow_calc: :type flow_calc: :param dataset: :type dataset: xarray.Dataset :param variables: :type variables: :param sources_and_targets: :type sources_and_targets: :param array_norms: :type array_norms: :param aggregation_norm: :type aggregation_norm: :param cut_evaluation_array: :type cut_evaluation_array: :param aggregate: :type aggregate: :returns: None """ if array_norm is None: def array_norm(x): return np.linalg.norm(x, ord=2) if aggregation_norm is None: def aggregation_norm(x): return np.linalg.norm(x, ord=2) non_ll_coords = {k: v for k, v in dataset.coords.items() if k not in ('latitude', 'longitude', 'step', 'valid_time', 'time') and v.values.size > 1} coord_names = non_ll_coords.keys() coord_axes = [non_ll_coords[c_name].values for c_name in coord_names] var_norms = {} for var in variables: norms_dict = {} norms = [] for sources, target in sources_and_targets.items(): sources: List[NamedTuple] source_selector_0 = filter_dims(sources[0]._asdict(), dataset.dims) source_selector_1 = filter_dims(sources[1]._asdict(), dataset.dims) target_selector = filter_dims(target._asdict(), dataset.dims) coord_values = product(*coord_axes) for cv in coord_values: selector = dict(zip(coord_names, cv)) ds_subset = dataset[var].sel(**selector) source_start = ds_subset.loc[source_selector_0].values source_end = ds_subset.loc[source_selector_1].values target_frame = ds_subset.loc[target_selector].values interpolant = TwoFrameInterpolant(source_start, source_end, flow_calc) prediction_at_half_point = interpolant(.5) delta_array = prediction_at_half_point - target_frame if cut_evaluation_array is None: norms.append(array_norm(delta_array)) else: cea = cut_evaluation_array if aggregate: norms.append(array_norm(delta_array[cea:-cea, cea:-cea])) else: norms_dict[(sources, target)] = array_norm(delta_array[cea:-cea, cea:-cea]) if aggregate: var_norms[var] = aggregation_norm(norms) else: var_norms[var] = norms_dict return var_norms
[docs]def evaluate_flow_on_dataset_norms(flow_calc, dataset, variables, sources_and_targets, array_norms=None, aggregation_norm=None, cut_evaluation_array=None, aggregate=False): """ :param flow_calc: :type flow_calc: :param dataset: :type dataset: xarray.Dataset :param variables: :type variables: :param sources_and_targets: :type sources_and_targets: :param array_norms: :type array_norms: :param aggregation_norm: :type aggregation_norm: :param cut_evaluation_array: :type cut_evaluation_array: :param aggregate: :type aggregate: :returns: None """ if array_norms is None: def arr_norm(x): return np.linalg.norm(x, ord=2) array_norms = [arr_norm] if aggregation_norm is None: def aggregation_norm(x): return np.linalg.norm(x, ord=2) non_ll_coords = {k: v for k, v in dataset.coords.items() if k not in ('latitude', 'longitude', 'step', 'valid_time', 'time') and v.values.size > 1} coord_names = non_ll_coords.keys() coord_axes = [non_ll_coords[c_name].values for c_name in coord_names] var_norms = {} for var in variables: norms_dict = {arr_norm: {} for arr_norm in array_norms} norms = [] for sources, target in sources_and_targets.items(): sources: List[NamedTuple] source_selector_0 = filter_dims(sources[0]._asdict(), dataset.dims) source_selector_1 = filter_dims(sources[1]._asdict(), dataset.dims) target_selector = filter_dims(target._asdict(), dataset.dims) coord_values = product(*coord_axes) for cv in coord_values: selector = dict(zip(coord_names, cv)) ds_subset = dataset[var].sel(**selector) source_start = ds_subset.loc[source_selector_0].values source_end = ds_subset.loc[source_selector_1].values target_frame = ds_subset.loc[target_selector].values interpolant = TwoFrameInterpolant(source_start, source_end, flow_calc) prediction_at_half_point = interpolant(.5) delta_array = prediction_at_half_point - target_frame if cut_evaluation_array is None: # fix for disaggregated norms norms.append(array_norm(delta_array)) else: cea = cut_evaluation_array if aggregate: for array_norm in array_norms: norms_dict[array_norm][(sources, target)] = array_norm(delta_array[cea:-cea, cea:-cea]) else: pass # integrate later if aggregate: var_norms[var] = aggregation_norm(norms) else: var_norms[var] = norms_dict return var_norms
[docs]def optimize_flow(parameter_space, flow_generator, dataset, sources_and_targets, var_weights=None, max_evals=100, cut_evaluation_array=None): """Optimizes the parameters of the algorithm for performance on the dataset :param parameter_space: :type parameter_space: :param flow_generator: :type flow_generator: :param dataset: :type dataset: :param sources_and_targets: :type sources_and_targets: :param var_weights: :type var_weights: :param max_evals: :type max_evals: :param cut_evaluation_array: :type cut_evaluation_array: :return: None """ if var_weights is None: var_weights = {'u': 1, 'v': 1, 't': 1} variables = var_weights.keys() def objective(args): flow = flow_generator(args) t0 = time.time() losses = evaluate_flow_on_dataset(flow, dataset, variables, sources_and_targets, cut_evaluation_array=cut_evaluation_array) delta_t = time.time() - t0 loss = sum(w * losses[var] for var, w in var_weights.items()) print(loss, delta_t) print(args) print("===") return {'loss': loss, 'status': STATUS_OK, 'var_losses': losses, 'time': delta_t} trials = Trials() best = fmin(objective, space=parameter_space, algo=tpe.suggest, max_evals=max_evals, trials=trials) return best, trials
if __name__ == "__main__": farneback_space = { 'pyr_scale': hp.uniform('pyr_scale', 0.1, 0.9), 'levels': hp.qloguniform('levels', 0, np.log(11), q=1), 'winsize': hp.qlognormal('winsize', np.log(100), 1/6*np.log(100), 1), 'iterations': hp.quniform('iterations', 2, 6, 1), 'poly_n': hp.quniform('poly_n', 2, 14, 1), 'poly_sigma': hp.uniform('poly_sigma', 0.1, 2.0), 'scale': hp.qloguniform('scale', np.log(0.1), np.log(1e7), 1), } def flow_calculator(args): return FarnebackFlow(**args) snt_steps = {(0*hour, 2*hour): hour, (0*hour, 4*hour): 2*hour, (0*hour, 6*hour): 3*hour}