! sqr_admin — table maintenance operations for the sqr module.
!
! Descendant of `sqr_base`: it inherits the storage/engine core (key compare
! and extraction, uniqueness and duplicate-key walks, the B+-tree bulk
! rebuild, schema I/O) by host association and carries no `use` of its own.
! These are the heavier whole-table operations that sit beside the per-row
! API in sqr_record and the lookup API in sqr_index: dropping a secondary
! index (db_drop_index_1/m), batched all-or-nothing insert with one packed
! reindex per index (db_insert_many), and the consistency checker that walks
! every index against the data file (db_verify, verify_one_index).

submodule (sqr:sqr_base) sqr_admin
    implicit none

contains

    ! ===== Drop index / batch insert / verify =====

    module subroutine db_drop_index_1(db, table_name, col_name, stat)
        class(db_t),       intent(inout)         :: db
        character(len=*), intent(in)            :: table_name
        character(len=*), intent(in)            :: col_name
        integer,          intent(out), optional :: stat
        character(len=len(col_name)) :: one(1)   ! named 1-elt array: no constructor temp
        one(1) = col_name
        call db_drop_index_m(db, table_name, one, stat)
    end subroutine

    module subroutine db_drop_index_m(db, table_name, col_names, stat)
        class(db_t),       intent(inout)         :: db
        character(len=*), intent(in)            :: table_name
        character(len=*), intent(in)            :: col_names(:)
        integer,          intent(out), optional :: stat
        integer :: ti, j, u, ios, rs
        if (readonly_block(db, stat)) return
        if (txn_block(db, stat)) return
        db%generation = db%generation + 1   ! structural change: invalidate cursors
        ti = db_table_index(db, table_name)
        if (ti == 0) then
            if (present(stat)) stat = SQR_NOT_FOUND
            return
        end if
        associate (t => db%tables(ti))
            j = index_for_columns(t, col_names)
            if (j == 0) then
                if (present(stat)) stat = SQR_NOT_FOUND
                return
            end if
            ! Tombstone the slot (ncols = 0) rather than removing it: the
            ! __i<slot> file names of surviving indices stay valid and a later
            ! db_create_index simply appends a fresh slot. Close the tree first.
            associate (ix => t%indices(j))
                if (ix%bt%unit /= -1) then
                    close(ix%bt%unit)        ! file is deleted below — no meta flush
                    ix%bt%unit = -1
                end if
                ix%ncols    = 0
                ix%key_size = 0
                ix%nentries = 0
                ix%unique   = .false.
                if (allocated(ix%columns)) deallocate(ix%columns)
                if (allocated(ix%col_idx)) deallocate(ix%col_idx)
                if (allocated(ix%key_off)) deallocate(ix%key_off)
            end associate
            ! Persist the tombstone BEFORE deleting the file (catalog-first
            ! discipline, as in db_drop_table): if write_schema fails the file
            ! and its still-live slot are intact; a crash after it leaves an
            ! orphaned file the dead slot ignores, not a live slot with no file.
            call write_schema(db, t, rs)
            if (rs /= SQR_OK) then
                if (present(stat)) stat = rs
                return
            end if
            open(newunit=u, file=index_path(db, t%name, j), status='old', iostat=ios)
            if (ios == 0) close(u, status='delete')
        end associate
        if (present(stat)) stat = SQR_OK
    end subroutine

    ! Auto-commit bracket: the whole batch is one implicit transaction, so a
    ! mid-batch I/O failure or a failed packed reindex rolls every row back —
    ! the table returns to its exact pre-call state rather than keeping the rows
    ! written so far.  A no-op when an explicit transaction is already in flight
    ! (that scope's commit/rollback then decides) or on a read-only handle (the
    ! core reports SQR_READONLY).
    module subroutine db_insert_many(db, table_name, bufs, row_ids, stat)
        class(db_t),      intent(inout)         :: db
        character(len=*), intent(in)            :: table_name
        character(len=*), intent(in)            :: bufs(:)
        integer(int32),   intent(out)           :: row_ids(:)
        integer,          intent(out), optional :: stat
        integer :: rs
        logical :: owns
        call ac_begin(db, owns, rs)
        if (rs == SQR_OK) call insert_many_core(db, table_name, bufs, row_ids, rs)
        call ac_end(db, owns, rs)
        if (rs /= SQR_OK) row_ids = 0   ! atomic: a rolled-back batch leaves no rows
        if (present(stat)) stat = rs
    end subroutine

    subroutine insert_many_core(db, table_name, bufs, row_ids, stat)
        class(db_t),      intent(inout)         :: db
        character(len=*), intent(in)            :: table_name
        character(len=*), intent(in)            :: bufs(:)
        integer(int32),   intent(out)           :: row_ids(:)
        integer,          intent(out), optional :: stat
        integer :: ti, n, k, j, ci, ios, rs
        integer(int32) :: rid
        character(len=:), allocatable :: wrows(:)
        n = size(bufs)
        row_ids = 0
        if (readonly_block(db, stat)) return
        db%generation = db%generation + 1   ! write: invalidate cursors
        ti = db_table_index(db, table_name)
        if (ti == 0) then
            if (present(stat)) stat = SQR_NOT_FOUND
            return
        end if
        if (size(row_ids) < n) then
            if (present(stat)) stat = SQR_INVALID
            return
        end if
        if (n == 0) then
            if (present(stat)) stat = SQR_OK
            return
        end if
        associate (t => db%tables(ti))
            ! Build the padded row images once (status alive, text descriptors
            ! zeroed), mirroring db_insert's per-row preparation.
            allocate(character(len=t%record_size) :: wrows(n))
            build: do k = 1, n
                wrows(k) = bufs(k)(1:min(len(bufs(k)), t%record_size))
                if (len(bufs(k)) < t%record_size) &
                    wrows(k)(len(bufs(k))+1:) = char(0)
                call row_set_status(wrows(k), ROW_ALIVE)
                do ci = 1, t%ncols
                    if (t%cols(ci)%dtype == DT_TEXT) &
                        call row_set_text_desc(wrows(k), t%cols(ci), 0_int64, 0_int32)
                end do
            end do build

            ! Validate the WHOLE batch before writing anything: reject NaN keys
            ! and, for unique indices, any key that collides with the existing
            ! index or with an earlier batch row. A failure here leaves the table
            ! untouched (all-or-nothing on validation). NULL-member rows are not
            ! indexed, so they neither carry a NaN key nor constrain uniqueness.
            validate: do j = 1, t%nindices
                if (.not. idx_live(t%indices(j))) cycle validate
                associate (ix => t%indices(j))
                    check_index: block
                        character(len=:), allocatable :: bkeys(:)
                        logical, allocatable :: bnull(:)
                        logical :: viol
                        integer :: p
                        allocate(character(len=ix%key_size) :: bkeys(n))
                        allocate(bnull(n))
                        keys_pass: do k = 1, n
                            bnull(k) = key_has_null(t, ix, wrows(k))
                            if (bnull(k)) cycle keys_pass
                            call extract_key(t, ix, wrows(k), bkeys(k))
                            if (key_has_nan(t, ix, bkeys(k))) then
                                if (present(stat)) stat = SQR_INVALID
                                return
                            end if
                        end do keys_pass
                        if (ix%unique) then
                            uniq_pass: do k = 1, n
                                if (bnull(k)) cycle uniq_pass
                                call unique_violation(db, ti, j, bkeys(k), 0_int32, viol, rs)
                                if (rs /= SQR_OK) then
                                    if (present(stat)) stat = rs
                                    return
                                end if
                                if (viol) then
                                    if (present(stat)) stat = SQR_DUP
                                    return
                                end if
                                ! Intra-batch: an earlier non-NULL row, same key.
                                batch_dup: do p = 1, k - 1
                                    if (bnull(p)) cycle batch_dup
                                    if (key_cmp_ix(t, ix, bkeys(p), bkeys(k)) == 0) then
                                        if (present(stat)) stat = SQR_DUP
                                        return
                                    end if
                                end do batch_dup
                            end do uniq_pass
                        end if
                    end block check_index
                end associate
            end do validate

            ! Every row appends at next_id (the data file's high-water), so the
            ! batch only ever grows the file: one EXTEND undo whose rollback
            ! truncates away every row this txn appended.  Logged once before the
            ! loop (jrnl_log_extend is idempotent per path anyway).
            if (db%jrnl%active) then
                call jrnl_log_extend(db, data_relpath(t%name), rs)
                if (rs /= SQR_OK) then
                    if (present(stat)) stat = rs
                    return
                end if
            end if

            ! Write every row.  Under the auto-commit bracket (or an explicit
            ! txn) the EXTEND above makes a mid-batch I/O failure roll the whole
            ! batch back; bare and un-journalled it degrades to the old weak
            ! guarantee (rows written so far stay).
            write_rows: do k = 1, n
                rid = t%next_id
                write(t%unit, rec=rid, iostat=ios) wrows(k)
                call io_check(ios)
                if (ios /= 0) then
                    if (present(stat)) stat = SQR_ERR
                    return
                end if
                row_ids(k)   = rid
                t%next_id    = t%next_id + 1
                t%live_count = t%live_count + 1
            end do write_rows

            ! Deferred index maintenance: one packed rebuild per live index over
            ! the now-complete data file, instead of a per-row tree insert.
            reindex: do j = 1, t%nindices
                if (.not. idx_live(t%indices(j))) cycle reindex
                call rebuild_index(db, ti, j, rs)
                if (rs /= SQR_OK) then
                    if (present(stat)) stat = rs
                    return
                end if
            end do reindex
        end associate
        if (present(stat)) stat = SQR_OK
    end subroutine

    module subroutine db_verify(db, table_name, stat, errmsg)
        class(db_t),       intent(inout)           :: db
        character(len=*), intent(in)              :: table_name
        integer,          intent(out),  optional  :: stat
        character(len=*), intent(inout), optional :: errmsg
        integer :: ti, j, ios, recount, vrs
        integer(int32) :: rid
        integer(int64) :: fsize
        character(len=:), allocatable :: rbuf
        character(len=128) :: detail
        vrs = SQR_OK
        detail = ''
        ti = db_table_index(db, table_name)
        if (ti == 0) then
            if (present(stat)) stat = SQR_NOT_FOUND
            if (present(errmsg)) errmsg = 'no such table: ' // trim(table_name)
            return
        end if
        verify: block
            associate (t => db%tables(ti))
                allocate(character(len=t%record_size) :: rbuf)
                ! (1) Live-row recount and data-file extent.
                recount = 0
                scan_rows: do rid = 1, t%next_id - 1
                    read(t%unit, rec=rid, iostat=ios) rbuf
                    call io_check(ios)
                    if (ios /= 0) then
                        vrs = SQR_ERR; detail = 'cannot read data record'
                        exit verify
                    end if
                    if (row_status(rbuf) == ROW_ALIVE) recount = recount + 1
                end do scan_rows
                if (recount /= t%live_count) then
                    vrs = SQR_INVALID; detail = 'live_count disagrees with row recount'
                    exit verify
                end if
                inquire(unit=t%unit, size=fsize)
                if (fsize < int(t%next_id - 1, int64) * int(t%record_size, int64)) then
                    vrs = SQR_INVALID; detail = 'data file shorter than next_id implies'
                    exit verify
                end if
                ! (2) Each live index agrees with the data: every entry over a
                ! live row carries that row's current key (no stale entry), and
                ! the count of such entries equals the live rows to be indexed.
                verify_idx: do j = 1, t%nindices
                    if (.not. idx_live(t%indices(j))) cycle verify_idx
                    call verify_one_index(db, ti, j, rbuf, vrs, detail)
                    if (vrs /= SQR_OK) exit verify
                end do verify_idx
            end associate
        end block verify
        if (present(stat))   stat = vrs
        if (present(errmsg) .and. vrs /= SQR_OK) errmsg = trim(detail)
    end subroutine

    ! Check one index against the table data: walk the tree, and for every entry
    ! pointing at a LIVE row confirm the row's extracted key equals the entry key
    ! (catches a stale entry left by an interrupted update or a crash-overwrite),
    ! and confirm the number of such matched entries equals the live rows that
    ! ought to be indexed (catches a missing entry). Unique indices additionally
    ! must have no duplicate live keys. rs is SQR_OK / SQR_INVALID / SQR_ERR.
    subroutine verify_one_index(db, ti, j, rbuf, rs, detail)
        type(db_t),       intent(inout) :: db
        integer,          intent(in)    :: ti, j
        character(len=*), intent(inout) :: rbuf
        integer,          intent(out)   :: rs
        character(len=*), intent(inout) :: detail
        integer :: bs, ios, expected, matched
        integer(int32) :: rid
        logical :: ok, dup
        character(len=:), allocatable :: ckey, rkey
        type(bt_cursor_t) :: cur
        rs = SQR_OK
        associate (t => db%tables(ti), ix => db%tables(ti)%indices(j))
            ! Live rows that should be indexed (no NULL member of this index).
            expected = 0
            count_live: do rid = 1, t%next_id - 1
                read(t%unit, rec=rid, iostat=ios) rbuf
                call io_check(ios)
                if (ios /= 0) then
                    rs = SQR_ERR; detail = 'cannot read data record'
                    return
                end if
                if (row_status(rbuf) /= ROW_ALIVE) cycle count_live
                if (.not. key_has_null(t, ix, rbuf)) expected = expected + 1
            end do count_live
            ! Walk the index; check live-row entries against the stored row.
            allocate(character(len=ix%key_size) :: ckey, rkey)
            call bt_first(ix%bt, cur, bs)
            if (bs /= BT_OK) then
                rs = SQR_ERR; detail = 'cannot read index'
                return
            end if
            matched = 0
            walk: do
                call bt_next(ix%bt, cur, ckey, rid, ok, bs)
                if (bs /= BT_OK) then
                    rs = SQR_ERR; detail = 'cannot read index'
                    return
                end if
                if (.not. ok) exit walk
                read(t%unit, rec=rid, iostat=ios) rbuf
                call io_check(ios)
                if (ios /= 0) then
                    rs = SQR_ERR; detail = 'cannot read data record'
                    return
                end if
                if (row_status(rbuf) /= ROW_ALIVE) cycle walk   ! lazy-delete leftover
                call extract_key(t, ix, rbuf, rkey)
                if (key_cmp_ix(t, ix, rkey, ckey) /= 0) then
                    rs = SQR_INVALID; detail = 'stale index entry: key disagrees with row'
                    return
                end if
                matched = matched + 1
            end do walk
            if (matched /= expected) then
                rs = SQR_INVALID; detail = 'index entry count disagrees with live rows'
                return
            end if
            if (ix%unique) then
                dup = .true.
                call has_dup_live_keys(db, ti, j, dup, ios)
                if (ios /= SQR_OK) then
                    rs = ios; detail = 'cannot walk index for duplicate check'
                    return
                end if
                if (dup) then
                    rs = SQR_INVALID; detail = 'duplicate live keys in a unique index'
                    return
                end if
            end if
        end associate
    end subroutine

end submodule sqr_admin
