Vespa with Logstash Recipes
Vespa with Logstash? I thought Vespa isn’t the best search engine for logs. That’s correct, but:
- Vespa is great at all the other search use-cases: enterprise search, recommendations, RAG, etc.
- Logstash is a very powerful, easy-to-use ETL tool. Not just a logging tool, although it started that way. I’d argue it’s the easiest path from zero to production as far as pipelines are concerned.
To prove the last statement, we’ll go over a lot of scenarios in one blog post:
- We’ll get started by reading data from a CSV file, parsing it, and feeding it to Vespa.
- “But my data doesn’t live in CSV files”, you may say. Fine, we’ll read from a DB instead.
- “But I don’t need a whole new pipeline, I already use Kafka”. Fine, we’ll use Logstash as a Kafka consumer that feeds to Vespa.
And that’s not it! We’ll also cover data migration: whether it’s from one Vespa cluster to another or from a different technology (here, Elasticsearch).
Let’s get to it!
Recipe 1 of 5: Feeding CSV Files to Vespa
TL;DR: If you want this recipe already cooked and deliver to your door, clone the logstash-vespa-output-demo repository and do:
docker compose up # or podman compose up
Then check out the README to see what you did 🙂
To cook this recipe yourself, you need:
- Vespa running with an application that has the right schema. By “right schema”, I mean a schema that maps to the fields you’re interested in from the CSV file, plus the features you need for searching: inverted index, retrieval of original values, ranking, etc.
- Logstash installed with the Vespa Output Plugin.
- Logstash configuration that points to the CSV file, parses every line and sends it to Vespa as a new document.
Let’s ingest this simple CSV file with three fields describing blog posts: id, title and content.
Setting Up Vespa
If you have Docker or Podman locally, the command below will start a Vespa container. Alternatively, you can go to Vespa Cloud and follow the steps there to create a new application (cluster).
podman run --detach --name vespa-csv --hostname vespa-csv \
--publish 8080:8080 --publish 19071:19071 \
vespaengine/vespa
Where:
vespa-csv
is the name and hostname we give to the container8080
is the port we’ll use for queries19071
is the port for configuration (i.e. to send the application package)
Application package
All the configuration goes into the application package. You’ll find a minimal application package here, containing:
- schemas/post.sd: schema file with id, title and content fields defined.
- schema.xml: defines cluster layout and options like JVM heap size.
Vespa CLI
Now you can deploy your application package. The simplest way is via the Vespa CLI. You can install it via Homebrew:
brew install vespa-cli
Or download the binary from Vespa Releases and put it somewhere in PATH
.
Then, deploy the application package via:
vespa deploy /path/to/package
Installing Logstash and the Vespa Output Plugin
First, download Logstash for your platform. I prefer unpacking the archive in one place for experiments, and packages/Docker for production because Logstash normally runs as a service. For Windows, NSSM can wrap a binary in a service.
To install the Logstash Vespa Output Plugin, you’ll need to navigate to Logstash’s home directory and do:
bin/logstash-plugin install logstash-output-vespa_feed
The Logstash home directory is where you unpacked the archive. For packages/Docker, that’s /usr/share/logstash/
.
Configuring Logstash
Feel free to create a logstash.conf file with contents from this blog post and use it in one of three ways:
- Run Logstash with the config:
bin/logstash -f /path/to/logstash.conf
- Reference
logstash.conf
inlogstash.yml
underpath.config
($LOGSTASH_HOME/config
or/etc/logstash
). Then runbin/logstash
. - If you’re using packages, put
logstash.conf
in/etc/logstash/conf.d
Notes:
- This blog post assumes one ETL pipeline, though you can also have multiple pipelines.
- Config file name can be
myawesomeconfig.conf
, for all Logstash cares.
Input
Start with an input section: read from the CSV file, line by line, using the File Input plugin:
input {
file {
path => "/tmp/blog_posts.csv"
# read the file from the beginning
start_position => "beginning"
}
}
Besides the path, point Logstash to the beginning of the file, otherwise it will tail it. Either way, Logstash remembers where it left off in a “sinceDB” file. To run multiple experiments and read the file every time, add sincedb_path => "/dev/null"
to the list of options.
Filter
Now let’s parse the CSV file. We can add a lot of transformations here, but the workhorse for this case is the CSV filter.
filter {
csv {
# how does the CSV file look like?
separator => ","
quote_char => '"'
# the first line is the header, so we'll skip it
skip_header => true
# columns of the CSV file. Make sure you have these fields in the Vespa schema
columns => ["id", "title", "content"]
}
# remove fields we don't need
# NOTE: the fields below are added by Logstash by default. You probably *need* this
# otherwise Vespa will reject documents complaining that e.g. @timestamp is unknown
mutate {
remove_field => ["@timestamp", "@version", "event", "host", "log", "message"]
}
}
The mutate filter shows why Logstash was initially done for logs: it appends a lot of metadata to each line, which would be useful if our lines were logs: timestamp, original file name, etc.
Output
To write documents to Vespa, use the Vespa Output installed earlier. You’ll need the Vespa endpoint, which is http://localhost:8080
with a container setup.
For Vespa Cloud, it’s the endpoint of your application - you’ll see it in the UI when you click on your application name. You’ll also need to provide the certificates for mTLS in this case - see recipe 4 below for details on both creating and using them.
output {
vespa_feed {
# Vespa endpoint
vespa_url => "http://localhost:8080"
# if you're using mTLS, provide the certificates generated via `vespa auth cert`
# client_cert => "/YOUR_HOME_DIRECTORY/.vespa/YOUR_TENANT_NAME.YOUR_APPLICATION_NAME.default/data-plane-public-cert.pem"
# client_key => "/YOUR_HOME_DIRECTORY/.vespa/YOUR_TENANT_NAME.YOUR_APPLICATION_NAME.default/data-plane-private-key.pem"
# document type. This needs to match the document type in the Vespa schema
document_type => "post"
# namespace. Soft delimitation of documents that can be used for visiting;
# not relevant for searches
namespace => "post"
}
}
Once data is in, you can query it. You can use the Vespa CLI:
vespa query 'select * from sources * where true'
Or HTTP requests directly (replace http://localhost:8080
with your Vespa endpoint):
curl -XPOST -H "Content-Type: application/json" -d\
'{ "yql": "select * from sources * where true"}'\
'http://localhost:8080/search/' | jq .
Next Steps
On the query side, check out the Query documentation.
For shipping more complex datasets, we have a few examples:
- Is the CSV file is non-standard? For example, do weird quoting rules break your CSV parsing? You can use grok or dissect instead of the CSV filter. Here’s an example logstash.conf.
- Custom transformation logic? The ruby filter can help. Though lots of tasks, like renaming fields, parsing timestamps, and much more, can be done with specialized filters. Here’s an example using both ruby and specialized filters.
- Entries from your CSV files span multiple lines? Add the multiline codec to the file input. Here’s an example.
Logstash also has lots of inputs. Next, we’ll explore reading from a DB, Kafka, another Vespa cluster, and Elasticsearch.
Recipe 2 of 5: From Database to Vespa
TL;DR: You’ll need to download the JDBC driver for your database, and then use it in Logstash’s JDBC input. Remove unneeded fields and send resulting documents to Vespa as before. Use a timestamp column to track changes and a soft-deletes column to remove data.
We’ll use Postgres for this. If you don’t have it, you can run it in Docker/Podman as well. Feel free to use different username and password 🙂 We’ll expose port 5432 for Logstash to use.
podman run --name postgres -e POSTGRES_USER=pguser -e POSTGRES_PASSWORD=pgpass -e POSTGRES_DB=blog -p 5432:5432 postgres
Then you can connect to the DB by using the psql binary from the container:
podman exec -it postgres psql -U pguser -d blog
Finally, we can create the table and insert the sample blog posts from the CSV file we used earlier:
CREATE TABLE blog_posts (
id INTEGER PRIMARY KEY,
title VARCHAR(255) NOT NULL,
content TEXT
);
INSERT INTO blog_posts (id, title, content)
VALUES (1, 'Blog Post 1', 'This is the content of the first blog post.');
INSERT INTO blog_posts (id, title, content)
VALUES (2, 'Awesome Blog Post 2', 'This is the content of the second blog post.');
Now, we’re ready to send these rows to Vespa.
Getting the Right JDBC Driver
In order to use the JDBC Input Plugin to read from the database, we need a JDBC driver that works with our Postgres - available for download here.
Configuring Logstash
Our input can look like this:
input {
jdbc {
# path to the Postgres JDBC driver we just downloaded
jdbc_driver_library => "/opt/logstash/postgresql-42.7.5.jar"
# for Postgres, this is the driver class. Other DBs will have similar class names
jdbc_driver_class => "org.postgresql.Driver"
# point it to the host and port (with our container, localhost:5432) and the DB name
jdbc_connection_string => "jdbc:postgresql://localhost:5432/blog"
# credentials
jdbc_user => "YOUR_POSTGRES_USER"
jdbc_password => "YOUR_POSTGRES_PASSWORD"
# how often to run the query (cron-like syntax). This runs every minute
schedule => "* * * * *"
# SQL query to fetch the rows
statement => "SELECT * from blog_posts"
}
}
Logstash will run a query every minute, fetch all rows from the table, and send them to Vespa. The JDBC input adds less metadata and we don’t need additional parsing, so the filter becomes:
filter {
mutate {
# remove unneeded fields
remove_field => ["@timestamp", "@version"]
}
}
The output stays the same as before:
output {
# for debugging
#stdout { codec => rubydebug }
vespa_feed {
# replace with your Vespa endpoint and include mTLS certificates if needed
vespa_url => "http://localhost:8080"
document_type => "post"
namespace => "post"
}
}
But sending all the rows over and over again to Vespa won’t fly beyond a PoC. So let’s only add new/updated rows.
Remembering Where We Left Off
To keep track of what was already fed to Vespa, we can use a timestamp column. For example:
ALTER TABLE blog_posts ADD COLUMN updated_at TIMESTAMP DEFAULT NOW();
On the Logstash side, change the select statement to query this updated_at
column with the value of the last query time:
statement => "SELECT id,title,content from blog_posts WHERE updated_at > :sql_last_value"
Now any new rows will get a newer timestamp and will be fed to Vespa. Updated rows (provided you also update the updated_at
column) will also be fetched and the newest version will be sent to Vespa.
This takes care of new rows and updates. What about deleted rows?
Tracking Deletes
We can do this with soft deletes: have a column tracking whether a document is deleted or not. Eventually, we can garbage-collect documents in the DB if we want to.
ALTER TABLE blog_posts ADD COLUMN is_deleted BOOLEAN DEFAULT FALSE;
Now we can delete documents by setting is_deleted
to TRUE
. Update the updated_at
column as well.
On the Logstash side, we need to fetch is_deleted
, too:
statement => "SELECT id,title,content,is_deleted from blog_posts WHERE updated_at > :sql_last_value"
We can use this field to tell the Vespa Output whether to put or remove the document in question. First, we need to convert it from a boolean to a string that says put/remove:
filter {
if [is_deleted] {
mutate {
add_field => { "operation" => "remove" }
}
} else {
mutate {
add_field => { "operation" => "put" }
}
}
mutate {
# remove unneeded fields
remove_field => ["@timestamp", "@version", "is_deleted"]
}
}
Note how we get rid of the is_deleted
field, since we have the operation field now with all the info we need.
Now the output section can look like this:
output {
vespa_feed {
# replace with your Vespa endpoint and include mTLS certificates if needed
vespa_url => "http://localhost:8080"
document_type => "post"
namespace => "post"
# use the value from the "operation" field
operation => "%{operation}"
# but exclude this "operation" field from the Vespa document
remove_operation => true
}
}
We used the operation
field we just created to set a dynamic operation in our requests to Vespa (instead of the default put
). And because we don’t care to store this operation
field in Vespa, we remove it at the time of the output.
Logstash has a lot of cool features like this, it’s a well-rounded ETL tool. It even offers queues to buffer data until Vespa can write everything. But more complex pipelines use dedicated queues like Apache Kafka. So let’s look at how you can use Logstash to feed data from Kafka to Vespa.
Recipe 3 of 5: Logstash as a Kafka Consumer
If you don’t have Kafka running already, you can get started in four steps:
- Download Kafka and unpack it
- Start Zookeeper:
bin/zookeeper-server-start.sh config/zookeeper.properties
- Start the Kafka broker:
bin/kafka-server-start.sh config/server.properties
- Create the topic we’ll use for blog posts:
bin/kafka-topics.sh --create --topic blog-posts --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
Then, the Logstash config depends on how we expect messages to be formatted. If they’re in JSON, we can use the JSON codec to parse:
input {
kafka {
# host and port of a Kafka broker (with our container, localhost:9092)
bootstrap_servers => "localhost:9092"
# the topic created earlier
topics => ["blog-posts"]
# codec to parse messages
codec => "json"
}
}
Then we remove unnecessary fields (as usual) and send resulting documents to Vespa:
filter {
mutate {
remove_field => ["@timestamp", "@version", "event"]
}
}
output {
vespa_feed {
# replace with your Vespa endpoint and include mTLS certificates if needed
vespa_url => "http://localhost:8080"
document_type => "post"
namespace => "post"
}
}
Now if we write a JSON-formatted blog post to Kafka, it will go to Vespa:
echo '{"id":1, "title": "first blog", "content": "kakfa post one"}' | bin/kafka-console-producer.sh --topic blog-posts --bootstrap-server localhost:9092
Architecture Considerations
Logstash can commit offsets to Kafka when it writes messages to its own queue, or at a specific interval (the default). Because Logstash pipelines can be arbitrarily complex, it can’t commit offsets when messages get to Vespa. This implies:
- In case of a Logstash crash, you might lose data from its memory queue that was already committed from Kafka.
- Which means you should use persistent queues. But even there, the size should be small, otherwise Logstash can move the Kafka offset too much just because it wrote to its own queue, not because it wrote to Vespa. If the queue is full, inputs block (i.e. stop consuming from Kafka) until there’s space in it again (i.e. data is written to Vespa).
If you simply want to connect Kafka with Vespa, without further processing, you’ll be better off with Kafka Connect Vespa, which doesn’t have a queue and therefore avoids these challenges. But queues are helpful if you want additional processing, like adding timestamps, changing field names or contents, or other business logic. This is where Logstash shines.
Dealing with Failed Documents
Logstash has another advantage compared to many others: the Vespa Output plugin supports dead letter queues. So your outputs in production should look like this:
vespa_feed {
# replace with your Vespa endpoint and include mTLS certificates if needed
vespa_url => "http://localhost:8080"
document_type => "post"
namespace => "post"
# turn on dead letter queue
enable_dlq => true
# path to the dead letter queue
dlq_path => "/path/to/dead-letter-queue"
}
Now, if messages can’t get to Vespa for whatever reason - schema doesn’t match, Vespa is down, exceeded retries, etc. - data will not be lost. You can read failed documents with Logstash’s dead letter queue input, change them if needed, and send them again.
Speaking of Logstash’s advantages, did you know it can also read data from Vespa? We’ll look into it next.
Recipe 4 of 5: Migrating Data to a New Vespa Cluster
Say you want to migrate your local Vespa to Vespa Cloud. How would you migrate both your data and your application?
For the application, you have a guide here, but let’s list the 5 steps so you don’t context-switch:
- Sign up for Vespa Cloud and follow the steps to create a new application.
- Point Vespa CLI to the new application:
vespa config set target cloud # names are in your Vespa Cloud dashboard: vespa config set application YOUR_TENANT_NAME.YOUR_APPLICATION_NAME
- Authenticate and generate the certificates for mTLS API access:
cd /path/to/your/application/package vespa auth login # acknowledge from a browser vespa auth cert # check the output for the path of certificates
- Make sure services.xml requires mTLS client authentication. You should uncomment this bit from
container -> clients
:<client id="mtls" permissions="read,write"> <certificate file="security/clients.pem"/> </client>
- Deploy your application:
vespa deploy
Now you should have the same application in Vespa Cloud as the one you have locally. You can query it to check that it works, but there’s no data:
vespa query "select * from sources * where true"
Let’s use Logstash to migrate data. First, we need the Vespa Input Plugin:
bin/logstash-plugin install logstash-input-vespa
To read all data, point the plugin to your Vespa endpoint and the cluster name from services.xml:
input {
vespa {
vespa_url => "http://localhost:8080"
cluster => "blog"
}
}
Documents will return in a fields
element. Move all fields to the root document before sending them over to Vespa Cloud:
filter {
# move the document from "fields" to the root
ruby {
code => '
fields = event.get("fields")
if fields
fields.each {|k, v| event.set(k, v) }
event.remove("fields")
end
'
}
# remove unneeded fields
mutate {
remove_field => ["@timestamp", "@version"]
}
}
Finally, send data to Vespa Cloud. You’ll need the endpoint of your application (click on your application name from the Vespa Cloud dashboard) and the path to the certificate and key generated with vespa auth cert
(check the output of that command):
output {
vespa_feed {
# copy the mTLS endpoint from Vespa Cloud, in the page of your application
vespa_url => "https://VESPA_ENDPOINT_GOES_HERE:8080"
document_type => "post"
namespace => "post"
# certificates generated via `vespa auth cert`
client_cert => "/YOUR_HOME_DIRECTORY/.vespa/YOUR_TENANT_NAME.YOUR_APPLICATION_NAME.default/data-plane-public-cert.pem"
client_key => "/YOUR_HOME_DIRECTORY/.vespa/YOUR_TENANT_NAME.YOUR_APPLICATION_NAME.default/data-plane-private-key.pem"
}
}
When you start Logstash, it will visit all the documents and send them to Vespa Cloud, exiting when done. Then, this query should return documents:
vespa query "select * from sources * where true"
You’d use a similar procedure to move data from a different data store to Vespa. The next recipe is with Elasticsearch: if you heard of Logstash before, it was probably in the context of Elasticsearch 🙂
Recipe 5 of 5: Migrating from Elasticsearch to Vespa
Say you have Elasticsearch running (if you don’t know how but have Docker/Podman, try this docker-compose gist) and two sample blog posts:
PUT blog/_doc/1
{
"title": "First",
"content": "First blog post from ES"
}
PUT blog/_doc/2
{
"title": "Second",
"content": "Second blog post from ES"
}
To transfer them to Vespa, we’ll first use the Elasticsearch input plugin:
input {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "blog"
docinfo => true
docinfo_fields => ["_id"]
docinfo_target => "@metadata"
}
}
In this particular case, the ID isn’t part of the document itself. It’s the “docinfo” field _id. We’ll put _id
into the (default) @metadata
field of our Logstash event. This @metadata
field is ignored by default by output plugins. We’ll put it in the main document and remove @timestamp
and @version
:
filter {
# move the id from @metadata to the root
mutate {
add_field => { "id" => "%{[@metadata][_id]}" }
}
# remove unneeded fields
mutate {
remove_field => ["@timestamp", "@version"]
}
}
In the end, we write everything to Vespa:
output {
vespa_feed {
# replace with your Vespa endpoint and include mTLS certificates if needed
vespa_url => "http://localhost:8080"
document_type => "post"
namespace => "post"
}
}
Migrating data from Elasticsearch to Vespa is quite easy, but migrating schema and queries might not be. But watch this space: guides on how to do that are coming soon 🙂