diff options
Diffstat (limited to 'debian/htdig/htdig-3.2.0b6/db/qam.c')
-rw-r--r-- | debian/htdig/htdig-3.2.0b6/db/qam.c | 1245 |
1 files changed, 1245 insertions, 0 deletions
diff --git a/debian/htdig/htdig-3.2.0b6/db/qam.c b/debian/htdig/htdig-3.2.0b6/db/qam.c new file mode 100644 index 00000000..d7574bec --- /dev/null +++ b/debian/htdig/htdig-3.2.0b6/db/qam.c @@ -0,0 +1,1245 @@ +/*- + * See the file LICENSE for redistribution information. + * + * Copyright (c) 1999 + * Sleepycat Software. All rights reserved. + */ + +#include "db_config.h" + +#ifndef lint +static const char sccsid[] = "@(#)qam.c 11.23 (Sleepycat) 10/26/99"; +#endif /* not lint */ + +#ifndef NO_SYSTEM_INCLUDES +#include <sys/types.h> + +#include <errno.h> +#include <string.h> +#endif + +#include "db_int.h" +#include "db_page.h" +#include "db_shash.h" +#include "db_am.h" +#include "lock.h" +#include "qam.h" +#include "mp.h" + +static int CDB___qam_c_close __P((DBC *)); +static int CDB___qam_c_del __P((DBC *, u_int32_t)); +static int CDB___qam_c_get __P((DBC *, DBT *, DBT *, u_int32_t)); +static int CDB___qam_c_put __P((DBC *, DBT *, DBT *, u_int32_t)); +static int CDB___qam_getno __P((DB *, const DBT *, db_recno_t *)); +static int CDB___qam_i_delete __P((DBC *)); +static int CDB___qam_i_put __P((DBC *, DBT *, u_int32_t)); +static int CDB___qam_nrecs __P((DBC *, db_recno_t *, db_recno_t *)); +static int CDB___qam_position + __P((DBC *, db_recno_t *, db_lockmode_t, db_recno_t, int *)); +static int CDB___qam_c_destroy __P((DBC *)); + +/* + * CDB___qam_nrecs -- + * Return the record number for the head and tail of the queue. + */ +static int +CDB___qam_nrecs(dbc, rep, start) + DBC *dbc; + db_recno_t *rep, *start; +{ + DB *dbp; + DB_LOCK lock; + QMETA *meta; + db_pgno_t pg; + int ret; + + dbp = dbc->dbp; + + pg = ((QUEUE *)dbp->q_internal)->q_meta; + if ((ret = CDB___db_lget(dbc, 0, pg, DB_LOCK_READ, 0, &lock)) != 0) + return (ret); + if ((ret = CDB_memp_fget(dbp->mpf, &pg, 0, &meta)) != 0) { + /* We did not fetch it, we can release the lock. */ + (void)__LPUT(dbc, lock); + return (ret); + } + + *rep = meta->cur_recno; + *start = meta->start; + + if ((ret = CDB_memp_fput(dbp->mpf, meta, 0)) != 0) + return (ret); + /* Don't hold the meta page long term. */ + if ((ret = __LPUT(dbc, lock)) != 0) + return (ret); + + return (0); +} + +/* + * CDB___qam_position -- + * Position a queued access method cursor at a record. This returns + * the page locked. *exactp will be set if the record is valid. + */ +static int +CDB___qam_position(dbc, recnop, lock_mode, start, exactp) + DBC *dbc; /* open cursor */ + db_recno_t *recnop; /* pointer to recno to find */ + db_lockmode_t lock_mode;/* locking: read or write */ + db_recno_t start; /* meta.start */ + int *exactp; /* indicate if it was found */ +{ + QUEUE_CURSOR *cp; + DB *dbp; + QAMDATA *qp; + db_pgno_t pg; + int ret; + + dbp = dbc->dbp; + cp = (QUEUE_CURSOR *) dbc->internal; + + /* Fetch the page for this recno. */ + pg = QAM_RECNO_PAGE(dbp, start, *recnop); + + if ((ret = CDB___db_lget(dbc, 0, pg, lock_mode, 0, &cp->lock)) != 0) + return (ret); + if ((ret = CDB_memp_fget(dbp->mpf, &pg, + lock_mode == DB_LOCK_WRITE ? DB_MPOOL_CREATE : 0, + &cp->page)) != 0) { + /* We did not fetch it, we can release the lock. */ + (void)__LPUT(dbc, cp->lock); + cp->lock.off = LOCK_INVALID; + return (ret); + } + cp->pgno = pg; + cp->indx = QAM_RECNO_INDEX(dbp, pg, start, *recnop); + + if (cp->page->pgno == 0) { + if (F_ISSET(dbp, DB_AM_RDONLY)) { + *exactp = 0; + return (0); + } + cp->page->pgno = pg; + cp->page->type = P_QAMDATA; + } + + qp = QAM_GET_RECORD(dbp, cp->page, cp->indx); + *exactp = F_ISSET(qp, QAM_VALID); + + return (ret); +} + +/* + * CDB___qam_pitem -- + * Put an item on a queue page. Copy the data to the page and set the + * VALID and SET bits. If logging and the record was previously set, + * log that data, otherwise just log the new data. + * + * pagep must be write locked + * + * PUBLIC: int CDB___qam_pitem + * PUBLIC: __P((DBC *, QPAGE *, u_int32_t, db_recno_t, DBT *)); + */ +int +CDB___qam_pitem(dbc, pagep, indx, recno, data) + DBC *dbc; + QPAGE *pagep; + u_int32_t indx; + db_recno_t recno; + DBT *data; +{ + DB *dbp; + DBT olddata, pdata, *datap; + QAMDATA *qp; + QUEUE *t; + u_int32_t size; + u_int8_t *dest, *p; + int alloced, ret; + + alloced = ret = 0; + + dbp = dbc->dbp; + t = (QUEUE *)dbp->q_internal; + + if (data->size > t->re_len) + return (EINVAL); + + qp = QAM_GET_RECORD(dbp, pagep, indx); + + p = qp->data; + size = data->size; + datap = data; + if (F_ISSET(data, DB_DBT_PARTIAL)) { + if (data->size != data->dlen) + return (EINVAL); + if (data->doff + data->dlen > t->re_len) + return (EINVAL); + if (data->size == t->re_len) + goto no_partial; + + /* + * If we are logging, then we have to build the record + * first, otherwise, we can simply drop the change + * directly on the page. After this clause, make + * sure that datap and p are set up correctly so that + * copying datap into p does the right thing. + * + * Note, I am changing this so that if the existing + * record is not valid, we create a complete record + * to log so that both this and the recovery code is simpler. + */ + + if (DB_LOGGING(dbc) || !F_ISSET(qp, QAM_VALID)) { + datap = &pdata; + memset(datap, 0, sizeof(*datap)); + + if ((ret = + CDB___os_malloc(t->re_len, NULL, &datap->data)) != 0) + return (ret); + alloced = 1; + datap->size = t->re_len; + + /* + * Construct the record if it's valid, otherwise set it + * all to the pad character. + */ + dest = datap->data; + if (F_ISSET(qp, QAM_VALID)) + memcpy(dest, p, t->re_len); + else + memset(dest, t->re_pad, t->re_len); + + dest += data->doff; + memcpy(dest, data->data, data->size); + } else { + datap = data; + p += data->doff; + } + } + +no_partial: + if (DB_LOGGING(dbc)) { + olddata.size = 0; + if (F_ISSET(qp, QAM_SET)) { + olddata.data = qp->data; + olddata.size = t->re_len; + } + if ((ret = CDB___qam_add_log(dbp->dbenv, dbc->txn, &LSN(pagep), + 0, dbp->log_fileid, &LSN(pagep), pagep->pgno, + indx, recno, datap, qp->flags, + olddata.size == 0 ? NULL : &olddata)) != 0) + goto err; + } + + F_SET(qp, QAM_VALID | QAM_SET); + memcpy(p, datap->data, datap->size); + if (!F_ISSET(data, DB_DBT_PARTIAL)) + memset(p + datap->size, t->re_pad, t->re_len - datap->size); + +err: if (alloced) + CDB___os_free(datap->data, t->re_len); + + return (ret); +} +/* + * CDB___qam_c_put + * Cursor put for queued access method. + * BEFORE and AFTER cannot be specified. + */ +static int +CDB___qam_c_put(dbc, key, data, flags) + DBC *dbc; + DBT *key, *data; + u_int32_t flags; +{ + QUEUE_CURSOR *cp; + DB_LOCK lock; + int ret; + + /* Check for invalid flags. */ + if ((ret = CDB___db_cputchk(dbc->dbp, key, data, flags, + F_ISSET(dbc->dbp, DB_AM_RDONLY), + ((QUEUE_CURSOR *)dbc->internal)->recno != RECNO_OOB)) != 0) + return (ret); + + DEBUG_LWRITE(dbc, dbc->txn, "qam_c_put", NULL, data, flags); + + cp = (QUEUE_CURSOR *)dbc->internal; + lock = cp->lock; + + ret = CDB___qam_i_put(dbc, data, flags); + + /* Release any previous lock, if not in a transaction. */ + if (ret == 0 && lock.off != LOCK_INVALID) { + if ((ret = __TLPUT(dbc, lock)) != 0) + return (ret); + cp->lock.off = LOCK_INVALID; + } + return (0); +} + +/* + * CDB___qam_i_put + * Internal cursor put for queued access method. + */ +static int +CDB___qam_i_put(dbc, data, flags) + DBC *dbc; + DBT *data; + u_int32_t flags; +{ + QUEUE_CURSOR *cp; + DB *dbp; + DB_LOCK lock; + QMETA *meta; + db_pgno_t pg; + db_recno_t new_cur, new_first; + u_int32_t opcode; + int exact, ret, t_ret; + + dbp = dbc->dbp; + PANIC_CHECK(dbp->dbenv); + /* + * If we are running CDB, this had better be either a write + * cursor or an immediate writer. If it's a regular writer, + * that means we have an IWRITE lock and we need to upgrade + * it to a write lock. + */ + if (F_ISSET(dbp->dbenv, DB_ENV_CDB)) { + if (!F_ISSET(dbc, DBC_WRITECURSOR | DBC_WRITER)) + return (EINVAL); + + if (F_ISSET(dbc, DBC_WRITECURSOR) && + (ret = CDB_lock_get(dbp->dbenv, dbc->locker, DB_LOCK_UPGRADE, + &dbc->lock_dbt, DB_LOCK_WRITE, &dbc->mylock)) != 0) + return (ret); + } + + cp = (QUEUE_CURSOR *)dbc->internal; + + /* Write lock the record. */ + if ((ret = CDB___db_lget(dbc, + 0, cp->recno, DB_LOCK_WRITE, DB_LOCK_RECORD, &lock)) != 0) + return (ret); + + if ((ret = CDB___qam_position(dbc, + &cp->recno, DB_LOCK_WRITE, cp->start, &exact)) != 0) { + /* We could not get the page, we can release the record lock. */ + __LPUT(dbc, lock); + return (ret); + } + + if (exact && flags == DB_NOOVERWRITE) { + ret = __TLPUT(dbc, lock); + /* Doing record locking, release the page lock */ + if ((t_ret = __LPUT(dbc, cp->lock)) == 0) + cp->lock.off = LOCK_INVALID; + else + if (ret == 0) + ret = t_ret; + if ((t_ret = CDB_memp_fput(dbp->mpf, cp->page, 0)) != 0 && ret == 0) + ret = t_ret; + return (ret == 0 ? DB_KEYEXIST : ret); + } + + /* Put the item on the page. */ + ret = CDB___qam_pitem(dbc, (QPAGE *)cp->page, cp->indx, cp->recno, data); + + /* Doing record locking, release the page lock */ + if ((t_ret = __LPUT(dbc, cp->lock)) != 0 && ret == 0) + ret = t_ret; + if ((t_ret = CDB_memp_fput(dbp->mpf, cp->page, DB_MPOOL_DIRTY)) && ret == 0) + ret = t_ret; + cp->lock = lock; + cp->lock_mode = DB_LOCK_WRITE; + if (ret != 0) + return (ret); + + /* We may need to reset the head or tail of the queue. */ + pg = ((QUEUE *)dbp->q_internal)->q_meta; + if ((ret = CDB___db_lget(dbc, 0, pg, DB_LOCK_WRITE, 0, &lock)) != 0) + return (ret); + if ((ret = CDB_memp_fget(dbp->mpf, &pg, 0, &meta)) != 0) { + /* We did not fetch it, we can release the lock. */ + (void)__LPUT(dbc, lock); + return (ret); + } + + opcode = 0; + new_cur = new_first = 0; + + if (cp->recno > meta->cur_recno) { + new_cur = cp->recno; + opcode |= QAM_SETCUR; + } + if (cp->recno < meta->first_recno || meta->first_recno < meta->start) { + new_first = cp->recno; + opcode |= QAM_SETFIRST; + } + + if (opcode != 0 && DB_LOGGING(dbc)) { + ret = CDB___qam_mvptr_log(dbp->dbenv, dbc->txn, &meta->dbmeta.lsn, + 0, opcode, dbp->log_fileid, meta->first_recno, new_first, + meta->cur_recno, new_cur, &meta->dbmeta.lsn); + } + + if (opcode & QAM_SETCUR) + meta->cur_recno = cp->recno; + if (opcode & QAM_SETFIRST) + meta->first_recno = cp->recno; + + if ((t_ret = + CDB_memp_fput(dbp->mpf, meta, opcode != 0 ? DB_MPOOL_DIRTY : 0)) != 0 && + ret == 0) + ret = t_ret; + + /* Don't hold the meta page long term. */ + if ((t_ret = __LPUT(dbc, lock)) != 0 && ret == 0) + ret = t_ret; + return (ret); +} + +/* + * CDB___qam_put -- + * Add a record to the queue. + * If we are doing anything but appending, just call qam_c_put to do the + * work. Otherwise we fast path things here. + * + * PUBLIC: int CDB___qam_put __P((DB *, DB_TXN *, DBT *, DBT *, u_int32_t)); + */ +int +CDB___qam_put(dbp, txn, key, data, flags) + DB *dbp; + DB_TXN *txn; + DBT *key, *data; + u_int32_t flags; +{ + QUEUE_CURSOR *cp; + DBC *dbc; + DB_LOCK lock; + QMETA *meta; + QPAGE *page; + db_pgno_t pg; + db_recno_t recno, start, total; + int ret, t_ret; + + PANIC_CHECK(dbp->dbenv); + + /* Allocate a cursor. */ + if ((ret = dbp->cursor(dbp, txn, &dbc, DB_WRITELOCK)) != 0) + return (ret); + + DEBUG_LWRITE(dbc, txn, "qam_put", key, data, flags); + + cp = dbc->internal; + + /* Check for invalid flags. */ + if ((ret = CDB___db_putchk(dbp, + key, data, flags, F_ISSET(dbp, DB_AM_RDONLY), 0)) != 0) + goto done; + + /* If not appending, then just call the cursor routine */ + if (flags != DB_APPEND) { + if ((ret = CDB___qam_getno(dbp, key, &cp->recno)) != 0) + goto done; + CDB___qam_nrecs(dbc, &total, &cp->start); + + ret = CDB___qam_i_put(dbc, data, flags); + goto done; + } + + /* Write lock the meta page. */ + pg = ((QUEUE *)dbp->q_internal)->q_meta; + if ((ret = CDB___db_lget(dbc, 0, pg, DB_LOCK_WRITE, 0, &lock)) != 0) + goto done; + if ((ret = CDB_memp_fget(dbp->mpf, &pg, 0, &meta)) != 0) { + /* We did not fetch it, we can release the lock. */ + (void)__LPUT(dbc, lock); + goto done; + } + + /* Record that we are going to allocate a record. */ + if (DB_LOGGING(dbc)) { + CDB___qam_inc_log(dbp->dbenv, + txn, &meta->dbmeta.lsn, + 0, dbp->log_fileid, &meta->dbmeta.lsn); + } + + /* Get the next record number. */ + recno = ++meta->cur_recno; + start = meta->start; + + if (meta->first_recno < meta->start || meta->first_recno > recno) + meta->first_recno = recno; + + /* Release the meta page. */ + if ((ret = CDB_memp_fput(dbp->mpf, meta, DB_MPOOL_DIRTY)) != 0) + return (ret); + + /* Lock the record and release meta page lock. */ + if ((ret = CDB___db_lget(dbc, + 1, recno, DB_LOCK_WRITE, DB_LOCK_RECORD, &lock)) != 0) + goto done; + + cp->lock = lock; + cp->lock_mode = DB_LOCK_WRITE; + + pg = QAM_RECNO_PAGE(dbp, start, recno); + + /* Fetch and write lock the data page. */ + if ((ret = CDB___db_lget(dbc, 0, pg, DB_LOCK_WRITE, 0, &lock)) != 0) + goto done; + if ((ret = CDB_memp_fget(dbp->mpf, &pg, DB_MPOOL_CREATE, &page)) != 0) { + /* We did not fetch it, we can release the lock. */ + (void)__LPUT(dbc, lock); + goto done; + } + + /* See if this is a new page. */ + if (page->pgno == 0) { + page->pgno = pg; + page->type = P_QAMDATA; + } + + /* Put the item on the page and log it. */ + if ((ret = CDB___qam_pitem(dbc, page, + QAM_RECNO_INDEX(dbp, pg, start, recno), recno, data)) != 0) { + /* The put failed. We did not change the row. */ + (void)__LPUT(dbc, cp->lock); + (void)CDB_memp_fput(dbp->mpf, cp->page, DB_MPOOL_DIRTY); + return (ret); + } + + /* Doing record locking, release the page lock */ + if ((ret = __LPUT(dbc, lock)) != 0) + return (ret); + if ((ret = CDB_memp_fput(dbp->mpf, page, DB_MPOOL_DIRTY)) != 0) + return (ret); + + /* Return the record number to the user. */ + *(db_recno_t *)key->data = recno; + +done: + /* Discard the cursor. */ + if ((t_ret = dbc->c_close(dbc)) != 0 && ret == 0) + ret = t_ret; + + return (ret); +} + +/* + * CDB___qam_i_delete -- + * Internal version of recno delete, called by CDB___qam_delete and + * CDB___qam_c_del. + */ +static int +CDB___qam_i_delete(dbc) + DBC *dbc; +{ + QUEUE_CURSOR *cp; + DB *dbp; + DBT data; + DB_LOCK lock; + PAGE *pagep; + QAMDATA *qp; + db_recno_t start; + int exact, ret, t_ret; + + dbp = dbc->dbp; + cp = dbc->internal; + start = cp->start; + ret = 0; + + /* + * If this is CDB and this isn't a write cursor, then it's an error. + * If it is a write cursor, but we don't yet hold the write lock, then + * we need to upgrade to the write lock. + */ + if (F_ISSET(dbp->dbenv, DB_ENV_CDB)) { + /* Make sure it's a valid update cursor. */ + if (!F_ISSET(dbc, DBC_WRITECURSOR | DBC_WRITER)) + return (EINVAL); + + if (F_ISSET(dbc, DBC_WRITECURSOR) && + (ret = CDB_lock_get(dbp->dbenv, dbc->locker, + DB_LOCK_UPGRADE, &dbc->lock_dbt, DB_LOCK_WRITE, + &dbc->mylock)) != 0) + return (EAGAIN); + } + + if ((ret = CDB___db_lget(dbc, + 0, cp->recno, DB_LOCK_WRITE, DB_LOCK_RECORD, &lock)) != 0) + goto err; + + cp->lock_mode = DB_LOCK_WRITE; + /* Find the record ; delete only deletes exact matches. */ + if ((ret = CDB___qam_position(dbc, + &cp->recno, DB_LOCK_WRITE, start, &exact)) != 0) { + cp->lock = lock; + goto err; + } + if (!exact) { + ret = DB_NOTFOUND; + goto err1; + } + + pagep = cp->page; + qp = QAM_GET_RECORD(dbp, pagep, cp->indx); + + if (DB_LOGGING(dbc)) { + data.size = ((QUEUE *)dbp->q_internal)->re_len; + data.data = qp->data; + if ((ret = CDB___qam_del_log(dbp->dbenv, dbc->txn, + &LSN(pagep), 0, dbp->log_fileid, &LSN(pagep), + pagep->pgno, cp->indx, cp->recno)) != 0) + goto err1; + } + + F_CLR(qp, QAM_VALID); + +err1: + if ((t_ret = CDB_memp_fput( + dbp->mpf, cp->page, ret == 0 ? DB_MPOOL_DIRTY : 0)) != 0) + return (ret ? ret : t_ret); + /* Doing record locking, release the page lock */ + if ((t_ret = __LPUT(dbc, cp->lock)) != 0) { + cp->lock = lock; + return (ret ? ret : t_ret); + } + cp->lock = lock; + return (ret); + +err: if (F_ISSET(dbp->dbenv, DB_ENV_CDB) && F_ISSET(dbc, DBC_WRITECURSOR)) + (void)CDB___lock_downgrade( + dbp->dbenv, &dbc->mylock, DB_LOCK_IWRITE, 0); + return (ret); +} + +/* + * CDB___qam_delete -- + * Queue db->del function. + * + * PUBLIC: int CDB___qam_delete __P((DB *, DB_TXN *, DBT *, u_int32_t)); + */ +int +CDB___qam_delete(dbp, txn, key, flags) + DB *dbp; + DB_TXN *txn; + DBT *key; + u_int32_t flags; +{ + QUEUE_CURSOR *cp; + DBC *dbc; + db_recno_t total; + int ret, t_ret; + + PANIC_CHECK(dbp->dbenv); + + /* Check for invalid flags. */ + if ((ret = + CDB___db_delchk(dbp, key, flags, F_ISSET(dbp, DB_AM_RDONLY))) != 0) + return (ret); + + /* Acquire a cursor. */ + if ((ret = dbp->cursor(dbp, txn, &dbc, DB_WRITELOCK)) != 0) + return (ret); + + DEBUG_LWRITE(dbc, txn, "qam_delete", key, NULL, flags); + + cp = dbc->internal; + if ((ret = CDB___qam_getno(dbp, key, &cp->recno)) != 0) + goto err; + CDB___qam_nrecs(dbc, &total, &cp->start); + + /* Do the delete. */ + if (cp->recno > total) { + ret = DB_NOTFOUND; + goto err; + } + + ret = CDB___qam_i_delete(dbc); + + /* Release the cursor. */ +err: if ((t_ret = dbc->c_close(dbc)) != 0 && ret == 0) + ret = t_ret; + + return (ret); +} + +/* + * CDB___qam_c_del -- + * Qam cursor->c_del function + */ +static int +CDB___qam_c_del(dbc, flags) + DBC *dbc; + u_int32_t flags; +{ + QUEUE_CURSOR *cp; + DB *dbp; + db_recno_t total; + int ret; + + dbp = dbc->dbp; + cp = dbc->internal; + + PANIC_CHECK(dbp->dbenv); + + /* Check for invalid flags. */ + if ((ret = CDB___db_cdelchk(dbp, flags, + F_ISSET(dbp, DB_AM_RDONLY), cp->recno != RECNO_OOB)) != 0) + return (ret); + + DEBUG_LWRITE(dbc, dbc->txn, "qam_c_del", NULL, NULL, flags); + + if ((ret = CDB___qam_nrecs(dbc, &total, &cp->start)) != 0) + return (ret); + return (CDB___qam_i_delete(dbc)); +} + + +/* + * CDB___qam_c_get -- + * Queue cursor->c_get function. + */ +static int +CDB___qam_c_get(dbc_orig, key, data, flags) + DBC *dbc_orig; + DBT *key, *data; + u_int32_t flags; +{ + QUEUE_CURSOR *cp, *orig; + DB *dbp; + DB_LOCK lock, pglock, metalock, save_lock; + DBC *dbc; + PAGE *pg; + QAMDATA *qp; + QMETA *meta; + db_indx_t save_indx; + db_lockmode_t lock_mode; + db_pgno_t metapno, save_page; + db_recno_t start, first, skipped, save_recno; + int exact, is_first, locked, ret, t_ret, with_delete; + int put_mode, meta_dirty; + + orig = dbc_orig->internal; + dbp = dbc_orig->dbp; + + PANIC_CHECK(dbp->dbenv); + + with_delete = 0; + lock_mode = DB_LOCK_READ; + put_mode = 0; + t_ret = 0; + + /* Check for invalid flags. */ + if ((ret = CDB___db_cgetchk(dbp, + key, data, flags, orig->recno != RECNO_OOB)) != 0) + return (ret); + + /* Clear OR'd in additional bits so we can check for flag equality. */ + if (LF_ISSET(DB_RMW)) { + lock_mode = DB_LOCK_WRITE; + LF_CLR(DB_RMW); + } + + if (flags == DB_CONSUME) { + with_delete = 1; + flags = DB_FIRST; + lock_mode = DB_LOCK_WRITE; + } + + DEBUG_LREAD(dbc_orig, dbc_orig->txn, "qam_c_get", + flags == DB_SET || flags == DB_SET_RANGE ? key : NULL, NULL, flags); + + /* Get a copy of the original cursor, including position. */ + if ((ret = dbc_orig->c_dup(dbc_orig, &dbc, DB_POSITIONI)) != 0) + return (ret); + cp = dbc->internal; + + is_first = 0; + + /* get the meta page */ + metapno = ((QUEUE *)dbp->q_internal)->q_meta; + if ((ret = CDB___db_lget(dbc, 0, metapno, lock_mode, 0, &metalock)) != 0) + return (ret); + locked = 1; + if ((ret = CDB_memp_fget(dbp->mpf, &metapno, 0, &meta)) != 0) { + /* We did not fetch it, we can release the lock. */ + (void)__LPUT(dbc, metalock); + return (ret); + } + + skipped = 0; + + /* Make lint and friends happy. */ + first = 0; + meta_dirty = 0; + + /* Release any previous lock if not in a transaction. */ + if (cp->lock.off != LOCK_INVALID) { + (void)__TLPUT(dbc, cp->lock); + cp->lock.off = LOCK_INVALID; + } + +retry: /* Update the record number. */ + cp->start = start = meta->start; + switch (flags) { + case DB_CURRENT: + break; + case DB_NEXT: + if (cp->recno != RECNO_OOB) { + ++cp->recno; + break; + } + /* FALLTHROUGH */ + case DB_FIRST: + flags = DB_NEXT; + is_first = 1; + + /* get the first record number */ + cp->recno = first = meta->first_recno; + + /* if we will delete it, then increment */ + if (with_delete && first < meta->cur_recno) { + if (DB_LOGGING(dbc)) + CDB___qam_incfirst_log(dbp->dbenv, dbc->txn, + &LSN(meta), 0, + dbp->log_fileid, first); + meta->first_recno++; + meta_dirty = 1; + } + + break; + case DB_PREV: + if (cp->recno != RECNO_OOB) { + if (cp->recno <= meta->first_recno) { + ret = DB_NOTFOUND; + goto err; + } + --cp->recno; + break; + } + /* FALLTHROUGH */ + case DB_LAST: + cp->recno = meta->cur_recno; + if (cp->recno == 0) { + ret = DB_NOTFOUND; + goto err; + } + break; + case DB_SET: + case DB_SET_RANGE: + if ((ret = CDB___qam_getno(dbp, key, &cp->recno)) != 0) + goto err; + break; + } + + if (cp->recno > meta->cur_recno || cp->recno < start) { + ret = DB_NOTFOUND; + pg = NULL; + if (skipped) + goto undo_meta; + goto err; + } + + /* Don't hold the meta page long term. */ + if (locked) { + if ((ret = __LPUT(dbc, metalock)) != 0) + goto err; + locked = 0; + } + + /* Lock the record. */ + if ((ret = CDB___db_lget(dbc, 0, cp->recno, lock_mode, + with_delete ? DB_LOCK_NOWAIT | DB_LOCK_RECORD : DB_LOCK_RECORD, + &lock)) == DB_LOCK_NOTGRANTED) { + /* + * In the DB_CONSUME case we skip the locked + * record, someone else will pick it up. + * + */ + is_first = 0; + if (skipped == 0) + skipped = cp->recno; + goto retry; + } + + if (ret != 0) + goto err; + + /* + * In the DB_FIRST or DB_LAST cases we must wait and then start over + * since the first/last may have moved while we slept. + * We release our locks and try again. + */ + if ((!with_delete && is_first) || flags == DB_LAST) { + if ((ret = + CDB___db_lget(dbc, 0, metapno, lock_mode, 0, &metalock)) != 0) + goto err; + if (cp->recno != + (is_first ? meta->first_recno : meta->cur_recno)) { + __LPUT(dbc, lock); + if (is_first) + flags = DB_FIRST; + locked = 1; + goto retry; + } + /* Don't hold the meta page long term. */ + if ((ret = __LPUT(dbc, metalock)) != 0) + goto err; + } + + /* Position the cursor on the record. */ + if ((ret = + CDB___qam_position(dbc, &cp->recno, lock_mode, start, &exact)) != 0) { + /* We cannot get the page, release the record lock. */ + (void)__LPUT(dbc, lock); + goto err; + } + + pg = cp->page; + pglock = cp->lock; + cp->lock = lock; + cp->lock_mode = lock_mode; + + if (!exact) { + if (flags == DB_NEXT || flags == DB_PREV || flags == DB_LAST) { + /* Release locks and try again. */ + (void)CDB_memp_fput(dbp->mpf, cp->page, 0); + (void)__LPUT(dbc, pglock); + (void)__LPUT(dbc, cp->lock); + cp->lock.off = LOCK_INVALID; + if (flags == DB_LAST) + flags = DB_PREV; + if (!with_delete) + is_first = 0; + goto retry; + } + /* this is for the SET and SET_RANGE cases */ + ret = DB_KEYEMPTY; + goto err1; + } + + /* Return the key if the user didn't give us one. */ + if (flags != DB_SET && + (ret = CDB___db_retcopy(dbp, key, &cp->recno, sizeof(cp->recno), + &dbc->rkey.data, &dbc->rkey.ulen)) != 0) { + if (with_delete) + goto undo_meta; + else + goto err1; + } + + qp = QAM_GET_RECORD(dbp, pg, cp->indx); + + /* Return the data item. */ + if ((ret = CDB___db_retcopy(dbp, data, qp->data, + ((QUEUE *)dbp->q_internal)->re_len, + &dbc->rdata.data, &dbc->rdata.ulen)) != 0) { + if (with_delete) + goto undo_meta; + else + goto err1; + } + + /* Finally, if we are doing DB_CONSUME mark the record. */ + if (with_delete) { + if (DB_LOGGING(dbc)) + if ((ret = CDB___qam_del_log(dbp->dbenv, dbc->txn, + &LSN(pg), 0, dbp->log_fileid, &LSN(pg), + pg->pgno, cp->indx, cp->recno)) != 0) + goto undo_meta; + + F_CLR(qp, QAM_VALID); + put_mode = DB_MPOOL_DIRTY; + + /* + * This code is responsible for correcting metadata. + * There are 3 cases. + * 1) We moved ahead more than one record. + * 2) We did not actually delete cp->recno. + * 3) We encountered at least one locked. + * record and skipped them. + */ + if (cp->recno != first) { + if (0) { +undo_meta: is_first = 0; + } + if (locked == 0 && (t_ret = CDB___db_lget( + dbc, 0, metapno, lock_mode, 0, &metalock)) != 0) + goto err1; + + if (is_first) { + /* + * Check to see if we moved past the first record, + * if so update meta so others can start past the + * deleted records. + */ + if (meta->first_recno > first) { + meta->first_recno = cp->recno; + meta_dirty = 1; + } + } + else if (skipped == 0) { + /* + * Error case: we did not actually delete the + * record, restore meta_first so that it is at + * least at or before cp->recno + */ + if (meta->first_recno > cp->recno) { + meta->first_recno = cp->recno; + meta_dirty = 1; + } + } + else if (meta->first_recno > skipped) { + /* + * We skipped some records because they were + * locked. If the meta-data page reflects a + * starting pointer after the skipped records + * we need to move it back to the first record + * that is not deleted or is sill locked. + * Release locks as we go, we are only + * reading to optimize future fetches. + */ + first = meta->first_recno; + /* Don't hold the meta page long term. */ + __LPUT(dbc, metalock); + locked = 0; + + /* reverify the skipped record */ + save_page = cp->pgno; + save_indx = cp->indx; + save_recno = cp->recno; + save_lock = cp->lock; + do { + t_ret = CDB___db_lget(dbc, 0, skipped, + DB_LOCK_READ, + DB_LOCK_NOWAIT | DB_LOCK_RECORD, + &lock); + if (t_ret == DB_LOCK_NOTGRANTED) + break; + if (t_ret != 0) + goto err1; + if ((t_ret = + CDB___qam_position(dbc, &skipped, + DB_LOCK_READ, + start, &exact)) != 0) { + (void)__LPUT(dbc, lock); + goto err1; + } + if ((t_ret = CDB_memp_fput(dbp->mpf, + cp->page, put_mode)) != 0) + goto err1; + if ((t_ret =__LPUT(dbc, lock)) != 0) + goto err1; + if ((t_ret = + __LPUT(dbc, cp->lock)) != 0) + goto err1; + if (exact) + break; + } while (++skipped <= first); + + t_ret = 0; + if ((t_ret = CDB___db_lget( + dbc, 0, metapno, + lock_mode, 0, &metalock)) != 0) + goto err1; + + if (meta->first_recno > skipped) { + meta->first_recno = skipped; + meta_dirty = 1; + } + cp->pgno = save_page; + cp->indx = save_indx; + cp->recno = save_recno; + cp->lock = save_lock; + } + locked = 1; + } + } + + /* + * Swap the cursors so we are left with the new position inside of + * the original DBCs structure, and close the dup'd cursor once it + * references the old position. + * + * The close can fail, but we only expect DB_LOCK_DEADLOCK failures. + * This violates our "the cursor is unchanged on error" semantics, + * but since all you can do with a DB_LOCK_DEADLOCK failure is close + * the cursor, I believe that's OK. + */ + orig = dbc_orig->internal; + dbc_orig->internal = dbc->internal; + dbc->internal = orig; + + +err1: + if (pg != NULL) { + if (!ret) + ret = t_ret; + t_ret = CDB_memp_fput(dbp->mpf, pg, put_mode); + + if (!ret) + ret = t_ret; + /* Doing record locking, release the page lock */ + t_ret = __LPUT(dbc, pglock); + } + +err: + if (meta) { + if (!ret) + ret = t_ret; + + /* release the meta page */ + t_ret = CDB_memp_fput( + dbp->mpf, meta, meta_dirty ? DB_MPOOL_DIRTY : 0); + + if (!ret) + ret = t_ret; + + /* Don't hold the meta page long term. */ + if (locked) + t_ret = __LPUT(dbc, metalock); + } + + if (!ret) + ret = t_ret; + + t_ret = dbc->c_close(dbc); + + return (ret ? ret : t_ret); +} + +/* + * CDB___qam_c_close -- + * Close down the cursor from a single use. + */ +static int +CDB___qam_c_close(dbc) + DBC *dbc; +{ + QUEUE_CURSOR *cp; + + cp = dbc->internal; + + /* Discard any locks not acquired inside of a transaction. */ + if (cp->lock.off != LOCK_INVALID) { + (void)__TLPUT(dbc, cp->lock); + cp->lock.off = LOCK_INVALID; + } + + cp->page = NULL; + cp->pgno = PGNO_INVALID; + cp->indx = 0; + cp->lock.off = LOCK_INVALID; + cp->lock_mode = DB_LOCK_NG; + cp->recno = RECNO_OOB; + cp->flags = 0; + + return (0); +} + +/* + * CDB___qam_c_dup -- + * Duplicate a queue cursor, such that the new one holds appropriate + * locks for the position of the original. + * + * PUBLIC: int CDB___qam_c_dup __P((DBC *, DBC *)); + */ +int +CDB___qam_c_dup(orig_dbc, new_dbc) + DBC *orig_dbc, *new_dbc; +{ + QUEUE_CURSOR *orig, *new; + + orig = orig_dbc->internal; + new = new_dbc->internal; + + new->pgno = orig->pgno; + new->indx = orig->indx; + + new->recno = orig->recno; + new->start = orig->start; + + new->lock_mode = orig->lock_mode; + + /* reget the long term lock if we are not in a xact */ + if (orig_dbc->txn != NULL || orig->lock.off == LOCK_INVALID) + return (0); + + return (CDB___db_lget(new_dbc, + 0, new->recno, new->lock_mode, DB_LOCK_RECORD, &new->lock)); +} + +/* + * CDB___qam_c_init + * + * PUBLIC: int CDB___qam_c_init __P((DBC *)); + */ +int +CDB___qam_c_init(dbc) + DBC *dbc; +{ + QUEUE_CURSOR *cp; + DB *dbp; + int ret; + + dbp = dbc->dbp; + + /* Allocate the internal structure. */ + if ((ret = CDB___os_calloc(1, sizeof(QUEUE_CURSOR), &cp)) != 0) + return (ret); + + /* + * Logical record numbers are always the same size, and we don't want + * to have to check for space every time we return one. Allocate it + * in advance. + */ + if ((ret = + CDB___os_malloc(sizeof(db_recno_t), NULL, &dbc->rkey.data)) != 0) { + CDB___os_free(cp, sizeof(QUEUE_CURSOR)); + return (ret); + } + dbc->rkey.ulen = sizeof(db_recno_t); + + /* Initialize methods. */ + dbc->internal = cp; + dbc->c_del = CDB___qam_c_del; + dbc->c_get = CDB___qam_c_get; + dbc->c_put = CDB___qam_c_put; + dbc->c_am_close = CDB___qam_c_close; + dbc->c_am_destroy = CDB___qam_c_destroy; + + return (0); +} +/* + * CDB___qam_c_destroy -- + * Close a single cursor -- internal version. + */ +static int +CDB___qam_c_destroy(dbc) + DBC *dbc; +{ + /* Discard the structures. */ + CDB___os_free(dbc->internal, sizeof(QUEUE_CURSOR)); + + return (0); +} + +/* + * CDB___qam_getno -- + * Check the user's record number. + */ +static int +CDB___qam_getno(dbp, key, rep) + DB *dbp; + const DBT *key; + db_recno_t *rep; +{ + if ((*rep = *(db_recno_t *)key->data) == 0) { + CDB___db_err(dbp->dbenv, "illegal record number of 0"); + return (EINVAL); + } + return (0); +} |