Radu Gheorghe
Radu Gheorghe
Software Engineer

Vespa with Logstash Recipes

Vespa with Logstash? I thought Vespa isn’t the best search engine for logs. That’s correct, but:

  • Vespa is great at all the other search use-cases: enterprise search, recommendations, RAG, etc.
  • Logstash is a very powerful, easy-to-use ETL tool. Not just a logging tool, although it started that way. I’d argue it’s the easiest path from zero to production as far as pipelines are concerned.

To prove the last statement, we’ll go over a lot of scenarios in one blog post:

  1. We’ll get started by reading data from a CSV file, parsing it, and feeding it to Vespa.
  2. “But my data doesn’t live in CSV files”, you may say. Fine, we’ll read from a DB instead.
  3. “But I don’t need a whole new pipeline, I already use Kafka”. Fine, we’ll use Logstash as a Kafka consumer that feeds to Vespa.

And that’s not it! We’ll also cover data migration: whether it’s from one Vespa cluster to another or from a different technology (here, Elasticsearch).

Let’s get to it!

Recipe 1 of 5: Feeding CSV Files to Vespa

TL;DR: If you want this recipe already cooked and deliver to your door, clone the logstash-vespa-output-demo repository and do:

docker compose up # or podman compose up

Then check out the README to see what you did 🙂

To cook this recipe yourself, you need:

  1. Vespa running with an application that has the right schema. By “right schema”, I mean a schema that maps to the fields you’re interested in from the CSV file, plus the features you need for searching: inverted index, retrieval of original values, ranking, etc.
  2. Logstash installed with the Vespa Output Plugin.
  3. Logstash configuration that points to the CSV file, parses every line and sends it to Vespa as a new document.

Let’s ingest this simple CSV file with three fields describing blog posts: id, title and content.

Setting Up Vespa

If you have Docker or Podman locally, the command below will start a Vespa container. Alternatively, you can go to Vespa Cloud and follow the steps there to create a new application (cluster).

podman run --detach --name vespa-csv --hostname vespa-csv \  
  --publish 8080:8080 --publish 19071:19071 \  
  vespaengine/vespa

Where:

  • vespa-csv is the name and hostname we give to the container
  • 8080 is the port we’ll use for queries
  • 19071 is the port for configuration (i.e. to send the application package)

Application package

All the configuration goes into the application package. You’ll find a minimal application package here, containing:

  • schemas/post.sd: schema file with id, title and content fields defined.
  • schema.xml: defines cluster layout and options like JVM heap size.

Vespa CLI

Now you can deploy your application package. The simplest way is via the Vespa CLI. You can install it via Homebrew:

brew install vespa-cli

Or download the binary from Vespa Releases and put it somewhere in PATH.

Then, deploy the application package via:

vespa deploy /path/to/package

Installing Logstash and the Vespa Output Plugin

First, download Logstash for your platform. I prefer unpacking the archive in one place for experiments, and packages/Docker for production because Logstash normally runs as a service. For Windows, NSSM can wrap a binary in a service.

To install the Logstash Vespa Output Plugin, you’ll need to navigate to Logstash’s home directory and do:

bin/logstash-plugin install logstash-output-vespa_feed

The Logstash home directory is where you unpacked the archive. For packages/Docker, that’s /usr/share/logstash/.

Configuring Logstash

Feel free to create a logstash.conf file with contents from this blog post and use it in one of three ways:

  1. Run Logstash with the config: bin/logstash -f /path/to/logstash.conf
  2. Reference logstash.conf in logstash.yml under path.config ($LOGSTASH_HOME/config or /etc/logstash). Then run bin/logstash.
  3. If you’re using packages, put logstash.conf in /etc/logstash/conf.d

Notes:

  • This blog post assumes one ETL pipeline, though you can also have multiple pipelines.
  • Config file name can be myawesomeconfig.conf, for all Logstash cares.

Input

Start with an input section: read from the CSV file, line by line, using the File Input plugin:

input {  
    file {  
        path => "/tmp/blog_posts.csv"  
        # read the file from the beginning  
        start_position => "beginning"  
    }  
}

