diff options
| author | thibauth <thibauth@30fcff6e-8de6-41c7-acce-77ff6d1dd07b> | 2011-07-18 16:38:56 +0000 |
|---|---|---|
| committer | thibauth <thibauth@30fcff6e-8de6-41c7-acce-77ff6d1dd07b> | 2011-07-18 16:38:56 +0000 |
| commit | 339204f59fc33e5c7f7f14b5a80ec95ff73017f6 (patch) | |
| tree | b2bfe99b475cb009e98c6437bde08c24b9e4ab39 /sources | |
| parent | f6374eb094b73d584c22af8625d87a7c3363b136 (diff) | |
| download | pacemaker-339204f59fc33e5c7f7f14b5a80ec95ff73017f6.tar.gz | |
Mesh construction in the client.
Left to do: plug the pacemaker code in
git-svn-id: https://scm.gforge.inria.fr/svn/pacemaker@44 30fcff6e-8de6-41c7-acce-77ff6d1dd07b
Diffstat (limited to 'sources')
| -rw-r--r-- | sources/thibaut/Makefile | 6 | ||||
| -rw-r--r-- | sources/thibaut/client.ml | 218 | ||||
| -rw-r--r-- | sources/thibaut/clientArg.ml | 24 | ||||
| -rw-r--r-- | sources/thibaut/clientGlobals.ml | 76 | ||||
| -rw-r--r-- | sources/thibaut/clientMesh.ml | 30 | ||||
| -rw-r--r-- | sources/thibaut/clientMessages.ml | 76 | ||||
| -rw-r--r-- | sources/thibaut/clientSlots.ml | 123 | ||||
| -rw-r--r-- | sources/thibaut/clientSlots.mli | 21 |
8 files changed, 400 insertions, 174 deletions
diff --git a/sources/thibaut/Makefile b/sources/thibaut/Makefile index 5652865..4c65bc3 100644 --- a/sources/thibaut/Makefile +++ b/sources/thibaut/Makefile @@ -3,7 +3,7 @@ OCAMLOPT=ocamlopt OCAMLDEP=ocamldep INCLUDES= OCAMLFLAGS=$(INCLUDES) -annot -SRCS=simulator.ml trace.ml gentrace.ml pacemaker.ml globals.ml compattrace.ml tracestats.ml mesh.ml graph.ml client.ml clientMesh.ml clientGlobals.ml +SRCS=simulator.ml trace.ml gentrace.ml pacemaker.ml globals.ml compattrace.ml tracestats.ml mesh.ml graph.ml client.ml clientGlobals.ml clientMessages.ml clientArg.ml clientSlots.ml clientSlots.mli BUILDDIR=build DEPEND=.depend LIBS=str unix @@ -18,7 +18,7 @@ $(BUILDDIR): client: $(BUILDDIR) $(BUILDDIR)/client $(BUILDDIR)/client: client.cmx - $(OCAMLOPT) -o $@ $(OCAMLFLAGS) $(LIBSOPT) clientGlobals.cmx clientMesh.cmx client.cmx + $(OCAMLOPT) -o $@ $(OCAMLFLAGS) $(LIBSOPT) clientArg.cmx clientSlots.cmx clientGlobals.cmx clientMessages.cmx client.cmx tracestats.opt: $(BUILDDIR) $(BUILDDIR)/tracestats @@ -36,7 +36,7 @@ $(BUILDDIR)/simulator: simulator.cmx $(OCAMLOPT) -o $@ $(OCAMLFLAGS) $(LIBSOPT) globals.cmx graph.cmx pacemaker.cmx trace.cmx mesh.cmx simulator.cmx clean: - rm -f *.cm? *.cmx? *.o *~ *.annot + rm -f *.cm? *.cmx? *.o *~ *.annot \#*\# rm -f $(DEPEND) depend: $(SRCS) diff --git a/sources/thibaut/client.ml b/sources/thibaut/client.ml index 10450f4..a034b05 100644 --- a/sources/thibaut/client.ml +++ b/sources/thibaut/client.ml @@ -1,145 +1,99 @@ -open Unix +open ClientArg open ClientGlobals -open ClientMesh - -let port = ref 54321 -let max_length = 10000 -let config_file = ref "client.conf" -let version = "0.1" -let server = ref false -let id_file = ref "id.conf" -let generate = ref false -let degree = ref 10 -let accuracy = ref 10. - -let arg_list = Arg.align [ - "--port", Arg.Set_int port, " <n> listening port"; - "--conf", Arg.Set_string config_file, " <filename> configuration file"; - "--server", Arg.Set server, " launch the client as a server"; - "--id", Arg.Set_string id_file, " <filename> id file"; - "--genid", Arg.Set generate, " generate the id file"; - "--degree", Arg.Set_int degree, " <n> number of neighbours"; - "--accuracy", Arg.Set_float accuracy, " <n> number of minutes between rounds"; -] - -let usage = Printf.sprintf "usage: %s [OPTIONS]" Sys.argv.(0) - -let process_message s = - let message = Marshal.from_string s 0 in - () - -let safe_open f filename = - try - f filename - with - | Sys_error s -> Printf.eprintf "Could not open file: %s.\n" s; exit 1 - -let send_message socket message addr = - let raw = Marshal.to_string message [] in - ignore( sendto - socket (Marshal.to_string message []) - 0 (String.length raw) [] addr - ) +open ClientSlots +open ClientMessages +open Unix let init_round time = () let init_pulse time = () let seed_reply time = () -let filter_peers () = () -let _ = - Printf.printf "Pacemaker client v%s\n%!" version; - Arg.parse arg_list (fun _ -> ()) usage; +let check_present = 60. +let keep_alive = 30. - let my_addr = (gethostbyname(gethostname())).h_addr_list.(0) in - - let _ = - if !generate then begin - let id_oc = safe_open open_out !id_file in - Random.self_init (); - Printf.fprintf id_oc "%d" (Random.int 500000000); - Printf.printf "Id generated in file %s\n%!" !id_file; - close_out id_oc; - exit 0 +let do_mesh time = + let aux neighbour = + if neighbour.last_seen <= (time -. check_present) then begin + send_message neighbour.addr Ping; + true + end + else if neighbour.last_seen <= (time-.check_present-.keep_alive) then begin + send_message neighbour.addr Close; + false end + else + true in - - let id_ic = safe_open open_in !id_file in - let my_id = Scanf.fscanf id_ic "%d" (fun x -> x) in - let conf_ic = safe_open open_in !config_file in - let root_addr, root_port = Scanf.fscanf conf_ic "%s\n%d" (fun a b -> a,b) in - let _ = close_in conf_ic; close_in id_ic in - - let socket = socket PF_INET SOCK_DGRAM 0 in - - let addr = ADDR_INET( - my_addr, - !port - ) in - - let root_addr = ADDR_INET( - inet_addr_of_string root_addr, - root_port - ) in + ClientSlots.iter_remove aux slots; + + if not !server then begin + let free_slots = !degree - (ClientSlots.length slots) in + if free_slots > 2 then begin + let to_ask = free_slots - 2 in + if Queue.length ask_queue < to_ask then begin + while not (Queue.is_empty ask_queue) do + let addr = Queue.pop ask_queue in + send_message addr Mesh + done; + send_message root_addr Mesh + end + else begin + for i = 0 to to_ask-1 do + let addr = Queue.pop ask_queue in + send_message addr Mesh + done; + end + end + end + +let reply_tick = 30. +let mesh_tick = 30. +let seed_tick = !accuracy/.2. +let time = Unix.gettimeofday() +let next_round = ref (time +. !accuracy) +let next_pulse = ref (!next_round +. seed_tick) +let next_seed = ref (time +. reply_tick) +let next_mesh = ref (time +. mesh_tick) - let _ = try - bind socket addr - with - | Unix_error(e,s1,s2) -> - Printf.eprintf "Could not bind socket on port %d: %s in %s %s\n%!" - !port (error_message e) s1 s2; - exit 1 - in +let _ = + while true do + + let time = Unix.gettimeofday() in - Printf.printf "Now listening on UDP port %d\n%!" !port; - let slot_array = SlotArray.make !degree in + if !server && (time > !next_round) then begin + init_round time; + next_round := time +. !accuracy; + next_pulse := !next_round +. seed_tick; + end; + + if !server && (time > !next_pulse) then begin + init_pulse time; + next_pulse := !next_pulse +. !accuracy; + end; - let reply_tick = 30. in - let mesh_tick = 30. in - let seed_tick = !accuracy/.2. in - let time = Unix.gettimeofday() in - let next_round = ref (time +. !accuracy) in - let next_pulse = ref (!next_round +. seed_tick) in - let next_seed = ref (time +. reply_tick) in - let next_mesh = ref (time +. mesh_tick) in - - while true do - - let time = Unix.gettimeofday() in - - if !server && (time > !next_round) then begin - init_round time; - next_round := time +. !accuracy; - next_pulse := !next_round +. seed_tick; - end; - - if !server && (time > !next_pulse) then begin - init_pulse time; - next_pulse := !next_pulse +. !accuracy; - end; - - if (not !server) && (time > !next_seed) then begin - seed_reply time; - next_seed := !next_seed +. reply_tick - end; - - if time > !next_mesh then begin - next_mesh := !next_mesh +. mesh_tick; - filter_peers (); - end; - - let a,b,c = select [socket] [] [] 1. in - match a with - | [] -> () - | t::q -> - let buff = String.create max_length in - let len, addr = recvfrom t buff 0 max_length [] in - let addr_string = match addr with - | ADDR_INET(iaddr,port) -> Printf.sprintf "%s:%d" - (string_of_inet_addr iaddr) port - | _ -> "" - in - process_message (String.sub buff 0 len); - Printf.printf "Received from %s: %s\n%!" addr_string - (String.sub buff 0 len) - + if (not !server) && (time > !next_seed) then begin + seed_reply time; + next_seed := !next_seed +. reply_tick + end; + + if time > !next_mesh then begin + next_mesh := !next_mesh +. mesh_tick; + do_mesh time + end; + + let a,b,c = Unix.select [socketfd] [] [] 1. in + match a with + | [] -> () + | t::q -> + let buff = String.create max_length in + let len, addr = recvfrom t buff 0 max_length [] in + let addr_string = match addr with + | ADDR_INET(iaddr,port) -> Printf.sprintf "%s:%d" + (string_of_inet_addr iaddr) port + | _ -> "" + in + let s = String.sub buff 0 len in + process_message time s addr; + Printf.printf "Received from %s: %s\n%!" addr_string + s done diff --git a/sources/thibaut/clientArg.ml b/sources/thibaut/clientArg.ml new file mode 100644 index 0000000..229b489 --- /dev/null +++ b/sources/thibaut/clientArg.ml @@ -0,0 +1,24 @@ +let port = ref 54321 +let max_length = 10000 +let config_file = ref "client.conf" +let version = "0.1" +let server = ref false +let id_file = ref "id.conf" +let generate = ref false +let degree = ref 10 +let accuracy = ref 10. + +let arg_list = Arg.align [ + "--port", Arg.Set_int port, " <n> listening port"; + "--conf", Arg.Set_string config_file, " <filename> configuration file"; + "--server", Arg.Set server, " launch the client as a server"; + "--id", Arg.Set_string id_file, " <filename> id file"; + "--genid", Arg.Set generate, " generate the id file"; + "--degree", Arg.Set_int degree, " <n> number of neighbours"; + "--accuracy", Arg.Set_float accuracy, " <n> number of minutes between rounds"; +] + +let usage = Printf.sprintf "usage: %s [OPTIONS]" Sys.argv.(0) + +let _ = + Arg.parse arg_list (fun _ -> ()) usage; diff --git a/sources/thibaut/clientGlobals.ml b/sources/thibaut/clientGlobals.ml index a07715d..079ef78 100644 --- a/sources/thibaut/clientGlobals.ml +++ b/sources/thibaut/clientGlobals.ml @@ -1,13 +1,16 @@ +open ClientArg open Unix +open ClientSlots type user_id = int type message_content = - | Ping + | Ping + | Pong | Mesh - | AskRoot - | AskPeers of sockaddr - | Accepted of user_id + | Close + | AskPeers of sockaddr list + | Accepted of sockaddr list type message = { id : user_id; @@ -15,9 +18,64 @@ type message = { content : message_content; } -type neighbour = { - id : user_id; - last_seen : float; - addr : sockaddr; -} +let version = "0.1" + +let safe_open f filename = + try + f filename + with + | Sys_error s -> Printf.eprintf "Could not open file: %s.\n" s; exit 1 + +let _ = + Printf.printf "Pacemaker client v%s\n%!" version; + if !generate then begin + let id_oc = safe_open open_out !id_file in + Random.self_init (); + Printf.fprintf id_oc "%d" (Random.int 500000000); + Printf.printf "Id generated in file %s\n%!" !id_file; + close_out id_oc; + exit 0 + end + +let id_ic = safe_open open_in !id_file +let my_id = Scanf.fscanf id_ic "%d" (fun x -> x) +let conf_ic = safe_open open_in !config_file +let root_addr, root_port = Scanf.fscanf conf_ic "%s\n%d" (fun a b -> a,b) +let _ = close_in conf_ic; close_in id_ic + +let my_addr = (gethostbyname(gethostname())).h_addr_list.(0) +let socketfd = socket PF_INET SOCK_DGRAM 0 + +let addr = ADDR_INET( + my_addr, + !port +) + +let root_addr = ADDR_INET( + inet_addr_of_string root_addr, + root_port +) + +let _ = + try + bind socketfd addr + with + | Unix_error(e,s1,s2) -> + Printf.eprintf "Could not bind socket on port %d: %s in %s %s\n%!" + !port (error_message e) s1 s2; + exit 1 + +let _ = + Printf.printf "Now listening on UDP port %d\n%!" !port + +let slots = ClientSlots.make !degree +let send_message addr (content:message_content) = + let message = { + id = my_id; + version = version; + content = content } + in + let s = Marshal.to_string message [] in + let n = String.length s in + ignore(Unix.sendto socketfd s 0 n [] addr) diff --git a/sources/thibaut/clientMesh.ml b/sources/thibaut/clientMesh.ml deleted file mode 100644 index 6a63017..0000000 --- a/sources/thibaut/clientMesh.ml +++ /dev/null @@ -1,30 +0,0 @@ -open ClientGlobals - -module SlotArray : sig - type t - type slot = Peer of neighbour | Empty - val make : int -> t - val full : t -> bool -end = struct - - type slot = Peer of neighbour | Empty - - type t = { - capacity : int; - mutable length : int; - array : slot array - } - - let make degree = { - capacity = degree; - length = 0; - array = Array.make degree Empty - } - - let full sa = sa.length = sa.capacity - - let add sa neighbour = - let length = sa.length in - if not (full sa) then - sa.array.(length) <- neighbour -end diff --git a/sources/thibaut/clientMessages.ml b/sources/thibaut/clientMessages.ml index e69de29..dbec955 100644 --- a/sources/thibaut/clientMessages.ml +++ b/sources/thibaut/clientMessages.ml @@ -0,0 +1,76 @@ +open Unix +open ClientArg +open ClientSlots +open ClientGlobals + + +let ask_queue = Queue.create () + +let rec add_to_asks l = match l with + | [] -> () + | a::b -> let ADDR_INET(s,p) = a in + Printf.printf "%s:%d\n" (string_of_inet_addr s) p; + Queue.push a ask_queue; add_to_asks b + +let update_time time id = + let neighbour = ClientSlots.find id slots in + neighbour.last_seen <- time + +let process_mesh_server time id addr = + if (ClientSlots.mem id slots) then begin + let l = ClientSlots.to_list_avoid id slots in + send_message addr (AskPeers l) + end + else if not (ClientSlots.full slots) then begin + ClientSlots.add { + last_seen = time; + addr = addr; + ClientSlots.id = id + } slots; + let l = ClientSlots.to_list_avoid id slots in + send_message addr (Accepted l) + end + else + let l = ClientSlots.to_list_avoid id slots in + send_message addr (AskPeers l) + +let process_message time s addr = + let send_message = send_message addr in + let message = ((Marshal.from_string s 0):message) in + if message.version = version then begin + match message.content with + | Ping -> + update_time time message.id; + send_message Pong + | Pong -> update_time time message.id; + | Mesh -> + if !server then + process_mesh_server time message.id addr + else if (ClientSlots.mem message.id slots) then begin + if (ClientSlots.length slots > 1) then + let neighbour = ClientSlots.random_peer_avoid message.id slots in + send_message (AskPeers [neighbour.addr]) + end + else if not (ClientSlots.full slots) then begin + ClientSlots.add { + ClientSlots.id = message.id; + last_seen = time; + addr = addr + } slots; + send_message (Accepted []) + end + else begin + let neighbour = ClientSlots.random_peer slots in + send_message (AskPeers [neighbour.addr]) + end + | Accepted l -> + ClientSlots.add { + ClientSlots.id = message.id; + last_seen = time; + addr = addr + } slots; + add_to_asks l + | AskPeers l -> + add_to_asks l + | Close -> ClientSlots.remove_id message.id slots + end diff --git a/sources/thibaut/clientSlots.ml b/sources/thibaut/clientSlots.ml new file mode 100644 index 0000000..3619f65 --- /dev/null +++ b/sources/thibaut/clientSlots.ml @@ -0,0 +1,123 @@ +open Unix + +type user_id = int + +type neighbour = { + id : user_id; + mutable last_seen : float; + addr : sockaddr; +} + +exception Found of neighbour +exception SlotError +type slot = Peer of neighbour | Empty + +type t = { + capacity : int; + mutable length : int; + array : slot array +} + +let length sa = sa.length + +let make degree = { + capacity = degree; + length = 0; + array = Array.make degree Empty + } + +let iter f sa = + for i = 0 to sa.length - 1 do + f sa.array.(i) + done + +let remove pos sa = + let length = sa.length - 1 in + sa.array.(pos) <- sa.array.(length); + sa.array.(length) <- Empty; + sa.length <- length + +let iter_remove f sa = + let rec aux n = + if n < sa.length then begin + match sa.array.(n) with + | Peer p -> + if f p then + aux (n+1) + else begin + remove n sa; + aux n + end + | Empty -> raise SlotError + end + in + aux 0 + +let full sa = (sa.length = sa.capacity) + +let find id sa = + let aux slot = match slot with + | Peer n -> if n.id = id then raise (Found n) + | _ -> failwith "Big problem" + in + try + iter aux sa; + raise Not_found + with + Found n -> n + +let mem id sa = + let aux slot = match slot with + | Peer n -> if n.id = id then raise (Found n) + | _ -> raise SlotError + in + try + iter aux sa; + false + with + Found n -> true + +let remove_id id sa = + iter_remove (fun n -> n.id = id) sa + +let to_list_avoid id sa = + let l = ref [] in + for i = 0 to sa.length -1 do + match sa.array.(i) with + | Peer p -> if p.id <> id then l:= (p.addr::(!l)) + | Empty -> raise SlotError + done; + !l + +let random_peer_avoid id sa = + let n = Random.int sa.length in + let slot = sa.array.(n) in + match slot with + | Peer p -> + let slot = + if p.id = id then + if n = sa.length - 1 then sa.array.(n-1) + else sa.array.(n+1) + else + sa.array.(n) + in begin + match slot with + | Peer n -> n + | Empty -> raise SlotError + end + | Empty -> raise SlotError + +let random_peer sa = + let n = Random.int sa.length in + match sa.array.(n) with + | Peer n -> n + | Empty -> raise SlotError + +let add neighbour sa = + let length = sa.length in + if not (full sa) then begin + sa.array.(length) <- (Peer neighbour); + sa.length <- length + 1 + end + + diff --git a/sources/thibaut/clientSlots.mli b/sources/thibaut/clientSlots.mli new file mode 100644 index 0000000..56e4ca3 --- /dev/null +++ b/sources/thibaut/clientSlots.mli @@ -0,0 +1,21 @@ +open Unix + +type t +type user_id = int +type neighbour = { + id : user_id; + mutable last_seen : float; + addr : sockaddr; +} +type slot = Peer of neighbour | Empty +val make : int -> t +val full : t -> bool +val find : user_id -> t -> neighbour +val mem : user_id -> t -> bool +val length : t -> int +val random_peer : t -> neighbour +val random_peer_avoid : user_id -> t -> neighbour +val add : neighbour -> t -> unit +val iter_remove : (neighbour -> bool) -> t -> unit +val remove_id : user_id -> t -> unit +val to_list_avoid : user_id -> t -> sockaddr list |
