diff options
| author | thibauth <thibauth@30fcff6e-8de6-41c7-acce-77ff6d1dd07b> | 2011-07-24 16:28:22 +0000 |
|---|---|---|
| committer | thibauth <thibauth@30fcff6e-8de6-41c7-acce-77ff6d1dd07b> | 2011-07-24 16:28:22 +0000 |
| commit | c00eb0c94b16883eee98ce675f7e90cf2193e0c9 (patch) | |
| tree | 674c098165ce7d8b396f12e447e6c61fd904d452 /sources | |
| parent | f9e8632ac6aef39728a713ce10f42c5bc88dbdc0 (diff) | |
| download | pacemaker-c00eb0c94b16883eee98ce675f7e90cf2193e0c9.tar.gz | |
Small fixes in the client.
* keep the round starting time in the round data
* print debugging info
* don't ping the root
git-svn-id: https://scm.gforge.inria.fr/svn/pacemaker@46 30fcff6e-8de6-41c7-acce-77ff6d1dd07b
Diffstat (limited to 'sources')
| -rw-r--r-- | sources/thibaut/client.ml | 32 | ||||
| -rw-r--r-- | sources/thibaut/clientArg.ml | 2 | ||||
| -rw-r--r-- | sources/thibaut/clientGlobals.ml | 50 | ||||
| -rw-r--r-- | sources/thibaut/clientMessages.ml | 37 | ||||
| -rw-r--r-- | sources/thibaut/clientSlots.ml | 2 |
5 files changed, 95 insertions, 28 deletions
diff --git a/sources/thibaut/client.ml b/sources/thibaut/client.ml index ab27f40..e4fae78 100644 --- a/sources/thibaut/client.ml +++ b/sources/thibaut/client.ml @@ -9,24 +9,28 @@ let reply_tick = 30. let mesh_tick = 30. let seed_tick = !accuracy/.2. +let disconnect () = send_to_all Close + +let _ = at_exit disconnect + let seed_reply time = try - let current_round = RoundMap.last_round !rounds_data in - let data = RoundMap.find current_round !rounds_data in - if data.phase = SEEDING then begin + let round, data = RoundMap.last_round !rounds_data in + if data.phase = SEEDING && time <= (data.start +. data.duration) then begin let hash = Hashtbl.hash data.hmap in Queue.push (hash, data.hmap) data.replies; - let message = SeedReply(current_round,hash) in + let message = SeedReply(round,hash) in send_to_all message; - rounds_data := RoundMap.truncate (current_round - 2) !rounds_data + rounds_data := RoundMap.truncate (round - 2) !rounds_data end with - Failure _ -> () + | Not_found -> () let init_round time = incr round; let data = { phase = SEEDING; + start = time; duration = seed_tick; seed = Random.int (1 lsl 29); hmap = UserMap.empty; @@ -41,20 +45,22 @@ let init_pulse time = send_to_all (Pulse(!round, data.seed, [data.hmap])) let check_present = 60. -let keep_alive = 30. +let keep_alive = 32. let do_mesh time = (* ping peers that haven't been heard of for a long time *) let aux neighbour = - if neighbour.last_seen <= (time -. check_present) then begin - send_message neighbour.addr Ping; + if neighbour.addr = root_addr then true - end else if neighbour.last_seen <= (time-.check_present-.keep_alive) then begin send_message neighbour.addr Close; false end + else if neighbour.last_seen <= (time -. check_present) then begin + send_message neighbour.addr Ping; + true + end else true in @@ -70,6 +76,7 @@ let do_mesh time = if Queue.length ask_queue < to_ask then begin while not (Queue.is_empty ask_queue) do let addr = Queue.pop ask_queue in + ask_set := NSet.remove addr !ask_set; send_message addr Mesh done; send_message root_addr Mesh @@ -77,6 +84,7 @@ let do_mesh time = else begin for i = 0 to to_ask-1 do let addr = Queue.pop ask_queue in + ask_set := NSet.remove addr !ask_set; send_message addr Mesh done; end @@ -97,12 +105,12 @@ let _ = if !server && (time > !next_round) then begin init_round time; next_round := time +. !accuracy; - next_pulse := !next_round +. seed_tick; + next_pulse := time +. seed_tick; end; if !server && (time > !next_pulse) then begin init_pulse time; - next_pulse := !next_pulse +. !accuracy; + next_pulse := !next_round +. !accuracy end; if (not !server) && (time > !next_seed) then begin diff --git a/sources/thibaut/clientArg.ml b/sources/thibaut/clientArg.ml index 229b489..1d092c1 100644 --- a/sources/thibaut/clientArg.ml +++ b/sources/thibaut/clientArg.ml @@ -6,7 +6,7 @@ let server = ref false let id_file = ref "id.conf" let generate = ref false let degree = ref 10 -let accuracy = ref 10. +let accuracy = ref 125. let arg_list = Arg.align [ "--port", Arg.Set_int port, " <n> listening port"; diff --git a/sources/thibaut/clientGlobals.ml b/sources/thibaut/clientGlobals.ml index 23cca69..9440a1a 100644 --- a/sources/thibaut/clientGlobals.ml +++ b/sources/thibaut/clientGlobals.ml @@ -25,6 +25,37 @@ type message_content = | SeedReply of int * hash (* round, hash *) | Pulse of int * seed_id * user_hash_map list (* round, seed, branch *) +let string_of_sockaddr a = match a with + | ADDR_INET(a,p) -> Printf.sprintf "%s:%d" (string_of_inet_addr a) p + | _ -> "" + +let print_sockaddr_list l = + let result = Buffer.create 100 in + Printf.bprintf result "["; + let rec aux l = match l with + | [] -> () + | [t] -> Printf.bprintf result "%s" (string_of_sockaddr t) + | t::q -> Printf.bprintf result "%s," (string_of_sockaddr t); aux q + in + aux l; + Printf.bprintf result "]"; + Buffer.contents result + +let string_of_message m = match m with + | Ping -> "Ping" + | Pong -> "Pong" + | Mesh -> "Mesh" + | Close -> "Close" + | AskPeers l -> Printf.sprintf "AskPeers %s" (print_sockaddr_list l) + | Accepted l -> Printf.sprintf "Accepted %s" (print_sockaddr_list l) + | Seed (a,_,c) -> Printf.sprintf "Seed %d,%f" a c + | SeedReply (n,_) -> Printf.sprintf "SeedReply %d" n + | Pulse (a,_,l) -> Printf.sprintf "Pulse %d, %d" a (List.length l) + +let print_time time = + let time = Unix.gmtime time in + Printf.sprintf "%02d:%02d:%02d" time.tm_hour time.tm_min time.tm_sec + type message = { id : user_id; version : string; @@ -89,6 +120,10 @@ let send_message addr (content:message_content) = version = version; content = content } in + let time = Unix.gettimeofday () in + Printf.printf "%s - SENT - %s - %s\n%!" (print_time time) + (string_of_sockaddr addr) + (string_of_message content); let s = Marshal.to_string message [] in let n = String.length s in ignore(Unix.sendto socketfd s 0 n [] addr) @@ -100,6 +135,13 @@ let send_to_all content = in ClientSlots.iter f slots +let send_to_all_but content addr = + let f slot = match slot with + | Peer p -> if p.addr <> addr then send_message p.addr content + | _ -> raise SlotError + in + ClientSlots.iter f slots + type phase = SEEDING | IDLE | PULSE module RoundMap : sig @@ -111,7 +153,7 @@ module RoundMap : sig val find : int -> 'a t -> 'a val iter : (int -> 'a -> unit) -> 'a t -> unit val iter_limit : (int -> 'a -> unit) -> int -> 'a t -> unit - val last_round : 'a t -> int + val last_round : 'a t -> int * 'a val truncate : int -> 'a t -> 'a t end = struct @@ -121,8 +163,8 @@ end = struct let add round data map = (round,data)::map let last_round map = match map with - | [] -> 0 - | (a,b)::c -> a + | [] -> raise Not_found + | (a,b)::c -> a,b let empty = [] @@ -164,9 +206,11 @@ end type round_data = { mutable phase : phase; duration : float; + start : float; seed : seed_id; mutable hmap : user_hash_map; replies : (int * user_hash_map) Queue.t } let rounds_data:(round_data RoundMap.t ref) = ref RoundMap.empty + diff --git a/sources/thibaut/clientMessages.ml b/sources/thibaut/clientMessages.ml index 8d23fb6..7d379ab 100644 --- a/sources/thibaut/clientMessages.ml +++ b/sources/thibaut/clientMessages.ml @@ -30,9 +30,10 @@ let find_reply hash replies = with Found b -> b -let peer_init_round round seed duration = +let peer_init_round time round seed duration = let data = { phase = SEEDING; + start = time; duration = duration; seed = seed; hmap = UserMap.add my_id (Hashtbl.hash seed) UserMap.empty; @@ -59,15 +60,19 @@ let rec verify_branch branch = match branch with let rec add_to_asks l = match l with | [] -> () | a::b -> - if not (NSet.mem a !ask_set) then begin + if not (NSet.mem a !ask_set) && a <> root_addr + then begin Queue.push a ask_queue; ask_set := (NSet.add a !ask_set); add_to_asks b end let update_time time id = - let neighbour = ClientSlots.find id slots in - neighbour.last_seen <- time + try + let neighbour = ClientSlots.find id slots in + neighbour.last_seen <- time + with + | Not_found -> () let process_mesh_server time id addr = if (ClientSlots.mem id slots) then begin @@ -90,12 +95,20 @@ let process_mesh_server time id addr = let process_message time s addr = let message = ((Marshal.from_string s 0):message) in if message.version = version then begin + let s = print_time time in + Printf.printf "%s - RECV - %s - %s\n%!" s (string_of_sockaddr addr) + (string_of_message message.content); match message.content with | Ping -> - update_time time message.id; - send_message addr Pong + if (ClientSlots.mem message.id slots) then begin + update_time time message.id; + send_message addr Pong + end + | Pong -> update_time time message.id; + | Mesh -> + update_time time message.id; if !server then process_mesh_server time message.id addr else if (ClientSlots.mem message.id slots) then begin @@ -115,6 +128,7 @@ let process_message time s addr = let neighbour = ClientSlots.random_peer slots in send_message addr (AskPeers [neighbour.addr]) end + | Accepted l -> ClientSlots.add { ClientSlots.id = message.id; @@ -124,6 +138,7 @@ let process_message time s addr = add_to_asks l | AskPeers l -> + update_time time message.id; add_to_asks l | Close -> ClientSlots.remove_id message.id slots @@ -132,8 +147,8 @@ let process_message time s addr = update_time time message.id; if not !server then begin if not (RoundMap.mem round !rounds_data) then begin - peer_init_round round seed duration; - send_to_all message.content + peer_init_round time round seed duration; + send_to_all_but message.content addr end end @@ -163,9 +178,9 @@ let process_message time s addr = if verify_branch branch then let branch2 = hmap::branch in let message = Pulse(round,seed,branch2) in - if data.phase = SEEDING then begin - data.phase <- PULSE; - end; + if data.phase = SEEDING then begin + data.phase <- PULSE; + end; send_to_all message; with | Not_found -> () diff --git a/sources/thibaut/clientSlots.ml b/sources/thibaut/clientSlots.ml index 3619f65..58374d4 100644 --- a/sources/thibaut/clientSlots.ml +++ b/sources/thibaut/clientSlots.ml @@ -115,7 +115,7 @@ let random_peer sa = let add neighbour sa = let length = sa.length in - if not (full sa) then begin + if not (full sa) && not (mem neighbour.id sa) then begin sa.array.(length) <- (Peer neighbour); sa.length <- length + 1 end |