Besides the path, point Logstash to the beginning of the file, otherwise it will tail it. Either way, Logstash remembers where it left off in a “sinceDB” file. To run multiple experiments and read the file every time, add sincedb_path => "/dev/null" to the list of options.

Filter

Now let’s parse the CSV file. We can add a lot of transformations here, but the workhorse for this case is the CSV filter.

filter {  
    csv {  
        # how does the CSV file look like?  
        separator => ","  
        quote_char => '"'

        # the first line is the header, so we'll skip it  
        skip_header => true

        # columns of the CSV file. Make sure you have these fields in the Vespa schema  
        columns => ["id", "title", "content"]  
    }

    # remove fields we don't need  
    # NOTE: the fields below are added by Logstash by default. You probably *need* this  
    # otherwise Vespa will reject documents complaining that e.g. @timestamp is unknown 
    mutate {  
        remove_field => ["@timestamp", "@version", "event", "host", "log", "message"]  
    }  
}

The mutate filter shows why Logstash was initially done for logs: it appends a lot of metadata to each line, which would be useful if our lines were logs: timestamp, original file name, etc.

Output

To write documents to Vespa, use the Vespa Output installed earlier. You’ll need the Vespa endpoint, which is http://localhost:8080 with a container setup.

For Vespa Cloud, it’s the endpoint of your application - you’ll see it in the UI when you click on your application name. You’ll also need to provide the certificates for mTLS in this case - see recipe 4 below for details on both creating and using them.

output {  
    vespa_feed {
        # Vespa endpoint
        vespa_url => "http://localhost:8080"

        # if you're using mTLS, provide the certificates generated via `vespa auth cert`  
        # client_cert => "/YOUR_HOME_DIRECTORY/.vespa/YOUR_TENANT_NAME.YOUR_APPLICATION_NAME.default/data-plane-public-cert.pem"  
        # client_key => "/YOUR_HOME_DIRECTORY/.vespa/YOUR_TENANT_NAME.YOUR_APPLICATION_NAME.default/data-plane-private-key.pem"

        # document type. This needs to match the document type in the Vespa schema  
        document_type => "post"

        # namespace. Soft delimitation of documents that can be used for visiting;
        # not relevant for searches  
        namespace => "post"  
    }  
}

Once data is in, you can query it. You can use the Vespa CLI:

vespa query 'select * from sources * where true'

