summaryrefslogtreecommitdiffstats
path: root/sources
diff options
context:
space:
mode:
authorthibauth <thibauth@30fcff6e-8de6-41c7-acce-77ff6d1dd07b>2011-07-24 16:28:22 +0000
committerthibauth <thibauth@30fcff6e-8de6-41c7-acce-77ff6d1dd07b>2011-07-24 16:28:22 +0000
commitc00eb0c94b16883eee98ce675f7e90cf2193e0c9 (patch)
tree674c098165ce7d8b396f12e447e6c61fd904d452 /sources
parentf9e8632ac6aef39728a713ce10f42c5bc88dbdc0 (diff)
downloadpacemaker-c00eb0c94b16883eee98ce675f7e90cf2193e0c9.tar.gz
Small fixes in the client.
* keep the round starting time in the round data * print debugging info * don't ping the root git-svn-id: https://scm.gforge.inria.fr/svn/pacemaker@46 30fcff6e-8de6-41c7-acce-77ff6d1dd07b
Diffstat (limited to 'sources')
-rw-r--r--sources/thibaut/client.ml32
-rw-r--r--sources/thibaut/clientArg.ml2
-rw-r--r--sources/thibaut/clientGlobals.ml50
-rw-r--r--sources/thibaut/clientMessages.ml37
-rw-r--r--sources/thibaut/clientSlots.ml2
5 files changed, 95 insertions, 28 deletions
diff --git a/sources/thibaut/client.ml b/sources/thibaut/client.ml
index ab27f40..e4fae78 100644
--- a/sources/thibaut/client.ml
+++ b/sources/thibaut/client.ml
@@ -9,24 +9,28 @@ let reply_tick = 30.
let mesh_tick = 30.
let seed_tick = !accuracy/.2.
+let disconnect () = send_to_all Close
+
+let _ = at_exit disconnect
+
let seed_reply time =
try
- let current_round = RoundMap.last_round !rounds_data in
- let data = RoundMap.find current_round !rounds_data in
- if data.phase = SEEDING then begin
+ let round, data = RoundMap.last_round !rounds_data in
+ if data.phase = SEEDING && time <= (data.start +. data.duration) then begin
let hash = Hashtbl.hash data.hmap in
Queue.push (hash, data.hmap) data.replies;
- let message = SeedReply(current_round,hash) in
+ let message = SeedReply(round,hash) in
send_to_all message;
- rounds_data := RoundMap.truncate (current_round - 2) !rounds_data
+ rounds_data := RoundMap.truncate (round - 2) !rounds_data
end
with
- Failure _ -> ()
+ | Not_found -> ()
let init_round time =
incr round;
let data = {
phase = SEEDING;
+ start = time;
duration = seed_tick;
seed = Random.int (1 lsl 29);
hmap = UserMap.empty;
@@ -41,20 +45,22 @@ let init_pulse time =
send_to_all (Pulse(!round, data.seed, [data.hmap]))
let check_present = 60.
-let keep_alive = 30.
+let keep_alive = 32.
let do_mesh time =
(* ping peers that haven't been heard of for a long time *)
let aux neighbour =
- if neighbour.last_seen <= (time -. check_present) then begin
- send_message neighbour.addr Ping;
+ if neighbour.addr = root_addr then
true
- end
else if neighbour.last_seen <= (time-.check_present-.keep_alive) then begin
send_message neighbour.addr Close;
false
end
+ else if neighbour.last_seen <= (time -. check_present) then begin
+ send_message neighbour.addr Ping;
+ true
+ end
else
true
in
@@ -70,6 +76,7 @@ let do_mesh time =
if Queue.length ask_queue < to_ask then begin
while not (Queue.is_empty ask_queue) do
let addr = Queue.pop ask_queue in
+ ask_set := NSet.remove addr !ask_set;
send_message addr Mesh
done;
send_message root_addr Mesh
@@ -77,6 +84,7 @@ let do_mesh time =
else begin
for i = 0 to to_ask-1 do
let addr = Queue.pop ask_queue in
+ ask_set := NSet.remove addr !ask_set;
send_message addr Mesh
done;
end
@@ -97,12 +105,12 @@ let _ =
if !server && (time > !next_round) then begin
init_round time;
next_round := time +. !accuracy;
- next_pulse := !next_round +. seed_tick;
+ next_pulse := time +. seed_tick;
end;
if !server && (time > !next_pulse) then begin
init_pulse time;
- next_pulse := !next_pulse +. !accuracy;
+ next_pulse := !next_round +. !accuracy
end;
if (not !server) && (time > !next_seed) then begin
diff --git a/sources/thibaut/clientArg.ml b/sources/thibaut/clientArg.ml
index 229b489..1d092c1 100644
--- a/sources/thibaut/clientArg.ml
+++ b/sources/thibaut/clientArg.ml
@@ -6,7 +6,7 @@ let server = ref false
let id_file = ref "id.conf"
let generate = ref false
let degree = ref 10
-let accuracy = ref 10.
+let accuracy = ref 125.
let arg_list = Arg.align [
"--port", Arg.Set_int port, " <n> listening port";
diff --git a/sources/thibaut/clientGlobals.ml b/sources/thibaut/clientGlobals.ml
index 23cca69..9440a1a 100644
--- a/sources/thibaut/clientGlobals.ml
+++ b/sources/thibaut/clientGlobals.ml
@@ -25,6 +25,37 @@ type message_content =
| SeedReply of int * hash (* round, hash *)
| Pulse of int * seed_id * user_hash_map list (* round, seed, branch *)
+let string_of_sockaddr a = match a with
+ | ADDR_INET(a,p) -> Printf.sprintf "%s:%d" (string_of_inet_addr a) p
+ | _ -> ""
+
+let print_sockaddr_list l =
+ let result = Buffer.create 100 in
+ Printf.bprintf result "[";
+ let rec aux l = match l with
+ | [] -> ()
+ | [t] -> Printf.bprintf result "%s" (string_of_sockaddr t)
+ | t::q -> Printf.bprintf result "%s," (string_of_sockaddr t); aux q
+ in
+ aux l;
+ Printf.bprintf result "]";
+ Buffer.contents result
+
+let string_of_message m = match m with
+ | Ping -> "Ping"
+ | Pong -> "Pong"
+ | Mesh -> "Mesh"
+ | Close -> "Close"
+ | AskPeers l -> Printf.sprintf "AskPeers %s" (print_sockaddr_list l)
+ | Accepted l -> Printf.sprintf "Accepted %s" (print_sockaddr_list l)
+ | Seed (a,_,c) -> Printf.sprintf "Seed %d,%f" a c
+ | SeedReply (n,_) -> Printf.sprintf "SeedReply %d" n
+ | Pulse (a,_,l) -> Printf.sprintf "Pulse %d, %d" a (List.length l)
+
+let print_time time =
+ let time = Unix.gmtime time in
+ Printf.sprintf "%02d:%02d:%02d" time.tm_hour time.tm_min time.tm_sec
+
type message = {
id : user_id;
version : string;
@@ -89,6 +120,10 @@ let send_message addr (content:message_content) =
version = version;
content = content }
in
+ let time = Unix.gettimeofday () in
+ Printf.printf "%s - SENT - %s - %s\n%!" (print_time time)
+ (string_of_sockaddr addr)
+ (string_of_message content);
let s = Marshal.to_string message [] in
let n = String.length s in
ignore(Unix.sendto socketfd s 0 n [] addr)
@@ -100,6 +135,13 @@ let send_to_all content =
in
ClientSlots.iter f slots
+let send_to_all_but content addr =
+ let f slot = match slot with
+ | Peer p -> if p.addr <> addr then send_message p.addr content
+ | _ -> raise SlotError
+ in
+ ClientSlots.iter f slots
+
type phase = SEEDING | IDLE | PULSE
module RoundMap : sig
@@ -111,7 +153,7 @@ module RoundMap : sig
val find : int -> 'a t -> 'a
val iter : (int -> 'a -> unit) -> 'a t -> unit
val iter_limit : (int -> 'a -> unit) -> int -> 'a t -> unit
- val last_round : 'a t -> int
+ val last_round : 'a t -> int * 'a
val truncate : int -> 'a t -> 'a t
end = struct
@@ -121,8 +163,8 @@ end = struct
let add round data map = (round,data)::map
let last_round map = match map with
- | [] -> 0
- | (a,b)::c -> a
+ | [] -> raise Not_found
+ | (a,b)::c -> a,b
let empty = []
@@ -164,9 +206,11 @@ end
type round_data = {
mutable phase : phase;
duration : float;
+ start : float;
seed : seed_id;
mutable hmap : user_hash_map;
replies : (int * user_hash_map) Queue.t
}
let rounds_data:(round_data RoundMap.t ref) = ref RoundMap.empty
+
diff --git a/sources/thibaut/clientMessages.ml b/sources/thibaut/clientMessages.ml
index 8d23fb6..7d379ab 100644
--- a/sources/thibaut/clientMessages.ml
+++ b/sources/thibaut/clientMessages.ml
@@ -30,9 +30,10 @@ let find_reply hash replies =
with
Found b -> b
-let peer_init_round round seed duration =
+let peer_init_round time round seed duration =
let data = {
phase = SEEDING;
+ start = time;
duration = duration;
seed = seed;
hmap = UserMap.add my_id (Hashtbl.hash seed) UserMap.empty;
@@ -59,15 +60,19 @@ let rec verify_branch branch = match branch with
let rec add_to_asks l = match l with
| [] -> ()
| a::b ->
- if not (NSet.mem a !ask_set) then begin
+ if not (NSet.mem a !ask_set) && a <> root_addr
+ then begin
Queue.push a ask_queue;
ask_set := (NSet.add a !ask_set);
add_to_asks b
end
let update_time time id =
- let neighbour = ClientSlots.find id slots in
- neighbour.last_seen <- time
+ try
+ let neighbour = ClientSlots.find id slots in
+ neighbour.last_seen <- time
+ with
+ | Not_found -> ()
let process_mesh_server time id addr =
if (ClientSlots.mem id slots) then begin
@@ -90,12 +95,20 @@ let process_mesh_server time id addr =
let process_message time s addr =
let message = ((Marshal.from_string s 0):message) in
if message.version = version then begin
+ let s = print_time time in
+ Printf.printf "%s - RECV - %s - %s\n%!" s (string_of_sockaddr addr)
+ (string_of_message message.content);
match message.content with
| Ping ->
- update_time time message.id;
- send_message addr Pong
+ if (ClientSlots.mem message.id slots) then begin
+ update_time time message.id;
+ send_message addr Pong
+ end
+
| Pong -> update_time time message.id;
+
| Mesh ->
+ update_time time message.id;
if !server then
process_mesh_server time message.id addr
else if (ClientSlots.mem message.id slots) then begin
@@ -115,6 +128,7 @@ let process_message time s addr =
let neighbour = ClientSlots.random_peer slots in
send_message addr (AskPeers [neighbour.addr])
end
+
| Accepted l ->
ClientSlots.add {
ClientSlots.id = message.id;
@@ -124,6 +138,7 @@ let process_message time s addr =
add_to_asks l
| AskPeers l ->
+ update_time time message.id;
add_to_asks l
| Close -> ClientSlots.remove_id message.id slots
@@ -132,8 +147,8 @@ let process_message time s addr =
update_time time message.id;
if not !server then begin
if not (RoundMap.mem round !rounds_data) then begin
- peer_init_round round seed duration;
- send_to_all message.content
+ peer_init_round time round seed duration;
+ send_to_all_but message.content addr
end
end
@@ -163,9 +178,9 @@ let process_message time s addr =
if verify_branch branch then
let branch2 = hmap::branch in
let message = Pulse(round,seed,branch2) in
- if data.phase = SEEDING then begin
- data.phase <- PULSE;
- end;
+ if data.phase = SEEDING then begin
+ data.phase <- PULSE;
+ end;
send_to_all message;
with
| Not_found -> ()
diff --git a/sources/thibaut/clientSlots.ml b/sources/thibaut/clientSlots.ml
index 3619f65..58374d4 100644
--- a/sources/thibaut/clientSlots.ml
+++ b/sources/thibaut/clientSlots.ml
@@ -115,7 +115,7 @@ let random_peer sa =
let add neighbour sa =
let length = sa.length in
- if not (full sa) then begin
+ if not (full sa) && not (mem neighbour.id sa) then begin
sa.array.(length) <- (Peer neighbour);
sa.length <- length + 1
end