summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--sources/thibaut/client.ml53
-rw-r--r--sources/thibaut/clientGlobals.ml91
-rw-r--r--sources/thibaut/clientMessages.ml120
-rw-r--r--sources/thibaut/clientSlots.mli2
4 files changed, 243 insertions, 23 deletions
diff --git a/sources/thibaut/client.ml b/sources/thibaut/client.ml
index a034b05..ab27f40 100644
--- a/sources/thibaut/client.ml
+++ b/sources/thibaut/client.ml
@@ -4,14 +4,48 @@ open ClientSlots
open ClientMessages
open Unix
-let init_round time = ()
-let init_pulse time = ()
-let seed_reply time = ()
+let round = ref 0
+let reply_tick = 30.
+let mesh_tick = 30.
+let seed_tick = !accuracy/.2.
+
+let seed_reply time =
+ try
+ let current_round = RoundMap.last_round !rounds_data in
+ let data = RoundMap.find current_round !rounds_data in
+ if data.phase = SEEDING then begin
+ let hash = Hashtbl.hash data.hmap in
+ Queue.push (hash, data.hmap) data.replies;
+ let message = SeedReply(current_round,hash) in
+ send_to_all message;
+ rounds_data := RoundMap.truncate (current_round - 2) !rounds_data
+ end
+ with
+ Failure _ -> ()
+
+let init_round time =
+ incr round;
+ let data = {
+ phase = SEEDING;
+ duration = seed_tick;
+ seed = Random.int (1 lsl 29);
+ hmap = UserMap.empty;
+ replies = Queue.create ()
+ } in
+ rounds_data := RoundMap.add !round data !rounds_data;
+ send_to_all (Seed(!round, data.seed, data.duration))
+
+let init_pulse time =
+ let data = RoundMap.find !round !rounds_data in
+ data.phase <- IDLE;
+ send_to_all (Pulse(!round, data.seed, [data.hmap]))
let check_present = 60.
let keep_alive = 30.
let do_mesh time =
+
+ (* ping peers that haven't been heard of for a long time *)
let aux neighbour =
if neighbour.last_seen <= (time -. check_present) then begin
send_message neighbour.addr Ping;
@@ -26,6 +60,9 @@ let do_mesh time =
in
ClientSlots.iter_remove aux slots;
+ (* if not full (< 4/5 slots) ask some new peers either from
+ * the ask queue or the server
+ *)
if not !server then begin
let free_slots = !degree - (ClientSlots.length slots) in
if free_slots > 2 then begin
@@ -46,9 +83,6 @@ let do_mesh time =
end
end
-let reply_tick = 30.
-let mesh_tick = 30.
-let seed_tick = !accuracy/.2.
let time = Unix.gettimeofday()
let next_round = ref (time +. !accuracy)
let next_pulse = ref (!next_round +. seed_tick)
@@ -87,13 +121,6 @@ let _ =
| t::q ->
let buff = String.create max_length in
let len, addr = recvfrom t buff 0 max_length [] in
- let addr_string = match addr with
- | ADDR_INET(iaddr,port) -> Printf.sprintf "%s:%d"
- (string_of_inet_addr iaddr) port
- | _ -> ""
- in
let s = String.sub buff 0 len in
process_message time s addr;
- Printf.printf "Received from %s: %s\n%!" addr_string
- s
done
diff --git a/sources/thibaut/clientGlobals.ml b/sources/thibaut/clientGlobals.ml
index 079ef78..23cca69 100644
--- a/sources/thibaut/clientGlobals.ml
+++ b/sources/thibaut/clientGlobals.ml
@@ -3,6 +3,16 @@ open Unix
open ClientSlots
type user_id = int
+type hash = int
+type seed_id = int
+
+module UserId = struct
+ type t = user_id
+ let compare = Pervasives.compare
+end
+
+module UserMap = Map.Make(UserId)
+type user_hash_map = hash UserMap.t
type message_content =
| Ping
@@ -11,6 +21,9 @@ type message_content =
| Close
| AskPeers of sockaddr list
| Accepted of sockaddr list
+ | Seed of int * seed_id * float (* round, seed, duration *)
+ | SeedReply of int * hash (* round, hash *)
+ | Pulse of int * seed_id * user_hash_map list (* round, seed, branch *)
type message = {
id : user_id;
@@ -79,3 +92,81 @@ let send_message addr (content:message_content) =
let s = Marshal.to_string message [] in
let n = String.length s in
ignore(Unix.sendto socketfd s 0 n [] addr)
+
+let send_to_all content =
+ let f slot = match slot with
+ | Peer p -> send_message p.addr content
+ | _ -> raise SlotError
+ in
+ ClientSlots.iter f slots
+
+type phase = SEEDING | IDLE | PULSE
+
+module RoundMap : sig
+ type 'a t
+
+ val empty : 'a t
+ val add : int -> 'a -> 'a t -> 'a t
+ val mem : int -> 'a t -> bool
+ val find : int -> 'a t -> 'a
+ val iter : (int -> 'a -> unit) -> 'a t -> unit
+ val iter_limit : (int -> 'a -> unit) -> int -> 'a t -> unit
+ val last_round : 'a t -> int
+ val truncate : int -> 'a t -> 'a t
+
+end = struct
+
+ type 'a t = (int* 'a) list
+
+ let add round data map = (round,data)::map
+
+ let last_round map = match map with
+ | [] -> 0
+ | (a,b)::c -> a
+
+ let empty = []
+
+ let rec mem round map = match map with
+ | [] -> false
+ | (a,b)::c ->
+ if a = round then true
+ else if a < round then false
+ else mem round c
+
+ let rec find round map = match map with
+ | [] -> raise Not_found
+ | (a,b)::c ->
+ if a = round then b
+ else if a < round then raise Not_found
+ else find round c
+
+ let rec iter f map = match map with
+ | [] -> ()
+ | (a,b)::c -> f a b; iter f c
+
+ let rec iter_limit f limit map = match map with
+ | [] -> ()
+ | (a,b)::c ->
+ if a >= limit then begin
+ f a b;
+ iter_limit f limit c
+ end
+
+ let rec truncate limit map = match map with
+ | [] -> []
+ | (a,b)::c ->
+ if a < limit then
+ []
+ else
+ (a,b)::(truncate limit c)
+end
+
+type round_data = {
+ mutable phase : phase;
+ duration : float;
+ seed : seed_id;
+ mutable hmap : user_hash_map;
+ replies : (int * user_hash_map) Queue.t
+}
+
+let rounds_data:(round_data RoundMap.t ref) = ref RoundMap.empty
diff --git a/sources/thibaut/clientMessages.ml b/sources/thibaut/clientMessages.ml
index dbec955..8d23fb6 100644
--- a/sources/thibaut/clientMessages.ml
+++ b/sources/thibaut/clientMessages.ml
@@ -3,15 +3,68 @@ open ClientArg
open ClientSlots
open ClientGlobals
+module Neighbour = struct
+ type t = sockaddr
+ let compare a1 a2 = match a1, a2 with
+ | ADDR_INET(s1, p1),ADDR_INET(s2,p2) ->
+ let s1, s2 = (string_of_inet_addr s1,string_of_inet_addr s2) in
+ Pervasives.compare (s1,p1) (s2,p2)
+ | _ -> failwith "Big problem"
+end
+
+module NSet = Set.Make(Neighbour)
+let ask_set = ref NSet.empty
let ask_queue = Queue.create ()
-let rec add_to_asks l = match l with
- | [] -> ()
- | a::b -> let ADDR_INET(s,p) = a in
- Printf.printf "%s:%d\n" (string_of_inet_addr s) p;
- Queue.push a ask_queue; add_to_asks b
+exception Found of user_hash_map
+
+let find_reply hash replies =
+ try
+ while not (Queue.is_empty replies) do
+ let a,b = Queue.pop replies in
+ if a = hash then
+ raise (Found b)
+ done;
+ raise Not_found
+ with
+ Found b -> b
+
+let peer_init_round round seed duration =
+ let data = {
+ phase = SEEDING;
+ duration = duration;
+ seed = seed;
+ hmap = UserMap.add my_id (Hashtbl.hash seed) UserMap.empty;
+ replies = Queue.create()
+ } in
+ rounds_data := RoundMap.add round data !rounds_data
+
+exception Found
+
+let exists map elem =
+ let aux _ b = if b = elem then raise Found in
+ try
+ let _ = UserMap.iter aux map in false
+ with
+ | Found -> true
+
+let rec verify_branch branch = match branch with
+ | [] -> true
+ | [h] -> true
+ | h1::h2::t ->
+ let hash = Hashtbl.hash h1 in
+ (exists h2 hash) && (verify_branch (h2::t))
+let rec add_to_asks l = match l with
+ | [] -> ()
+ | a::b ->
+ if not (NSet.mem a !ask_set) then begin
+ Queue.push a ask_queue;
+ ask_set := (NSet.add a !ask_set);
+ add_to_asks b
+ end
+
let update_time time id =
let neighbour = ClientSlots.find id slots in
neighbour.last_seen <- time
@@ -35,13 +88,12 @@ let process_mesh_server time id addr =
send_message addr (AskPeers l)
let process_message time s addr =
- let send_message = send_message addr in
let message = ((Marshal.from_string s 0):message) in
if message.version = version then begin
match message.content with
| Ping ->
update_time time message.id;
- send_message Pong
+ send_message addr Pong
| Pong -> update_time time message.id;
| Mesh ->
if !server then
@@ -49,7 +101,7 @@ let process_message time s addr =
else if (ClientSlots.mem message.id slots) then begin
if (ClientSlots.length slots > 1) then
let neighbour = ClientSlots.random_peer_avoid message.id slots in
- send_message (AskPeers [neighbour.addr])
+ send_message addr (AskPeers [neighbour.addr])
end
else if not (ClientSlots.full slots) then begin
ClientSlots.add {
@@ -57,11 +109,11 @@ let process_message time s addr =
last_seen = time;
addr = addr
} slots;
- send_message (Accepted [])
+ send_message addr (Accepted [])
end
else begin
let neighbour = ClientSlots.random_peer slots in
- send_message (AskPeers [neighbour.addr])
+ send_message addr (AskPeers [neighbour.addr])
end
| Accepted l ->
ClientSlots.add {
@@ -70,7 +122,55 @@ let process_message time s addr =
addr = addr
} slots;
add_to_asks l
+
| AskPeers l ->
add_to_asks l
+
| Close -> ClientSlots.remove_id message.id slots
+
+ | Seed(round,seed,duration) ->
+ update_time time message.id;
+ if not !server then begin
+ if not (RoundMap.mem round !rounds_data) then begin
+ peer_init_round round seed duration;
+ send_to_all message.content
+ end
+ end
+
+ | SeedReply(round,hash) ->
+ update_time time message.id;
+ begin
+ try
+ let data = RoundMap.find round !rounds_data in
+ if data.phase = SEEDING then
+ data.hmap <- UserMap.add message.id hash data.hmap;
+ with
+ | Not_found -> ()
+ end
+
+ | Pulse(round,seed, branch) ->
+ update_time time message.id;
+ if not !server then begin
+ try
+ let data = RoundMap.find round !rounds_data in
+ if data.phase = SEEDING || data.phase = PULSE then
+ match branch with
+ | [] -> ()
+ | h::t ->
+ try
+ let hash = UserMap.find message.id h in
+ let hmap = find_reply hash data.replies in
+ if verify_branch branch then
+ let branch2 = hmap::branch in
+ let message = Pulse(round,seed,branch2) in
+ if data.phase = SEEDING then begin
+ data.phase <- PULSE;
+ end;
+ send_to_all message;
+ with
+ | Not_found -> ()
+ with
+ | Not_found -> ()
+ end
end
+
diff --git a/sources/thibaut/clientSlots.mli b/sources/thibaut/clientSlots.mli
index 56e4ca3..32e1909 100644
--- a/sources/thibaut/clientSlots.mli
+++ b/sources/thibaut/clientSlots.mli
@@ -1,5 +1,6 @@
open Unix
+exception SlotError
type t
type user_id = int
type neighbour = {
@@ -19,3 +20,4 @@ val add : neighbour -> t -> unit
val iter_remove : (neighbour -> bool) -> t -> unit
val remove_id : user_id -> t -> unit
val to_list_avoid : user_id -> t -> sockaddr list
+val iter : (slot -> unit) -> t -> unit