HK — where I grew up, a place famous for low barriers to entry. Photo by Jason Wong on Unsplash

Ingestion of data is over. But not as you know it

Data ingestion is over because I do not believe the manner in which data ingestion services are delivered to data teams is something that will persist. But don’t worry, you’ll still be moving data.

--

About me

I’m Hugo Lu — I started my career working in M&A in London before doing data engineering at JUUL and London-based Fintech, Codat. I’m now CEO at Orchestra, which is a tool to help Data Teams release data into Production reliably and efficiently.

The process of ingesting data is both necessary and complicated. It’s an almost undeniable fact that valuable sources of data live in disparate sources. In logical terms, you might consider that statement a premise. A statement of undeniable fact. A statement you would describe as “true.” A statement that holds firm in almost all circumstances, both current and foreseeable.

For this to be false, we would need software applications that individually hold all requisite data for anything useful whatsoever. This requirement is extremely demanding. It should suffice that the very existence of software built for specific purposes exist and are successful prove the validity of this premise, at least, for the near-term future.

What does this mean for data?

Isolated datasets from these types of systems rarely carry intrinsic value. By contrast, there is significant value in aggregating isolated datasets together for individuals and companies alike.

It follows that both individuals and companies will benefit from services that make use of aggregated datasets. What fascinates me is the debate around which tool should fundamentally be used to service this need. It is an intrinsically use-case specific problem. There are examples abound. From bank statements that aggregate transactional data from different suppliers to data warehouses that offer data teams an aggregated source of truth for company-specific data, it feels as if the need for data in the same place and therefore data movement services is all but undeniable.

So what’s the future of data ingestion? Why is it over?

I write that data ingestion is over because I do not believe the manner in which data ingestion services are delivered to data teams is something that will persist. The needs of companies (rather than individuals) and therefore data teams is what the rest of this article will focus on.

How are Data Ingestion Services delivered?

From an architectural perspective, there are three ways data ingestion services are delivered:

  1. Data is aggregated in a business context via data aggregation microservices. These are pieces of software that contain logic to move data from different sources to different “sinks”.
  2. Scripts that ingest data are included in a “monolithic app”, that is also able to handle components like scheduling, data transformation, and basic functionality like making HTTP Calls
  3. Data is placed onto a queue and a consumer (typically a lambda function and an event hub) process events in real-time, and push the processes events into partitions in data lakes or data warehouses

There are two ways these microservices are delivered to Data Teams

  1. Data Teams “deliver it themselves”. They build these microservices themselves. This is common for (3) and for (2).
  2. Data Teams purchase access to third party microservices. This is most common for (1) and increasingly (2). Fivetran, Airbyte Cloud, Hevo Data, Stitch etc. are all examples of this.

Why is this over?

This is over because the code required to do (2) is completely commoditized. A helpful definition here is that of Perfect Competition:

the situation prevailing in a market in which buyers and sellers are so numerous and well informed that all elements of monopoly are absent and the market price of a commodity is beyond the control of individual buyers and sellers.

Or in English / normal-people terms:

  • There are lots of providers providing exactly the same thing (data movement software)
  • There are many buyers and sellers, which means it is impossible to sustain any level of supernormal profits; this means if a typical company makes for argument’s sake, 20% profits on revenue, then if the raw cost of delivering a data ingestion microservice for a month is $80 it should be selling for $100

What’s been interesting is that people are still willing to pay for data ingestion services despite the fact the code to ingest data has been available for a very long time. Airbyte and Singer are well-known open-source frameworks that can be used to write connectors. However, there are additional features of data movement software people are willing to pay for:

  1. Support
  2. Deployment
  3. UI
  4. Security requirements
  5. An Account Manager
  6. Quality of Service / SLA

Which means these items (1–6; non-exhaustive) are what data movement software providers now need to compete on, and should be the only thing determining differentials in price. If you aren’t getting a bit more of 1–6 but you’re paying more than elsewhere, you’re doing it wrong.

Why now?

It has never been easier to use AI to generate code. It will be easy to deploy these as software products. The barriers to building your own Fivetran or your own Hightouch are therefore very very low. Take an example, I asked Chat GPT (an all-purpose LLM, not fine-tuned on anything, not prompted), the following:

Write me a python class that can connect to Salesforce's API. Add a method that fetches the salesforce accounts object

import requests

class SalesforceAPIConnector:
def __init__(self, client_id, client_secret, username, password, security_token, sandbox=False):
self.client_id = client_id
self.client_secret = client_secret
self.username = username
self.password = password
self.security_token = security_token
self.sandbox = sandbox
self.access_token = None
self.instance_url = None
self.authenticate()

