ServerDatabase
ServerDatabase abstract class API reference
ServerDatabase
Abstract interface for server-side database operations. Implement this class to connect Talon to your backend (Supabase, Firebase, custom API, etc.).
Overview
ServerDatabase handles:
- Uploading local changes to the server
- Downloading changes from other clients
- Real-time subscriptions for live updates
getMessagesFromServer
Future<List<Message>> getMessagesFromServer({
required int? lastSyncedServerTimestamp,
required String clientId,
required String userId,
})
Fetch messages from the server that haven't been synced locally.
| Parameter | Description |
|---|---|
lastSyncedServerTimestamp | Only fetch messages newer than this |
clientId | Exclude messages from this client (they're local) |
userId | Only fetch this user's messages |
@override
Future<List<Message>> getMessagesFromServer({
required int? lastSyncedServerTimestamp,
required String clientId,
required String userId,
}) async {
final response = await supabase
.from('messages')
.select()
.eq('user_id', userId)
.neq('client_id', clientId)
.gt('server_timestamp', lastSyncedServerTimestamp ?? 0)
.order('server_timestamp', ascending: true);
return (response as List)
.map((row) => Message.fromMap(row))
.toList();
}
sendMessageToServer
Future<bool> sendMessageToServer({required Message message})
Send a single message to the server. Returns true if successful.
@override
Future<bool> sendMessageToServer({required Message message}) async {
try {
final data = message.toMap()
..remove('hasBeenApplied')
..remove('hasBeenSynced');
await supabase.from('messages').insert(data);
return true;
} catch (e) {
return false;
}
}
subscribeToServerMessages
StreamSubscription subscribeToServerMessages({
required String clientId,
required String userId,
required int? lastSyncedServerTimestamp,
required void Function(List<Message>) onMessagesReceived,
})
Subscribe to real-time server updates. Return a StreamSubscription that Talon can cancel.
@override
StreamSubscription subscribeToServerMessages({
required String clientId,
required String userId,
required int? lastSyncedServerTimestamp,
required void Function(List<Message>) onMessagesReceived,
}) {
return supabase
.from('messages')
.stream(primaryKey: ['id'])
.eq('user_id', userId)
.listen((List<Map<String, dynamic>> data) {
final messages = data
.where((row) => row['client_id'] != clientId)
.where((row) =>
(row['server_timestamp'] ?? 0) > (lastSyncedServerTimestamp ?? 0))
.map((row) => Message.fromMap(row))
.toList();
if (messages.isNotEmpty) {
onMessagesReceived(messages);
}
});
}
sendMessagesToServer
Future<List<String>> sendMessagesToServer({
required List<Message> messages,
})
Send multiple messages in a batch. Override for better performance.
Default implementation: Calls sendMessageToServer for each message sequentially.
Optimized implementation:
@override
Future<List<String>> sendMessagesToServer({
required List<Message> messages,
}) async {
if (messages.isEmpty) return [];
try {
final data = messages.map((m) {
final map = m.toMap();
map.remove('hasBeenApplied');
map.remove('hasBeenSynced');
return map;
}).toList();
await supabase.from('messages').insert(data);
return messages.map((m) => m.id).toList();
} catch (e) {
// Batch failed - fall back to individual sends
return super.sendMessagesToServer(messages: messages);
}
}
Supabase
import 'dart:async';
import 'package:supabase_flutter/supabase_flutter.dart';
import 'package:talon/talon.dart';
class SupabaseServerDatabase extends ServerDatabase {
final SupabaseClient _client;
SupabaseServerDatabase(this._client);
@override
Future<List<Message>> getMessagesFromServer({
required int? lastSyncedServerTimestamp,
required String clientId,
required String userId,
}) async {
final response = await _client
.from('messages')
.select()
.eq('user_id', userId)
.neq('client_id', clientId)
.gt('server_timestamp', lastSyncedServerTimestamp ?? 0)
.order('server_timestamp', ascending: true);
return (response as List)
.map((row) => Message.fromMap(row))
.toList();
}
@override
Future<bool> sendMessageToServer({required Message message}) async {
try {
final data = message.toMap()
..remove('hasBeenApplied')
..remove('hasBeenSynced');
await _client.from('messages').insert(data);
return true;
} catch (e) {
return false;
}
}
@override
Future<List<String>> sendMessagesToServer({
required List<Message> messages,
}) async {
if (messages.isEmpty) return [];
try {
final data = messages.map((m) {
final map = m.toMap();
map.remove('hasBeenApplied');
map.remove('hasBeenSynced');
return map;
}).toList();
await _client.from('messages').insert(data);
return messages.map((m) => m.id).toList();
} catch (e) {
return super.sendMessagesToServer(messages: messages);
}
}
@override
StreamSubscription subscribeToServerMessages({
required String clientId,
required String userId,
required int? lastSyncedServerTimestamp,
required void Function(List<Message>) onMessagesReceived,
}) {
return _client
.from('messages')
.stream(primaryKey: ['id'])
.eq('user_id', userId)
.listen((List<Map<String, dynamic>> data) {
final messages = data
.where((row) => row['client_id'] != clientId)
.where((row) =>
(row['server_timestamp'] ?? 0) >
(lastSyncedServerTimestamp ?? 0))
.map((row) => Message.fromMap(row))
.toList();
if (messages.isNotEmpty) {
onMessagesReceived(messages);
}
});
}
}
Firebase Firestore
import 'dart:async';
import 'package:cloud_firestore/cloud_firestore.dart';
import 'package:talon/talon.dart';
class FirebaseServerDatabase extends ServerDatabase {
final FirebaseFirestore _firestore;
FirebaseServerDatabase(this._firestore);
@override
Future<List<Message>> getMessagesFromServer({
required int? lastSyncedServerTimestamp,
required String clientId,
required String userId,
}) async {
var query = _firestore
.collection('messages')
.where('user_id', isEqualTo: userId)
.where('server_timestamp',
isGreaterThan: lastSyncedServerTimestamp ?? 0)
.orderBy('server_timestamp');
final snapshot = await query.get();
return snapshot.docs
.map((doc) => doc.data())
.where((data) => data['client_id'] != clientId)
.map((data) => Message.fromMap(data))
.toList();
}
@override
Future<bool> sendMessageToServer({required Message message}) async {
try {
final data = message.toMap()
..remove('hasBeenApplied')
..remove('hasBeenSynced')
..['server_timestamp'] = FieldValue.serverTimestamp();
await _firestore.collection('messages').doc(message.id).set(data);
return true;
} catch (e) {
return false;
}
}
@override
StreamSubscription subscribeToServerMessages({
required String clientId,
required String userId,
required int? lastSyncedServerTimestamp,
required void Function(List<Message>) onMessagesReceived,
}) {
return _firestore
.collection('messages')
.where('user_id', isEqualTo: userId)
.where('server_timestamp',
isGreaterThan: lastSyncedServerTimestamp ?? 0)
.snapshots()
.listen((snapshot) {
final messages = snapshot.docChanges
.where((change) => change.type == DocumentChangeType.added)
.map((change) => change.doc.data()!)
.where((data) => data['client_id'] != clientId)
.map((data) => Message.fromMap(data))
.toList();
if (messages.isNotEmpty) {
onMessagesReceived(messages);
}
});
}
}
Custom REST API
import 'dart:async';
import 'dart:convert';
import 'package:http/http.dart' as http;
import 'package:talon/talon.dart';
class RestServerDatabase extends ServerDatabase {
final String baseUrl;
final String authToken;
Timer? _pollingTimer;
StreamController<List<Message>>? _controller;
RestServerDatabase({
required this.baseUrl,
required this.authToken,
});
@override
Future<List<Message>> getMessagesFromServer({
required int? lastSyncedServerTimestamp,
required String clientId,
required String userId,
}) async {
final response = await http.get(
Uri.parse('$baseUrl/messages').replace(queryParameters: {
'user_id': userId,
'after': (lastSyncedServerTimestamp ?? 0).toString(),
'exclude_client': clientId,
}),
headers: {'Authorization': 'Bearer $authToken'},
);
if (response.statusCode != 200) return [];
final List<dynamic> data = json.decode(response.body);
return data.map((row) => Message.fromMap(row)).toList();
}
@override
Future<bool> sendMessageToServer({required Message message}) async {
final data = message.toMap()
..remove('hasBeenApplied')
..remove('hasBeenSynced');
final response = await http.post(
Uri.parse('$baseUrl/messages'),
headers: {
'Authorization': 'Bearer $authToken',
'Content-Type': 'application/json',
},
body: json.encode(data),
);
return response.statusCode == 201;
}
@override
StreamSubscription subscribeToServerMessages({
required String clientId,
required String userId,
required int? lastSyncedServerTimestamp,
required void Function(List<Message>) onMessagesReceived,
}) {
_controller = StreamController<List<Message>>();
int lastTs = lastSyncedServerTimestamp ?? 0;
// Poll every 5 seconds
_pollingTimer = Timer.periodic(Duration(seconds: 5), (_) async {
final messages = await getMessagesFromServer(
lastSyncedServerTimestamp: lastTs,
clientId: clientId,
userId: userId,
);
if (messages.isNotEmpty) {
lastTs = messages
.map((m) => m.serverTimestamp ?? 0)
.reduce((a, b) => a > b ? a : b);
_controller?.add(messages);
}
});
return _controller!.stream.listen(onMessagesReceived);
}
void dispose() {
_pollingTimer?.cancel();
_controller?.close();
}
}
Important Notes
-
Exclude Own Messages: Always filter out messages from the current
clientId- they're already local. -
Server Timestamp: The server should assign
server_timestamp. Use auto-incrementing IDs or database timestamps. -
Error Handling: Return
falsefromsendMessageToServeron failure. Talon will retry on next sync. -
Batching: Override
sendMessagesToServerfor significant performance improvements with batch inserts. -
Real-time: The subscription should filter messages client-side if your backend doesn't support server-side filtering.