DSNodeServer
DSNodeServer
HTTP server wrapper for DSNode that handles incoming network requests.
from language_pipes.distributed_state_network import DSNodeServerClass Definition
class DSNodeServer: config: DSNodeConfig node: DSNode http_server: ThreadingHTTPServer running: bool thread: threading.ThreadConstructor
Parameters:
config(DSNodeConfig): Node configurationdisconnect_callback(Optional[Callable]): Callback for disconnect eventsupdate_callback(Optional[Callable]): Callback for state update eventsreceive_callback(Optional[Callable]): Callback for data transfer from one node to another
Static Methods
Start() -> DSNodeServer
Creates and starts a new DSNodeServer instance with a threaded HTTP server based on BaseHTTPRequestHandler
server = DSNodeServer.start(config)Parameters:
config(DSNodeConfig): Node configurationdisconnect_callback(Optional[Callable]): Callback for disconnect events (no parameters)update_callback(Optional[Callable]): Callback for state update events (no parameters)receive_callback(Optional[Callable]): Callback for data transfer between nodes
Returns:
DSNodeServer: Running server instance
Example with bootstrap:
# Bootstrap node (first node in network)bootstrap_config = DSNodeConfig( node_id="bootstrap", port=8000, bootstrap_nodes=[])bootstrap = DSNodeServer.start(bootstrap_config)
# Connector node (joins existing network)connector_config = DSNodeConfig( node_id="connector", port=8001, bootstrap_nodes=[Endpoint("127.0.0.1", 8000)])connector = DSNodeServer.start(connector_config)generate_key() -> str
Generates a new hexadecimal encoded AES-128 key for network encryption. All nodes in the same network must share the same AES key.
- Output length: 32 hex characters (16 bytes)
Parameters:
- None
Example:
DSNodeServer.generate_key()Instance Methods
stop() -> None
Gracefully shuts down the server and cleans up resources.
Example:
server.stop()update_data() -> None
Updates a key-value pair in the node’s state and broadcasts the update to all peers.
node.update_data("status", "active")Parameters:
key(str): State key to updateval(str): New value for the key
read_data() -> Optional[str]
Reads a value from a specific node’s state.
Parameters:
node_id(str): ID of the node to read fromkey(str): Key to retrieve
Returns:
Optional[str]: Value if exists, None otherwise
peers() -> List[str]
Returns a list of all connected peer node IDs.
Parameters:
- None
Returns:
List[str]: List of node IDs
send_to_node() -> None
Sends data to another node
node.send_to_node('node-1', b'foo bar')Parameters:
node_id(str): id of node to send data todata(bytes): data to send to node
Returns:
- None
is_shut_down() -> bool
Returns whether the server is currently shut down or not
node.is_shut_down()Parameters:
- None
Returns:
- bool denoting that the server is shutdown if true
node_id() -> str
Return the node_id of this node instance
node.node_id()Parameters:
- None
Returns:
- string denoting the node id of the current node
set_receive_cb() -> None
Set the receive callback, this function will be called whenever the node receives a data packet.
def receive(): passnode.set_receive_cb(receive)Parameters:
cb(Callable[[bytes], None]) a function that will be given the bytes of the data packet
Returns:
- None
set_update_cb() -> None
Set the update callback, this function will be called whenever a new update is made on the server.
def update_cb(): passnode.set_update_cb(update_cb)Parameters:
cb(Callable[[], None]) a function that receives no arguments
Returns:
- None
set_disconnect_cb() -> None
Set the disconnect callback, this function will be run whenever a node is disconnected from the network.
def disconnect(): passnode.set_disconnect_cb(disconnect)Parameters:
cb(Callable[[], None]) a function that takes no arguments
Returns:
- None
Network Protocol
The server uses UDP sockets with the following characteristics:
- Encryption: All packets can be encrypted with AES
- Authentication: ECDSA signatures for message verification
Message Types
The server handles four types of messages:
- HELLO (1): Node introduction and public key exchange
- PEERS (2): Request/response for peer list
- UPDATE (3): State synchronization
- PING (4): Connection health check
- DATA (5): Data transfer packet