import main as mn import theano from theano import tensor as tsr import blocks import blocks.algorithms, blocks.main_loop, blocks.extensions.monitoring import picklable_itertools import numpy as np from six.moves import range import fuel import fuel.datasets import collections class LearnedDataset(fuel.datasets.Dataset): """ Dynamically-created dataset (for active learning) -compatible with ConstantScheme with request corresponding to a batch_size """ provides_sources = ('x', 's') def __init__(self, node_p, graph, source=mn.var_source, **kwargs): super(LearnedDataset, self).__init__(**kwargs) self.node_p = node_p self.graph = graph self.n_cascades = 1 # nbr of cascades of total size approx = request self.source = lambda graph, t : source(graph, t, self.node_p) def get_data(self, state=None, request=None): floatX = 'int8' x_obs = np.empty((request, len(self.graph)), dtype=floatX) s_obs = np.empty((request, len(self.graph)), dtype=floatX) i = 0 while i < request: x_tmp, s_tmp = mn.build_cascade_list( mn.simulate_cascades(self.n_cascades, self.graph, self.source), collapse=True ) x_obs[i:i + len(x_tmp)] = x_tmp[:request - i] s_obs[i:i + len(x_tmp)] = s_tmp[:request - i] i += len(x_tmp) self.n_cascades += 1 # learn optimal nbr in loop self.n_cascades = max(1, self.n_cascades - 2) return (x_obs, s_obs) class ActiveLearning(blocks.extensions.SimpleExtension): """ Extension which updates the node_p array passed to the get_data method of LearnedDataset """ def __init__(self, dataset, **kwargs): super(ActiveLearning, self).__init__(**kwargs) self.dataset = dataset def do(self, which_callback, *args): out_degree = np.sum(self.dataset.graph, axis=1) self.dataset.node_p = out_degree / np.sum(out_degree) print(self.dataset.node_p) class ShuffledBatchesScheme(fuel.schemes.ShuffledScheme): """Iteration scheme over finite dataset: -shuffles batches but not within batch -arguments: dataset_size (int) ; batch_size (int)""" def get_request_iterator(self): indices = list(self.indices) # self.indices = xrange(dataset_size) start = np.random.randint(self.batch_size) batches = list(map( list, picklable_itertools.extras.partition_all(self.batch_size, indices[start:]) )) if indices[:start]: batches.append(indices[:start]) batches = np.asarray(batches) return iter(batches[np.random.permutation(len(batches))]) def create_mle_model(graph): """return cascade likelihood theano computation graph""" n_nodes = len(graph) x = tsr.matrix(name='x', dtype='int8') s = tsr.matrix(name='s', dtype='int8') params = theano.shared( .5 + .01 * np.random.normal(size=(n_nodes, n_nodes)).astype(theano.config.floatX), name='params' ) y = tsr.maximum(tsr.dot(x, params), 1e-5) infect = tsr.log(1. - tsr.exp(-y[0:-1])) lkl_pos = tsr.sum(infect * (x[1:] & s[1:])) lkl_neg = tsr.sum(-y[0:-1] * (~x[1:] & s[1:])) lkl_mle = lkl_pos + lkl_neg lkl_mle.name = 'cost' return x, s, params, lkl_mle def rmse_error(graph, params): n_nodes = len(graph) g_shared = theano.shared(value=graph, name='graph') diff = (g_shared - params) ** 2 subarray = tsr.arange(g_shared.shape[0]) tsr.set_subtensor(diff[subarray, subarray], 0) rmse = tsr.sum(diff) / (n_nodes ** 2) rmse.name = 'rmse' g_shared.name = 'graph' return rmse, g_shared def create_fixed_data_stream(n_cascades, graph, batch_size, shuffle=True): """ creates a datastream for a fixed (not learned) dataset: -shuffle (bool): shuffle minibatches but not within minibatch, else sequential (non-shuffled) batches are used """ cascades = mn.build_cascade_list(mn.simulate_cascades(n_cascades, graph), collapse=True) x_obs, s_obs = cascades[0], cascades[1] data_set = fuel.datasets.base.IndexableDataset(collections.OrderedDict( [('x', x_obs), ('s', s_obs)] )) if shuffle: scheme = ShuffledBatchesScheme(len(x_obs), batch_size=batch_size) else: scheme = fuel.schemes.SequentialScheme(len(x_obs), batch_size=batch_size) return fuel.streams.DataStream(dataset=data_set, iteration_scheme=scheme) def create_learned_data_stream(graph, batch_size): node_p = np.ones(len(graph)) / len(graph) data_set = LearnedDataset(node_p, graph) scheme = fuel.schemes.ConstantScheme(batch_size) return fuel.streams.DataStream(dataset=data_set, iteration_scheme=scheme) if __name__ == "__main__": batch_size = 1000 graph = mn.create_star(1000) print('GRAPH:\n', graph, '\n-------------\n') x, s, params, cost = create_mle_model(graph) rmse, g_shared = rmse_error(graph, params) alg = blocks.algorithms.GradientDescent( cost=-cost, parameters=[params], step_rule=blocks.algorithms.AdaDelta() ) data_stream = create_learned_data_stream(graph, batch_size) loop = blocks.main_loop.MainLoop( alg, data_stream, extensions=[ blocks.extensions.FinishAfter(after_n_batches = 10**4), blocks.extensions.monitoring.TrainingDataMonitoring([cost, params, rmse, g_shared], after_batch=True), blocks.extensions.Printing(every_n_batches = 10), ActiveLearning(data_stream.dataset), ] ) loop.run()