1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
|
from theano import tensor as tsr
import fuel.datasets
import blocks.extensions as be
import picklable_itertools
import numpy as np
from json import dumps
import collections
import utils
class LearnedDataset(fuel.datasets.Dataset):
"""
Dynamically-created dataset (for active learning)
-compatible with ConstantScheme with request corresponding to a
batch_size
"""
provides_sources = ('x', 's')
def __init__(self, node_p, graph, **kwargs):
super(LearnedDataset, self).__init__(**kwargs)
self.node_p = node_p
self.graph = graph
self.source = lambda graph: utils.random_source(graph, self.node_p)
def get_data(self, state=None, request=None):
return utils.simulate_cascades(request, self.graph, self.source)
class ActiveLearning(be.SimpleExtension):
"""
Extension which updates the node_p array passed to the get_data method of
LearnedDataset
"""
def __init__(self, dataset, params, **kwargs):
super(ActiveLearning, self).__init__(**kwargs)
self.dataset = dataset
self.params = params
def do(self, which_callback, *args):
try:
exp_out_par = np.exp(np.sum(self.params.get_value(), axis=1))
except AttributeError:
exp_out_par = np.exp(np.sum(self.params, axis=1))
self.dataset.node_p = exp_out_par / np.sum(exp_out_par)
class JSONDump(be.SimpleExtension):
"""Dump a JSON-serialized version of the log to a file."""
def __init__(self, filename, **kwargs):
super(JSONDump, self).__init__(**kwargs)
self.fh = open(filename, "w")
def do(self, which_callback, *args):
log = self.main_loop.log
d = {k: v for (k, v) in log.current_row.items()
if not k.startswith("_")}
d["time"] = log.status["iterations_done"]
self.fh.write(dumps(d, default=lambda o: str(o)) + "\n")
def __del__(self):
self.fh.close()
class ShuffledBatchesScheme(fuel.schemes.ShuffledScheme):
"""Iteration scheme over finite dataset:
-shuffles batches but not within batch
-arguments: dataset_size (int) ; batch_size (int)"""
def get_request_iterator(self):
indices = list(self.indices) # self.indices = xrange(dataset_size)
start = np.random.randint(self.batch_size)
batches = list(map(
list,
picklable_itertools.extras.partition_all(self.batch_size,
indices[start:])
))
if indices[:start]:
batches.append(indices[:start])
batches = np.asarray(batches)
return iter(batches[np.random.permutation(len(batches))])
def rmse_error(graph, params):
n_nodes = graph.shape[0]
diff = (graph - params) ** 2
subarray = tsr.arange(n_nodes)
tsr.set_subtensor(diff[subarray, subarray], 0)
rmse = tsr.sum(diff) / (n_nodes ** 2)
rmse.name = 'rmse'
return rmse
def relative_error(graph, params):
n_nodes = graph.shape[0]
diff = abs(graph - params)
subarray = tsr.arange(n_nodes)
tsr.set_subtensor(diff[subarray, subarray], 0)
error = tsr.sum(tsr.switch(tsr.eq(graph, 0.), 0., diff / graph)) / n_nodes
error.name = 'rel_error'
return error
def fixed_data_stream(n_obs, graph, batch_size, shuffle=True):
"""
creates a datastream for a fixed (not learned) dataset:
-shuffle (bool): shuffle minibatches but not within minibatch, else
sequential (non-shuffled) batches are used
"""
x_obs, s_obs = utils.simulate_cascades(n_obs, graph)
data_set = fuel.datasets.base.IndexableDataset(collections.OrderedDict(
[('x', x_obs), ('s', s_obs)]
))
if shuffle:
scheme = ShuffledBatchesScheme(n_obs, batch_size=batch_size)
else:
scheme = fuel.schemes.SequentialScheme(n_obs, batch_size=batch_size)
return fuel.streams.DataStream(dataset=data_set, iteration_scheme=scheme)
def dynamic_data_stream(graph, batch_size):
node_p = np.ones(len(graph)) / len(graph)
data_set = LearnedDataset(node_p, graph)
scheme = fuel.schemes.ConstantScheme(batch_size)
return fuel.streams.DataStream(dataset=data_set, iteration_scheme=scheme)
|