make oldconfig will rebuild these...
[linux-2.4.21-pre4.git] / fs / intermezzo / replicator.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001 Cluster File Systems, Inc. <braam@clusterfs.com>
5  * Copyright (C) 2001 Tacit Networks, Inc. <phil@off.net>
6  *
7  *   This file is part of InterMezzo, http://www.inter-mezzo.org.
8  *
9  *   InterMezzo is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   InterMezzo is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with InterMezzo; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  * Manage RCVD records for clients in the kernel
23  *
24  */
25
26 #define __NO_VERSION__
27 #include <linux/module.h>
28 #include <stdarg.h>
29 #include <asm/uaccess.h>
30
31 #include <linux/errno.h>
32
33 #include <linux/intermezzo_fs.h>
34
35 /*
36  * this file contains a hash table of replicators/clients for a
37  * fileset. It allows fast lookup and update of reintegration status
38  */
39
40 struct izo_offset_rec {
41         struct list_head or_list;
42         char             or_uuid[16];
43         loff_t           or_offset;
44 };
45
46 #define RCACHE_BITS 8
47 #define RCACHE_SIZE (1 << RCACHE_BITS)
48 #define RCACHE_MASK (RCACHE_SIZE - 1)
49
50 static struct list_head *
51 izo_rep_cache(void)
52 {
53         int i;
54         struct list_head *cache;
55         PRESTO_ALLOC(cache, sizeof(struct list_head) * RCACHE_SIZE);
56         if (cache == NULL) {
57                 CERROR("intermezzo-fatal: no memory for replicator cache\n");
58                 return NULL;
59         }
60         memset(cache, 0, sizeof(struct list_head) * RCACHE_SIZE);
61         for (i = 0; i < RCACHE_SIZE; i++)
62                 INIT_LIST_HEAD(&cache[i]);
63
64         return cache;
65 }
66
67 static struct list_head *
68 izo_rep_hash(struct list_head *cache, char *uuid)
69 {
70         return &cache[(RCACHE_MASK & uuid[1])];
71 }
72
73 static void
74 izo_rep_cache_clean(struct presto_file_set *fset)
75 {
76         int i;
77         struct list_head *bucket;
78         struct list_head *tmp;
79
80         if (fset->fset_clients == NULL)
81                 return;
82         for (i = 0; i < RCACHE_SIZE; i++) {
83                 tmp = bucket = &fset->fset_clients[i];
84
85                 tmp = tmp->next;
86                 while (tmp != bucket) {
87                         struct izo_offset_rec *offrec;
88                         tmp = tmp->next;
89                         list_del(tmp);
90                         offrec = list_entry(tmp, struct izo_offset_rec,
91                                             or_list);
92                         PRESTO_FREE(offrec, sizeof(struct izo_offset_rec));
93                 }
94         }
95 }
96
97 struct izo_offset_rec *
98 izo_rep_cache_find(struct presto_file_set *fset, char *uuid)
99 {
100         struct list_head *buck = izo_rep_hash(fset->fset_clients, uuid);
101         struct list_head *tmp = buck;
102         struct izo_offset_rec *rec = NULL;
103
104         while ( (tmp = tmp->next) != buck ) {
105                 rec = list_entry(tmp, struct izo_offset_rec, or_list);
106                 if ( memcmp(rec->or_uuid, uuid, sizeof(rec->or_uuid)) == 0 )
107                         return rec;
108         }
109
110         return NULL;
111 }
112
113 static int
114 izo_rep_cache_add(struct presto_file_set *fset, struct izo_rcvd_rec *rec,
115                   loff_t offset)
116 {
117         struct izo_offset_rec *offrec;
118
119         if (izo_rep_cache_find(fset, rec->lr_uuid)) {
120                 CERROR("izo: duplicate client entry %s off %Ld\n",
121                        fset->fset_name, offset);
122                 return -EINVAL;
123         }
124
125         PRESTO_ALLOC(offrec, sizeof(*offrec));
126         if (offrec == NULL) {
127                 CERROR("izo: cannot allocate offrec\n");
128                 return -ENOMEM;
129         }
130
131         memcpy(offrec->or_uuid, rec->lr_uuid, sizeof(rec->lr_uuid));
132         offrec->or_offset = offset;
133
134         list_add(&offrec->or_list,
135                  izo_rep_hash(fset->fset_clients, rec->lr_uuid));
136         return 0;
137 }
138
139 int
140 izo_rep_cache_init(struct presto_file_set *fset)
141 {
142         struct izo_rcvd_rec rec;
143         loff_t offset = 0, last_offset = 0;
144
145         fset->fset_clients = izo_rep_cache();
146         if (fset->fset_clients == NULL) {
147                 CERROR("Error initializing client cache\n");
148                 return -ENOMEM;
149         }
150
151         while ( presto_fread(fset->fset_rcvd.fd_file, (char *)&rec,
152                              sizeof(rec), &offset) == sizeof(rec) ) {
153                 int rc;
154
155                 if ((rc = izo_rep_cache_add(fset, &rec, last_offset)) < 0) {
156                         izo_rep_cache_clean(fset);
157                         return rc;
158                 }
159
160                 last_offset = offset;
161         }
162
163         return 0;
164 }
165
166 /*
167  * Return local last_rcvd record for the client. Update or create 
168  * if necessary.
169  *
170  * XXX: After this call, any -EINVAL from izo_rcvd_get is a real error.
171  */
172 int
173 izo_repstatus(struct presto_file_set *fset,  __u64 client_kmlsize, 
174               struct izo_rcvd_rec *lr_client, struct izo_rcvd_rec *lr_server)
175 {
176         int rc;
177         rc = izo_rcvd_get(lr_server, fset, lr_client->lr_uuid);
178         if (rc < 0 && rc != -EINVAL) {
179                 return rc;
180         }
181
182         /* client is new or has been reset. */
183         if (rc < 0 || (client_kmlsize == 0 && lr_client->lr_remote_offset == 0)) {
184                 memset(lr_server, 0, sizeof(*lr_server));
185                 memcpy(lr_server->lr_uuid, lr_client->lr_uuid, sizeof(lr_server->lr_uuid));
186                 rc = izo_rcvd_write(fset, lr_server);
187                 if (rc < 0)
188                         return rc;
189         }
190
191         /* update intersync */
192         rc = izo_upc_repstatus(presto_f2m(fset), fset->fset_name, lr_server);
193         return rc;
194 }
195
196 loff_t
197 izo_rcvd_get(struct izo_rcvd_rec *rec, struct presto_file_set *fset, char *uuid)
198 {
199         struct izo_offset_rec *offrec;
200         struct izo_rcvd_rec tmprec;
201         loff_t offset;
202
203         offrec = izo_rep_cache_find(fset, uuid);
204         if (offrec == NULL) {
205                 CDEBUG(D_SPECIAL, "izo_get_rcvd: uuid not in hash.\n");
206                 return -EINVAL;
207         }
208         offset = offrec->or_offset;
209
210         if (rec == NULL)
211                 return offset;
212
213         if (presto_fread(fset->fset_rcvd.fd_file, (char *)&tmprec,
214                          sizeof(tmprec), &offset) != sizeof(tmprec)) {
215                 CERROR("izo_get_rcvd: Unable to read from last_rcvd file offset "
216                        "%Lu\n", offset);
217                 return -EIO;
218         }
219
220         memcpy(rec->lr_uuid, tmprec.lr_uuid, sizeof(tmprec.lr_uuid));
221         rec->lr_remote_recno = le64_to_cpu(tmprec.lr_remote_recno);
222         rec->lr_remote_offset = le64_to_cpu(tmprec.lr_remote_offset);
223         rec->lr_local_recno = le64_to_cpu(tmprec.lr_local_recno);
224         rec->lr_local_offset = le64_to_cpu(tmprec.lr_local_offset);
225         rec->lr_last_ctime = le64_to_cpu(tmprec.lr_last_ctime);
226
227         return offrec->or_offset;
228 }
229
230 /* Try to lookup the UUID in the hash.  Insert it if it isn't found.  Write the
231  * data to the file.
232  *
233  * Returns the offset of the beginning of the record in the last_rcvd file. */
234 loff_t
235 izo_rcvd_write(struct presto_file_set *fset, struct izo_rcvd_rec *rec)
236 {
237         struct izo_offset_rec *offrec;
238         loff_t offset, rc;
239
240         ENTRY;
241
242         offrec = izo_rep_cache_find(fset, rec->lr_uuid);
243         if (offrec == NULL) {
244                 /* I don't think it should be possible for an entry to be not in
245                  * the hash table without also having an invalid offset, but we
246                  * handle it gracefully regardless. */
247                 write_lock(&fset->fset_rcvd.fd_lock);
248                 offset = fset->fset_rcvd.fd_offset;
249                 fset->fset_rcvd.fd_offset += sizeof(*rec);
250                 write_unlock(&fset->fset_rcvd.fd_lock);
251
252                 rc = izo_rep_cache_add(fset, rec, offset);
253                 if (rc < 0) {
254                         EXIT;
255                         return rc;
256                 }
257         } else
258                 offset = offrec->or_offset;
259         
260
261         rc = presto_fwrite(fset->fset_rcvd.fd_file, (char *)rec, sizeof(*rec),
262                            &offset);
263         if (rc == sizeof(*rec))
264                 /* presto_fwrite() advances 'offset' */
265                 rc = offset - sizeof(*rec);
266
267         EXIT;
268         return rc;
269 }
270
271 loff_t
272 izo_rcvd_upd_remote(struct presto_file_set *fset, char * uuid,  __u64 remote_recno, 
273                     __u64 remote_offset)
274 {
275         struct izo_rcvd_rec rec;
276         
277         loff_t rc;
278
279         ENTRY;
280         rc = izo_rcvd_get(&rec, fset, uuid);
281         if (rc < 0)
282                 return rc;
283         rec.lr_remote_recno = remote_recno;
284         rec.lr_remote_offset = remote_offset;
285
286         rc = izo_rcvd_write(fset, &rec);
287         EXIT;
288         if (rc < 0)
289                 return rc;
290         return 0;
291 }