ServerDatabase

Implementing the ServerDatabase interface

ServerDatabase

The ServerDatabase interface connects Talon to your backend.

Interface Overview

abstract class ServerDatabase {
  // Pull changes from server
  Future<List<Message>> getMessagesFromServer({
    required int? lastSyncedServerTimestamp,
    required String clientId,
    required String userId,
  });

  // Push single change
  Future<bool> sendMessageToServer({required Message message});

  // Push batch (optional override)
  Future<List<String>> sendMessagesToServer({required List<Message> messages});

  // Real-time subscription
  StreamSubscription subscribeToServerMessages({
    required String clientId,
    required String userId,
    required int? lastSyncedServerTimestamp,
    required void Function(List<Message>) onMessagesReceived,
  });
}

Complete Implementation (Supabase)

import 'dart:async';
import 'package:supabase_flutter/supabase_flutter.dart';
import 'package:talon/talon.dart';

class SupabaseServerDatabase extends ServerDatabase {
  final SupabaseClient _supabase;

  SupabaseServerDatabase(this._supabase);

  @override
  Future<List<Message>> getMessagesFromServer({
    required int? lastSyncedServerTimestamp,
    required String clientId,
    required String userId,
  }) async {
    try {
      var query = _supabase
          .from('messages')
          .select()
          .eq('user_id', userId)
          .neq('client_id', clientId);  // Exclude own messages

      if (lastSyncedServerTimestamp != null) {
        query = query.gt('server_timestamp', lastSyncedServerTimestamp);
      }

      final response = await query.order('server_timestamp', ascending: true);

      return response.map<Message>((row) => _messageFromRow(row)).toList();
    } catch (e) {
      print('Error fetching from server: $e');
      return [];
    }
  }

  @override
  Future<bool> sendMessageToServer({required Message message}) async {
    try {
      await _supabase.from('messages').insert(_messageToRow(message));
      return true;
    } catch (e) {
      print('Error sending to server: $e');
      return false;
    }
  }

  @override
  Future<List<String>> sendMessagesToServer({
    required List<Message> messages,
  }) async {
    if (messages.isEmpty) return [];

    try {
      // Batch insert
      final rows = messages.map((m) => _messageToRow(m)).toList();
      await _supabase.from('messages').insert(rows);
      return messages.map((m) => m.id).toList();
    } catch (e) {
      // Fall back to individual sends
      return super.sendMessagesToServer(messages: messages);
    }
  }

  @override
  StreamSubscription subscribeToServerMessages({
    required String clientId,
    required String userId,
    required int? lastSyncedServerTimestamp,
    required void Function(List<Message>) onMessagesReceived,
  }) {
    return _supabase
        .from('messages')
        .stream(primaryKey: ['id'])
        .eq('user_id', userId)
        .listen((rows) {
          // Filter out own messages and already-synced messages
          final newMessages = rows
              .where((row) => row['client_id'] != clientId)
              .where((row) {
                if (lastSyncedServerTimestamp == null) return true;
                final ts = row['server_timestamp'] as int?;
                return ts != null && ts > lastSyncedServerTimestamp;
              })
              .map<Message>((row) => _messageFromRow(row))
              .toList();

          if (newMessages.isNotEmpty) {
            onMessagesReceived(newMessages);
          }
        });
  }

  // Helper: Convert database row to Message
  Message _messageFromRow(Map<String, dynamic> row) {
    return Message(
      id: row['id'] as String,
      table: row['table_name'] as String,
      row: row['row'] as String,
      column: row['column'] as String,
      dataType: row['data_type'] as String? ?? '',
      value: row['value'] as String,
      serverTimestamp: row['server_timestamp'] as int?,
      localTimestamp: row['local_timestamp'] as String,
      userId: row['user_id'] as String,
      clientId: row['client_id'] as String,
      hasBeenApplied: false,
      hasBeenSynced: true,
    );
  }

  // Helper: Convert Message to database row
  Map<String, dynamic> _messageToRow(Message message) {
    return {
      'id': message.id,
      'table_name': message.table,
      'row': message.row,
      'column': message.column,
      'data_type': message.dataType,
      'value': message.value,
      'local_timestamp': message.localTimestamp,
      'user_id': message.userId,
      'client_id': message.clientId,
      // server_timestamp is auto-generated
    };
  }
}

Method Details

getMessagesFromServer()

Fetches messages from the server for incremental sync.

Key requirements:

  • Filter by userId (only this user's data)
  • Exclude clientId (don't fetch own messages)
  • Filter by lastSyncedServerTimestamp (only new messages)
@override
Future<List<Message>> getMessagesFromServer({
  required int? lastSyncedServerTimestamp,
  required String clientId,
  required String userId,
}) async {
  // SQL equivalent:
  // SELECT * FROM messages
  // WHERE user_id = :userId
  //   AND client_id != :clientId
  //   AND server_timestamp > :lastSyncedServerTimestamp
  // ORDER BY server_timestamp ASC
}

sendMessageToServer()

Sends a single message to the server.

@override
Future<bool> sendMessageToServer({required Message message}) async {
  try {
    // Don't send hasBeenApplied and hasBeenSynced - those are local
    await api.post('/messages', body: {
      'id': message.id,
      'table_name': message.table,
      'row': message.row,
      'column': message.column,
      'data_type': message.dataType,
      'value': message.value,
      'local_timestamp': message.localTimestamp,
      'user_id': message.userId,
      'client_id': message.clientId,
    });
    return true;
  } catch (e) {
    return false;
  }
}

sendMessagesToServer()

Batch send for efficiency. The default implementation calls sendMessageToServer() for each message.

@override
Future<List<String>> sendMessagesToServer({
  required List<Message> messages,
}) async {
  // Option 1: Override for batch insert (more efficient)
  try {
    await api.post('/messages/batch', body: messages.map(...).toList());
    return messages.map((m) => m.id).toList();
  } catch (e) {
    // Option 2: Fall back to individual sends
    return super.sendMessagesToServer(messages: messages);
  }
}

subscribeToServerMessages()

Real-time subscription for live updates.

@override
StreamSubscription subscribeToServerMessages({
  required String clientId,
  required String userId,
  required int? lastSyncedServerTimestamp,
  required void Function(List<Message>) onMessagesReceived,
}) {
  // Set up your real-time listener
  // Filter: user_id = userId, client_id != clientId
  // Call onMessagesReceived when new messages arrive
}

The subscription must filter out messages from the current client to avoid loops.

Error Handling

@override
Future<bool> sendMessageToServer({required Message message}) async {
  try {
    await _supabase.from('messages').insert(data);
    return true;
  } on PostgrestException catch (e) {
    // Handle database errors
    print('Database error: ${e.message}');
    return false;
  } on SocketException catch (e) {
    // Handle network errors
    print('Network error: $e');
    return false;
  } catch (e) {
    // Handle other errors
    print('Unknown error: $e');
    return false;
  }
}

Backend Examples

See complete implementations for:

Next Steps