def authenticate(self):
domain = 'test' if self.sandbox else 'login'
auth_url = f'https://{domain}.salesforce.com/services/oauth2/token'
data = {
'grant_type': 'password',
'client_id': self.client_id,
'client_secret': self.client_secret,
'username': self.username,
'password': f'{self.password}{self.security_token}'
}

response = requests.post(auth_url, data=data)
auth_data = response.json()

self.access_token = auth_data['access_token']
self.instance_url = auth_data['instance_url']

def fetch_accounts(self):
if not self.access_token or not self.instance_url:
raise Exception("Authentication failed. Call 'authenticate()' first.")

query = "SELECT Id, Name, Industry, Phone FROM Account LIMIT 10"
headers = {
'Authorization': f'Bearer {self.access_token}'
}
url = f'{self.instance_url}/services/data/v52.0/query/?q={query}'

response = requests.get(url, headers=headers)
response_data = response.json()

accounts = response_data.get('records', [])
return accounts

# Usage
if __name__ == "__main__":
client_id = "your_client_id"
client_secret = "your_client_secret"
username = "your_salesforce_username"
password = "your_salesforce_password"
security_token = "your_salesforce_security_token"

connector = SalesforceAPIConnector(client_id, client_secret, username, password, security_token, sandbox=True)
accounts = connector.fetch_accounts()

print("Fetched Accounts:")
for account in accounts:
print("Name:", account["Name"])
print("Industry:", account["Industry"])
print("Phone:", account["Phone"])
print("-" * 30)

Not bad. But no paging, only 10 results.

Add a method to the above that gets all accounts i.e. without the limit but is able to fetch 100 accounts at once, page through the results, and return them all as one list

class SalesforceAPIConnector:

...

def fetch_accounts(self):
if not self.access_token or not self.instance_url:
raise Exception("Authentication failed. Call 'authenticate()' first.")

query = "SELECT Id, Name, Industry, Phone FROM Account LIMIT 10"
headers = {
'Authorization': f'Bearer {self.access_token}'
}
url = f'{self.instance_url}/services/data/v52.0/query/?q={query}'

response = requests.get(url, headers=headers)
response_data = response.json()

accounts = response_data.get('records', [])
return accounts

Or in Javascript:

const axios = require('axios');

class SalesforceAPIConnector {
constructor(clientId, clientSecret, username, password, securityToken, sandbox = false) {
this.clientId = clientId;
this.clientSecret = clientSecret;
this.username = username;
this.password = password;
this.securityToken = securityToken;
this.sandbox = sandbox;
this.accessToken = null;
this.instanceUrl = null;
this.authenticate();
}

async authenticate() {
const domain = this.sandbox ? 'test' : 'login';
const authUrl = `https://${domain}.salesforce.com/services/oauth2/token`;
const data = new URLSearchParams();
data.append('grant_type', 'password');
data.append('client_id', this.clientId);
data.append('client_secret', this.clientSecret);
data.append('username', this.username);
data.append('password', `${this.password}${this.securityToken}`);

const response = await axios.post(authUrl, data);
const authData = response.data;

this.accessToken = authData.access_token;
this.instanceUrl = authData.instance_url;
}

async fetchAllAccounts() {
if (!this.accessToken || !this.instanceUrl) {
throw new Error("Authentication failed. Call 'authenticate()' first.");
}

let accounts = [];
let nextRecordsUrl = `/services/data/v52.0/queryAll/?q=SELECT+Id,Name,Industry,Phone+FROM+Account`;
const headers = {
'Authorization': `Bearer ${this.accessToken}`
};

while (nextRecordsUrl) {
const url = `${this.instanceUrl}${nextRecordsUrl}`;
const response = await axios.get(url, { headers });
const responseData = response.data;
accounts = accounts.concat(responseData.records || []);
nextRecordsUrl = responseData.nextRecordsUrl;
}

return accounts;
}
}

// Usage
(async () => {
const clientId = "your_client_id";
const clientSecret = "your_client_secret";
const username = "your_salesforce_username";
const password = "your_salesforce_password";
const securityToken = "your_salesforce_security_token";

const connector = new SalesforceAPIConnector(clientId, clientSecret, username, password, securityToken, true);
const allAccounts = await connector.fetchAllAccounts();

console.log("Fetched Accounts:");
for (const account of allAccounts) {
console.log("Name:", account.Name);
console.log("Industry:", account.Industry);
console.log("Phone:", account.Phone);
console.log("-".repeat(30));
}
})();

