OfflineDatabase
Implementing the OfflineDatabase interface
OfflineDatabase
The OfflineDatabase interface connects Talon to your local database.
Interface Overview
abstract class OfflineDatabase {
// Setup
Future<void> init();
// Apply changes
Future<bool> applyMessageToLocalDataTable(Message message);
Future<bool> applyMessageToLocalMessageTable(Message message);
// Conflict detection
Future<String?> getExistingTimestamp({
required String table,
required String row,
required String column,
});
// Sync tracking
Future<List<Message>> getUnsyncedMessages();
Future<void> markMessagesAsSynced(List<String> syncedMessageIds);
Future<int?> readLastSyncedServerTimestamp();
Future<void> saveLastSyncedServerTimestamp(int serverTimestamp);
// Provided by Talon (don't override)
Future<bool> shouldApplyMessage(Message message);
Future<bool> saveMessageFromServer(Message message);
Future<void> saveMessagesFromServer(List<Message> messages);
Future<bool> saveMessageFromLocalChange(Message message);
}
Complete Implementation (sqflite)
import 'package:sqflite/sqflite.dart';
import 'package:talon/talon.dart';
import 'package:shared_preferences/shared_preferences.dart';
class SqliteOfflineDatabase extends OfflineDatabase {
late final Database _db;
late final SharedPreferences _prefs;
SqliteOfflineDatabase();
@override
Future<void> init() async {
_prefs = await SharedPreferences.getInstance();
final dbPath = await getDatabasesPath();
_db = await openDatabase(
'$dbPath/app.db',
version: 1,
onCreate: (db, version) async {
// Create Talon messages table
await db.execute(TalonSchema.messagesTableSql);
// Create your data tables
await db.execute('''
CREATE TABLE IF NOT EXISTS todos (
id TEXT PRIMARY KEY,
name TEXT NOT NULL DEFAULT '',
is_done INTEGER NOT NULL DEFAULT 0,
created_at TEXT
)
''');
},
);
}
@override
Future<bool> applyMessageToLocalDataTable(Message message) async {
try {
// Check if row exists
final existing = await _db.query(
message.table,
where: 'id = ?',
whereArgs: [message.row],
);
if (existing.isEmpty) {
// Insert new row with this column
await _db.insert(message.table, {
'id': message.row,
message.column: message.value,
});
} else {
// Update existing row
await _db.update(
message.table,
{message.column: message.value},
where: 'id = ?',
whereArgs: [message.row],
);
}
return true;
} catch (e) {
print('Error applying message: $e');
return false;
}
}
@override
Future<bool> applyMessageToLocalMessageTable(Message message) async {
try {
// Use INSERT OR REPLACE to handle duplicates
await _db.insert(
'talon_messages',
message.toMap(),
conflictAlgorithm: ConflictAlgorithm.replace,
);
return true;
} catch (e) {
print('Error storing message: $e');
return false;
}
}
@override
Future<String?> getExistingTimestamp({
required String table,
required String row,
required String column,
}) async {
final result = await _db.rawQuery('''
SELECT local_timestamp FROM talon_messages
WHERE table_name = ? AND row = ? AND "column" = ?
ORDER BY local_timestamp DESC
LIMIT 1
''', [table, row, column]);
if (result.isEmpty) return null;
return result.first['local_timestamp'] as String?;
}
@override
Future<List<Message>> getUnsyncedMessages() async {
final rows = await _db.query(
'talon_messages',
where: 'hasBeenSynced = 0',
orderBy: 'local_timestamp ASC',
);
return rows.map((r) => Message.fromMap(r)).toList();
}
@override
Future<void> markMessagesAsSynced(List<String> syncedMessageIds) async {
if (syncedMessageIds.isEmpty) return;
final placeholders = syncedMessageIds.map((_) => '?').join(',');
await _db.rawUpdate(
'UPDATE talon_messages SET hasBeenSynced = 1 WHERE id IN ($placeholders)',
syncedMessageIds,
);
}
@override
Future<int?> readLastSyncedServerTimestamp() async {
return _prefs.getInt('talon_last_synced_timestamp');
}
@override
Future<void> saveLastSyncedServerTimestamp(int serverTimestamp) async {
await _prefs.setInt('talon_last_synced_timestamp', serverTimestamp);
}
}
init()
Called once before using the database.
@override
Future<void> init() async {
// 1. Open your database
_db = await openDatabase('app.db');
// 2. Create messages table
await _db.execute(TalonSchema.messagesTableSql);
// 3. Create your data tables
await _db.execute('CREATE TABLE IF NOT EXISTS todos (...)');
}
applyMessageToLocalDataTable()
Applies a change to your actual data.
@override
Future<bool> applyMessageToLocalDataTable(Message message) async {
// message.table = 'todos'
// message.row = 'todo-123'
// message.column = 'name'
// message.value = 'Buy milk'
await _db.update(
message.table,
{message.column: message.value},
where: 'id = ?',
whereArgs: [message.row],
);
return true;
}
Handle both INSERT and UPDATE cases. The row might not exist yet if this is a creation message.
applyMessageToLocalMessageTable()
Stores the message for tracking.
@override
Future<bool> applyMessageToLocalMessageTable(Message message) async {
await _db.insert(
'talon_messages',
message.toMap(),
conflictAlgorithm: ConflictAlgorithm.replace,
);
return true;
}
getExistingTimestamp()
Returns the HLC timestamp of the most recent message for a cell.
@override
Future<String?> getExistingTimestamp({
required String table,
required String row,
required String column,
}) async {
final result = await _db.rawQuery('''
SELECT local_timestamp FROM talon_messages
WHERE table_name = ? AND row = ? AND "column" = ?
ORDER BY local_timestamp DESC
LIMIT 1
''', [table, row, column]);
if (result.isEmpty) return null;
return result.first['local_timestamp'] as String?;
}
This is used by shouldApplyMessage() to determine if an incoming message should overwrite existing data.
getUnsyncedMessages()
Returns messages that need to be sent to the server.
@override
Future<List<Message>> getUnsyncedMessages() async {
final rows = await _db.query(
'talon_messages',
where: 'hasBeenSynced = 0',
orderBy: 'local_timestamp ASC', // Oldest first
);
return rows.map((r) => Message.fromMap(r)).toList();
}
markMessagesAsSynced()
Called after messages are successfully sent to the server.
@override
Future<void> markMessagesAsSynced(List<String> syncedMessageIds) async {
if (syncedMessageIds.isEmpty) return;
final placeholders = syncedMessageIds.map((_) => '?').join(',');
await _db.rawUpdate(
'UPDATE talon_messages SET hasBeenSynced = 1 WHERE id IN ($placeholders)',
syncedMessageIds,
);
}
readLastSyncedServerTimestamp() / saveLastSyncedServerTimestamp()
Track sync progress to enable incremental sync.
@override
Future<int?> readLastSyncedServerTimestamp() async {
return _prefs.getInt('talon_last_synced_timestamp');
}
@override
Future<void> saveLastSyncedServerTimestamp(int serverTimestamp) async {
await _prefs.setInt('talon_last_synced_timestamp', serverTimestamp);
}
shouldApplyMessage()
// Talon's implementation:
Future<bool> shouldApplyMessage(Message message) async {
final existingTimestamp = await getExistingTimestamp(
table: message.table,
row: message.row,
column: message.column,
);
if (existingTimestamp == null) return true;
return HLC.compareTimestamps(message.localTimestamp, existingTimestamp) > 0;
}
saveMessageFromServer()
// Talon's implementation:
Future<bool> saveMessageFromServer(Message message) async {
await applyMessageToLocalMessageTable(message);
if (await shouldApplyMessage(message)) {
await applyMessageToLocalDataTable(message);
}
return true;
}
Next Steps
- ServerDatabase - Implement server connection
- Database Schema - Schema details