import time from collections import namedtuple from datetime import timedelta import numpy as np import scipy from networkx import DiGraph, dfs_successors, balanced_tree, path_graph from sklearn.metrics import pairwise_kernels ##################################################### # Basic data structures # ##################################################### class SampleDict(object): """A collection of samples, together with their reweighting constants and other information necessary to build a nystrom reconstruction Attributes ---------- X : array, shape (q, n_features) Samples included in the dictionary. Also known as inducing points, support vectors or anchor points. probs : array, shape (q,) The inclusion probability of each sample, used for coin flipping and reweighting. q_i : array, shape (q,) The number of copies of each sample, used for coin flipping and reweighting. When q_i[i] reaches 0, the sample is removed from the dictionary. qbar : float The number of copies used to initialize the sequential Binomial sampling process and for reweighting. gamma : float The regularization parameter used to compute the Ridge Leverage Scores. vareps : float The accuracy parameter used to compute the Ridge Leverage Scores. """ def __init__(self, X, probs, q_i, qbar, gamma, vareps): self.X = X self.probs = probs self.q_i = q_i self.gamma = gamma self.vareps = vareps self.qbar = qbar class MergePromise(namedtuple('MergePromise', ('is_finished', 'result'))): """A simple wrapper for an arbitrary promise (sometimes called a "future" or "async call") implementation. Attributes ---------- is_finished : bool whether the promise has finished running or not. result : any The promised result of the computation, containing the result of a dictionary merge. """ # A raw namedtuple is very memory efficient as it packs the attributes # in a struct to get rid of the __dict__ of attributes in particular it # does not copy the string for the keys on each instance. # By deriving a namedtuple class just to introduce the __init__ method we # would also reintroduce the __dict__ on the instance. By telling the # Python interpreter that this subclass uses static __slots__ instead of # dynamic attributes. Furthermore we don't need any additional slot in the # subclass so we set __slots__ to the empty tuple. __slots__ = () def get_rpc_invoker(backend='redis', url='localhost', port=6379, async_eval=True): """Returns an invoker that satisfies the signature rpc_invoker(merge_function, **merge_args) -> MergePromise. The actual implementation depends on the backend and can be replaced if necessary. The currently implemented backends are based on dask.distributed or redis and redis-queue. If the url is 'localhost' and async_eval is False, it will return a local executor without the need of installing any backend. Parameters ---------- backend : string (optional, default='redis') Backend to be used for computation. url : string (optional, default='localhost') Url of the redis server. port : int (optional, default=6379) TCP port of the redis server. async_eval : bool (optional, default=True) Whether the execution should be delayed or not. Set to False to force sequential execution or for debugging purposes. Returns ------- call_rpc_function : callable A callable object that will execute the merge using an arbitrary backend. """ # run local experiment if url == 'localhost' and not async_eval: def call_rpc_function(func, *args): return MergePromise(is_finished=True, result=func(*args)) return call_rpc_function else: if backend == 'redis': from redis import Redis from rq import Queue task_queue = Queue( connection=Redis(url, port, password="yourpasswordhere"), async=async_eval) def call_rpc_function(func, *args): return task_queue.enqueue(func, *args, timeout=86400) return call_rpc_function elif backend == 'dask': from dask.distributed import Client # TODO: for now all workers and the scheduler do not use autentication, this means anyone can submit # almost arbitrary code to run. Add TLS (but user needs to know how to use it) client = Client("{}:{}".format(url, port)) def call_rpc_function(func, *args): class DaskPromise(object): def __init__(self, dask_promise): self.dask_promise = dask_promise @property def is_finished(self): return self.dask_promise.done() @property def result(self): try: return self.dask_promise.result(timeout=10) except TimeoutError: return None return DaskPromise(client.submit(func, *args)) return call_rpc_function else: raise NotImplementedError def dict_merge(d_left, d_right, kernel_options, random_state): """Merges two dictionaries, and performs a rejection sampling step according to the new RLS to discard redundant samples. Parameters ---------- d_left : SampleDict The first dictionary to merge (from left descendent). d_right : SampleDict The second dictionary to merge (from right descendent). kernel_options : mapping of string to any Dictionary containing the kernel function used as a similarity, with all associated parameters (keyword arguments). random_state : RandomState !!MODIFIED!! RandomState instance used as the random number generator Returns ------- d_top : SampleDict The result of the merge. """ # create a temporary dictionary as the concatenation of the two inputs d_top = SampleDict(X=np.concatenate((d_left.X, d_right.X)), probs=np.concatenate((d_left.probs, d_right.probs)), q_i=np.concatenate((d_left.q_i, d_right.q_i)), qbar=d_left.qbar, gamma=d_left.gamma, vareps=d_left.vareps) # estimate RLS tau = estimate_tau(d_top, kernel_options) # check for numerical problems assert np.all(tau > np.finfo(float).eps) # reject sample for s in range(len(d_top.q_i)): # remember that to make the sampling well-defined, we need to take this minimum # it is always a legitimate operation, since increase in RLS can only be due to approximation error new_prob = np.minimum(tau[s], d_top.probs[s]) d_top.q_i[s] = random_state.binomial(d_top.q_i[s], new_prob / d_top.probs[s]) d_top.probs[s] = new_prob # non-zero returns a 1-tuple, extract the only elements survived_samples = np.nonzero(d_top.q_i)[0] d_top.X = d_top.X[survived_samples] d_top.probs = d_top.probs[survived_samples] d_top.q_i = d_top.q_i[survived_samples] q = d_top.q_i.shape[0] assert d_top.X.shape == (q, d_left.X.shape[1]) assert d_top.q_i.shape == (q,) # well, duh assert d_top.probs.shape == (q,) assert (d_top.q_i != 0).all() return d_top def estimate_tau(d_top, kernel_options): """Given a sample dictionary, estimates the gamma-Ridge Leverage Scores (RLS) tau of all samples in the dictionary to vareps precision. Parameters ---------- d_top : SampleDict The dictionary whose taus need to be estimated. kernel_options : mapping of string to any Dictionary containing the kernel function used as a similarity, with all associated parameters (keyword arguments). Returns ------- tau : array shape (q,) The estimated RLS. """ # just aliases for conciseness gamma = d_top.gamma vareps = d_top.vareps qbar = d_top.qbar q = len(d_top.q_i) # double check we did not end up dropping all samples because of a too large gamma assert q > 0 # construct kernel matrix between samples in the dictionary K = pairwise_kernels(d_top.X, **kernel_options, filter_params=True) assert K.shape == (q, q) # compute the dictionary weights s = (np.sqrt(d_top.q_i) / np.sqrt(d_top.probs)) / np.sqrt(float(qbar)) # test for numerical errors assert (s > np.finfo(float).eps).all() # the multiplication automatically creates a copy # TODO: could delete K here to free up some memory SKS = s[:, np.newaxis] * K * s # this avoids creating an additional copy np.fill_diagonal(SKS, SKS.diagonal() + gamma) # this is a different reformulation of the estimator reported in the paper # it is strictly equivalent to the one in the paper ONLY for samples in the dictionary, but we only estimate # RLS for samples in the dictionary # on the plus side it is more efficient and simpler # TODO: check if the inv is the most efficient and numerically stable choice, replace with e.g. pinv(), svd() or solve() tau = (1. - 2 * vareps) * np.power( np.sqrt(np.ones(q) - gamma * np.diag(scipy.linalg.inv(SKS, overwrite_a=True))) / s, 2) return tau def visit_merge_tree(X, merge_tree, root, rpc_invoker, exp_options, random_state): """Visits the merge tree, computing the necessary merges. All squeak variants simply feed a different tree to this routine. Leaves in the input tree must possess a 'leaf_assigned_sample' field containing an np.array of indices, indicating which samples in the dataset are assigned to that leaf. For all interior nodes, visit_merge_tree will: (1) collect the dictionaries contained in the two descendant (2) invoke the dict_merge function on the two dictionaries ,using the rpc_invoker to execute it asynchronously (3) generate a new dictionary (wrapped in a MergePromise) and assign it to the ancestor node in a 'merge_promise' field The computation proceeds from the leaves to the root. Once it completes, the 'merge_promise' field in the root contains a dictionary (again wrapped in a Mergepromise) that well approximates the whole dataset. Required fields in exp_options to run the algorithm are: 'qbar': The number of copies used to initialize the sequential Binomial sampling process and for reweighting 'gamma': The regularization parameter used to compute the Ridge Leverage Scores. 'vareps': The accuracy parameter used to compute the Ridge Leverage Scores. 'max_dict_size': The maximum number of samples to store in the dictionary, if a dictionary exceeds this threshold the algorithm aborts 'kernel_options': Dictionary containing the kernel function used as a similarity, with all associated parameters (keyword arguments) Parameters ---------- X : array, shape (q, n_features) Input samples. merge_tree : networkx.DiGraph !!MODIFIED!! The binary tree guiding the dictionary merges. root : int Index of the root node. rpc_invoker : callable An invoker that satisfies the signature rpc_invoker(dict_merge , [merge args]) -> MergePromise. exp_options : mapping of string to any Dictionary containing the experiment options. random_state : RandomState !!MODIFIED!! RandomState instance used as the random number generator Returns ------- MergePromise containing a dictionary that well approximates the whole dataset. """ leaves = [leaf for leaf in merge_tree.nodes_iter() if merge_tree.out_degree(leaf) == 0] # for each leaf, wrap the assigned samples into a completed MergePromise, with a dictionary containing all samples # with multiplicity qbar and probability 1. These initialization dictionaries are guaranteed to be accurate # since they simply store all samples. for leaf in leaves: n_i = merge_tree.node[leaf]['leaf_assigned_samples'].shape[0] merge_tree.node[leaf]['merge_promise'] = MergePromise(is_finished=True, result=SampleDict( X=X[merge_tree.node[leaf]['leaf_assigned_samples'], :], probs=np.ones(n_i), q_i=np.ones(n_i, dtype=int) * ( int(np.round(exp_options['qbar']))), qbar=int(np.round(exp_options['qbar'])), gamma=float(exp_options['gamma']), vareps=float(exp_options['vareps']))) # total number of merges we need to do is the number of interior nodes, which is number of leaves - 1 since this # is a binary tree merge_total = len(leaves) - 1 # we track the runtime of the algorithm start_merging_time = time.time() # infinite loop, we will break out of it once the root's promise is completed while True: # declare the flag did_something_flag = False # TODO: trasversing the merge_tree is cheap compared to running the algorithm, so we do not really optimize for now for node, successors in dfs_successors(merge_tree, root).items(): # we set the flag before all the checks did_something_flag = False # double check that the tree is not malformed assert len(successors) == 2 or len(successors) == 0 # we do not process leaves if len(successors) == 0: continue # we already scheduled this merge, nothing to do on this node if 'merge_promise' in merge_tree.node[node]: continue # extract the descendent nodes l_node = merge_tree.node[successors[0]] r_node = merge_tree.node[successors[1]] # if one of the descendent does not contain a promise, it is not ready and nothing to do on this node if 'merge_promise' not in l_node or 'merge_promise' not in r_node: continue # if one of the descendent's promise is not done, it is not ready and nothing to do on this node if not l_node['merge_promise'].is_finished or not r_node['merge_promise'].is_finished: continue # we cannot pass directly random_state to the rpc_invoker call, since the rpc could be executed on a remote # machine where modification to random_state do not propagate and compromise reproducibility # instead, we send over a reproducible random seed and expect the remote system to use it to initialize # a local reproducible rng merge_random_state = np.random.RandomState( random_state.randint(np.iinfo(np.uint32).min + 10, high=np.iinfo(np.uint32).max - 10)) # unwrap the descendent's results (leaf nodes were wrapped too) l_dict = l_node['merge_promise'].result r_dict = r_node['merge_promise'].result # if the combined budget size exceed max_dict_size, terminate since we do not want to exceed the machine # memory assert len(l_dict.q_i) + len(r_dict.q_i) <= 2 * exp_options['max_dict_size'] # schedule the merge, and assign it to the ancestor merge_tree.node[node]['merge_promise'] = rpc_invoker(dict_merge, l_dict, r_dict, exp_options['kernel_options'], merge_random_state) # we did something, let the printing function know did_something_flag = True # we did something, time to log some statistics if did_something_flag: # how many merges are currently going on merge_running = 0 for node_count_merges in merge_tree.nodes_iter(): if ('merge_promise' in merge_tree.node[node_count_merges] and not merge_tree.node[node_count_merges]['merge_promise'].is_finished): merge_running = merge_running + 1 # how many are left before the end merge_remaining = 0 for node_count_merges in merge_tree.nodes_iter(): if 'merge_promise' not in merge_tree.node[node_count_merges]: merge_remaining = merge_remaining + 1 print( "merge remaining {}/{},".format(merge_remaining, merge_total) + "merge currently running {},".format(merge_running) + "time elapsed {},".format(str(timedelta(seconds=time.time() - start_merging_time))) + "last merge {: 5d} + {: 5d} -> {: 5d}".format(successors[0], successors[1], node) ) else: # if we did nothing, sleep a bit to avoid wasting CPU cycles. the user can also enjoy half second of rest time.sleep(0.5) # we are done, bail out if 'merge_promise' in merge_tree.node[root] and merge_tree.node[root]['merge_promise'].is_finished: break return merge_tree.node[root]['merge_promise'] def squeak(X, exp_options, random_state=None): """Invokes visit_merge_tree with a completely unbalanced (sequential) tree. See visit_merge_tree for more details Parameters ---------- X : array, shape (q, n_features) Input samples. exp_options : mapping of string to any Dictionary containing the experiment options, see visit_merge_tree and get_rpc_invoker. random_state : RandomState (optional, default=None) RandomState instance used as the random number generator. If None, gets automatically initialized to a fixed number. Returns ------- MergePromise containing a dictionary that well approximates the whole dataset. """ if not random_state: random_state = np.random.RandomState(42) n = float(X.shape[0]) # number of samples m = exp_options['max_dict_size'] k = int(np.ceil(n / m)) # number of initial chunks # randomly assign samples to chunks perm_idx = np.array_split(random_state.permutation(int(n)), k) # to construct a fully unbalanced tree with k leaves, first we create a line graph with k nodes merge_tree = path_graph(k, create_using=DiGraph()) root = min(merge_tree.nodes_iter()) # the end of the chain is a leaf, so we assign it the last chunk end_of_chain = max(merge_tree.nodes_iter()) merge_tree.node[end_of_chain]['leaf_assigned_samples'] = perm_idx[k - 1] # for all the other chunks we look for a (non-leaf) node that does not have samples assigned # and also does not have a leaf attached, and attach a leaf with the associated chunk for i in range(k - 1): for node, successors in dfs_successors(merge_tree, root).items(): if 'leaf_assigned_samples' not in merge_tree.node[node] and len(successors) < 2: merge_tree.add_node(end_of_chain + i + 1, leaf_assigned_samples=perm_idx[i]) merge_tree.add_edge(node, end_of_chain + i + 1) # after we add the node, we have to break, we do not want to add the i-th chunk too many times break rpc_invoker = get_rpc_invoker(**exp_options['rpc_invoker_options']) root_dict_promise = visit_merge_tree(X, merge_tree, root, rpc_invoker, exp_options, random_state) return root_dict_promise def disqueak(X, exp_options, random_state=None): """Invokes visit_merge_tree with a completely balanced (fully parallel) tree. See visit_merge_tree for more details Parameters ---------- X : array, shape (q, n_features) Input samples. exp_options : mapping of string to any Dictionary containing the experiment options, see visit_merge_tree and get_rpc_invoker. random_state : RandomState (optional, default=None) RandomState instance used as the random number generator. If None, gets automatically initialized to a fixed number. Returns ------- MergePromise containing a dictionary that well approximates the whole dataset. """ if not random_state: random_state = np.random.RandomState(42) n = float(X.shape[0]) # number of samples m = exp_options['max_dict_size'] k = int(np.ceil(n / m)) # number of initial chunks if k % 2: k += 1 # make it even # randomly assign samples to chunks perm_idx = np.array_split(random_state.permutation(int(n)), k) # we need to fit k chunks in a tree that: # (1) is balanced, each node has exactly two descendant or zero (leaf) # (2) holds data only in leaves # (3) has a difference between the minimum and maximum path in the tree smaller or equal than 1 (in worst case, only # one round difference between best and worst path) # To obtain this we compute h, the closest power of two smaller than the number of leaves h = int(np.floor(np.log2(k))) # create a complete binary tree of depth h merge_tree = balanced_tree(2, h, create_using=DiGraph()) # compute diff, the difference between the number of leaves we want to insert k and the available leaves 2**h diff = k - 2 ** h # find all leaves leaves = [leaf for leaf in merge_tree.nodes_iter() if merge_tree.out_degree(leaf) == 0] # then we assign all chunks to the leaves. since we reserved 2**h leaves instead of k, for the first diff # rounds we simply add two descendent. every time we do this, we reduce diff by one (since we processed two chunks # instead of one) until we have no more difference and we can directly assign to the leaves k_i = 0 node_i = max(merge_tree.nodes_iter()) for leaf in leaves: if diff == 0: # we are done with extra chunks, simply assign merge_tree.node[leaf]['leaf_assigned_samples'] = perm_idx[k_i] k_i = k_i + 1 else: # we do not have enough space to directly assign, add two descendants to the current node and assign # to those instead merge_tree.add_node(node_i + 1, leaf_assigned_samples=perm_idx[k_i]) merge_tree.add_node(node_i + 2, leaf_assigned_samples=perm_idx[k_i + 1]) merge_tree.add_edge(leaf, node_i + 1) merge_tree.add_edge(leaf, node_i + 2) node_i = node_i + 2 k_i = k_i + 2 # the number of leftover leaves get closer to the right one diff = diff - 1 rpc_invoker = get_rpc_invoker(**exp_options['rpc_invoker_options']) root_dict_promise = visit_merge_tree(X, merge_tree, 0, rpc_invoker, exp_options, random_state) return root_dict_promise