What this means for Data Engineers

Generating ELT code is very straightforward. This was done using an off-the-shelf, FREE, general-purpose LLM. This could be made better by using something like LLama2-Code and fine-tuning it on a code base to generate production-grade code.

It’s hard to see why you wouldn’t do this for yourself, at least, if you were thinking about building your own ELT application to be sold to other people. It might still be a bit overkill if you, like me, are a data engineer that just wants to move some data from A to B. That said, I recently built something like this for a talk on “How to build your own Reverse ELT App”, and I realized there was no real reason to not built some ELT too. You can see this repo here.

In terms of what this means for Data Engineers, if it’s not that we should just build all our own apps, then what is it? There’s this awesome interview from Seth Levine with guest Paige Bailey of Google DeepMind. They talk a little bit about around generative AI allowing people to move into the business of building and commercializing software at an unprecedented rate over the coming years due to the ability of basic models to do a pretty good job at doing basic stuff like the above.

And this is the point about data engineering; data ingestion in particular.

It’s basic. The code is commoditized because it’s all over the internet and people have been using it and sharing it for many years. I’ve added a few more examples at the bottom of the article to demonstrate how easily ChatGPT can generate code.

These are extremely interesting times for engineers who enjoy building things. There is no reason you could not build your own Fivetran or Hightouch (or better yet, combine them like I did in the repo above). Make friends with someone who knows DevOps and you could have yourself a serious side-hustle.

Conclusion

Data ingestion is indeed over as we know it — but not in terms of how we implement it architecturally or how we even acquire these services. What I think is over is the hegemony of a few big players in data ingestion tooling.

At Codat, someone in the team had written a pretty awesome version of Fivetran in Javascript. It was an API that had logic to fetch data from different APIs and store it in different places. It took them a long-time to write, and we were glad to eventually deprecate some of it and start using Fivetran.

If that same situation happens in 10 years we’ll say the same thing. Apart from one word:

someone in the team had written a pretty awesome version of [………]in Javascript. It was an API that had logic to fetch data from different APIs and store it in different places. It took them a long-time to write, and we were glad to eventually deprecate some of it and start using [………]

Because there won’t just be a few vendors of data ingestion tooling. There’ll be hundreds if not thousands, and they’ll all do exactly the same thing. Sure, there will always be companies that cater to the enterprise. For something as simple as data ingestion (where support and value-add services are generally of little concern as long as data works), it’s an area that seems ripe for a bit of perfect competition.

Appendix

Code generated by ChatGPT to connect to Azure Data Lake Storage using an AWS Keyvault:

import boto3

class AWSSecretsManagerClient:
def __init__(self, region_name):
self.region_name = region_name
self.secrets_manager_client = boto3.client('secretsmanager', region_name=self.region_name)

def get_secret(self, secret_name):
response = self.secrets_manager_client.get_secret_value(SecretId=secret_name)
if 'SecretString' in response:
return response['SecretString']
else:
raise Exception("Secret value not found.")

# Usage
if __name__ == "__main__":
# AWS Secrets Manager client setup
aws_region_name = "your_aws_region_name"
aws_secret_name = "your_aws_secret_name"
aws_secrets_client = AWSSecretsManagerClient(aws_region_name)

# Retrieve Salesforce and Azure Data Lake Storage credentials from AWS Secrets Manager
aws_secrets = aws_secrets_client.get_secret(aws_secret_name)
aws_secrets = json.loads(aws_secrets)

client_id = aws_secrets["salesforce_client_id"]
client_secret = aws_secrets["salesforce_client_secret"]
username = aws_secrets["salesforce_username"]
password = aws_secrets["salesforce_password"]
security_token = aws_secrets["salesforce_security_token"]

adls_account_name = aws_secrets["adls_account_name"]
adls_account_key = aws_secrets["adls_account_key"]
adls_file_system_name = aws_secrets["adls_file_system_name"]

# Salesforce API connector setup
connector = SalesforceAPIConnector(client_id, client_secret, username, password, security_token, sandbox=True)
accounts = connector.fetch_all_accounts()

# Azure Data Lake Storage client setup
data_lake_client = DataLakeClient(adls_account_name, adls_account_key, adls_file_system_name)

# Push Salesforce accounts data to Azure Data Lake Storage
partition_path = "salesforce/accounts"
data_lake_client.push_data_to_lake(partition_path, accounts)

print("Data pushed to Data Lake successfully.")

--

--

Hugo Lu - I write about how to be good at Data engineering and do the coolest data stuff. I am the CEO @ Orchestra, a data release pipeline management platform