Building a Real-Time Data Pipeline: Streaming TCP Socket Data to PostgreSQL with Node.js

Real-time data streams are the lifeblood of many modern applications, ranging from financial market tickers to IoT sensor networks. Processing these continuous streams efficiently requires software that can ingest raw data, parse it reliably, and pipe it into a persistent storage layer.

The socket-to-pg project is a lightweight Node.js microservice designed to do exactly this: it connects to a raw TCP socket, decodes an ASCII-based data stream, and continuously inserts the extracted metrics into a PostgreSQL database. Authored by @rpi1337, the application offers a clean, event-driven architecture using pure JavaScript.

Here is a deep dive into how it works.

The Challenge: Parsing Fragmented TCP Streams

When dealing with low-level TCP connections, data rarely arrives in perfect, individual packages. Instead, it comes in chunks that can arbitrarily split a single message in half.

The socket-to-pg application expects its incoming data over a TCP socket via Node’s native net module. The expected protocol is an ASCII string of messages separated by newlines. Each message features a timestamp (in nanoseconds since the UNIX epoch) and a numerical value enclosed in brackets, such as [1468009895549670789,397807].

To reliably parse this, the application implements a custom SocketProcessor class that inherits from Node’s built-in EventEmitter. When the processor receives a chunk of data, it iterates through it character by character.

  • It uses an inRecord flag that turns true when an opening bracket [ is detected.
  • Incoming characters are pushed into a recordBuffer array.
  • When a closing bracket ] is encountered, the buffered characters are joined together, and the buffer is cleared to prepare for the next message.

Once a fragment is successfully isolated, it must pass a regular expression validation check ([0-9][,]{1}[0-9]) to ensure the basic structure of numbers separated by a comma is met. Valid fragments are then emitted as a data event for the next stage of the pipeline.

Bridging the Gap: The Database Provider

Once the metrics are extracted from the raw socket stream, they need to be stored. For this, the application uses the popular pg library (version 8.7.1) and encapsulates its database logic within a DatabaseProvider class.

The database workflow is designed to be self-initializing:

  1. Connection: The provider connects to the PostgreSQL instance using configurations passed to the client.
  2. Table Creation: At startup, it dynamically executes a CREATE TABLE query to ensure the target table is available. By default, this table is named socket_stream_values and requires timestamp bigint NOT NULL and value integer NOT NULL fields.
  3. Data Insertion: When the SocketProcessor emits a new piece of data, the main script splits the fragment by its comma to separate the timestamp from the value. The writeToTable method is then called, which safely constructs a parameterized query (like $1, $2) using a helper method named _prepareInsertVariables. This approach ensures that incoming parameters are strictly evaluated by the database engine, avoiding potential SQL injection attacks.

Configuration and Deployment

The project is structured to be portable and configurable via environment variables. It relies on the dotenv package to load parameters into process.env.

To deploy the script, a user must configure their .env file to include network parameters (SOCKET_HOST, SOCKET_PORT) as well as database credentials (DB_HOST, DB_PORT, DB_DATABASE, DB_USER, DB_PASSWORD, and optionally TABLE_NAME). Once the .env configuration is in place, the data pipeline is initialized by simply executing node index.js via the command line.

Through this combination of robust stream parsing and clean class-based separation of concerns, socket-to-pg demonstrates an efficient, native approach to building a real-time data ingestion pipeline in Node.js.

https://github.com/arpad1337/socket-to-pg

Total
0
Shares
Leave a Reply

Your email address will not be published. Required fields are marked *

Previous Post
building-ai-sales-reps:-how-showme-orchestrates-voice,-video,-and-multi-agent-workflows-to-close-deals

Building AI Sales Reps: How ShowMe Orchestrates Voice, Video, and Multi-Agent Workflows to Close Deals

Next Post

From strategy to impact- Executing for measurable growth

Related Posts