ServerDatabase
Implementing the ServerDatabase interface
ServerDatabase
The ServerDatabase interface connects Talon to your backend.
Interface Overview
abstract class ServerDatabase {
// Pull changes from server
Future<List<Message>> getMessagesFromServer({
required int? lastSyncedServerTimestamp,
required String clientId,
required String userId,
});
// Push single change
Future<bool> sendMessageToServer({required Message message});
// Push batch (optional override)
Future<List<String>> sendMessagesToServer({required List<Message> messages});
// Real-time subscription
StreamSubscription subscribeToServerMessages({
required String clientId,
required String userId,
required int? lastSyncedServerTimestamp,
required void Function(List<Message>) onMessagesReceived,
});
}
Complete Implementation (Supabase)
import 'dart:async';
import 'package:supabase_flutter/supabase_flutter.dart';
import 'package:talon/talon.dart';
class SupabaseServerDatabase extends ServerDatabase {
final SupabaseClient _supabase;
SupabaseServerDatabase(this._supabase);
@override
Future<List<Message>> getMessagesFromServer({
required int? lastSyncedServerTimestamp,
required String clientId,
required String userId,
}) async {
try {
var query = _supabase
.from('messages')
.select()
.eq('user_id', userId)
.neq('client_id', clientId); // Exclude own messages
if (lastSyncedServerTimestamp != null) {
query = query.gt('server_timestamp', lastSyncedServerTimestamp);
}
final response = await query.order('server_timestamp', ascending: true);
return response.map<Message>((row) => _messageFromRow(row)).toList();
} catch (e) {
print('Error fetching from server: $e');
return [];
}
}
@override
Future<bool> sendMessageToServer({required Message message}) async {
try {
await _supabase.from('messages').insert(_messageToRow(message));
return true;
} catch (e) {
print('Error sending to server: $e');
return false;
}
}
@override
Future<List<String>> sendMessagesToServer({
required List<Message> messages,
}) async {
if (messages.isEmpty) return [];
try {
// Batch insert
final rows = messages.map((m) => _messageToRow(m)).toList();
await _supabase.from('messages').insert(rows);
return messages.map((m) => m.id).toList();
} catch (e) {
// Fall back to individual sends
return super.sendMessagesToServer(messages: messages);
}
}
@override
StreamSubscription subscribeToServerMessages({
required String clientId,
required String userId,
required int? lastSyncedServerTimestamp,
required void Function(List<Message>) onMessagesReceived,
}) {
return _supabase
.from('messages')
.stream(primaryKey: ['id'])
.eq('user_id', userId)
.listen((rows) {
// Filter out own messages and already-synced messages
final newMessages = rows
.where((row) => row['client_id'] != clientId)
.where((row) {
if (lastSyncedServerTimestamp == null) return true;
final ts = row['server_timestamp'] as int?;
return ts != null && ts > lastSyncedServerTimestamp;
})
.map<Message>((row) => _messageFromRow(row))
.toList();
if (newMessages.isNotEmpty) {
onMessagesReceived(newMessages);
}
});
}
// Helper: Convert database row to Message
Message _messageFromRow(Map<String, dynamic> row) {
return Message(
id: row['id'] as String,
table: row['table_name'] as String,
row: row['row'] as String,
column: row['column'] as String,
dataType: row['data_type'] as String? ?? '',
value: row['value'] as String,
serverTimestamp: row['server_timestamp'] as int?,
localTimestamp: row['local_timestamp'] as String,
userId: row['user_id'] as String,
clientId: row['client_id'] as String,
hasBeenApplied: false,
hasBeenSynced: true,
);
}
// Helper: Convert Message to database row
Map<String, dynamic> _messageToRow(Message message) {
return {
'id': message.id,
'table_name': message.table,
'row': message.row,
'column': message.column,
'data_type': message.dataType,
'value': message.value,
'local_timestamp': message.localTimestamp,
'user_id': message.userId,
'client_id': message.clientId,
// server_timestamp is auto-generated
};
}
}
getMessagesFromServer()
Fetches messages from the server for incremental sync.
Key requirements:
- Filter by
userId(only this user's data) - Exclude
clientId(don't fetch own messages) - Filter by
lastSyncedServerTimestamp(only new messages)
@override
Future<List<Message>> getMessagesFromServer({
required int? lastSyncedServerTimestamp,
required String clientId,
required String userId,
}) async {
// SQL equivalent:
// SELECT * FROM messages
// WHERE user_id = :userId
// AND client_id != :clientId
// AND server_timestamp > :lastSyncedServerTimestamp
// ORDER BY server_timestamp ASC
}
sendMessageToServer()
Sends a single message to the server.
@override
Future<bool> sendMessageToServer({required Message message}) async {
try {
// Don't send hasBeenApplied and hasBeenSynced - those are local
await api.post('/messages', body: {
'id': message.id,
'table_name': message.table,
'row': message.row,
'column': message.column,
'data_type': message.dataType,
'value': message.value,
'local_timestamp': message.localTimestamp,
'user_id': message.userId,
'client_id': message.clientId,
});
return true;
} catch (e) {
return false;
}
}
sendMessagesToServer()
Batch send for efficiency. The default implementation calls sendMessageToServer() for each message.
@override
Future<List<String>> sendMessagesToServer({
required List<Message> messages,
}) async {
// Option 1: Override for batch insert (more efficient)
try {
await api.post('/messages/batch', body: messages.map(...).toList());
return messages.map((m) => m.id).toList();
} catch (e) {
// Option 2: Fall back to individual sends
return super.sendMessagesToServer(messages: messages);
}
}
subscribeToServerMessages()
Real-time subscription for live updates.
@override
StreamSubscription subscribeToServerMessages({
required String clientId,
required String userId,
required int? lastSyncedServerTimestamp,
required void Function(List<Message>) onMessagesReceived,
}) {
// Set up your real-time listener
// Filter: user_id = userId, client_id != clientId
// Call onMessagesReceived when new messages arrive
}
The subscription must filter out messages from the current client to avoid loops.
Error Handling
@override
Future<bool> sendMessageToServer({required Message message}) async {
try {
await _supabase.from('messages').insert(data);
return true;
} on PostgrestException catch (e) {
// Handle database errors
print('Database error: ${e.message}');
return false;
} on SocketException catch (e) {
// Handle network errors
print('Network error: $e');
return false;
} catch (e) {
// Handle other errors
print('Unknown error: $e');
return false;
}
}
Next Steps
- Database Schema - Server-side schema
- Supabase Example - Full Supabase setup