OfflineDatabase

OfflineDatabase abstract class API reference

OfflineDatabase

Abstract interface for local database operations. Implement this class to connect Talon to your local database (sqflite, Drift, Hive, etc.).

Overview

OfflineDatabase handles:

  • Storing messages for sync tracking
  • Applying changes to your data tables
  • Conflict resolution using HLC timestamps
  • Tracking sync progress

Abstract Methods to Implement

init

Future<void> init()

Initialize your database. Create tables here, including the messages table.

@override
Future<void> init() async {
  _db = await openDatabase(
    join(await getDatabasesPath(), 'app.db'),
    version: 1,
    onCreate: (db, version) async {
      await db.execute(TalonSchema.messagesTableSql);
      await db.execute('''
        CREATE TABLE todos (
          id TEXT PRIMARY KEY,
          name TEXT NOT NULL,
          is_done INTEGER NOT NULL DEFAULT 0
        )
      ''');
    },
  );
}

applyMessageToLocalDataTable

Future<bool> applyMessageToLocalDataTable(Message message)

Apply a change to your actual data table. This is where you UPDATE or INSERT the value.

@override
Future<bool> applyMessageToLocalDataTable(Message message) async {
  // Check if row exists
  final existing = await _db.query(
    message.table,
    where: 'id = ?',
    whereArgs: [message.row],
  );

  if (existing.isEmpty) {
    // Row doesn't exist yet - create it
    await _db.insert(message.table, {
      'id': message.row,
      message.column: message.typedValue,
    });
  } else {
    // Update existing row
    await _db.update(
      message.table,
      {message.column: message.typedValue},
      where: 'id = ?',
      whereArgs: [message.row],
    );
  }

  return true;
}

applyMessageToLocalMessageTable

Future<bool> applyMessageToLocalMessageTable(Message message)

Store a message in the messages tracking table.

@override
Future<bool> applyMessageToLocalMessageTable(Message message) async {
  await _db.insert(
    'talon_messages',
    message.toMap(),
    conflictAlgorithm: ConflictAlgorithm.replace,
  );
  return true;
}

getExistingTimestamp

Future<String?> getExistingTimestamp({
  required String table,
  required String row,
  required String column,
})

Get the most recent HLC timestamp for a specific cell. Used for conflict resolution.

@override
Future<String?> getExistingTimestamp({
  required String table,
  required String row,
  required String column,
}) async {
  final result = await _db.query(
    'talon_messages',
    columns: ['local_timestamp'],
    where: 'table_name = ? AND row = ? AND "column" = ?',
    whereArgs: [table, row, column],
    orderBy: 'local_timestamp DESC',
    limit: 1,
  );

  if (result.isEmpty) return null;
  return result.first['local_timestamp'] as String?;
}

saveLastSyncedServerTimestamp

Future<void> saveLastSyncedServerTimestamp(int serverTimestamp)

Store the highest server timestamp received. Used for incremental sync.

@override
Future<void> saveLastSyncedServerTimestamp(int serverTimestamp) async {
  await _db.insert(
    'talon_metadata',
    {'key': 'lastServerTimestamp', 'value': serverTimestamp.toString()},
    conflictAlgorithm: ConflictAlgorithm.replace,
  );
}

readLastSyncedServerTimestamp

Future<int?> readLastSyncedServerTimestamp()

Read the last synced server timestamp.

@override
Future<int?> readLastSyncedServerTimestamp() async {
  final result = await _db.query(
    'talon_metadata',
    where: 'key = ?',
    whereArgs: ['lastServerTimestamp'],
  );

  if (result.isEmpty) return null;
  return int.tryParse(result.first['value'] as String);
}

getUnsyncedMessages

Future<List<Message>> getUnsyncedMessages()

Get all messages that haven't been synced to the server yet.

@override
Future<List<Message>> getUnsyncedMessages() async {
  final result = await _db.query(
    'talon_messages',
    where: 'hasBeenSynced = ?',
    whereArgs: [0],
    orderBy: 'local_timestamp ASC',
  );

  return result.map((row) => Message.fromMap(row)).toList();
}

markMessagesAsSynced

Future<void> markMessagesAsSynced(List<String> syncedMessageIds)

Mark messages as successfully synced to the server.

@override
Future<void> markMessagesAsSynced(List<String> syncedMessageIds) async {
  if (syncedMessageIds.isEmpty) return;

  final placeholders = List.filled(syncedMessageIds.length, '?').join(',');
  await _db.rawUpdate(
    'UPDATE talon_messages SET hasBeenSynced = 1 WHERE id IN ($placeholders)',
    syncedMessageIds,
  );
}

