Berkeley DB Reference Guide:
Berkeley DB Replication


Ex_repquote: a TCP/IP based communication infrastructure

All Berkeley DB replication applications must implement a communication infrastructure. The communication infrastructure consists of three parts: a way to map environment IDs to particular sites, the functions to get and receive messages, and the application architecture that supports the particular communication infrastructure used (for example, individual threads per communicating site, a shared message handler for all sites, a hybrid solution). The communication infrastructure is implemented in the file ex_repquote/ex_rq_net.c, and each part of that infrastructure is described as follows.

Ex_repquote maintains a table of environment ID to TCP/IP port mappings. This table is stored in the app_private field of the DB_ENV object so it can be accessed by any function that has the database environment handle. The table is represented by a machtab_t structure which contains a reference to a linked list of member_t's, both of which are defined in ex_repquote/ex_rq_net.c. Each member_t contains the host and port identification, the environment ID, and a file descriptor. The table is maintained by the following interfaces:

int machtab_add(machtab_t *machtab, int fd, u_int32_t hostaddr, int port, int *eidp);
int machtab_init(machtab_t **machtabp, int priority, int nsites);
int machtab_getinfo(machtab_t *machtab, int eid, u_int32_t *hostp, int *portp);
void machtab_parm(machtab_t *machtab, int *nump, int *priorityp, u_int32_t *timeoutp);
int machtab_rem(machtab_t *machtab, int eid, int lock);

These interfaces are particular to this application and communication infrastructure, but provide an indication of the sort of functionality that is needed to maintain the application-specific state for a TCP/IP-based infrastructure. The goal of the table and its interfaces is threefold: First, it must guarantee that given an environment ID, the send function can send a message to the appropriate place. Second, when given the special environment ID DB_EID_BROADCAST, the send function can send messages to all the machines in the group. Third, upon receipt of an incoming message, the receive function can correctly identify the sender and pass the appropriate environment ID to the DB_ENV->rep_process_message method.

Mapping a particular environment ID to a specific port is accomplished by looping through the linked list until the desired environment ID is found. Broadcast communication is implemented by looping through the linked list and sending to each member found. Since each port communicates with only a single other environment, receipt of a message on a particular port precisely identifies the sender.

The example provided is merely one way to satisfy these requirements, and there are alternative implementations as well. For instance, instead of associating separate socket connections with each remote environment, an application might instead label each message with a sender identifier; instead of looping through a table and sending a copy of a message to each member of the replication group, the application could send a single message using a broadcast protocol.

In ex_repquote's case, the send function (slightly simplified) is as follows:

int quote_send(dbenv, control, rec, eid, flags) DB_ENV *dbenv; const DBT *control, *rec; int eid; u_int32_t flags; { int fd, n, ret; machtab_t *machtab; member_t *m;

machtab = (machtab_t *)dbenv->app_private;

/* * If this is a broadcast, call a separate function to * iterate through the table of environment (a/k/a * machine) IDs and call quote_send_one on each. * (This function is not reproduced here, but can be * seen in ex_rq_net.c.) */ if (eid == DB_EID_BROADCAST) { n = quote_send_broadcast(machtab, rec, control, flags); if (n < 0) return (DB_REP_UNAVAIL); return (0); }

/* Find the fild descriptor, fd, associated with this EID. */ fd = 0; if ((ret = pthread_mutex_lock(&machtab->mtmutex)) != 0) return (0); for (m = LIST_FIRST(&machtab->machlist); m != NULL; m = LIST_NEXT(m, links)) { if (m->eid == eid) { fd = m->fd; break; } } if (pthread_mutex_unlock(&machtab->mtmutex) != 0) return (-1);

if (fd == 0) return (DB_REP_UNAVAIL);

/* We have a file descriptor; write the data over it. */ ret = quote_send_one(rec, control, fd, flags);

return (ret); }

int quote_send_broadcast(machtab, rec, control, flags) machtab_t *machtab; const DBT *rec, *control; u_int32_t flags; { int ret, sent; member_t *m, *next; if ((ret = pthread_mutex_lock(&machtab->mtmutex)) != 0) return (0); sent = 0; for (m = LIST_FIRST(&machtab->machlist); m != NULL; m = next) { next = LIST_NEXT(m, links); if ((ret = quote_send_one(rec, control, m->fd, flags)) != 0) { (void)machtab_rem(machtab, m->eid, 0); } else sent++; } if (pthread_mutex_unlock(&machtab->mtmutex) != 0) return (-1); return (sent); }

The quote_send_one function has been omitted as it simply writes the data requested over the file descriptor that it is passed. It contains nothing specific to Berkeley DB or this communication infrastructure. The complete code can be found in ex_repquote/ex_rq_net.c.

The quote_send function is passed as the callback to DB_ENV->set_rep_transport; Berkeley DB automatically sends messages as needed for replication. The receive function is a mirror to the quote_send_one function. It is not a callback function (the application is responsible for collecting messages and calling DB_ENV->rep_process_message on them as is convenient). In the sample application, all messages transmitted are Berkeley DB messages that get handled by DB_ENV->rep_process_message, however, this is not always going to be the case. The application may want to pass its own messages across the same channels, distinguish between its own messages and those of Berkeley DB, and then pass only the Berkeley DB ones to DB_ENV->rep_process_message.

The final component of the communication infrastructure is the process model used to communicate with all the sites in the replication group. Each site creates a thread of control that listens on its designated socket (as specified by the -m command line argument) and then creates a new channel for each site that contacts it. In addition, each site explicitly connects to the sites specified in the -o command line argument. This is a fairly standard TCP/IP process architecture and is implemented by the following functions (all in ex_repquote/ex_rq_net.c).

int get_connected_socket(machtab_t *machtab, char *progname, char *remotehost,
int port, int *is_open, int *eidp): Connect to the specified host/port, add the
site to the machtab, and return a file descriptor for communication with this

int listen_socket_init(char *progname, int port): Initialize a socket for listening on a particular part.

int listen_socket_accept(machtab_t *machtab, char *progname, int socket, int *eidp): Accept a connection on a socket and add it to the machtab. int listen_socket_connect


Copyright Sleepycat Software