! sqr_index — secondary-index query and maintenance for the sqr module.
!
! Descendant of `sqr_base`: it inherits the storage/engine core — key compare
! and extraction, the B+-tree bulk rebuild, the kc_ctx_t comparator context,
! and the file-open helpers — by host association, so it carries no `use` of
! its own beyond the IEEE infinities that bound a range scan.  This submodule
! is the index read/seek side: building an index and its in-memory geometry
! (db_create_index_*), equality lookup (db_find_by_*), ordered cursors and
! leading-column range scans (db_open_cursor, db_find_range_*, db_cursor_next),
! and the natural-key by-unique-index operations (db_get/update/delete_by_key).

submodule (sqr:sqr_base) sqr_index
    use, intrinsic :: ieee_arithmetic, only: ieee_value, &
                                             ieee_positive_inf, ieee_negative_inf
    ! ieee_is_nan is host-associated from sqr_base (used by key_has_nan).
    implicit none

contains

    ! ===== Index support =====

    module subroutine db_create_index_1(db, table_name, col_name, stat, unique)
        class(db_t),       intent(inout)         :: db
        character(len=*), intent(in)            :: table_name
        character(len=*), intent(in)            :: col_name
        integer,          intent(out), optional :: stat
        logical,          intent(in),  optional :: unique
        character(len=len(col_name)) :: one(1)   ! named 1-elt array: no constructor temp
        one(1) = col_name
        call create_index_impl(db, table_name, one, stat, unique)
    end subroutine

    module subroutine db_create_index_m(db, table_name, col_names, stat, unique)
        class(db_t),       intent(inout)         :: db
        character(len=*), intent(in)            :: table_name
        character(len=*), intent(in)            :: col_names(:)
        integer,          intent(out), optional :: stat
        logical,          intent(in),  optional :: unique
        call create_index_impl(db, table_name, col_names, stat, unique)
    end subroutine

    ! Build a (possibly composite, possibly unique) secondary index over
    ! col_names in the given order. Rejects: unknown table/column, a TEXT
    ! member, a repeated member, an index already covering exactly these
    ! columns, and — when unique — pre-existing duplicate live keys.
    subroutine create_index_impl(db, table_name, col_names, stat, unique)
        type(db_t),       intent(inout)         :: db
        character(len=*), intent(in)            :: table_name
        character(len=*), intent(in)            :: col_names(:)
        integer,          intent(out), optional :: stat
        logical,          intent(in),  optional :: unique
        integer :: ti, rs, m, p, nc, koff
        logical :: uniq
        type(index_t), allocatable :: new_idx(:)
        type(index_t) :: ix
        if (readonly_block(db, stat)) return
        if (txn_block(db, stat)) return
        uniq = .false.
        if (present(unique)) uniq = unique
        nc = size(col_names)
        ti = db_table_index(db, table_name)
        if (ti == 0) then
            if (present(stat)) stat = SQR_NOT_FOUND
            return
        end if
        associate (t => db%tables(ti))
            if (nc < 1) then
                if (present(stat)) stat = SQR_INVALID
                return
            end if
            if (index_for_columns(t, col_names) > 0) then
                if (present(stat)) stat = SQR_DUP
                return
            end if
            ix%ncols = nc
            allocate(ix%columns(nc), ix%col_idx(nc), ix%key_off(nc))
            koff = 1
            members: do m = 1, nc
                ix%columns(m) = col_names(m)
                ix%col_idx(m) = col_index(t, col_names(m))
                if (ix%col_idx(m) == 0) then
                    if (present(stat)) stat = SQR_NOT_FOUND
                    return
                end if
                if (t%cols(ix%col_idx(m))%dtype == DT_TEXT) then
                    if (present(stat)) stat = SQR_INVALID
                    return
                end if
                dup_member: do p = 1, m - 1
                    if (ix%col_idx(p) == ix%col_idx(m)) then
                        if (present(stat)) stat = SQR_INVALID
                        return
                    end if
                end do dup_member
                ix%key_off(m) = koff
                koff = koff + t%cols(ix%col_idx(m))%csize
            end do members
            ix%key_size = koff - 1
            ix%nentries = 0
            ix%unique   = uniq

            allocate(new_idx(t%nindices + 1))
            new_idx(1:t%nindices) = t%indices(1:t%nindices)
            new_idx(t%nindices + 1) = ix
            call move_alloc(new_idx, t%indices)
            t%nindices = t%nindices + 1

            call rebuild_index(db, ti, t%nindices, rs)
            if (rs /= SQR_OK) then
                call drop_last_index(db, ti)
                if (present(stat)) stat = rs
                return
            end if

            ! A unique index must not be built over data that already has
            ! duplicate live keys; tear it back down and report SQR_DUP.
            if (uniq) then
                call has_dup_live_keys(db, ti, t%nindices, uniq, rs)
                if (rs /= SQR_OK) then
                    call drop_last_index(db, ti)
                    if (present(stat)) stat = rs
                    return
                end if
                if (uniq) then   ! reused as the "violation found" out-flag
                    call drop_last_index(db, ti)
                    if (present(stat)) stat = SQR_DUP
                    return
                end if
            end if

            call write_schema(db, t, rs)
            if (rs /= SQR_OK) then
                if (present(stat)) stat = rs
                return
            end if
            if (present(stat)) stat = SQR_OK
        end associate
    end subroutine

    ! Tear down the most recently appended (still in-memory, not yet
    ! schema-persisted) index of table ti: close + delete its file and
    ! shrink the index array. Used to roll back a failed create.
    subroutine drop_last_index(db, ti)
        type(db_t), intent(inout) :: db
        integer,    intent(in)    :: ti
        type(index_t), allocatable :: keep(:)
        integer :: s, u, ios
        associate (t => db%tables(ti))
            s = t%nindices
            ! File is deleted next — close without flushing meta.
            if (t%indices(s)%bt%unit /= -1) then
                close(t%indices(s)%bt%unit)
                t%indices(s)%bt%unit = -1
            end if
            open(newunit=u, file=index_path(db, t%name, s), status='old', iostat=ios)
            if (ios == 0) close(u, status='delete')
            allocate(keep(s - 1))
            keep(1:s-1) = t%indices(1:s-1)
            call move_alloc(keep, t%indices)
            t%nindices = s - 1
        end associate
    end subroutine

    ! First live row whose indexed key equals `key` (B+-tree lower-bound
    ! seek, then forward over the equal-key run skipping dead rows).
    ! row_id = 0 / SQR_NOT_FOUND if none.
    subroutine index_find(db, ti, j, key, row_id, stat)
        type(db_t),       intent(inout) :: db
        integer,          intent(in)    :: ti, j
        character(len=*), intent(in)    :: key
        integer(int32),   intent(out)   :: row_id
        integer,          intent(out)   :: stat
        integer :: bs, ios
        integer(int32) :: rid
        logical :: ok
        character(len=:), allocatable :: ckey, rbuf
        type(kc_ctx_t) :: cx
        type(bt_cursor_t) :: cur
        row_id = 0
        stat   = SQR_NOT_FOUND
        associate (t => db%tables(ti), ix => db%tables(ti)%indices(j))
            allocate(character(len=ix%key_size) :: ckey)
            allocate(character(len=t%record_size) :: rbuf)
            cx = make_kc_ctx(t, ix)
            call bt_seek(ix%bt, key, bt_key_cmp, cx, cur, bs)
            if (bs /= BT_OK) then
                stat = SQR_ERR
                return
            end if
            scan: do
                call bt_next(ix%bt, cur, ckey, rid, ok, bs)
                if (bs /= BT_OK) then
                    stat = SQR_ERR
                    return
                end if
                if (.not. ok) exit scan
                if (key_cmp_ix(t, ix, ckey, key) /= 0) exit scan
                read(t%unit, rec=rid, iostat=ios) rbuf
                call io_check(ios)
                if (ios == 0 .and. row_status(rbuf) == ROW_ALIVE) then
                    row_id = rid
                    stat = SQR_OK
                    return
                end if
            end do scan
        end associate
    end subroutine

    module subroutine db_find_by_int(db, table_name, col_name, key, row_id, stat)
        class(db_t),       intent(inout)        :: db
        character(len=*), intent(in)           :: table_name
        character(len=*), intent(in)           :: col_name
        integer(int32),   intent(in)           :: key
        integer(int32),   intent(out)          :: row_id
        integer,          intent(out), optional :: stat
        integer :: ti, j, rs
        character(len=4) :: kbuf
        row_id = 0
        ti = db_table_index(db, table_name)
        if (ti == 0) then
            if (present(stat)) stat = SQR_NOT_FOUND
            return
        end if
        j = index_index(db%tables(ti), col_name)
        if (j == 0) then
            if (present(stat)) stat = SQR_NOT_FOUND
            return
        end if
        if (leading_dtype(db%tables(ti), db%tables(ti)%indices(j)) /= DT_INT) then
            if (present(stat)) stat = SQR_NOT_FOUND   ! wrong overload for this index
            return
        end if
        kbuf = transfer(key, kbuf)
        call index_find(db, ti, j, kbuf, row_id, rs)
        if (present(stat)) stat = rs
    end subroutine

    ! Exact bit-for-bit equality — see the interface comment in sqr.f90.
    module subroutine db_find_by_real(db, table_name, col_name, key, row_id, stat)
        class(db_t),       intent(inout)        :: db
        character(len=*), intent(in)           :: table_name
        character(len=*), intent(in)           :: col_name
        real(real64),     intent(in)           :: key
        integer(int32),   intent(out)          :: row_id
        integer,          intent(out), optional :: stat
        integer :: ti, j, rs
        character(len=8) :: kbuf
        row_id = 0
        ti = db_table_index(db, table_name)
        if (ti == 0) then
            if (present(stat)) stat = SQR_NOT_FOUND
            return
        end if
        j = index_index(db%tables(ti), col_name)
        if (j == 0) then
            if (present(stat)) stat = SQR_NOT_FOUND
            return
        end if
        if (leading_dtype(db%tables(ti), db%tables(ti)%indices(j)) /= DT_REAL) then
            if (present(stat)) stat = SQR_NOT_FOUND   ! wrong overload for this index
            return
        end if
        ! A NaN key is never stored (rejected on write) and key_cmp's </>
        ! comparison would treat it as equal to every stored real, so it could
        ! return an unrelated row. It can match nothing — say so.
        if (ieee_is_nan(key)) then
            if (present(stat)) stat = SQR_NOT_FOUND
            return
        end if
        kbuf = transfer(key, kbuf)
        call index_find(db, ti, j, kbuf, row_id, rs)
        if (present(stat)) stat = rs
    end subroutine

    module subroutine db_find_by_char(db, table_name, col_name, key, row_id, stat)
        class(db_t),       intent(inout)        :: db
        character(len=*), intent(in)           :: table_name
        character(len=*), intent(in)           :: col_name
        character(len=*), intent(in)           :: key
        integer(int32),   intent(out)          :: row_id
        integer,          intent(out), optional :: stat
        integer :: ti, j, rs, ks, nc
        character(len=:), allocatable :: kbuf
        row_id = 0
        ti = db_table_index(db, table_name)
        if (ti == 0) then
            if (present(stat)) stat = SQR_NOT_FOUND
            return
        end if
        j = index_index(db%tables(ti), col_name)
        if (j == 0) then
            if (present(stat)) stat = SQR_NOT_FOUND
            return
        end if
        if (leading_dtype(db%tables(ti), db%tables(ti)%indices(j)) /= DT_CHAR) then
            if (present(stat)) stat = SQR_NOT_FOUND   ! wrong overload for this index
            return
        end if
        ks = db%tables(ti)%indices(j)%key_size
        ! A key longer than the column can hold could never have been stored
        ! (row_set_char would truncate it), so it matches nothing — say so
        ! rather than silently truncating the search key to ks and matching a
        ! shorter stored value. Trailing blanks are insignificant padding.
        if (len_trim(key) > ks) then
            if (present(stat)) stat = SQR_NOT_FOUND
            return
        end if
        nc = min(ks, len(key))
        allocate(character(len=ks) :: kbuf)
        kbuf              = repeat(char(0), ks)
        kbuf(1:nc) = key(1:nc)
        call index_find(db, ti, j, kbuf, row_id, rs)
        if (present(stat)) stat = rs
    end subroutine

    ! ===== Ordered cursor / range queries =====

    ! Resolve (table, column) to an index that can order/range by col_name:
    ! an exact single-column index if one exists (unchanged behaviour), else
    ! the first index whose LEADING member is col_name. A composite index's
    ! B+-tree key order is primarily by its leading member, so a range or scan
    ! over that member is a prefix scan needing no redundant single-column
    ! index (review item 5.2). SQR_NOT_FOUND if neither exists.
    subroutine find_leading_index(db, table_name, col_name, ti, j, stat)
        type(db_t),       intent(in)  :: db
        character(len=*), intent(in)  :: table_name, col_name
        integer,          intent(out) :: ti, j
        integer,          intent(out) :: stat
        integer :: k
        j  = 0
        ti = db_table_index(db, table_name)
        if (ti == 0) then
            stat = SQR_NOT_FOUND
            return
        end if
        j = index_index(db%tables(ti), col_name)   ! exact single-column first
        if (j == 0) then
            associate (t => db%tables(ti))
                scan: do k = 1, t%nindices
                    if (.not. idx_live(t%indices(k))) cycle scan
                    if (trim(t%indices(k)%columns(1)) == trim(col_name)) then
                        j = k
                        exit scan
                    end if
                end do scan
            end associate
        end if
        stat = merge(SQR_OK, SQR_NOT_FOUND, j /= 0)
    end subroutine

    pure function leading_dtype(t, ix) result(dt)
        type(table_t), intent(in) :: t
        type(index_t), intent(in) :: ix
        integer :: dt
        dt = t%cols(ix%col_idx(1))%dtype
    end function

    ! Byte image of the minimum / maximum value of a key member of the given
    ! dtype and width, used to fill the trailing members of a leading-column
    ! range bound so the full-key band [lokey,hikey] spans "any value" for those
    ! members in the index's member-by-member order (key_cmp): signed int32,
    ! real64 (-inf / +inf; NaN is already excluded from the index), or
    ! byte-lexicographic DT_CHAR. Not pure (ieee_value is not guaranteed pure),
    ! but only ever called from the non-pure range setup.
    function member_min(dtype, width) result(bytes)
        integer, intent(in)  :: dtype, width
        character(len=width)  :: bytes
        integer(int32) :: iv
        real(real64)   :: rv
        select case (dtype)
        case (DT_INT)
            iv = -huge(0_int32) - 1_int32          ! int32 minimum
            bytes = transfer(iv, bytes)
        case (DT_REAL)
            rv = ieee_value(rv, ieee_negative_inf)
            bytes = transfer(rv, bytes)
        case default                               ! DT_CHAR
            bytes = repeat(char(0), width)
        end select
    end function

    function member_max(dtype, width) result(bytes)
        integer, intent(in)  :: dtype, width
        character(len=width)  :: bytes
        integer(int32) :: iv
        real(real64)   :: rv
        select case (dtype)
        case (DT_INT)
            iv = huge(0_int32)                     ! int32 maximum
            bytes = transfer(iv, bytes)
        case (DT_REAL)
            rv = ieee_value(rv, ieee_positive_inf)
            bytes = transfer(rv, bytes)
        case default                               ! DT_CHAR
            bytes = repeat(char(255), width)
        end select
    end function

    ! Compose full-width lo/hi key bounds for a range on the LEADING member of
    ! index ix: the caller's leading-member bytes go in slot 1; every trailing
    ! member is filled with its typed minimum (lo) / maximum (hi) so the full-key
    ! band [lokey,hikey] is exactly {rows : lo <= leading <= hi}, whatever the
    ! trailing members hold. Reuses the existing full-key cursor machinery.
    subroutine build_leading_bounds(t, ix, lobytes, hibytes, lokey, hikey)
        type(table_t),                 intent(in)  :: t
        type(index_t),                 intent(in)  :: ix
        character(len=*),              intent(in)  :: lobytes, hibytes
        character(len=:), allocatable, intent(out) :: lokey, hikey
        integer :: m, lo, hi, w, dt
        allocate(character(len=ix%key_size) :: lokey, hikey)
        w = t%cols(ix%col_idx(1))%csize            ! leading member at key offset 1
        lokey(1:w) = lobytes(1:w)
        hikey(1:w) = hibytes(1:w)
        fill: do m = 2, ix%ncols
            lo = ix%key_off(m)
            w  = t%cols(ix%col_idx(m))%csize
            hi = lo + w - 1
            dt = t%cols(ix%col_idx(m))%dtype
            lokey(lo:hi) = member_min(dt, w)
            hikey(lo:hi) = member_max(dt, w)
        end do fill
    end subroutine

    module subroutine db_open_cursor(db, table_name, col_name, cur, stat)
        class(db_t),        intent(inout)         :: db
        character(len=*),  intent(in)            :: table_name, col_name
        type(db_cursor_t), intent(out)           :: cur
        integer,           intent(out), optional :: stat
        integer :: ti, j, rs, bs
        call find_leading_index(db, table_name, col_name, ti, j, rs)
        if (rs /= SQR_OK) then
            if (present(stat)) stat = rs
            return
        end if
        cur%ti      = ti
        cur%j       = j
        cur%bounded = .false.
        cur%gen     = db%generation
        call bt_first(db%tables(ti)%indices(j)%bt, cur%bt, bs)
        cur%active = bs == BT_OK
        if (present(stat)) stat = sqr_of_bt(bs)
    end subroutine

    ! Shared opener for the typed db_find_range_* wrappers. lokey/hikey are
    ! the index key bytes for the inclusive [lo,hi] bounds; seeks to the first
    ! key >= lokey and records hikey so db_cursor_next stops once a yielded
    ! key orders after it. lo > hi simply yields nothing (the first key >= lo
    ! already orders after hi).
    subroutine open_range(db, ti, j, lokey, hikey, cur, stat)
        type(db_t),        intent(inout) :: db
        integer,           intent(in)    :: ti, j
        character(len=*),  intent(in)    :: lokey, hikey
        type(db_cursor_t), intent(out)   :: cur
        integer,           intent(out)   :: stat
        integer :: bs
        type(kc_ctx_t) :: cx
        cur%ti      = ti
        cur%j       = j
        cur%bounded = .true.
        cur%hikey   = hikey
        cur%gen     = db%generation
        cx = make_kc_ctx(db%tables(ti), db%tables(ti)%indices(j))
        call bt_seek(db%tables(ti)%indices(j)%bt, lokey, bt_key_cmp, cx, cur%bt, bs)
        cur%active = bs == BT_OK
        stat = sqr_of_bt(bs)
    end subroutine

    module subroutine db_find_range_int(db, table_name, col_name, lo, hi, cur, stat)
        class(db_t),        intent(inout)         :: db
        character(len=*),  intent(in)            :: table_name, col_name
        integer(int32),    intent(in)            :: lo, hi
        type(db_cursor_t), intent(out)           :: cur
        integer,           intent(out), optional :: stat
        integer :: ti, j, rs
        character(len=4) :: lb, hb
        character(len=:), allocatable :: lk, hk
        call find_leading_index(db, table_name, col_name, ti, j, rs)
        if (rs /= SQR_OK) then
            if (present(stat)) stat = rs
            return
        end if
        associate (t => db%tables(ti), ix => db%tables(ti)%indices(j))
            if (leading_dtype(t, ix) /= DT_INT) then   ! wrong overload for this index
                if (present(stat)) stat = SQR_NOT_FOUND
                return
            end if
            lb = transfer(lo, lb)
            hb = transfer(hi, hb)
            call build_leading_bounds(t, ix, lb, hb, lk, hk)
        end associate
        call open_range(db, ti, j, lk, hk, cur, rs)
        if (present(stat)) stat = rs
    end subroutine

    module subroutine db_find_range_real(db, table_name, col_name, lo, hi, cur, stat)
        class(db_t),        intent(inout)         :: db
        character(len=*),  intent(in)            :: table_name, col_name
        real(real64),      intent(in)            :: lo, hi
        type(db_cursor_t), intent(out)           :: cur
        integer,           intent(out), optional :: stat
        integer :: ti, j, rs
        character(len=8) :: lb, hb
        character(len=:), allocatable :: lk, hk
        call find_leading_index(db, table_name, col_name, ti, j, rs)
        if (rs /= SQR_OK) then
            if (present(stat)) stat = rs
            return
        end if
        associate (t => db%tables(ti), ix => db%tables(ti)%indices(j))
            if (leading_dtype(t, ix) /= DT_REAL) then
                if (present(stat)) stat = SQR_NOT_FOUND
                return
            end if
            ! A NaN bound is unordered against every stored real (key_cmp's </>
            ! both read false), so it cannot define a band. Reject it rather than
            ! seek on a nonsensical range.
            if (ieee_is_nan(lo) .or. ieee_is_nan(hi)) then
                if (present(stat)) stat = SQR_INVALID
                return
            end if
            lb = transfer(lo, lb)
            hb = transfer(hi, hb)
            call build_leading_bounds(t, ix, lb, hb, lk, hk)
        end associate
        call open_range(db, ti, j, lk, hk, cur, rs)
        if (present(stat)) stat = rs
    end subroutine

    module subroutine db_find_range_char(db, table_name, col_name, lo, hi, cur, stat)
        class(db_t),        intent(inout)         :: db
        character(len=*),  intent(in)            :: table_name, col_name
        character(len=*),  intent(in)            :: lo, hi
        type(db_cursor_t), intent(out)           :: cur
        integer,           intent(out), optional :: stat
        integer :: ti, j, rs, lw
        character(len=:), allocatable :: lb, hb, lk, hk
        call find_leading_index(db, table_name, col_name, ti, j, rs)
        if (rs /= SQR_OK) then
            if (present(stat)) stat = rs
            return
        end if
        associate (t => db%tables(ti), ix => db%tables(ti)%indices(j))
            if (leading_dtype(t, ix) /= DT_CHAR) then
                if (present(stat)) stat = SQR_NOT_FOUND
                return
            end if
            lw = t%cols(ix%col_idx(1))%csize       ! leading char member width
            ! A bound longer than the column would be silently truncated,
            ! narrowing or widening the band to something the caller did not ask
            ! for; reject it instead. Trailing blanks are insignificant padding.
            if (len_trim(lo) > lw .or. len_trim(hi) > lw) then
                if (present(stat)) stat = SQR_INVALID
                return
            end if
            allocate(character(len=lw) :: lb, hb)
            lb = repeat(char(0), lw)
            hb = repeat(char(0), lw)
            lb(1:min(lw, len(lo))) = lo(1:min(lw, len(lo)))
            hb(1:min(lw, len(hi))) = hi(1:min(lw, len(hi)))
            call build_leading_bounds(t, ix, lb, hb, lk, hk)
        end associate
        call open_range(db, ti, j, lk, hk, cur, rs)
        if (present(stat)) stat = rs
    end subroutine

    ! Pull the next live row at/after the cursor in ascending key order,
    ! skipping tombstoned rows (lazy delete leaves their index entries) and
    ! stopping at the band's upper bound. The same seek+forward idiom as
    ! index_find, but yielding one row per call rather than the first match.
    module subroutine db_cursor_next(db, cur, row_id, buf, ok, stat)
        class(db_t),        intent(inout)         :: db
        type(db_cursor_t), intent(inout)         :: cur
        integer(int32),    intent(out)           :: row_id
        character(len=*),  intent(out)           :: buf
        logical,           intent(out)           :: ok
        integer,           intent(out), optional :: stat
        integer :: bs, ios
        integer(int32) :: rid
        logical :: got
        character(len=:), allocatable :: ckey, rbuf
        row_id = 0
        ok     = .false.
        ! A mutating call (or close) since this cursor was opened may have
        ! shifted or freed table slots; cur%ti/cur%j would then address the
        ! wrong table or run off the array. Detect it instead of risking UB.
        if (.not. db%opened) then
            if (present(stat)) stat = SQR_INVALID
            return
        end if
        if (cur%gen /= db%generation) then
            cur%active = .false.
            if (present(stat)) stat = SQR_INVALID
            return
        end if
        if (.not. cur%active) then
            if (present(stat)) stat = SQR_OK
            return
        end if
        associate (t => db%tables(cur%ti), ix => db%tables(cur%ti)%indices(cur%j))
            allocate(character(len=ix%key_size)  :: ckey)
            allocate(character(len=t%record_size) :: rbuf)
            scan: do
                call bt_next(ix%bt, cur%bt, ckey, rid, got, bs)
                if (bs /= BT_OK) then
                    cur%active = .false.
                    if (present(stat)) stat = SQR_ERR
                    return
                end if
                if (.not. got) then
                    cur%active = .false.
                    if (present(stat)) stat = SQR_OK
                    return
                end if
                if (cur%bounded) then
                    if (key_cmp_ix(t, ix, ckey, cur%hikey) > 0) then
                        cur%active = .false.
                        if (present(stat)) stat = SQR_OK
                        return
                    end if
                end if
                read(t%unit, rec=rid, iostat=ios) rbuf
                call io_check(ios)
                if (ios /= 0) then
                    cur%active = .false.
                    if (present(stat)) stat = SQR_ERR
                    return
                end if
                if (row_status(rbuf) == ROW_ALIVE) then
                    row_id = rid
                    buf    = rbuf
                    ok     = .true.
                    if (present(stat)) stat = SQR_OK
                    return
                end if
            end do scan
        end associate
    end subroutine

    ! ===== Natural-key (by unique composite index) operations =====

    ! Resolve (table, member columns, key-bearing row buffer) to a live
    ! row_id via the matching UNIQUE index. SQR_NOT_FOUND if the table or a
    ! matching index is absent or no live row carries the key; SQR_INVALID
    ! if the matching index is not unique (a by-key op needs a single row).
    subroutine resolve_by_key(db, table_name, col_names, keyrow, ti, row_id, stat)
        type(db_t),       intent(inout) :: db
        character(len=*), intent(in)    :: table_name
        character(len=*), intent(in)    :: col_names(:)
        character(len=*), intent(in)    :: keyrow
        integer,          intent(out)   :: ti
        integer(int32),   intent(out)   :: row_id
        integer,          intent(out)   :: stat
        integer :: j
        character(len=:), allocatable :: key
        row_id = 0
        ti = db_table_index(db, table_name)
        if (ti == 0) then
            stat = SQR_NOT_FOUND
            return
        end if
        associate (t => db%tables(ti))
            j = index_for_columns(t, col_names)
            if (j == 0) then
                stat = SQR_NOT_FOUND
                return
            end if
            if (.not. t%indices(j)%unique) then
                stat = SQR_INVALID
                return
            end if
            allocate(character(len=t%indices(j)%key_size) :: key)
            call extract_key(t, t%indices(j), keyrow, key)
            call index_find(db, ti, j, key, row_id, stat)
        end associate
    end subroutine

    module subroutine db_get_by_key(db, table_name, col_names, keyrow, buf, stat, row_id)
        class(db_t),       intent(inout)         :: db
        character(len=*), intent(in)            :: table_name
        character(len=*), intent(in)            :: col_names(:)
        character(len=*), intent(in)            :: keyrow
        character(len=*), intent(out)           :: buf
        integer,          intent(out), optional :: stat
        integer(int32),   intent(out), optional :: row_id
        integer :: ti, rs
        integer(int32) :: rid
        if (present(row_id)) row_id = 0
        call resolve_by_key(db, table_name, col_names, keyrow, ti, rid, rs)
        if (rs /= SQR_OK) then
            if (present(stat)) stat = rs
            return
        end if
        call db_get(db, table_name, rid, buf, stat)
        if (present(row_id)) row_id = rid
    end subroutine

    module subroutine db_update_by_key(db, table_name, col_names, keyrow, newrow, stat)
        class(db_t),       intent(inout)         :: db
        character(len=*), intent(in)            :: table_name
        character(len=*), intent(in)            :: col_names(:)
        character(len=*), intent(in)            :: keyrow
        character(len=*), intent(in)            :: newrow
        integer,          intent(out), optional :: stat
        integer :: ti, rs
        integer(int32) :: rid
        if (readonly_block(db, stat)) return
        call resolve_by_key(db, table_name, col_names, keyrow, ti, rid, rs)
        if (rs /= SQR_OK) then
            if (present(stat)) stat = rs
            return
        end if
        call db_update(db, table_name, rid, newrow, stat)
    end subroutine

    module subroutine db_delete_by_key(db, table_name, col_names, keyrow, stat)
        class(db_t),       intent(inout)         :: db
        character(len=*), intent(in)            :: table_name
        character(len=*), intent(in)            :: col_names(:)
        character(len=*), intent(in)            :: keyrow
        integer,          intent(out), optional :: stat
        integer :: ti, rs
        integer(int32) :: rid
        if (readonly_block(db, stat)) return
        call resolve_by_key(db, table_name, col_names, keyrow, ti, rid, rs)
        if (rs /= SQR_OK) then
            if (present(stat)) stat = rs
            return
        end if
        call db_delete(db, table_name, rid, stat)
    end subroutine

end submodule sqr_index
