Real-time data streams are the lifeblood of many modern applications, ranging from financial market tickers to IoT sensor networks. Processing these continuous streams efficiently requires software that can ingest raw data, parse it reliably, and pipe it into a persistent storage layer.
The socket-to-pg project is a lightweight Node.js microservice designed to do exactly this: it connects to a raw TCP socket, decodes an ASCII-based data stream, and continuously inserts the extracted metrics into a PostgreSQL database. Authored by @rpi1337, the application offers a clean, event-driven architecture using pure JavaScript.
Here is a deep dive into how it works.
The Challenge: Parsing Fragmented TCP Streams
When dealing with low-level TCP connections, data rarely arrives in perfect, individual packages. Instead, it comes in chunks that can arbitrarily split a single message in half.
The socket-to-pg application expects its incoming data over a TCP socket via Node’s native net module. The expected protocol is an ASCII string of messages separated by newlines. Each message features a timestamp (in nanoseconds since the UNIX epoch) and a numerical value enclosed in brackets, such as [1468009895549670789,397807].
To reliably parse this, the application implements a custom SocketProcessor class that inherits from Node’s built-in EventEmitter. When the processor receives a chunk of data, it iterates through it character by character.
- It uses an
inRecordflag that turns true when an opening bracket[is detected. - Incoming characters are pushed into a
recordBufferarray. - When a closing bracket
]is encountered, the buffered characters are joined together, and the buffer is cleared to prepare for the next message.
Once a fragment is successfully isolated, it must pass a regular expression validation check ([0-9][,]{1}[0-9]) to ensure the basic structure of numbers separated by a comma is met. Valid fragments are then emitted as a data event for the next stage of the pipeline.
Bridging the Gap: The Database Provider
Once the metrics are extracted from the raw socket stream, they need to be stored. For this, the application uses the popular pg library (version 8.7.1) and encapsulates its database logic within a DatabaseProvider class.
The database workflow is designed to be self-initializing:
- Connection: The provider connects to the PostgreSQL instance using configurations passed to the client.
-
Table Creation: At startup, it dynamically executes a
CREATE TABLEquery to ensure the target table is available. By default, this table is namedsocket_stream_valuesand requirestimestamp bigint NOT NULLandvalue integer NOT NULLfields. -
Data Insertion: When the
SocketProcessoremits a new piece of data, the main script splits the fragment by its comma to separate the timestamp from the value. ThewriteToTablemethod is then called, which safely constructs a parameterized query (like$1, $2) using a helper method named_prepareInsertVariables. This approach ensures that incoming parameters are strictly evaluated by the database engine, avoiding potential SQL injection attacks.
Configuration and Deployment
The project is structured to be portable and configurable via environment variables. It relies on the dotenv package to load parameters into process.env.
To deploy the script, a user must configure their .env file to include network parameters (SOCKET_HOST, SOCKET_PORT) as well as database credentials (DB_HOST, DB_PORT, DB_DATABASE, DB_USER, DB_PASSWORD, and optionally TABLE_NAME). Once the .env configuration is in place, the data pipeline is initialized by simply executing node index.js via the command line.
Through this combination of robust stream parsing and clean class-based separation of concerns, socket-to-pg demonstrates an efficient, native approach to building a real-time data ingestion pipeline in Node.js.