Built-in Methods (Don't Override)

shouldApplyMessage

Future<bool> shouldApplyMessage(Message message)

Determines if a message should be applied based on HLC comparison. Uses getExistingTimestamp internally.

Returns true if:

  • No existing message for this cell, OR
  • New message has a higher HLC timestamp

saveMessageFromServer

Future<bool> saveMessageFromServer(Message message)

Process a message from the server:

  1. Always saves to message table
  2. Applies to data table only if it wins conflict resolution

saveMessagesFromServer

Future<void> saveMessagesFromServer(List<Message> messages)

Process multiple server messages. Updates the last synced timestamp after all messages are processed.

saveMessageFromLocalChange

Future<bool> saveMessageFromLocalChange(Message message)

Process a local change:

  1. Applies to data table
  2. Saves to message table for sync tracking

Complete Implementation Example

import 'package:sqflite/sqflite.dart';
import 'package:talon/talon.dart';

class SqliteOfflineDatabase extends OfflineDatabase {
  late Database _db;

  @override
  Future<void> init() async {
    _db = await openDatabase(
      join(await getDatabasesPath(), 'app.db'),
      version: 1,
      onCreate: (db, version) async {
        // Create messages table
        await db.execute(TalonSchema.messagesTableSql);

        // Create metadata table for sync tracking
        await db.execute('''
          CREATE TABLE talon_metadata (
            key TEXT PRIMARY KEY,
            value TEXT NOT NULL
          )
        ''');

        // Create your data tables
        await db.execute('''
          CREATE TABLE todos (
            id TEXT PRIMARY KEY,
            name TEXT NOT NULL,
            is_done INTEGER NOT NULL DEFAULT 0,
            created_at TEXT,
            updated_at TEXT
          )
        ''');
      },
    );
  }

  @override
  Future<bool> applyMessageToLocalDataTable(Message message) async {
    final existing = await _db.query(
      message.table,
      where: 'id = ?',
      whereArgs: [message.row],
    );

    if (existing.isEmpty) {
      await _db.insert(message.table, {
        'id': message.row,
        message.column: message.typedValue,
      });
    } else {
      await _db.update(
        message.table,
        {message.column: message.typedValue},
        where: 'id = ?',
        whereArgs: [message.row],
      );
    }
    return true;
  }

  @override
  Future<bool> applyMessageToLocalMessageTable(Message message) async {
    await _db.insert(
      'talon_messages',
      message.toMap(),
      conflictAlgorithm: ConflictAlgorithm.replace,
    );
    return true;
  }

  @override
  Future<String?> getExistingTimestamp({
    required String table,
    required String row,
    required String column,
  }) async {
    final result = await _db.query(
      'talon_messages',
      columns: ['local_timestamp'],
      where: 'table_name = ? AND row = ? AND "column" = ?',
      whereArgs: [table, row, column],
      orderBy: 'local_timestamp DESC',
      limit: 1,
    );
    if (result.isEmpty) return null;
    return result.first['local_timestamp'] as String?;
  }

  @override
  Future<void> saveLastSyncedServerTimestamp(int serverTimestamp) async {
    await _db.insert(
      'talon_metadata',
      {'key': 'lastServerTimestamp', 'value': serverTimestamp.toString()},
      conflictAlgorithm: ConflictAlgorithm.replace,
    );
  }

  @override
  Future<int?> readLastSyncedServerTimestamp() async {
    final result = await _db.query(
      'talon_metadata',
      where: 'key = ?',
      whereArgs: ['lastServerTimestamp'],
    );
    if (result.isEmpty) return null;
    return int.tryParse(result.first['value'] as String);
  }

  @override
  Future<List<Message>> getUnsyncedMessages() async {
    final result = await _db.query(
      'talon_messages',
      where: 'hasBeenSynced = ?',
      whereArgs: [0],
      orderBy: 'local_timestamp ASC',
    );
    return result.map((row) => Message.fromMap(row)).toList();
  }

  @override
  Future<void> markMessagesAsSynced(List<String> syncedMessageIds) async {
    if (syncedMessageIds.isEmpty) return;
    final placeholders = List.filled(syncedMessageIds.length, '?').join(',');
    await _db.rawUpdate(
      'UPDATE talon_messages SET hasBeenSynced = 1 WHERE id IN ($placeholders)',
      syncedMessageIds,
    );
  }
}

Important Notes

  1. Conflict Resolution: You don't implement conflict resolution - just implement getExistingTimestamp and Talon handles the rest.

  2. Column Quoting: SQLite requires "column" to be quoted since it's a reserved word.

  3. Message Storage: Always store messages in the message table, even if they lose conflict resolution. This maintains sync history.

  4. Error Handling: If applyMessageToLocalDataTable fails (e.g., table doesn't exist), the message is still saved for tracking.