Transactions
Transactions are how you modify data within Crux.
They are comprised of a series of Transaction Operations to be performed.
If the transaction contains pre-conditions, all pre-conditions must pass, or the entire transaction is aborted. This happens at the query node during indexing, and not when submitting the transaction.
A transaction is performed by calling .submitTx
on an ICruxIngestAPI
with a Transaction
object.
The Transaction
object can be created by either using Transaction.Builder
directly or using a Consumer
.
If using the Consumer
approach, we recommend importing crux.api.tx.Transaction.buildTx
statically for brevity.
import static crux.api.tx.Transaction.buildTx;
Transaction.builder()
.put(document)
.build();
buildTx(tx -> {
tx.put(document);
});
// To run the transaction:
node.submitTx(transaction);
// To run a transaction directly:
node.submitTx(buildTx(tx -> {
tx.put(document);
}));
A transaction is performed by calling crux.db/submit-tx
on a node with a list of transaction operations.
(crux/submit-tx node
[
;; Operations
])
Operations
There are five transaction (write) operations:
Operation | Purpose | Pre-condition? |
---|---|---|
Write a version of a document |
||
Deletes a specific document |
||
Check the document state against the given document |
✓ |
|
Evicts a document entirely, including all historical versions |
||
Runs a transaction function |
✓ |
You can add individual operations to the Transaction.Builder
instance with their respective methods.
Transaction Operations are vectors which have their associated keyword as their first value.
Put
Puts a Document into Crux.
If a document already exists with the same id
, a new version of this document will be created at the supplied valid time
.
See Valid Time for more details.
node.submitTx(buildTx(tx -> {
tx.put(document1); (1)
tx.put(document2, validTime1); (2)
tx.put(document3, validTime2, endValidTime); (3)
}));
1 | Putting a document as of now. |
2 | Putting a document with a specific valid time |
3 | Putting a document with a valid time and end valid time |
(crux/submit-tx node [[:crux.tx/put
{:crux.db/id :dbpedia.resource/Pablo-Picasso :first-name :Pablo} (1)
#inst "2018-05-18T09:20:27.966-00:00" (2)
#inst "2018-05-19T08:31:15.966-00:00"]] ) (3)
1 | Document to add |
2 | (optional) valid time |
3 | (optional) end valid time |
Delete
Deletes a Document . See Valid Time for details on how this interacts with multiple versions of the document.
node.submitTx(buildTx(tx -> {
tx.delete(documentId1); (1)
tx.delete(documentId2, validTime1); (2)
tx.delete(documentId3, validTime2, endValidTime); (3)
}));
1 | Deletes as of now |
2 | Deleting with a specific valid time |
3 | Deleting with a valid time and end valid time |
(crux/submit-tx node [[:crux.tx/delete
:dbpedia.resource/Pablo-Picasso (1)
#inst "2018-05-18T09:20:27.966-00:00" (2)
#inst "2018-05-19T08:31:15.966-00:00"]]) (3)
1 | Document ID to be deleted |
2 | (optional) valid time |
3 | (optional) end valid time |
Match
Match checks the state of an entity - if the entity doesn’t match the provided document, the transaction will not continue.
node.submitTx(buildTx(tx -> {
tx.match(document1); (1)
tx.match(document2, validTime1); (2)
tx.matchNotExists(documentId1); (3)
tx.matchNotExists(documentId2, validTime2); (4)
tx.put(document3); (5)
}));
1 | Passes if document1 is exactly present now |
2 | Passes if document2 is exactly present at validTime1 |
3 | Passes if there is no document with the id documentId1 present now |
4 | Passes if there is no document with the id documentId2 present at validTime2 |
5 | Operation(s) to apply if all preconditions are met |
(crux/submit-tx node [[:crux.tx/match
:dbpedia.resource/Pablo-Picasso (1)
{:crux.db/id :dbpedia.resource/Pablo-Picasso :first-name :Pablo} (2)
#inst "2018-05-18T09:21:31.846-00:00"] (3)
[:crux.tx/delete :dbpedia.resource/Pablo-Picasso]]) (4)
1 | ID to be matched (for an entity which may or may not exist) |
2 | A specific document revision (or nil ) |
3 | (optional) valid time |
4 | Operation(s) to perform if the document is matched |
If the document supplied is nil
, the match only passes if there does not exist a Document with the given ID.
Evict
Evicts a document from Crux. Historical versions of the document will no longer be available.
node.submitTx(buildTx(tx -> {
tx.evict(documentId);
}));
(crux/submit-tx node [[:crux.tx/evict :dbpedia.resource/Pablo-Picasso]])
Evict is primarily used for GDPR Right to Erasure compliance.
It is important to note that Evict is the only operation which will have effects on the results returned when querying against an earlier Transaction Time.
Transaction Functions
Transaction functions are user-supplied functions that run on the individual Crux nodes when a transaction is being ingested.
Transaction functions can be used, for example, to safely check the current database state before applying a transaction, for integrity checks, or to patch an entity.
Anatomy
(fn [ctx eid] (1)
(let [db (crux.api/db ctx) (2)
entity (crux.api/entity db eid)]
[[:crux.tx/put (update entity :age inc)]])) (3)
1 | Transaction functions are passed a context parameter and any number of other parameters. |
2 | The context parameter can be used to obtain a database value using db or open-db . |
3 | Transaction functions should return a list of transaction operations or false |
If a list of transaction operations is returned, these are indexed as part of the transaction.
If false
is returned, or an exception is thrown, the whole transaction will roll back.
Creating / Updating
Transaction functions are created/updated by submitting a document to Crux with the desired function.
You create a function document with CruxDocument.createFunction
.
It takes the ID for the function as well as a string consisting of the Clojure function to run.
TransactionInstant ti = node.submitTx(buildTx(tx -> {
tx.put(CruxDocument.createFunction("incAge",
"(fn [ctx eid] (let [db (crux.api/db ctx) entity (crux.api/entity db eid)] [[:crux.tx/put (update entity :age inc)]]))"));
}));
The document should use the :crux.db/fn
key.
(crux/submit-tx node [[:crux.tx/put {:crux.db/id :increment-age
:crux.db/fn '(fn [ctx eid] (1)
(let [db (crux.api/db ctx)
entity (crux.api/entity db eid)]
[[:crux.tx/put (update entity :age inc)]]))}]])
1 | Note that the function itself is quoted |
Documents
A CruxDocument
is created with an ID that must be of a valid type.
The instance itself is immutable and plusing/minusing data yields a new instance of CruxDocument
.
Similarly to Transactions, you can use CruxDocument.Builder
directly or use the Consumer
approach.
CruxDocument.builder("pablo-picasso")
.put("name", "Pablo")
.put("lastName", "Picasso")
.build();
build("pablo-picasso", doc -> {
doc.put("name", "Pablo");
doc.put("lastName", "Picasso");
});
// You can also chain creating new instances of the document,
// but this will be slow for larger documents.
CruxDocument.create("pablo-picasso")
.plus("name", "Pablo")
.plus("lastName", "Picasso");
A document is a map from keywords to values.
{:crux.db/id :dbpedia.resource/Pablo-Picasso
:name "Pablo"
:last-name "Picasso"}
All documents must contain the :crux.db/id
key.
Persistence of Clojure metadata is not supported (although is not currently rejected either) - we strongly recommended that all metadata should be removed from each document prior to submission to Crux, to avoid potential equality checking issues (see here for more details).
For operations containing documents, the id and the document are
hashed, and the operation and hash is submitted to the tx-topic
in
the event log. The document itself is submitted to the doc-topic
,
using its content hash as key. In Kafka, the doc-topic
is compacted,
which enables later eviction of documents.
Valid IDs
The following types of document IDs are allowed:
Type | Example |
---|---|
Keyword |
|
String |
|
Long |
|
UUID |
|
URI |
|
URL |
|
IPersistentMap |
|
The following types of :crux.db/id
are allowed:
Type | Example |
---|---|
Keyword |
|
String |
|
Integers/Longs |
|
UUID |
|
URI |
|
URL |
|
Maps |
|
The #crux/id
reader literal will take URI/URL strings and attempt to coerce them into valid IDs.
URIs and URLs are interpreted using Java classes (java.net.URI and java.net.URL respectively) and therefore you can also use these directly.
Valid Times
When an optional valid time
is omitted from a transaction operation, the Transaction Time will be used as valid time
.
Awaiting Transactions
After a transaction is submitted, it needs to be indexed before it is visible in Crux DB snapshots.
The return value from submitting the transaction can be used to wait for the transaction to be indexed.
This return value holds both the ID of the transaction, and the Transaction Time
In Java, you receive a TransactionInstant
from the submitTx
call.
TransactionInstant ti = node.submitTx(buildTx(tx -> {
tx.put(CruxDocument.create("Ivan"));
}));
// This will be null because the transaction won't have been indexed yet
assertNull(node.db().entity("Ivan"));
// Here we will wait until it has been indexed
node.awaitTx(ti, Duration.ofSeconds(5));
// And now our new document will be in the DB snapshot
assertNotNull(node.db().entity("Ivan"));
In Clojure, you receive a map from submit-tx
containing :crux.tx/tx-id
and :crux.tx/tx-time
(let [tx (crux/submit-tx node [[:crux.tx/put {:crux.db/id :ivan}]])]
;; The transaction won't have indexed yet so :ivan won't exist in a snapshot
(crux/entity (crux/db node) :ivan) ;; => nil
;; Wait for the transaction to be indexed
(crux/await-tx node tx)
;; Now :ivan will exist in a snapshot
(crux/entity (crux/db node) :ivan)) ;; => {:crux.db/id :ivan}
Speculative transactions
You can submit speculative transactions to Crux, to see what the results of your queries would be if a new transaction were to be applied. This is particularly useful for forecasting/projections or further integrity checks, without persisting the changes or affecting other users of the database.
You’ll receive a new database value, against which you can make queries and entity requests as you would any normal database value. Only you will see the effect of these transactions - they’re not submitted to the cluster, and they’re not visible to any other database value in your application.
You submit these transactions to an instance of ICruxDatasource using withTx
:
TransactionInstant ti = node.submitTx(buildTx(tx -> {
tx.put(CruxDocument.create("Ivan"));
}));
awaitTx(node, ti);
ICruxDatasource db = node.db();
assertNotNull(db.entity("Ivan"));
assertNull(db.entity("Petr"));
ICruxDatasource speculativeDb = db.withTx(buildTx(tx -> {
tx.put(CruxDocument.create("Petr"));
}));
// Petr is in our speculative db
assertNotNull(speculativeDb.entity("Ivan"));
assertNotNull(speculativeDb.entity("Petr"));
// We haven't impacted our original db
assertNotNull(db.entity("Ivan"));
assertNull(db.entity("Petr"));
// Nor have we impacted our node
assertNotNull(node.db().entity("Ivan"));
assertNull(node.db().entity("Petr"));
You submit these transactions to a database value using with-tx
:
(let [real-tx (crux/submit-tx node [[:crux.tx/put {:crux.db/id :ivan, :name "Ivan"}]])
_ (crux/await-tx node real-tx)
all-names '{:find [?name], :where [[?e :name ?name]]}
db (crux/db node)]
(crux/q db all-names) ; => #{["Ivan"]}
(let [speculative-db (crux/with-tx db
[[:crux.tx/put {:crux.db/id :petr, :name "Petr"}]])]
(crux/q speculative-db all-names) ; => #{["Petr"] ["Ivan"]}
)
;; we haven't impacted the original db value, nor the node
(crux/q db all-names) ; => #{["Ivan"]}
(crux/q (crux/db node) all-names) ; => #{["Ivan"]}
)
The entities submitted by the speculative Put take their valid time (if not explicitly specified) from the valid time of the db
from which they were forked.