diff options
| -rw-r--r-- | sources/thibaut/client.ml | 53 | ||||
| -rw-r--r-- | sources/thibaut/clientGlobals.ml | 91 | ||||
| -rw-r--r-- | sources/thibaut/clientMessages.ml | 120 | ||||
| -rw-r--r-- | sources/thibaut/clientSlots.mli | 2 |
4 files changed, 243 insertions, 23 deletions
diff --git a/sources/thibaut/client.ml b/sources/thibaut/client.ml index a034b05..ab27f40 100644 --- a/sources/thibaut/client.ml +++ b/sources/thibaut/client.ml @@ -4,14 +4,48 @@ open ClientSlots open ClientMessages open Unix -let init_round time = () -let init_pulse time = () -let seed_reply time = () +let round = ref 0 +let reply_tick = 30. +let mesh_tick = 30. +let seed_tick = !accuracy/.2. + +let seed_reply time = + try + let current_round = RoundMap.last_round !rounds_data in + let data = RoundMap.find current_round !rounds_data in + if data.phase = SEEDING then begin + let hash = Hashtbl.hash data.hmap in + Queue.push (hash, data.hmap) data.replies; + let message = SeedReply(current_round,hash) in + send_to_all message; + rounds_data := RoundMap.truncate (current_round - 2) !rounds_data + end + with + Failure _ -> () + +let init_round time = + incr round; + let data = { + phase = SEEDING; + duration = seed_tick; + seed = Random.int (1 lsl 29); + hmap = UserMap.empty; + replies = Queue.create () + } in + rounds_data := RoundMap.add !round data !rounds_data; + send_to_all (Seed(!round, data.seed, data.duration)) + +let init_pulse time = + let data = RoundMap.find !round !rounds_data in + data.phase <- IDLE; + send_to_all (Pulse(!round, data.seed, [data.hmap])) let check_present = 60. let keep_alive = 30. let do_mesh time = + + (* ping peers that haven't been heard of for a long time *) let aux neighbour = if neighbour.last_seen <= (time -. check_present) then begin send_message neighbour.addr Ping; @@ -26,6 +60,9 @@ let do_mesh time = in ClientSlots.iter_remove aux slots; + (* if not full (< 4/5 slots) ask some new peers either from + * the ask queue or the server + *) if not !server then begin let free_slots = !degree - (ClientSlots.length slots) in if free_slots > 2 then begin @@ -46,9 +83,6 @@ let do_mesh time = end end -let reply_tick = 30. -let mesh_tick = 30. -let seed_tick = !accuracy/.2. let time = Unix.gettimeofday() let next_round = ref (time +. !accuracy) let next_pulse = ref (!next_round +. seed_tick) @@ -87,13 +121,6 @@ let _ = | t::q -> let buff = String.create max_length in let len, addr = recvfrom t buff 0 max_length [] in - let addr_string = match addr with - | ADDR_INET(iaddr,port) -> Printf.sprintf "%s:%d" - (string_of_inet_addr iaddr) port - | _ -> "" - in let s = String.sub buff 0 len in process_message time s addr; - Printf.printf "Received from %s: %s\n%!" addr_string - s done diff --git a/sources/thibaut/clientGlobals.ml b/sources/thibaut/clientGlobals.ml index 079ef78..23cca69 100644 --- a/sources/thibaut/clientGlobals.ml +++ b/sources/thibaut/clientGlobals.ml @@ -3,6 +3,16 @@ open Unix open ClientSlots type user_id = int +type hash = int +type seed_id = int + +module UserId = struct + type t = user_id + let compare = Pervasives.compare +end + +module UserMap = Map.Make(UserId) +type user_hash_map = hash UserMap.t type message_content = | Ping @@ -11,6 +21,9 @@ type message_content = | Close | AskPeers of sockaddr list | Accepted of sockaddr list + | Seed of int * seed_id * float (* round, seed, duration *) + | SeedReply of int * hash (* round, hash *) + | Pulse of int * seed_id * user_hash_map list (* round, seed, branch *) type message = { id : user_id; @@ -79,3 +92,81 @@ let send_message addr (content:message_content) = let s = Marshal.to_string message [] in let n = String.length s in ignore(Unix.sendto socketfd s 0 n [] addr) + +let send_to_all content = + let f slot = match slot with + | Peer p -> send_message p.addr content + | _ -> raise SlotError + in + ClientSlots.iter f slots + +type phase = SEEDING | IDLE | PULSE + +module RoundMap : sig + type 'a t + + val empty : 'a t + val add : int -> 'a -> 'a t -> 'a t + val mem : int -> 'a t -> bool + val find : int -> 'a t -> 'a + val iter : (int -> 'a -> unit) -> 'a t -> unit + val iter_limit : (int -> 'a -> unit) -> int -> 'a t -> unit + val last_round : 'a t -> int + val truncate : int -> 'a t -> 'a t + +end = struct + + type 'a t = (int* 'a) list + + let add round data map = (round,data)::map + + let last_round map = match map with + | [] -> 0 + | (a,b)::c -> a + + let empty = [] + + let rec mem round map = match map with + | [] -> false + | (a,b)::c -> + if a = round then true + else if a < round then false + else mem round c + + let rec find round map = match map with + | [] -> raise Not_found + | (a,b)::c -> + if a = round then b + else if a < round then raise Not_found + else find round c + + let rec iter f map = match map with + | [] -> () + | (a,b)::c -> f a b; iter f c + + let rec iter_limit f limit map = match map with + | [] -> () + | (a,b)::c -> + if a >= limit then begin + f a b; + iter_limit f limit c + end + + let rec truncate limit map = match map with + | [] -> [] + | (a,b)::c -> + if a < limit then + [] + else + (a,b)::(truncate limit c) +end + +type round_data = { + mutable phase : phase; + duration : float; + seed : seed_id; + mutable hmap : user_hash_map; + replies : (int * user_hash_map) Queue.t +} + +let rounds_data:(round_data RoundMap.t ref) = ref RoundMap.empty diff --git a/sources/thibaut/clientMessages.ml b/sources/thibaut/clientMessages.ml index dbec955..8d23fb6 100644 --- a/sources/thibaut/clientMessages.ml +++ b/sources/thibaut/clientMessages.ml @@ -3,15 +3,68 @@ open ClientArg open ClientSlots open ClientGlobals +module Neighbour = struct + type t = sockaddr + let compare a1 a2 = match a1, a2 with + | ADDR_INET(s1, p1),ADDR_INET(s2,p2) -> + let s1, s2 = (string_of_inet_addr s1,string_of_inet_addr s2) in + Pervasives.compare (s1,p1) (s2,p2) + | _ -> failwith "Big problem" +end + +module NSet = Set.Make(Neighbour) +let ask_set = ref NSet.empty let ask_queue = Queue.create () -let rec add_to_asks l = match l with - | [] -> () - | a::b -> let ADDR_INET(s,p) = a in - Printf.printf "%s:%d\n" (string_of_inet_addr s) p; - Queue.push a ask_queue; add_to_asks b +exception Found of user_hash_map + +let find_reply hash replies = + try + while not (Queue.is_empty replies) do + let a,b = Queue.pop replies in + if a = hash then + raise (Found b) + done; + raise Not_found + with + Found b -> b + +let peer_init_round round seed duration = + let data = { + phase = SEEDING; + duration = duration; + seed = seed; + hmap = UserMap.add my_id (Hashtbl.hash seed) UserMap.empty; + replies = Queue.create() + } in + rounds_data := RoundMap.add round data !rounds_data + +exception Found + +let exists map elem = + let aux _ b = if b = elem then raise Found in + try + let _ = UserMap.iter aux map in false + with + | Found -> true + +let rec verify_branch branch = match branch with + | [] -> true + | [h] -> true + | h1::h2::t -> + let hash = Hashtbl.hash h1 in + (exists h2 hash) && (verify_branch (h2::t)) +let rec add_to_asks l = match l with + | [] -> () + | a::b -> + if not (NSet.mem a !ask_set) then begin + Queue.push a ask_queue; + ask_set := (NSet.add a !ask_set); + add_to_asks b + end + let update_time time id = let neighbour = ClientSlots.find id slots in neighbour.last_seen <- time @@ -35,13 +88,12 @@ let process_mesh_server time id addr = send_message addr (AskPeers l) let process_message time s addr = - let send_message = send_message addr in let message = ((Marshal.from_string s 0):message) in if message.version = version then begin match message.content with | Ping -> update_time time message.id; - send_message Pong + send_message addr Pong | Pong -> update_time time message.id; | Mesh -> if !server then @@ -49,7 +101,7 @@ let process_message time s addr = else if (ClientSlots.mem message.id slots) then begin if (ClientSlots.length slots > 1) then let neighbour = ClientSlots.random_peer_avoid message.id slots in - send_message (AskPeers [neighbour.addr]) + send_message addr (AskPeers [neighbour.addr]) end else if not (ClientSlots.full slots) then begin ClientSlots.add { @@ -57,11 +109,11 @@ let process_message time s addr = last_seen = time; addr = addr } slots; - send_message (Accepted []) + send_message addr (Accepted []) end else begin let neighbour = ClientSlots.random_peer slots in - send_message (AskPeers [neighbour.addr]) + send_message addr (AskPeers [neighbour.addr]) end | Accepted l -> ClientSlots.add { @@ -70,7 +122,55 @@ let process_message time s addr = addr = addr } slots; add_to_asks l + | AskPeers l -> add_to_asks l + | Close -> ClientSlots.remove_id message.id slots + + | Seed(round,seed,duration) -> + update_time time message.id; + if not !server then begin + if not (RoundMap.mem round !rounds_data) then begin + peer_init_round round seed duration; + send_to_all message.content + end + end + + | SeedReply(round,hash) -> + update_time time message.id; + begin + try + let data = RoundMap.find round !rounds_data in + if data.phase = SEEDING then + data.hmap <- UserMap.add message.id hash data.hmap; + with + | Not_found -> () + end + + | Pulse(round,seed, branch) -> + update_time time message.id; + if not !server then begin + try + let data = RoundMap.find round !rounds_data in + if data.phase = SEEDING || data.phase = PULSE then + match branch with + | [] -> () + | h::t -> + try + let hash = UserMap.find message.id h in + let hmap = find_reply hash data.replies in + if verify_branch branch then + let branch2 = hmap::branch in + let message = Pulse(round,seed,branch2) in + if data.phase = SEEDING then begin + data.phase <- PULSE; + end; + send_to_all message; + with + | Not_found -> () + with + | Not_found -> () + end end + diff --git a/sources/thibaut/clientSlots.mli b/sources/thibaut/clientSlots.mli index 56e4ca3..32e1909 100644 --- a/sources/thibaut/clientSlots.mli +++ b/sources/thibaut/clientSlots.mli @@ -1,5 +1,6 @@ open Unix +exception SlotError type t type user_id = int type neighbour = { @@ -19,3 +20,4 @@ val add : neighbour -> t -> unit val iter_remove : (neighbour -> bool) -> t -> unit val remove_id : user_id -> t -> unit val to_list_avoid : user_id -> t -> sockaddr list +val iter : (slot -> unit) -> t -> unit |
