summaryrefslogtreecommitdiffstats
path: root/sources
diff options
context:
space:
mode:
Diffstat (limited to 'sources')
-rw-r--r--sources/thibaut/Makefile6
-rw-r--r--sources/thibaut/client.ml218
-rw-r--r--sources/thibaut/clientArg.ml24
-rw-r--r--sources/thibaut/clientGlobals.ml76
-rw-r--r--sources/thibaut/clientMesh.ml30
-rw-r--r--sources/thibaut/clientMessages.ml76
-rw-r--r--sources/thibaut/clientSlots.ml123
-rw-r--r--sources/thibaut/clientSlots.mli21
8 files changed, 400 insertions, 174 deletions
diff --git a/sources/thibaut/Makefile b/sources/thibaut/Makefile
index 5652865..4c65bc3 100644
--- a/sources/thibaut/Makefile
+++ b/sources/thibaut/Makefile
@@ -3,7 +3,7 @@ OCAMLOPT=ocamlopt
OCAMLDEP=ocamldep
INCLUDES=
OCAMLFLAGS=$(INCLUDES) -annot
-SRCS=simulator.ml trace.ml gentrace.ml pacemaker.ml globals.ml compattrace.ml tracestats.ml mesh.ml graph.ml client.ml clientMesh.ml clientGlobals.ml
+SRCS=simulator.ml trace.ml gentrace.ml pacemaker.ml globals.ml compattrace.ml tracestats.ml mesh.ml graph.ml client.ml clientGlobals.ml clientMessages.ml clientArg.ml clientSlots.ml clientSlots.mli
BUILDDIR=build
DEPEND=.depend
LIBS=str unix
@@ -18,7 +18,7 @@ $(BUILDDIR):
client: $(BUILDDIR) $(BUILDDIR)/client
$(BUILDDIR)/client: client.cmx
- $(OCAMLOPT) -o $@ $(OCAMLFLAGS) $(LIBSOPT) clientGlobals.cmx clientMesh.cmx client.cmx
+ $(OCAMLOPT) -o $@ $(OCAMLFLAGS) $(LIBSOPT) clientArg.cmx clientSlots.cmx clientGlobals.cmx clientMessages.cmx client.cmx
tracestats.opt: $(BUILDDIR) $(BUILDDIR)/tracestats
@@ -36,7 +36,7 @@ $(BUILDDIR)/simulator: simulator.cmx
$(OCAMLOPT) -o $@ $(OCAMLFLAGS) $(LIBSOPT) globals.cmx graph.cmx pacemaker.cmx trace.cmx mesh.cmx simulator.cmx
clean:
- rm -f *.cm? *.cmx? *.o *~ *.annot
+ rm -f *.cm? *.cmx? *.o *~ *.annot \#*\#
rm -f $(DEPEND)
depend: $(SRCS)
diff --git a/sources/thibaut/client.ml b/sources/thibaut/client.ml
index 10450f4..a034b05 100644
--- a/sources/thibaut/client.ml
+++ b/sources/thibaut/client.ml
@@ -1,145 +1,99 @@
-open Unix
+open ClientArg
open ClientGlobals
-open ClientMesh
-
-let port = ref 54321
-let max_length = 10000
-let config_file = ref "client.conf"
-let version = "0.1"
-let server = ref false
-let id_file = ref "id.conf"
-let generate = ref false
-let degree = ref 10
-let accuracy = ref 10.
-
-let arg_list = Arg.align [
- "--port", Arg.Set_int port, " <n> listening port";
- "--conf", Arg.Set_string config_file, " <filename> configuration file";
- "--server", Arg.Set server, " launch the client as a server";
- "--id", Arg.Set_string id_file, " <filename> id file";
- "--genid", Arg.Set generate, " generate the id file";
- "--degree", Arg.Set_int degree, " <n> number of neighbours";
- "--accuracy", Arg.Set_float accuracy, " <n> number of minutes between rounds";
-]
-
-let usage = Printf.sprintf "usage: %s [OPTIONS]" Sys.argv.(0)
-
-let process_message s =
- let message = Marshal.from_string s 0 in
- ()
-
-let safe_open f filename =
- try
- f filename
- with
- | Sys_error s -> Printf.eprintf "Could not open file: %s.\n" s; exit 1
-
-let send_message socket message addr =
- let raw = Marshal.to_string message [] in
- ignore( sendto
- socket (Marshal.to_string message [])
- 0 (String.length raw) [] addr
- )
+open ClientSlots
+open ClientMessages
+open Unix
let init_round time = ()
let init_pulse time = ()
let seed_reply time = ()
-let filter_peers () = ()
-let _ =
- Printf.printf "Pacemaker client v%s\n%!" version;
- Arg.parse arg_list (fun _ -> ()) usage;
+let check_present = 60.
+let keep_alive = 30.
- let my_addr = (gethostbyname(gethostname())).h_addr_list.(0) in
-
- let _ =
- if !generate then begin
- let id_oc = safe_open open_out !id_file in
- Random.self_init ();
- Printf.fprintf id_oc "%d" (Random.int 500000000);
- Printf.printf "Id generated in file %s\n%!" !id_file;
- close_out id_oc;
- exit 0
+let do_mesh time =
+ let aux neighbour =
+ if neighbour.last_seen <= (time -. check_present) then begin
+ send_message neighbour.addr Ping;
+ true
+ end
+ else if neighbour.last_seen <= (time-.check_present-.keep_alive) then begin
+ send_message neighbour.addr Close;
+ false
end
+ else
+ true
in
-
- let id_ic = safe_open open_in !id_file in
- let my_id = Scanf.fscanf id_ic "%d" (fun x -> x) in
- let conf_ic = safe_open open_in !config_file in
- let root_addr, root_port = Scanf.fscanf conf_ic "%s\n%d" (fun a b -> a,b) in
- let _ = close_in conf_ic; close_in id_ic in
-
- let socket = socket PF_INET SOCK_DGRAM 0 in
-
- let addr = ADDR_INET(
- my_addr,
- !port
- ) in
-
- let root_addr = ADDR_INET(
- inet_addr_of_string root_addr,
- root_port
- ) in
+ ClientSlots.iter_remove aux slots;
+
+ if not !server then begin
+ let free_slots = !degree - (ClientSlots.length slots) in
+ if free_slots > 2 then begin
+ let to_ask = free_slots - 2 in
+ if Queue.length ask_queue < to_ask then begin
+ while not (Queue.is_empty ask_queue) do
+ let addr = Queue.pop ask_queue in
+ send_message addr Mesh
+ done;
+ send_message root_addr Mesh
+ end
+ else begin
+ for i = 0 to to_ask-1 do
+ let addr = Queue.pop ask_queue in
+ send_message addr Mesh
+ done;
+ end
+ end
+ end
+
+let reply_tick = 30.
+let mesh_tick = 30.
+let seed_tick = !accuracy/.2.
+let time = Unix.gettimeofday()
+let next_round = ref (time +. !accuracy)
+let next_pulse = ref (!next_round +. seed_tick)
+let next_seed = ref (time +. reply_tick)
+let next_mesh = ref (time +. mesh_tick)
- let _ = try
- bind socket addr
- with
- | Unix_error(e,s1,s2) ->
- Printf.eprintf "Could not bind socket on port %d: %s in %s %s\n%!"
- !port (error_message e) s1 s2;
- exit 1
- in
+let _ =
+ while true do
+
+ let time = Unix.gettimeofday() in
- Printf.printf "Now listening on UDP port %d\n%!" !port;
- let slot_array = SlotArray.make !degree in
+ if !server && (time > !next_round) then begin
+ init_round time;
+ next_round := time +. !accuracy;
+ next_pulse := !next_round +. seed_tick;
+ end;
+
+ if !server && (time > !next_pulse) then begin
+ init_pulse time;
+ next_pulse := !next_pulse +. !accuracy;
+ end;
- let reply_tick = 30. in
- let mesh_tick = 30. in
- let seed_tick = !accuracy/.2. in
- let time = Unix.gettimeofday() in
- let next_round = ref (time +. !accuracy) in
- let next_pulse = ref (!next_round +. seed_tick) in
- let next_seed = ref (time +. reply_tick) in
- let next_mesh = ref (time +. mesh_tick) in
-
- while true do
-
- let time = Unix.gettimeofday() in
-
- if !server && (time > !next_round) then begin
- init_round time;
- next_round := time +. !accuracy;
- next_pulse := !next_round +. seed_tick;
- end;
-
- if !server && (time > !next_pulse) then begin
- init_pulse time;
- next_pulse := !next_pulse +. !accuracy;
- end;
-
- if (not !server) && (time > !next_seed) then begin
- seed_reply time;
- next_seed := !next_seed +. reply_tick
- end;
-
- if time > !next_mesh then begin
- next_mesh := !next_mesh +. mesh_tick;
- filter_peers ();
- end;
-
- let a,b,c = select [socket] [] [] 1. in
- match a with
- | [] -> ()
- | t::q ->
- let buff = String.create max_length in
- let len, addr = recvfrom t buff 0 max_length [] in
- let addr_string = match addr with
- | ADDR_INET(iaddr,port) -> Printf.sprintf "%s:%d"
- (string_of_inet_addr iaddr) port
- | _ -> ""
- in
- process_message (String.sub buff 0 len);
- Printf.printf "Received from %s: %s\n%!" addr_string
- (String.sub buff 0 len)
-
+ if (not !server) && (time > !next_seed) then begin
+ seed_reply time;
+ next_seed := !next_seed +. reply_tick
+ end;
+
+ if time > !next_mesh then begin
+ next_mesh := !next_mesh +. mesh_tick;
+ do_mesh time
+ end;
+
+ let a,b,c = Unix.select [socketfd] [] [] 1. in
+ match a with
+ | [] -> ()
+ | t::q ->
+ let buff = String.create max_length in
+ let len, addr = recvfrom t buff 0 max_length [] in
+ let addr_string = match addr with
+ | ADDR_INET(iaddr,port) -> Printf.sprintf "%s:%d"
+ (string_of_inet_addr iaddr) port
+ | _ -> ""
+ in
+ let s = String.sub buff 0 len in
+ process_message time s addr;
+ Printf.printf "Received from %s: %s\n%!" addr_string
+ s
done
diff --git a/sources/thibaut/clientArg.ml b/sources/thibaut/clientArg.ml
new file mode 100644
index 0000000..229b489
--- /dev/null
+++ b/sources/thibaut/clientArg.ml
@@ -0,0 +1,24 @@
+let port = ref 54321
+let max_length = 10000
+let config_file = ref "client.conf"
+let version = "0.1"
+let server = ref false
+let id_file = ref "id.conf"
+let generate = ref false
+let degree = ref 10
+let accuracy = ref 10.
+
+let arg_list = Arg.align [
+ "--port", Arg.Set_int port, " <n> listening port";
+ "--conf", Arg.Set_string config_file, " <filename> configuration file";
+ "--server", Arg.Set server, " launch the client as a server";
+ "--id", Arg.Set_string id_file, " <filename> id file";
+ "--genid", Arg.Set generate, " generate the id file";
+ "--degree", Arg.Set_int degree, " <n> number of neighbours";
+ "--accuracy", Arg.Set_float accuracy, " <n> number of minutes between rounds";
+]
+
+let usage = Printf.sprintf "usage: %s [OPTIONS]" Sys.argv.(0)
+
+let _ =
+ Arg.parse arg_list (fun _ -> ()) usage;
diff --git a/sources/thibaut/clientGlobals.ml b/sources/thibaut/clientGlobals.ml
index a07715d..079ef78 100644
--- a/sources/thibaut/clientGlobals.ml
+++ b/sources/thibaut/clientGlobals.ml
@@ -1,13 +1,16 @@
+open ClientArg
open Unix
+open ClientSlots
type user_id = int
type message_content =
- | Ping
+ | Ping
+ | Pong
| Mesh
- | AskRoot
- | AskPeers of sockaddr
- | Accepted of user_id
+ | Close
+ | AskPeers of sockaddr list
+ | Accepted of sockaddr list
type message = {
id : user_id;
@@ -15,9 +18,64 @@ type message = {
content : message_content;
}
-type neighbour = {
- id : user_id;
- last_seen : float;
- addr : sockaddr;
-}
+let version = "0.1"
+
+let safe_open f filename =
+ try
+ f filename
+ with
+ | Sys_error s -> Printf.eprintf "Could not open file: %s.\n" s; exit 1
+
+let _ =
+ Printf.printf "Pacemaker client v%s\n%!" version;
+ if !generate then begin
+ let id_oc = safe_open open_out !id_file in
+ Random.self_init ();
+ Printf.fprintf id_oc "%d" (Random.int 500000000);
+ Printf.printf "Id generated in file %s\n%!" !id_file;
+ close_out id_oc;
+ exit 0
+ end
+
+let id_ic = safe_open open_in !id_file
+let my_id = Scanf.fscanf id_ic "%d" (fun x -> x)
+let conf_ic = safe_open open_in !config_file
+let root_addr, root_port = Scanf.fscanf conf_ic "%s\n%d" (fun a b -> a,b)
+let _ = close_in conf_ic; close_in id_ic
+
+let my_addr = (gethostbyname(gethostname())).h_addr_list.(0)
+let socketfd = socket PF_INET SOCK_DGRAM 0
+
+let addr = ADDR_INET(
+ my_addr,
+ !port
+)
+
+let root_addr = ADDR_INET(
+ inet_addr_of_string root_addr,
+ root_port
+)
+
+let _ =
+ try
+ bind socketfd addr
+ with
+ | Unix_error(e,s1,s2) ->
+ Printf.eprintf "Could not bind socket on port %d: %s in %s %s\n%!"
+ !port (error_message e) s1 s2;
+ exit 1
+
+let _ =
+ Printf.printf "Now listening on UDP port %d\n%!" !port
+
+let slots = ClientSlots.make !degree
+let send_message addr (content:message_content) =
+ let message = {
+ id = my_id;
+ version = version;
+ content = content }
+ in
+ let s = Marshal.to_string message [] in
+ let n = String.length s in
+ ignore(Unix.sendto socketfd s 0 n [] addr)
diff --git a/sources/thibaut/clientMesh.ml b/sources/thibaut/clientMesh.ml
deleted file mode 100644
index 6a63017..0000000
--- a/sources/thibaut/clientMesh.ml
+++ /dev/null
@@ -1,30 +0,0 @@
-open ClientGlobals
-
-module SlotArray : sig
- type t
- type slot = Peer of neighbour | Empty
- val make : int -> t
- val full : t -> bool
-end = struct
-
- type slot = Peer of neighbour | Empty
-
- type t = {
- capacity : int;
- mutable length : int;
- array : slot array
- }
-
- let make degree = {
- capacity = degree;
- length = 0;
- array = Array.make degree Empty
- }
-
- let full sa = sa.length = sa.capacity
-
- let add sa neighbour =
- let length = sa.length in
- if not (full sa) then
- sa.array.(length) <- neighbour
-end
diff --git a/sources/thibaut/clientMessages.ml b/sources/thibaut/clientMessages.ml
index e69de29..dbec955 100644
--- a/sources/thibaut/clientMessages.ml
+++ b/sources/thibaut/clientMessages.ml
@@ -0,0 +1,76 @@
+open Unix
+open ClientArg
+open ClientSlots
+open ClientGlobals
+
+
+let ask_queue = Queue.create ()
+
+let rec add_to_asks l = match l with
+ | [] -> ()
+ | a::b -> let ADDR_INET(s,p) = a in
+ Printf.printf "%s:%d\n" (string_of_inet_addr s) p;
+ Queue.push a ask_queue; add_to_asks b
+
+let update_time time id =
+ let neighbour = ClientSlots.find id slots in
+ neighbour.last_seen <- time
+
+let process_mesh_server time id addr =
+ if (ClientSlots.mem id slots) then begin
+ let l = ClientSlots.to_list_avoid id slots in
+ send_message addr (AskPeers l)
+ end
+ else if not (ClientSlots.full slots) then begin
+ ClientSlots.add {
+ last_seen = time;
+ addr = addr;
+ ClientSlots.id = id
+ } slots;
+ let l = ClientSlots.to_list_avoid id slots in
+ send_message addr (Accepted l)
+ end
+ else
+ let l = ClientSlots.to_list_avoid id slots in
+ send_message addr (AskPeers l)
+
+let process_message time s addr =
+ let send_message = send_message addr in
+ let message = ((Marshal.from_string s 0):message) in
+ if message.version = version then begin
+ match message.content with
+ | Ping ->
+ update_time time message.id;
+ send_message Pong
+ | Pong -> update_time time message.id;
+ | Mesh ->
+ if !server then
+ process_mesh_server time message.id addr
+ else if (ClientSlots.mem message.id slots) then begin
+ if (ClientSlots.length slots > 1) then
+ let neighbour = ClientSlots.random_peer_avoid message.id slots in
+ send_message (AskPeers [neighbour.addr])
+ end
+ else if not (ClientSlots.full slots) then begin
+ ClientSlots.add {
+ ClientSlots.id = message.id;
+ last_seen = time;
+ addr = addr
+ } slots;
+ send_message (Accepted [])
+ end
+ else begin
+ let neighbour = ClientSlots.random_peer slots in
+ send_message (AskPeers [neighbour.addr])
+ end
+ | Accepted l ->
+ ClientSlots.add {
+ ClientSlots.id = message.id;
+ last_seen = time;
+ addr = addr
+ } slots;
+ add_to_asks l
+ | AskPeers l ->
+ add_to_asks l
+ | Close -> ClientSlots.remove_id message.id slots
+ end
diff --git a/sources/thibaut/clientSlots.ml b/sources/thibaut/clientSlots.ml
new file mode 100644
index 0000000..3619f65
--- /dev/null
+++ b/sources/thibaut/clientSlots.ml
@@ -0,0 +1,123 @@
+open Unix
+
+type user_id = int
+
+type neighbour = {
+ id : user_id;
+ mutable last_seen : float;
+ addr : sockaddr;
+}
+
+exception Found of neighbour
+exception SlotError
+type slot = Peer of neighbour | Empty
+
+type t = {
+ capacity : int;
+ mutable length : int;
+ array : slot array
+}
+
+let length sa = sa.length
+
+let make degree = {
+ capacity = degree;
+ length = 0;
+ array = Array.make degree Empty
+ }
+
+let iter f sa =
+ for i = 0 to sa.length - 1 do
+ f sa.array.(i)
+ done
+
+let remove pos sa =
+ let length = sa.length - 1 in
+ sa.array.(pos) <- sa.array.(length);
+ sa.array.(length) <- Empty;
+ sa.length <- length
+
+let iter_remove f sa =
+ let rec aux n =
+ if n < sa.length then begin
+ match sa.array.(n) with
+ | Peer p ->
+ if f p then
+ aux (n+1)
+ else begin
+ remove n sa;
+ aux n
+ end
+ | Empty -> raise SlotError
+ end
+ in
+ aux 0
+
+let full sa = (sa.length = sa.capacity)
+
+let find id sa =
+ let aux slot = match slot with
+ | Peer n -> if n.id = id then raise (Found n)
+ | _ -> failwith "Big problem"
+ in
+ try
+ iter aux sa;
+ raise Not_found
+ with
+ Found n -> n
+
+let mem id sa =
+ let aux slot = match slot with
+ | Peer n -> if n.id = id then raise (Found n)
+ | _ -> raise SlotError
+ in
+ try
+ iter aux sa;
+ false
+ with
+ Found n -> true
+
+let remove_id id sa =
+ iter_remove (fun n -> n.id = id) sa
+
+let to_list_avoid id sa =
+ let l = ref [] in
+ for i = 0 to sa.length -1 do
+ match sa.array.(i) with
+ | Peer p -> if p.id <> id then l:= (p.addr::(!l))
+ | Empty -> raise SlotError
+ done;
+ !l
+
+let random_peer_avoid id sa =
+ let n = Random.int sa.length in
+ let slot = sa.array.(n) in
+ match slot with
+ | Peer p ->
+ let slot =
+ if p.id = id then
+ if n = sa.length - 1 then sa.array.(n-1)
+ else sa.array.(n+1)
+ else
+ sa.array.(n)
+ in begin
+ match slot with
+ | Peer n -> n
+ | Empty -> raise SlotError
+ end
+ | Empty -> raise SlotError
+
+let random_peer sa =
+ let n = Random.int sa.length in
+ match sa.array.(n) with
+ | Peer n -> n
+ | Empty -> raise SlotError
+
+let add neighbour sa =
+ let length = sa.length in
+ if not (full sa) then begin
+ sa.array.(length) <- (Peer neighbour);
+ sa.length <- length + 1
+ end
+
+
diff --git a/sources/thibaut/clientSlots.mli b/sources/thibaut/clientSlots.mli
new file mode 100644
index 0000000..56e4ca3
--- /dev/null
+++ b/sources/thibaut/clientSlots.mli
@@ -0,0 +1,21 @@
+open Unix
+
+type t
+type user_id = int
+type neighbour = {
+ id : user_id;
+ mutable last_seen : float;
+ addr : sockaddr;
+}
+type slot = Peer of neighbour | Empty
+val make : int -> t
+val full : t -> bool
+val find : user_id -> t -> neighbour
+val mem : user_id -> t -> bool
+val length : t -> int
+val random_peer : t -> neighbour
+val random_peer_avoid : user_id -> t -> neighbour
+val add : neighbour -> t -> unit
+val iter_remove : (neighbour -> bool) -> t -> unit
+val remove_id : user_id -> t -> unit
+val to_list_avoid : user_id -> t -> sockaddr list