From c39e365d71fc07f3ae5b198252b4a7247efb9bc5 Mon Sep 17 00:00:00 2001 From: Thibaut Horel Date: Tue, 1 Dec 2015 14:50:54 -0500 Subject: Adapt vi_blocks to the new code structure --- simulation/vi_blocks.py | 43 ++++++++++++++++++++----------------------- 1 file changed, 20 insertions(+), 23 deletions(-) (limited to 'simulation/vi_blocks.py') diff --git a/simulation/vi_blocks.py b/simulation/vi_blocks.py index 2b03198..11dc4de 100644 --- a/simulation/vi_blocks.py +++ b/simulation/vi_blocks.py @@ -1,17 +1,15 @@ -import main as mn +import utils import theano from theano import tensor as tsr -import blocks -import blocks.algorithms, blocks.main_loop, blocks.extensions.monitoring +from blocks import algorithms, main_loop +import blocks.extensions as be +import blocks.extensions.monitoring as bm import theano.tensor.shared_randomstreams import numpy as np -from six.moves import range -import fuel -import fuel.datasets import active_blocks as ab -class ClippedParams(blocks.algorithms.StepRule): +class ClippedParams(algorithms.StepRule): """A rule to maintain parameters within a specified range""" def __init__(self, min_value, max_value): self.min_value = min_value @@ -38,8 +36,8 @@ def create_vi_model(n_nodes, n_samp=100): sig0 = theano.shared(value=aux(.5, .1), name='sig0') srng = tsr.shared_randomstreams.RandomStreams(seed=123) - theta = srng.normal((n_samp, n_nodes, n_nodes)) * sig[None, :, :] + mu[None, - :, :] + theta = (srng.normal((n_samp, n_nodes, n_nodes)) * sig[None, :, :] + + mu[None, :, :]) y = tsr.maximum(tsr.dot(x, theta), 1e-3) infect = tsr.log(1. - tsr.exp(-y[0:-1])).dimshuffle(1, 0, 2) lkl_pos = tsr.sum(infect * (x[1:] & s[1:])) / n_samp @@ -56,28 +54,27 @@ if __name__ == "__main__": n_cascades = 10000 batch_size = 1000 n_samples = 50 - graph = mn.create_random_graph(n_nodes=4) + graph = utils.create_random_graph(n_nodes=4) print('GRAPH:\n', graph, '\n-------------\n') x, s, mu, sig, cost = create_vi_model(len(graph), n_samples) - rmse, g_shared = ab.rmse_error(graph, mu) + rmse = ab.rmse_error(graph, mu) - step_rules= blocks.algorithms.CompositeRule([blocks.algorithms.AdaDelta(), - ClippedParams(1e-3, 1 - 1e-3)]) + step_rules = algorithms.CompositeRule([algorithms.AdaDelta(), + ClippedParams(1e-3, 1 - 1e-3)]) - alg = blocks.algorithms.GradientDescent(cost=cost, parameters=[mu, sig], - step_rule=step_rules) + alg = algorithms.GradientDescent(cost=cost, parameters=[mu, sig], + step_rule=step_rules) data_stream = ab.create_fixed_data_stream(n_cascades, graph, batch_size, - shuffle=False) - #data_stream = ab.create_learned_data_stream(graph, batch_size) - loop = blocks.main_loop.MainLoop( + shuffle=False) + # data_stream = ab.create_learned_data_stream(graph, batch_size) + loop = main_loop.MainLoop( alg, data_stream, extensions=[ - blocks.extensions.FinishAfter(after_n_batches = 10**4), - blocks.extensions.monitoring.TrainingDataMonitoring([cost, mu, sig, - rmse, g_shared], after_batch=True), - blocks.extensions.Printing(every_n_batches = 100, - after_epoch=False), + be.FinishAfter(after_n_batches=10**4), + bm.TrainingDataMonitoring([cost, mu, sig, rmse], + every_n_batches=10), + be.Printing(every_n_batches=100, after_epoch=False), ] ) loop.run() -- cgit v1.2.3-70-g09d2 From 3b5321add6cd71c6e23ff65e75faaa48e6829634 Mon Sep 17 00:00:00 2001 From: Thibaut Horel Date: Tue, 1 Dec 2015 15:00:51 -0500 Subject: Extract blocks utils and reorganize code --- simulation/active_blocks.py | 176 -------------------------------------------- simulation/mle_blocks.py | 59 +++++++++++++++ simulation/utils_blocks.py | 121 ++++++++++++++++++++++++++++++ simulation/vi_blocks.py | 10 +-- 4 files changed, 185 insertions(+), 181 deletions(-) delete mode 100644 simulation/active_blocks.py create mode 100644 simulation/mle_blocks.py create mode 100644 simulation/utils_blocks.py (limited to 'simulation/vi_blocks.py') diff --git a/simulation/active_blocks.py b/simulation/active_blocks.py deleted file mode 100644 index 1495eb8..0000000 --- a/simulation/active_blocks.py +++ /dev/null @@ -1,176 +0,0 @@ -import utils -import theano -from theano import tensor as tsr -import blocks -from blocks import algorithms, main_loop -import blocks.extensions as be -import blocks.extensions.monitoring as bm -import picklable_itertools -import numpy as np -import fuel -import fuel.datasets -from json import dumps -import collections - - -class LearnedDataset(fuel.datasets.Dataset): - """ - Dynamically-created dataset (for active learning) - -compatible with ConstantScheme with request corresponding to a - batch_size - """ - provides_sources = ('x', 's') - - def __init__(self, node_p, graph, **kwargs): - super(LearnedDataset, self).__init__(**kwargs) - self.node_p = node_p - self.graph = graph - self.source = lambda graph: utils.random_source(graph, self.node_p) - - def get_data(self, state=None, request=None): - return utils.simulate_cascades(request, self.graph, self.source) - - -class ActiveLearning(blocks.extensions.SimpleExtension): - """ - Extension which updates the node_p array passed to the get_data method of - LearnedDataset - """ - def __init__(self, dataset, **kwargs): - super(ActiveLearning, self).__init__(**kwargs) - self.dataset = dataset - - def do(self, which_callback, *args): - out_degree = np.sum(self.dataset.graph, axis=1) - self.dataset.node_p = out_degree / np.sum(out_degree) - print(self.dataset.node_p) - - -class JSONDump(blocks.extensions.SimpleExtension): - """Dump a JSON-serialized version of the log to a file.""" - - def __init__(self, filename, **kwargs): - super(JSONDump, self).__init__(**kwargs) - self.fh = open(filename, "w") - - def do(self, which_callback, *args): - log = self.main_loop.log - d = {k: v for (k, v) in log.current_row.items() - if not k.startswith("_")} - d["time"] = log.status["iterations_done"] - self.fh.write(dumps(d, default=lambda o: str(o)) + "\n") - - def __del__(self): - self.fh.close() - - -class ShuffledBatchesScheme(fuel.schemes.ShuffledScheme): - """Iteration scheme over finite dataset: - -shuffles batches but not within batch - -arguments: dataset_size (int) ; batch_size (int)""" - def get_request_iterator(self): - indices = list(self.indices) # self.indices = xrange(dataset_size) - start = np.random.randint(self.batch_size) - batches = list(map( - list, - picklable_itertools.extras.partition_all(self.batch_size, - indices[start:]) - )) - if indices[:start]: - batches.append(indices[:start]) - batches = np.asarray(batches) - return iter(batches[np.random.permutation(len(batches))]) - - -def create_mle_model(graph): - """return cascade likelihood theano computation graph""" - n_nodes = len(graph) - x = tsr.matrix(name='x', dtype='int8') - s = tsr.matrix(name='s', dtype='int8') - params = theano.shared( - .5 + .01 * - np.random.normal(size=(n_nodes, n_nodes)).astype(theano.config.floatX), - name='params' - ) - y = tsr.maximum(tsr.dot(x, params), 1e-5) - infect = tsr.log(1. - tsr.exp(-y[0:-1])) - lkl_pos = tsr.sum(infect * (x[1:] & s[1:])) - lkl_neg = tsr.sum(-y[0:-1] * (~x[1:] & s[1:])) - lkl_mle = lkl_pos + lkl_neg - lkl_mle.name = 'cost' - - return x, s, params, lkl_mle - - -def rmse_error(graph, params): - n_nodes = graph.shape[0] - diff = (graph - params) ** 2 - subarray = tsr.arange(n_nodes) - tsr.set_subtensor(diff[subarray, subarray], 0) - rmse = tsr.sum(diff) / (n_nodes ** 2) - rmse.name = 'rmse' - return rmse - - -def relative_error(graph, params): - n_nodes = graph.shape[0] - diff = abs(g_shared - params) - subarray = tsr.arange(n_nodes) - tsr.set_subtensor(diff[subarray, subarray], 0) - error = tsr.sum(tsr.switch(tsr.eq(graph, 0.), 0., diff / graph)) / n_nodes - error.name = 'rel_error' - return error - - -def create_fixed_data_stream(n_obs, graph, batch_size, shuffle=True): - """ - creates a datastream for a fixed (not learned) dataset: - -shuffle (bool): shuffle minibatches but not within minibatch, else - sequential (non-shuffled) batches are used - """ - x_obs, s_obs = utils.simulate_cascades(n_obs, graph) - data_set = fuel.datasets.base.IndexableDataset(collections.OrderedDict( - [('x', x_obs), ('s', s_obs)] - )) - if shuffle: - scheme = ShuffledBatchesScheme(n_obs, batch_size=batch_size) - else: - scheme = fuel.schemes.SequentialScheme(n_obs, batch_size=batch_size) - return fuel.streams.DataStream(dataset=data_set, iteration_scheme=scheme) - - -def create_learned_data_stream(graph, batch_size): - node_p = np.ones(len(graph)) / len(graph) - data_set = LearnedDataset(node_p, graph) - scheme = fuel.schemes.ConstantScheme(batch_size) - return fuel.streams.DataStream(dataset=data_set, iteration_scheme=scheme) - - -if __name__ == "__main__": - batch_size = 100 - n_obs = 1000 - graph = utils.create_wheel(10) - print('GRAPH:\n', graph, '\n-------------\n') - - g_shared = theano.shared(value=graph, name='graph') - x, s, params, cost = create_mle_model(graph) - rmse = rmse_error(g_shared, params) - error = relative_error(g_shared, params) - - alg = algorithms.GradientDescent( - cost=-cost, parameters=[params], step_rule=blocks.algorithms.AdaDelta() - ) - # data_stream = create_learned_data_stream(graph, batch_size) - data_stream = create_fixed_data_stream(n_obs, graph, batch_size) - loop = main_loop.MainLoop( - alg, data_stream, - extensions=[ - be.FinishAfter(after_n_batches=10**3), - bm.TrainingDataMonitoring([cost, params, - rmse, error], every_n_batches=10), - be.Printing(every_n_batches=10), - JSONDump("log.json", every_n_batches=10) - # ActiveLearning(data_stream.dataset), - ], - ) - loop.run() diff --git a/simulation/mle_blocks.py b/simulation/mle_blocks.py new file mode 100644 index 0000000..89aaf2e --- /dev/null +++ b/simulation/mle_blocks.py @@ -0,0 +1,59 @@ +import utils +import utils_blocks as ub +import theano +from theano import tensor as tsr +from blocks import algorithms, main_loop +import blocks.extensions as be +import blocks.extensions.monitoring as bm +import numpy as np + + +def create_mle_model(graph): + """return cascade likelihood theano computation graph""" + n_nodes = len(graph) + x = tsr.matrix(name='x', dtype='int8') + s = tsr.matrix(name='s', dtype='int8') + params = theano.shared( + .5 + .01 * + np.random.normal(size=(n_nodes, n_nodes)).astype(theano.config.floatX), + name='params' + ) + y = tsr.maximum(tsr.dot(x, params), 1e-5) + infect = tsr.log(1. - tsr.exp(-y[0:-1])) + lkl_pos = tsr.sum(infect * (x[1:] & s[1:])) + lkl_neg = tsr.sum(-y[0:-1] * (~x[1:] & s[1:])) + lkl_mle = lkl_pos + lkl_neg + lkl_mle.name = 'cost' + + return x, s, params, lkl_mle + + +if __name__ == "__main__": + batch_size = 100 + n_obs = 100000 + graph = utils.create_wheel(100) + + print('GRAPH:\n', graph, '\n-------------\n') + + g_shared = theano.shared(value=graph, name='graph') + x, s, params, cost = create_mle_model(graph) + rmse = ub.rmse_error(g_shared, params) + error = ub.relative_error(g_shared, params) + + alg = algorithms.GradientDescent( + cost=-cost, parameters=[params], step_rule=algorithms.AdaDelta() + ) + data_stream = ub.dynamic_data_stream(graph, batch_size) + # data_stream = ub.fixed_data_stream(n_obs, graph, batch_size) + loop = main_loop.MainLoop( + alg, data_stream, + extensions=[ + be.FinishAfter(after_n_batches=10**3), + bm.TrainingDataMonitoring([cost, params, + rmse, error], every_n_batches=10), + be.Printing(every_n_batches=10), + ub.JSONDump("log.json", every_n_batches=10), + ub.ActiveLearning(data_stream.dataset), + ], + ) + loop.run() diff --git a/simulation/utils_blocks.py b/simulation/utils_blocks.py new file mode 100644 index 0000000..5e91658 --- /dev/null +++ b/simulation/utils_blocks.py @@ -0,0 +1,121 @@ +from theano import tensor as tsr +import fuel.datasets +import blocks.extensions as be +import picklable_itertools +import numpy as np +from json import dumps +import collections +import utils + + +class LearnedDataset(fuel.datasets.Dataset): + """ + Dynamically-created dataset (for active learning) + -compatible with ConstantScheme with request corresponding to a + batch_size + """ + provides_sources = ('x', 's') + + def __init__(self, node_p, graph, **kwargs): + super(LearnedDataset, self).__init__(**kwargs) + self.node_p = node_p + self.graph = graph + self.source = lambda graph: utils.random_source(graph, self.node_p) + + def get_data(self, state=None, request=None): + return utils.simulate_cascades(request, self.graph, self.source) + + +class ActiveLearning(be.SimpleExtension): + """ + Extension which updates the node_p array passed to the get_data method of + LearnedDataset + """ + def __init__(self, dataset, **kwargs): + super(ActiveLearning, self).__init__(**kwargs) + self.dataset = dataset + + def do(self, which_callback, *args): + out_degree = np.sum(self.dataset.graph, axis=1) + self.dataset.node_p = out_degree / np.sum(out_degree) + print(self.dataset.node_p) + + +class JSONDump(be.SimpleExtension): + """Dump a JSON-serialized version of the log to a file.""" + + def __init__(self, filename, **kwargs): + super(JSONDump, self).__init__(**kwargs) + self.fh = open(filename, "w") + + def do(self, which_callback, *args): + log = self.main_loop.log + d = {k: v for (k, v) in log.current_row.items() + if not k.startswith("_")} + d["time"] = log.status["iterations_done"] + self.fh.write(dumps(d, default=lambda o: str(o)) + "\n") + + def __del__(self): + self.fh.close() + + +class ShuffledBatchesScheme(fuel.schemes.ShuffledScheme): + """Iteration scheme over finite dataset: + -shuffles batches but not within batch + -arguments: dataset_size (int) ; batch_size (int)""" + def get_request_iterator(self): + indices = list(self.indices) # self.indices = xrange(dataset_size) + start = np.random.randint(self.batch_size) + batches = list(map( + list, + picklable_itertools.extras.partition_all(self.batch_size, + indices[start:]) + )) + if indices[:start]: + batches.append(indices[:start]) + batches = np.asarray(batches) + return iter(batches[np.random.permutation(len(batches))]) + + +def rmse_error(graph, params): + n_nodes = graph.shape[0] + diff = (graph - params) ** 2 + subarray = tsr.arange(n_nodes) + tsr.set_subtensor(diff[subarray, subarray], 0) + rmse = tsr.sum(diff) / (n_nodes ** 2) + rmse.name = 'rmse' + return rmse + + +def relative_error(graph, params): + n_nodes = graph.shape[0] + diff = abs(graph - params) + subarray = tsr.arange(n_nodes) + tsr.set_subtensor(diff[subarray, subarray], 0) + error = tsr.sum(tsr.switch(tsr.eq(graph, 0.), 0., diff / graph)) / n_nodes + error.name = 'rel_error' + return error + + +def fixed_data_stream(n_obs, graph, batch_size, shuffle=True): + """ + creates a datastream for a fixed (not learned) dataset: + -shuffle (bool): shuffle minibatches but not within minibatch, else + sequential (non-shuffled) batches are used + """ + x_obs, s_obs = utils.simulate_cascades(n_obs, graph) + data_set = fuel.datasets.base.IndexableDataset(collections.OrderedDict( + [('x', x_obs), ('s', s_obs)] + )) + if shuffle: + scheme = ShuffledBatchesScheme(n_obs, batch_size=batch_size) + else: + scheme = fuel.schemes.SequentialScheme(n_obs, batch_size=batch_size) + return fuel.streams.DataStream(dataset=data_set, iteration_scheme=scheme) + + +def dynamic_data_stream(graph, batch_size): + node_p = np.ones(len(graph)) / len(graph) + data_set = LearnedDataset(node_p, graph) + scheme = fuel.schemes.ConstantScheme(batch_size) + return fuel.streams.DataStream(dataset=data_set, iteration_scheme=scheme) diff --git a/simulation/vi_blocks.py b/simulation/vi_blocks.py index 11dc4de..94038a5 100644 --- a/simulation/vi_blocks.py +++ b/simulation/vi_blocks.py @@ -1,4 +1,5 @@ import utils +import utils_blocks as ub import theano from theano import tensor as tsr from blocks import algorithms, main_loop @@ -6,7 +7,6 @@ import blocks.extensions as be import blocks.extensions.monitoring as bm import theano.tensor.shared_randomstreams import numpy as np -import active_blocks as ab class ClippedParams(algorithms.StepRule): @@ -58,16 +58,16 @@ if __name__ == "__main__": print('GRAPH:\n', graph, '\n-------------\n') x, s, mu, sig, cost = create_vi_model(len(graph), n_samples) - rmse = ab.rmse_error(graph, mu) + rmse = ub.rmse_error(graph, mu) step_rules = algorithms.CompositeRule([algorithms.AdaDelta(), ClippedParams(1e-3, 1 - 1e-3)]) alg = algorithms.GradientDescent(cost=cost, parameters=[mu, sig], step_rule=step_rules) - data_stream = ab.create_fixed_data_stream(n_cascades, graph, batch_size, - shuffle=False) - # data_stream = ab.create_learned_data_stream(graph, batch_size) + data_stream = ub.fixed_data_stream(n_cascades, graph, batch_size, + shuffle=False) + # data_stream = ub.dynamic_data_stream(graph, batch_size) loop = main_loop.MainLoop( alg, data_stream, extensions=[ -- cgit v1.2.3-70-g09d2