more debug output
[linux-2.4.git] / fs / intermezzo / replicator.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001 Cluster File Systems, Inc. <braam@clusterfs.com>
5  * Copyright (C) 2001 Tacit Networks, Inc. <phil@off.net>
6  *
7  *   This file is part of InterMezzo, http://www.inter-mezzo.org.
8  *
9  *   InterMezzo is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   InterMezzo is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with InterMezzo; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  * Manage RCVD records for clients in the kernel
23  *
24  */
25
26 #define __NO_VERSION__
27 #include <linux/module.h>
28 #include <stdarg.h>
29 #include <asm/uaccess.h>
30
31 #include <linux/errno.h>
32
33 #include <linux/intermezzo_fs.h>
34
35 /*
36  * this file contains a hash table of replicators/clients for a
37  * fileset. It allows fast lookup and update of reintegration status
38  */
39
40 struct izo_offset_rec {
41         struct list_head or_list;
42         char             or_uuid[16];
43         loff_t           or_offset;
44 };
45
46 #define RCACHE_BITS 8
47 #define RCACHE_SIZE (1 << RCACHE_BITS)
48 #define RCACHE_MASK (RCACHE_SIZE - 1)
49
50 static struct list_head *
51 izo_rep_cache(void)
52 {
53         int i;
54         struct list_head *cache;
55         PRESTO_ALLOC(cache, sizeof(struct list_head) * RCACHE_SIZE);
56         if (cache == NULL) {
57                 CERROR("intermezzo-fatal: no memory for replicator cache\n");
58                 return NULL;
59         }
60         memset(cache, 0, sizeof(struct list_head) * RCACHE_SIZE);
61         for (i = 0; i < RCACHE_SIZE; i++)
62                 INIT_LIST_HEAD(&cache[i]);
63
64         return cache;
65 }
66
67 static struct list_head *
68 izo_rep_hash(struct list_head *cache, char *uuid)
69 {
70         return &cache[(RCACHE_MASK & uuid[1])];
71 }
72
73 void
74 izo_rep_cache_clean(struct presto_file_set *fset)
75 {
76         int i;
77         struct list_head *bucket;
78         struct list_head *tmp;
79
80         if (fset->fset_clients == NULL)
81                 return;
82         for (i = 0; i < RCACHE_SIZE; i++) {
83
84                                 list_for_each_safe(tmp,bucket,&fset->fset_clients[i])
85                                 {
86                                                 struct izo_offset_rec *offrec;
87                                                 list_del(tmp);
88                                                 offrec = list_entry(tmp, struct izo_offset_rec,or_list);
89                                                 PRESTO_FREE(offrec, sizeof(struct izo_offset_rec)); 
90                                 }
91                 }
92                 PRESTO_FREE(fset->fset_clients,sizeof(struct list_head) * RCACHE_SIZE);
93 }
94
95 struct izo_offset_rec *
96 izo_rep_cache_find(struct presto_file_set *fset, char *uuid)
97 {
98         struct list_head *tmp, *buck = izo_rep_hash(fset->fset_clients, uuid);
99         struct izo_offset_rec *rec = NULL;
100
101         list_for_each(tmp, buck) {
102                 rec = list_entry(tmp, struct izo_offset_rec, or_list);
103                 if ( memcmp(rec->or_uuid, uuid, sizeof(rec->or_uuid)) == 0 )
104                         return rec;
105         }
106
107         return NULL;
108 }
109
110 static int
111 izo_rep_cache_add(struct presto_file_set *fset, struct izo_rcvd_rec *rec,
112                   loff_t offset)
113 {
114         struct izo_offset_rec *offrec;
115
116         if (izo_rep_cache_find(fset, rec->lr_uuid)) {
117                 CERROR("izo: duplicate client entry %s off %Ld\n",
118                        fset->fset_name, offset);
119                 return -EINVAL;
120         }
121
122         PRESTO_ALLOC(offrec, sizeof(*offrec));
123         if (offrec == NULL) {
124                 CERROR("izo: cannot allocate offrec\n");
125                 return -ENOMEM;
126         }
127
128         memcpy(offrec->or_uuid, rec->lr_uuid, sizeof(rec->lr_uuid));
129         offrec->or_offset = offset;
130
131         list_add(&offrec->or_list,
132                  izo_rep_hash(fset->fset_clients, rec->lr_uuid));
133         return 0;
134 }
135
136 int
137 izo_rep_cache_init(struct presto_file_set *fset)
138 {
139         struct izo_rcvd_rec rec;
140         loff_t offset = 0, last_offset = 0;
141
142         fset->fset_clients = izo_rep_cache();
143         if (fset->fset_clients == NULL) {
144                 CERROR("Error initializing client cache\n");
145                 return -ENOMEM;
146         }
147
148         while ( presto_fread(fset->fset_rcvd.fd_file, (char *)&rec,
149                              sizeof(rec), &offset) == sizeof(rec) ) {
150                 int rc;
151
152                 if ((rc = izo_rep_cache_add(fset, &rec, last_offset)) < 0) {
153                         izo_rep_cache_clean(fset);
154                         return rc;
155                 }
156
157                 last_offset = offset;
158         }
159
160         return 0;
161 }
162
163 /*
164  * Return local last_rcvd record for the client. Update or create 
165  * if necessary.
166  *
167  * XXX: After this call, any -EINVAL from izo_rcvd_get is a real error.
168  */
169 int
170 izo_repstatus(struct presto_file_set *fset,  __u64 client_kmlsize, 
171               struct izo_rcvd_rec *lr_client, struct izo_rcvd_rec *lr_server)
172 {
173         int rc;
174         rc = izo_rcvd_get(lr_server, fset, lr_client->lr_uuid);
175         if (rc < 0 && rc != -EINVAL) {
176                 return rc;
177         }
178
179         /* client is new or has been reset. */
180         if (rc < 0 || (client_kmlsize == 0 && lr_client->lr_remote_offset == 0)) {
181                 memset(lr_server, 0, sizeof(*lr_server));
182                 memcpy(lr_server->lr_uuid, lr_client->lr_uuid, sizeof(lr_server->lr_uuid));
183                 rc = izo_rcvd_write(fset, lr_server);
184                 if (rc < 0)
185                         return rc;
186         }
187
188         /* update intersync */
189         rc = izo_upc_repstatus(presto_f2m(fset), fset->fset_name, lr_server);
190         return rc;
191 }
192
193 loff_t
194 izo_rcvd_get(struct izo_rcvd_rec *rec, struct presto_file_set *fset, char *uuid)
195 {
196         struct izo_offset_rec *offrec;
197         struct izo_rcvd_rec tmprec;
198         loff_t offset;
199
200         offrec = izo_rep_cache_find(fset, uuid);
201         if (offrec == NULL) {
202                 CDEBUG(D_SPECIAL, "izo_get_rcvd: uuid not in hash.\n");
203                 return -EINVAL;
204         }
205         offset = offrec->or_offset;
206
207         if (rec == NULL)
208                 return offset;
209
210         if (presto_fread(fset->fset_rcvd.fd_file, (char *)&tmprec,
211                          sizeof(tmprec), &offset) != sizeof(tmprec)) {
212                 CERROR("izo_get_rcvd: Unable to read from last_rcvd file offset "
213                        "%Lu\n", offset);
214                 return -EIO;
215         }
216
217         memcpy(rec->lr_uuid, tmprec.lr_uuid, sizeof(tmprec.lr_uuid));
218         rec->lr_remote_recno = le64_to_cpu(tmprec.lr_remote_recno);
219         rec->lr_remote_offset = le64_to_cpu(tmprec.lr_remote_offset);
220         rec->lr_local_recno = le64_to_cpu(tmprec.lr_local_recno);
221         rec->lr_local_offset = le64_to_cpu(tmprec.lr_local_offset);
222         rec->lr_last_ctime = le64_to_cpu(tmprec.lr_last_ctime);
223
224         return offrec->or_offset;
225 }
226
227 /* Try to lookup the UUID in the hash.  Insert it if it isn't found.  Write the
228  * data to the file.
229  *
230  * Returns the offset of the beginning of the record in the last_rcvd file. */
231 loff_t
232 izo_rcvd_write(struct presto_file_set *fset, struct izo_rcvd_rec *rec)
233 {
234         struct izo_offset_rec *offrec;
235         loff_t offset, rc;
236
237         ENTRY;
238
239         offrec = izo_rep_cache_find(fset, rec->lr_uuid);
240         if (offrec == NULL) {
241                 /* I don't think it should be possible for an entry to be not in
242                  * the hash table without also having an invalid offset, but we
243                  * handle it gracefully regardless. */
244                 write_lock(&fset->fset_rcvd.fd_lock);
245                 offset = fset->fset_rcvd.fd_offset;
246                 fset->fset_rcvd.fd_offset += sizeof(*rec);
247                 write_unlock(&fset->fset_rcvd.fd_lock);
248
249                 rc = izo_rep_cache_add(fset, rec, offset);
250                 if (rc < 0) {
251                         EXIT;
252                         return rc;
253                 }
254         } else
255                 offset = offrec->or_offset;
256         
257
258         rc = presto_fwrite(fset->fset_rcvd.fd_file, (char *)rec, sizeof(*rec),
259                            &offset);
260         if (rc == sizeof(*rec))
261                 /* presto_fwrite() advances 'offset' */
262                 rc = offset - sizeof(*rec);
263
264         EXIT;
265         return rc;
266 }
267
268 loff_t
269 izo_rcvd_upd_remote(struct presto_file_set *fset, char * uuid,  __u64 remote_recno, 
270                     __u64 remote_offset)
271 {
272         struct izo_rcvd_rec rec;
273         
274         loff_t rc;
275
276         ENTRY;
277         rc = izo_rcvd_get(&rec, fset, uuid);
278         if (rc < 0)
279                 return rc;
280         rec.lr_remote_recno = remote_recno;
281         rec.lr_remote_offset = remote_offset;
282
283         rc = izo_rcvd_write(fset, &rec);
284         EXIT;
285         if (rc < 0)
286                 return rc;
287         return 0;
288 }