Or HTTP requests directly (replace http://localhost:8080 with your Vespa endpoint):

curl -XPOST -H "Content-Type: application/json" -d\  
  '{  "yql": "select * from sources * where true"}'\  
   'http://localhost:8080/search/' | jq .

Next Steps

On the query side, check out the Query documentation.

For shipping more complex datasets, we have a few examples:

Logstash also has lots of inputs. Next, we’ll explore reading from a DB, Kafka, another Vespa cluster, and Elasticsearch.

Recipe 2 of 5: From Database to Vespa

TL;DR: You’ll need to download the JDBC driver for your database, and then use it in Logstash’s JDBC input. Remove unneeded fields and send resulting documents to Vespa as before. Use a timestamp column to track changes and a soft-deletes column to remove data.

We’ll use Postgres for this. If you don’t have it, you can run it in Docker/Podman as well. Feel free to use different username and password 🙂 We’ll expose port 5432 for Logstash to use.

podman run --name postgres -e POSTGRES_USER=pguser -e POSTGRES_PASSWORD=pgpass -e POSTGRES_DB=blog -p 5432:5432 postgres

Then you can connect to the DB by using the psql binary from the container:

podman exec -it postgres psql -U pguser -d blog

Finally, we can create the table and insert the sample blog posts from the CSV file we used earlier:

CREATE TABLE blog_posts (  
    id INTEGER PRIMARY KEY,  
    title VARCHAR(255) NOT NULL,  
    content TEXT  
);  
INSERT INTO blog_posts (id, title, content)   
VALUES (1, 'Blog Post 1', 'This is the content of the first blog post.');  
INSERT INTO blog_posts (id, title, content)   
VALUES (2, 'Awesome Blog Post 2', 'This is the content of the second blog post.');

Now, we’re ready to send these rows to Vespa.

Getting the Right JDBC Driver

In order to use the JDBC Input Plugin to read from the database, we need a JDBC driver that works with our Postgres - available for download here.

Configuring Logstash

Our input can look like this:

input {  
  jdbc {  
    # path to the Postgres JDBC driver we just downloaded  
    jdbc_driver_library => "/opt/logstash/postgresql-42.7.5.jar"  
    # for Postgres, this is the driver class. Other DBs will have similar class names  
    jdbc_driver_class => "org.postgresql.Driver"  
    # point it to the host and port (with our container, localhost:5432) and the DB name  
    jdbc_connection_string => "jdbc:postgresql://localhost:5432/blog"  
    # credentials  
    jdbc_user => "YOUR_POSTGRES_USER"  
    jdbc_password => "YOUR_POSTGRES_PASSWORD"  
    # how often to run the query (cron-like syntax). This runs every minute  
    schedule => "* * * * *"  
    # SQL query to fetch the rows  
    statement => "SELECT * from blog_posts"  
  }  
}

Logstash will run a query every minute, fetch all rows from the table, and send them to Vespa. The JDBC input adds less metadata and we don’t need additional parsing, so the filter becomes:

filter {  
  mutate {  
    # remove unneeded fields  
    remove_field => ["@timestamp", "@version"]  
  }  
}

The output stays the same as before:

output {  
    # for debugging  
    #stdout { codec => rubydebug }  

    vespa_feed {  
        # replace with your Vespa endpoint and include mTLS certificates if needed
        vespa_url => "http://localhost:8080"

        document_type => "post"  
        namespace => "post"  
    }  
}

But sending all the rows over and over again to Vespa won’t fly beyond a PoC. So let’s only add new/updated rows.

Remembering Where We Left Off

To keep track of what was already fed to Vespa, we can use a timestamp column. For example:

ALTER TABLE blog_posts ADD COLUMN updated_at TIMESTAMP DEFAULT NOW();

On the Logstash side, change the select statement to query this updated_at column with the value of the last query time:

statement => "SELECT id,title,content from blog_posts WHERE updated_at > :sql_last_value"

Now any new rows will get a newer timestamp and will be fed to Vespa. Updated rows (provided you also update the updated_at column) will also be fetched and the newest version will be sent to Vespa.

This takes care of new rows and updates. What about deleted rows?

Tracking Deletes

We can do this with soft deletes: have a column tracking whether a document is deleted or not. Eventually, we can garbage-collect documents in the DB if we want to.

ALTER TABLE blog_posts ADD COLUMN is_deleted BOOLEAN DEFAULT FALSE;

Now we can delete documents by setting is_deleted to TRUE. Update the updated_at column as well.

On the Logstash side, we need to fetch is_deleted, too:

statement => "SELECT id,title,content,is_deleted from blog_posts WHERE updated_at > :sql_last_value"

We can use this field to tell the Vespa Output whether to put or remove the document in question. First, we need to convert it from a boolean to a string that says put/remove:

filter {  
  if [is_deleted] {  
    mutate {  
      add_field => { "operation" => "remove" }  
    }  
  } else {  
    mutate {  
      add_field => { "operation" => "put" }  
    }  
  }  
  mutate {  
    # remove unneeded fields  
    remove_field => ["@timestamp", "@version", "is_deleted"]  
  }  
}

Note how we get rid of the is_deleted field, since we have the operation field now with all the info we need.

Now the output section can look like this:

output {  
    vespa_feed {  
        # replace with your Vespa endpoint and include mTLS certificates if needed
        vespa_url => "http://localhost:8080"

        document_type => "post"  
        namespace => "post"  

        # use the value from the "operation" field  
        operation => "%{operation}"  
        # but exclude this "operation" field from the Vespa document  
        remove_operation => true  
    }  
}

We used the operation field we just created to set a dynamic operation in our requests to Vespa (instead of the default put). And because we don’t care to store this operation field in Vespa, we remove it at the time of the output.

Logstash has a lot of cool features like this, it’s a well-rounded ETL tool. It even offers queues to buffer data until Vespa can write everything. But more complex pipelines use dedicated queues like Apache Kafka. So let’s look at how you can use Logstash to feed data from Kafka to Vespa.

Recipe 3 of 5: Logstash as a Kafka Consumer

If you don’t have Kafka running already, you can get started in four steps:

  1. Download Kafka and unpack it
  2. Start Zookeeper: bin/zookeeper-server-start.sh config/zookeeper.properties
  3. Start the Kafka broker: bin/kafka-server-start.sh config/server.properties
  4. Create the topic we’ll use for blog posts: bin/kafka-topics.sh --create --topic blog-posts --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1

Then, the Logstash config depends on how we expect messages to be formatted. If they’re in JSON, we can use the JSON codec to parse:

input {  
  kafka {  
    # host and port of a Kafka broker (with our container, localhost:9092)  
    bootstrap_servers => "localhost:9092"  
    # the topic created earlier  
    topics => ["blog-posts"]  
    # codec to parse messages  
    codec => "json"  
  }  
}

Then we remove unnecessary fields (as usual) and send resulting documents to Vespa:

filter {  
  mutate {  
    remove_field => ["@timestamp", "@version", "event"]  
  }  
}  
output {  
    vespa_feed {  
        # replace with your Vespa endpoint and include mTLS certificates if needed
        vespa_url => "http://localhost:8080"

        document_type => "post"  
        namespace => "post"  
    }  
}

Now if we write a JSON-formatted blog post to Kafka, it will go to Vespa:

echo '{"id":1, "title": "first blog", "content": "kakfa post one"}' | bin/kafka-console-producer.sh --topic blog-posts --bootstrap-server localhost:9092

Architecture Considerations

Logstash can commit offsets to Kafka when it writes messages to its own queue, or at a specific interval (the default). Because Logstash pipelines can be arbitrarily complex, it can’t commit offsets when messages get to Vespa. This implies:

  • In case of a Logstash crash, you might lose data from its memory queue that was already committed from Kafka.
  • Which means you should use persistent queues. But even there, the size should be small, otherwise Logstash can move the Kafka offset too much just because it wrote to its own queue, not because it wrote to Vespa. If the queue is full, inputs block (i.e. stop consuming from Kafka) until there’s space in it again (i.e. data is written to Vespa).

If you simply want to connect Kafka with Vespa, without further processing, you’ll be better off with Kafka Connect Vespa, which doesn’t have a queue and therefore avoids these challenges. But queues are helpful if you want additional processing, like adding timestamps, changing field names or contents, or other business logic. This is where Logstash shines.

Dealing with Failed Documents

Logstash has another advantage compared to many others: the Vespa Output plugin supports dead letter queues. So your outputs in production should look like this:

vespa_feed {  
        # replace with your Vespa endpoint and include mTLS certificates if needed
        vespa_url => "http://localhost:8080"

        document_type => "post"  
        namespace => "post"

        # turn on dead letter queue  
        enable_dlq => true  
        # path to the dead letter queue  
        dlq_path => "/path/to/dead-letter-queue"  
    }

Now, if messages can’t get to Vespa for whatever reason - schema doesn’t match, Vespa is down, exceeded retries, etc. - data will not be lost. You can read failed documents with Logstash’s dead letter queue input, change them if needed, and send them again.

Speaking of Logstash’s advantages, did you know it can also read data from Vespa? We’ll look into it next.

Recipe 4 of 5: Migrating Data to a New Vespa Cluster

Say you want to migrate your local Vespa to Vespa Cloud. How would you migrate both your data and your application?

For the application, you have a guide here, but let’s list the 5 steps so you don’t context-switch:

  1. Sign up for Vespa Cloud and follow the steps to create a new application.
  2. Point Vespa CLI to the new application:
    vespa config set target cloud  
    # names are in your Vespa Cloud dashboard:  
    vespa config set application YOUR_TENANT_NAME.YOUR_APPLICATION_NAME 
    
  3. Authenticate and generate the certificates for mTLS API access:
    cd /path/to/your/application/package  
    vespa auth login # acknowledge from a browser  
    vespa auth cert # check the output for the path of certificates
    
  4. Make sure services.xml requires mTLS client authentication. You should uncomment this bit from container -> clients:
    <client id="mtls" permissions="read,write">  
    <certificate file="security/clients.pem"/>  
    </client>
    
  5. Deploy your application:
    vespa deploy
    

Now you should have the same application in Vespa Cloud as the one you have locally. You can query it to check that it works, but there’s no data:

vespa query "select * from sources * where true"

Let’s use Logstash to migrate data. First, we need the Vespa Input Plugin:

bin/logstash-plugin install logstash-input-vespa

To read all data, point the plugin to your Vespa endpoint and the cluster name from services.xml:

input {  
  vespa {  
    vespa_url => "http://localhost:8080"  
    cluster => "blog"  
  }  
}

Documents will return in a fields element. Move all fields to the root document before sending them over to Vespa Cloud:

filter {  
  # move the document from "fields" to the root  
  ruby {  
    code => '  
      fields = event.get("fields")  
      if fields  
        fields.each {|k, v| event.set(k, v) }  
        event.remove("fields")  
      end  
    '  
  }  
  # remove unneeded fields  
  mutate {  
    remove_field => ["@timestamp", "@version"]  
  }  
}

Finally, send data to Vespa Cloud. You’ll need the endpoint of your application (click on your application name from the Vespa Cloud dashboard) and the path to the certificate and key generated with vespa auth cert (check the output of that command):

output {  
    vespa_feed {  
        # copy the mTLS endpoint from Vespa Cloud, in the page of your application  
        vespa_url => "https://VESPA_ENDPOINT_GOES_HERE:8080"  
        document_type => "post"  
        namespace => "post"

        # certificates generated via `vespa auth cert`  
        client_cert => "/YOUR_HOME_DIRECTORY/.vespa/YOUR_TENANT_NAME.YOUR_APPLICATION_NAME.default/data-plane-public-cert.pem"  
        client_key => "/YOUR_HOME_DIRECTORY/.vespa/YOUR_TENANT_NAME.YOUR_APPLICATION_NAME.default/data-plane-private-key.pem"  
    }  
}

When you start Logstash, it will visit all the documents and send them to Vespa Cloud, exiting when done. Then, this query should return documents:

vespa query "select * from sources * where true"

You’d use a similar procedure to move data from a different data store to Vespa. The next recipe is with Elasticsearch: if you heard of Logstash before, it was probably in the context of Elasticsearch 🙂

Recipe 5 of 5: Migrating from Elasticsearch to Vespa

Say you have Elasticsearch running (if you don’t know how but have Docker/Podman, try this docker-compose gist) and two sample blog posts:

PUT blog/_doc/1  
{  
  "title": "First",  
  "content": "First blog post from ES"  
}

PUT blog/_doc/2  
{  
  "title": "Second",  
  "content": "Second blog post from ES"  
}

To transfer them to Vespa, we’ll first use the Elasticsearch input plugin:

input {  
  elasticsearch {  
    hosts => ["http://localhost:9200"]  
    index => "blog"  
    docinfo => true  
    docinfo_fields => ["_id"]  
    docinfo_target => "@metadata"  
  }  
}

In this particular case, the ID isn’t part of the document itself. It’s the “docinfo” field _id. We’ll put _id into the (default) @metadata field of our Logstash event. This @metadata field is ignored by default by output plugins. We’ll put it in the main document and remove @timestamp and @version:

filter {  
  # move the id from @metadata to the root  
  mutate {  
    add_field => { "id" => "%{[@metadata][_id]}" }  
  }  
  # remove unneeded fields  
  mutate {  
    remove_field => ["@timestamp", "@version"]  
  }  
}

In the end, we write everything to Vespa:

output {  
    vespa_feed {  
        # replace with your Vespa endpoint and include mTLS certificates if needed
        vespa_url => "http://localhost:8080"
        
        document_type => "post"  
        namespace => "post"  
    }  
}

Migrating data from Elasticsearch to Vespa is quite easy, but migrating schema and queries might not be. But watch this space: guides on how to do that are coming soon 🙂

Read more

Vespa Terminology for Elasticsearch, OpenSearch or Solr People

Glossary of Vespa concepts, translated for engineers familiar with Lucene search engines: Elasticsearch, OpenSearch or Solr

Free Trial

Deploy your application for free. Get started now to get $300 in free credits. No credit card required!

Elasticsearch vs Vespa Benchmark Report

Curious about how Vespa stacks up against Elasticsearch?