1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
|
/*
Copyright (C) 2008 Renaissance Technologies Corp.
main developer: HP Wei <hp@rentec.com>
Copyright (C) 2006 Renaissance Technologies Corp.
main developer: HP Wei <hp@rentec.com>
Copyright (C) 2005 Renaissance Technologies Corp.
Copyright (C) 2001 Renaissance Technologies Corp.
main developer: HP Wei <hp@rentec.com>
This file was modified in 2001 and later from files in the program
multicaster copyrighted by Aaron Hillegass as found at
<http://sourceforge.net/projects/multicaster/>
Copyright (C) 2000 Aaron Hillegass <aaron@classmax.com>
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2, or (at your option)
any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; see the file COPYING.
If not, write to the Free Software Foundation,
59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*/
#include "main.h"
/* the following is needed on Sun but not on linux */
#ifdef _SUN
#include <sys/filio.h>
#endif
extern int machineID;
extern int verbose;
extern char * my_MCAST_ADDR; /* defined in multicatcher.c */
extern char * my_IFname;
extern int my_PORT;
extern unsigned int current_file_id;
int isMonitor; /* flag = if this target machine is a designated monitor */
int nPage_recv; /* counter for the number of pages received for a file */
int machineState; /* there are four states during one file transmission */
int isFirstPage=TRUE; /* flag */
/* The followings are used to determine sick condition */
int current_missing_pages;
int last_missing_pages;
int sick_count;
/* receive socket */
int recfd;
#ifndef IPV6
struct sockaddr_in rec_addr;
#else
struct sockaddr_in6 rec_addr;
#endif
/*
Receive buffer for storing the data obtained from UDP
The format:
(5*sizeof(int) bytes header) + (PAGE_SIZE data area)
The header has five int_type (4 bytes) int's.
(1) mode -- for master to give instructions to the target machines.
(2) current file index (starting with 1)
(3) current page index (starting with 1)
(4) bytes that has been sent in this UDP page
(5) total number of pages.
data_ptr points to the data area.
*/
int *mode_ptr; /* hp: change from char to int */
int *total_pages_ptr;
int *current_page_ptr;
int *bytes_sent_ptr, *current_file_ptr;
char *data_ptr;
char rec_buf[PAGE_BUFFSIZE];
void init_page_reader()
{
/*struct ip_mreq mreq;*/
int rcv_size;
machineState = IDLE_STATE;
isMonitor = FALSE;
/* Prepare buffer pointers */
mode_ptr = (int*)rec_buf;
current_file_ptr = (int *)(mode_ptr + 1);
current_page_ptr = (int *)(current_file_ptr + 1);
bytes_sent_ptr = (int *)(current_page_ptr + 1);
total_pages_ptr = (int *)(bytes_sent_ptr + 1);
data_ptr = (char *)(total_pages_ptr + 1);
/* Set up receive socket */
if (verbose>=2) fprintf(stderr, "setting up receive socket\n");
recfd = rec_socket(&rec_addr, my_PORT);
/* Join the multicast group */
/*inet_pton(AF_INET, MCAST_ADDR, &(mreq.imr_multiaddr.s_addr));
mreq.imr_multiaddr.s_addr = inet_addr(my_MCAST_ADDR);
mreq.imr_interface.s_addr = htonl(INADDR_ANY);
if (setsockopt(recfd, IPPROTO_IP, IP_ADD_MEMBERSHIP, (void*)&mreq, sizeof(mreq)) < 0){
perror("Joining Multicast Group");
}
*/
if (Mcast_join(recfd, my_MCAST_ADDR, my_IFname, 0)<0) {
perror("Joining Multicast Group");
}
/* Increase socket receive buffer */
rcv_size = TOTAL_REC_PAGE * PAGE_BUFFSIZE;
if (setsockopt(recfd, SOL_SOCKET, SO_RCVBUF, &rcv_size, sizeof(rcv_size)) < 0){
perror("Expanding receive buffer for page_reader");
}
}
/*
This is the heart of multicatcher.
It parses the incoming pages and do proper reactions according
to the mode (command code) encoded in the first 4 bytes in an UDP page.
It returns the mode.
*/
int read_handle_page()
{
#ifndef IPV6
struct sockaddr_in return_addr;
#else
struct sockaddr_in6 return_addr;
#endif
int bytes_read;
socklen_t return_len = (socklen_t)sizeof(return_addr);
int mode_v, bytes_sent_v, total_pages_v, current_file_v, current_page_v;
/* -----------receiving data -----------------*/
if (readable(recfd) == 1) { /* there is data coming in */
/* check_queue(); This might be useful. But need more study. */
/* get data */
bytes_read = recvfrom(recfd, rec_buf, PAGE_BUFFSIZE, 0,
(struct sockaddr *)&return_addr,
(socklen_t*) &return_len);
bytes_sent_v = ntohl(*bytes_sent_ptr);
if (bytes_read != (bytes_sent_v + HEAD_SIZE))
return NULL_CMD;
/* convert from network byte order to host byte order */
mode_v = ntohl(*mode_ptr);
total_pages_v = ntohl(*total_pages_ptr);
current_file_v = ntohl(*current_file_ptr);
current_page_v = ntohl(*current_page_ptr);
if (isFirstPage) {
update_complaint_address(&return_addr);
isFirstPage = FALSE;
}
/* --- process various commands (modes) */
switch (mode_v) {
case TEST:
/* It is just a test packet? */
fprintf(stderr, "********** Received test packet **********\n");
return mode_v;
case SELECT_MONITOR_CMD:
if (current_page_v == machineID) {
isMonitor = TRUE;
send_complaint(MONITOR_OK, machineID, 0, 0);
} else {
isMonitor = FALSE;
}
return mode_v;
case OPEN_FILE_CMD:
if ((current_page_v == (int) ALL_MACHINES || current_page_v == (int) machineID)
&& machineState == IDLE_STATE) {
/* get info about this file */
if (verbose>=1)
fprintf(stderr, "open file id= %d\n", current_file_v);
if (!extract_file_info(data_ptr, current_file_v, total_pages_v))
return mode_v;
/* different tasks here */
/* open file, rmdir, unlink */
if (total_pages_v < 0) { /* delete a file or a directory */
if (!delete_file(TRUE)) return mode_v; /* this fx can be re-entered many times */
/* machineState remains to be IDLE_STATE */
} else if (total_pages_v == 0) {
/* handle an empty file, or (soft)link or directory */
if (!check_zero_page_entry()) return mode_v; /* can re-enter many times */
/* machineState remains to be IDLE_STATE */
} else { /* a regular file */
if (!open_file()) return mode_v;
/* the file has been opened. */
sick_count = 0;
current_missing_pages =0;
last_missing_pages = nPages_for_file(current_file_v);
machineState = GET_DATA_STATE;
}
send_complaint(OPEN_OK, machineID, 0, current_file_id); /* ack */
nPage_recv = 0;
return mode_v;
}
/*
We must be in GET_DATA_STATE.
OPEN_OK ack has been sent back in the previous block.
However,
the master may not have received the ack.
In that case the master will send back the open_file_cmd again.
*/
if ((current_page_v == (int) ALL_MACHINES ||current_page_v == (int) machineID)
&& (machineState == GET_DATA_STATE)) { /****/
send_complaint(OPEN_OK, machineID, 0, current_file_id);
}
return mode_v;
case EOF_CMD:
/**********/
if (verbose>=1)
fprintf(stderr, "***** EOF received for id=%d state=%d id=%d, file=%d\n",
current_page_v, machineState, machineID, current_file_v);
/* the following happnens when this machine was previously out-of-pace
and was labeled as 'BAD MACHINE' by the master.
The master has proceeded with the syncing process without
waiting for this machine to finish the process in one of the previous files.
Since under normal condition, this machine should not expect to see
current_file changes except when OPEN_FILE_CMD is received.
*/
if ((current_file_v) != current_file_id) return mode_v; /* ignore the cmd */
/* normal condition */
if ((current_page_v == (int) ALL_MACHINES || current_page_v == (int) machineID)
&& machineState == GET_DATA_STATE) { /* GET_DATA_STATE */
/* check missing pages and send back missing-page-request */
current_missing_pages = ask_for_missing_page(); /* = total # of missing_pages */
if (current_page_v == (int) machineID) {
/* master is asking for my EOF_ack */
if (current_missing_pages == 0) {
/* w/o assuming how we get to this point ... */
machineState = DATA_READY_STATE;
send_complaint(EOF_OK, machineID, 0, current_file_id);
} else {
send_complaint(MISSING_TOTAL, machineID,
current_missing_pages , current_file_id);
missing_page_stat(); /* this has to be done before close_file() ******************/
}
return mode_v;
}
/***
master is asking everyone, (after master sent or re-sent pages)
so, we do some book-keeping procedures (incl state change)
***/
if (verbose >=1)
fprintf(stderr, "missing_pages = %d, nPages_received = %d file = %d\n",
current_missing_pages, nPage_recv, current_file_v); /************/
nPage_recv = 0;
if (current_missing_pages == 0) {
/*
There is no missing page.
Change the state.
*/
machineState = DATA_READY_STATE;
send_complaint(EOF_OK, machineID, 0, current_file_id);
return mode_v;
} else {
/*
There are missing pages.
If we still miss many pages for SICK_THRESHOLD consecutive times,
then we are sick. e.g. machine CPU does not give multicatcher
enough time to process incoming UDP's OR the disk I/O is too slow.
*/
if ((SICK_RATIO)*(double)last_missing_pages < (double)current_missing_pages) {
++sick_count;
if (sick_count > SICK_THRESHOLD) {
machineState = SICK_STATE;
send_complaint(SIT_OUT, machineID,
0, current_file_id); /* no more attempt to receive */
/* master may send more pages from requests from other machines
but this machine will mark this file as 'sits out receiving' */
} else {
/* not sick enough yet */
send_complaint(MISSING_TOTAL, machineID,
current_missing_pages, current_file_id);
missing_page_stat(); /* this has to be done before close_file() ******************/
/* master will send more pages */
}
} else { /* we are getting enough missing pages this time to keep up */
sick_count = 0; /* break the consecutiveness */
send_complaint(MISSING_TOTAL, machineID,
current_missing_pages, current_file_id);
missing_page_stat(); /* this has to be done before close_file() ******************/
/* master will send more pages */
}
last_missing_pages = current_missing_pages;
return mode_v;
}
} /* end GET_DATA_STATE */
/* After state change, we still get request for ack.
send back ack again */
if ((current_page_v == (int) ALL_MACHINES || current_page_v == (int) machineID)) {
switch (machineState) {
case DATA_READY_STATE:
send_complaint(EOF_OK, machineID,
0, current_file_id);
return mode_v;
case SICK_STATE:
send_complaint(SIT_OUT, machineID,
0, current_file_id); /* just an ack, even for sick state*/
return mode_v;
}
}
return mode_v;
case CLOSE_FILE_CMD:
if (verbose>=1)
fprintf(stderr, "***** CLOSE received for id=%d state=%d id=%d, file=%d\n",
current_page_v, machineState, machineID, current_file_v);
if ((current_file_v) != current_file_id) return mode_v; /* ignore the cmd */
if (current_page_v == (int) ALL_MACHINES || current_page_v == (int) machineID) {
if (machineState == DATA_READY_STATE) {
if (!close_file()) { return mode_v; };
set_owner_perm_times();
machineState = IDLE_STATE;
send_complaint(CLOSE_OK, machineID, 0, current_file_id);
return mode_v;
} else if (machineState == IDLE_STATE) {
/* send ack back again because we are asked */
send_complaint(CLOSE_OK, machineID, 0, current_file_id);
return mode_v;
} else { /* other states -- we should not be here*/
/* if (machineState == SICK_STATE || machineState == GET_DATA_STATE) */
/* SICK_STATE --> we are too slow in getting missing pages
if one of the machines is sick, master will send out CLOSE_ABORT
GET_DATA_STATE -->
We are not supposed to be in GET_DATA_STATE,
so consider it a sick_state */
fprintf(stderr, "*** should not be here -- state=%d\n", machineState);
if (!rm_tmp_file()) { return mode_v; };
machineState = IDLE_STATE;
send_complaint(SIT_OUT, machineID, 0, current_file_id);
/* make sick_count larger than threshold for GET_DATA_STATE */
sick_count = SICK_THRESHOLD + 10000;
return mode_v;
}
}
return mode_v;
case CLOSE_ABORT_CMD:
if (verbose>=1)
fprintf(stderr, "***** CLOSE_ABORT received for id=%d state=%d id=%d, file=%d\n",
current_page_v, machineState, machineID, current_file_v);
if ((current_file_v) != current_file_id) return mode_v; /* ignore the cmd */
if (current_page_v == (int) ALL_MACHINES || current_page_v == (int) machineID) {
if (!rm_tmp_file()) { return mode_v; };
machineState = IDLE_STATE;
send_complaint(CLOSE_OK, machineID, 0, current_file_id);
}
return mode_v;
case SENDING_DATA:
case RESENDING_DATA:
if ((current_file_v) != current_file_id) return mode_v; /* ignore the cmd */
/*
otherwise, go ahead...
*/
if (verbose>=2) {
fprintf(stderr, "Got %d bytes from page %d of %d for file %d mode=%d\n",
bytes_read - HEAD_SIZE,
current_page_v, total_pages_v,
current_file_v + 1, mode_v);
}
/* timing the disk IO */
/* start_timer(); */
write_page(current_page_v, data_ptr, bytes_read - HEAD_SIZE);
if (isMonitor) send_complaint(PAGE_RECV, machineID, 0, current_file_id);
/*
end_timer();
update_time_accumulator();
*/
/* Yes, we have just read a page */
++nPage_recv;
return mode_v;
case ALL_DONE_CMD:
/*
clear up the files.
*/
/*** since we do not know if there are other machines
that are NOT in data_ready_state,
to maintain equality, it is best to just
remove tmp_file without close_file() ***/
rm_tmp_file();
return mode_v;
default:
return mode_v;
} /* end of switch */
} else {
/* No, the read is timed out */
return TIMED_OUT;
}
}
|