Channable Delta API
The Channable Delta API allows you to send item updates to Channable at any time, as a complement to our importers that download item data at scheduled times. Updates consist of deltas that can create, modify, or delete items in your project.
How deltas work
This section gives an overview of the modifications you can make, how and when they are applied, and when they will disappear.
Channable will periodically import your feed, as configured in the Setup → Setup import screen and the Setup → Schedule settings screen. The API can be used to subsequently modify this feed, without Channable having to download the feed again.
Each item in the feed has a unique item identifier that you choose, as configured in the Setup → Project settings screen. Each item also has fields with particular values, such as color or price. The API can be used to add new items to the feed, modify existing items by their unique item identifier, and delete existing items by their unique item identifier.
To adjust the feed of your project, you can submit deltas by calling the API. An API call includes a batch of one or more deltas. Each batch is accompanied by a unique batch identifier that can be used to later poll the status of the processing of the batch. The status of a batch also includes processing errors. Batch identifiers only need to be unique per Channable project. Batches in different projects may have the same batch identifier.
Delta application
Projects in Channable are scheduled to run periodically. A run proceeds in the following phases:
- Download, where the project’s imports download the latest available data. For file-based importers, this means that we download a feed from your server.
- Import, where we join or append the data from all imports in the project into a single view of your items.
- Export, where we export feeds, generate ads, or push updates to marketplaces.
Deltas are applied between the Import phase and the Export phase, or when the project is idle. You can continuously push updates to the Delta API. These deltas will be applied shortly after submission when the project is not running, i.e. not currently in Download, Import or Export phase. In cases where the project is running, the application of deltas will be delayed until either at the end of the Import phase, or once the project is idle again.
Note that an import might render deltas obsolete. In this case, deltas submitted
right before a feed download would be rejected. To prevent this from happening,
it is recommended that your server serves an ETag
header and respects
If-None-Match
. With those headers, we will not download the
feed again for the next 10 days, as long as the file on the server does not
change. Besides your server sending those headers, this also needs to be enabled
in Channable. To enable this feature, check the “optimize bandwidth” box in the
import settings. See also the help center article about this
feature.
Keep in mind that although deltas will allow you to skip importing all items every run, exporting feeds still requires exporting all items, so the export phase may be the limiting factor in how frequently your project can run.
Delta ordering
It is possible that a delta arrives during a feed download. In this case there is an ambiguity about whether the delta should be applied to the feed currently being downloaded, or whether the feed currently being downloaded should override the changes made by the delta.
It is also possible for two batches of deltas to arrive simultaneously, or in a different order than they were sent. For example, packet loss might cause an http request to be delayed. In this case there is an ambiguity about which deltas should be applied first.
To resolve such ambiguities, each delta must include a timestamp. A delta is eligible to be applied to a feed, when the timestamp of the delta is greater than the timestamp associated with the feed. This includes deltas whose timestamp lies in the future at the time of the import run. During the import run, we process all eligible deltas. We process deltas in order of ascending timestamp, regardless of the order in which we received them.
We determine the timestamp associated with the feed from the request made during the most recent download. The timestamp is decided as follows:
- The value of the
Last-Modified
http response header, if present. - Otherwise, the value of the
Date
http response header minus theAge
header, if both are present. - Otherwise, the value of the
Date
http response header. - Otherwise, the instant at which we started importing your feed.
We recommend that you use the time at which a change occurred as the timestamp for the delta that represents that change. The timestamp can differ among deltas in the same batch.
See the timeline in the appendix for a detailed example of how multiple deltas and feeds interact.
Retention
We store deltas batches for 14 days (336 hours) after we receive them. During this time the deltas will be applied, if they are eligible. You can query the status of the batch during those 14 days. After that, the batch expires, and the status endpoint will return 410 or 404 for the batch.
The effect of deltas that have been applied lasts until a more recent feed or delta supersedes it. The effect of a delta lasts even when the batch that contained the delta expires. However, if a delta was not applied within 14 days, then it will be lost after that period. A delta might not be applied because of an error (for example, it tries to update a non-existing item), or because it was already obsolete by the time we tried to apply it (for example, because a more recent feed download existed).
Nested fields
// Good update for nested field.
{
"shipping": [
{"country": "NL", "price": "4.99 EUR"},
{"country": "UK", "price": "4.99 GBP"}
]
}
// Bad update for nested field.
{ "shipping.price": ["4.99 EUR", "4.99 GBP"] }
Channable allows for nested fields. In Channable, these fields are displayed
with a dot in their name, such as shipping.price
. Keep in mind that when
updating nested fields with the API, they must be given as nested objects, not
as fields with a dot in their names. Because dots are used to refer to nested
fields in Channable, dots are currently not allowed in field names.
Updates overwrite the value of a top-level field. If you want to make a change to a nested object, you must send the entire new nested object, not just the nested field that changed. If you want to make a change to a list, you must send the entire new list.
For details about updating fields in general, see the API reference.
Status and maintenance
The health of the Delta API is reported at www.channablestatus.com. Upcoming maintenance is also announced there. To be notified of updates, subscribe to the atom feed or RSS feed.
Testing
The easiest way to test your API integration is to create a test project in Channable. Deltas are applied on a per-project basis, so you can safely push to a test project without affecting a main project. Pushing deltas to inactive projects is possible, but they will only be applied after you manually click “run now” on any of the project’s imports under “setup > setup import”.
Beware that “run now” will also trigger a feed download, which might supersede
the deltas. To prevent the deltas from being obsoleted by a more recent feed,
either serve the feed with appropriate ETag
s to skip the
download, or submit the deltas with a timestamp that is
later than the time at which you click “run now”.
There is no dedicated server, endpoint, or authentication token for testing, so you can be sure that if your code works in testing, it will behave the same in production.
Connecting
Address
Property | Value |
---|---|
API Version | 1.0 |
Protocol | https |
Hostname | api.channable.com |
Base URL | https://api.channable.com/delta/v1 |
Authentication
Authentication is done by including a bearer token with every request. To
authenticate, create a token in Channable under Your name → Channable API. To
each API call, add an HTTP header of the following form, substituting your token
for $TOKEN
:
Authorization: Bearer $TOKEN
Beware that only one token can exist for a company at a time.
Rate limits
HTTP/1.1 201 Created
Server: nginx
Date: Thu, 03 Sep 2020 16:24:02 GMT
Connection: close
Cache-Control: private
RateLimit-Remaining: 95
RateLimit-Restore-Rate-Hz: 2.0
An example response served after submitting a batch of deltas.
HTTP/1.1 429 Too Many Requests
Server: nginx
Date: Thu, 03 Sep 2020 16:24:02 GMT
Connection: keep-alive
Cache-Control: private
Retry-After: 1
X-Retry-After-Seconds: 1
RateLimit-Remaining: 0
RateLimit-Restore-Rate-Hz: 2.0
An example response served when you exceed the rate limit.
The API is subject to leaky bucket rate limiting with the parameters below. This means that you can make one request every 0.5 seconds, bursts of 100 requests every 50 seconds, or anything in between.
Sustained request rate | Bursts |
---|---|
2 requests per second | 100 requests |
The rate limit is enforced per company id, and every endpoint counts towards the limit. You can view your company id in Channable under Your name → Channable API.
When your request rate exceeds the limit, the API will respond with HTTP status
429 Too Many Requests. A 429 response does not itself count towards the rate
limit. Every 429 response includes a Retry-After
header with the number of
seconds to wait before making a new request.
Responses furthermore include a RateLimit-Remaining
header that indicates the
number of remaining calls. This number replenishes at the sustained request rate
of 2 Hz up to the maximum of 100. The request itself consumes one of those, so
the maximum observable value is 99. When the number of remaining calls is close
to zero, it is a good idea to slow down your request rate.
Finally, responses include a RateLimit-Restore-Rate-Hz
header that indicates
the sustained request rate in requests per second. Responses with a 5xx status
code might not include the RateLimit-Remaining
and RateLimit-Restore-Rate-Hz
header.
Robustly calling the API
When performing API calls, several things may go wrong. For instance, you may encounter a network problem, or the service may be temporarily unavailable. Because of the possibility for error, it is highly recommended that your API calls are subjected to retries and timeouts.
Delta submissions are idempotent: submitting the same batch twice has the same effect as submitting it just once, as long as both submissions include the same batch identifier. Therefore, it is safe to retry delta submissions in case something goes wrong, as long as the requests, including the request body, are identical.
For retries, we recommend using exponential backoff with jitter. Transient errors such a 503 response, should resolve within a minute or so. If you use Python, Opnieuw provides a convenient way to do so. Make sure that generating the batch identifier, and obtaining the current time (if your delta’s do not have an inherent timestamp) are outside of the scope of the retry.
API reference
Documentation for the API is also available in OpenAPI format. A machine-readable JSON Schema for the request and response bodies is available to the right. Use the tabs to switch between examples and schema.
Submit a batch of deltas
Create one or more new deltas, supplied in the request body.
A delta consists of an operation, a timestamp, and a (possibly partial) item, given as fields.
Each delta identifies items by their unique item identifier. The unique item identifier must be given as a field with each delta, like any other field. It must be a string value and cannot be nested. To configure which of the fields is the unique item identifier, go to Setup → Setup import in Channable.
There are three delta operations:
An insert delta creates a new item. If an item with the same unique item identifier already exists, an error is reported for this delta instead.
An update delta changes fields of an existing item. If no item with the same unique item identifier exists, an error is reported for this delta instead.
A delete delta deletes an item. If an item with the given unique item identifier does not exists, this delta has no effect. Fields other than the unique item identifier field are ignored.
Aforementioned “error reporting” happens through the batch status endpoint. An error being reported for one delta does not inhibit the success of other deltas in the same batch. The response status is independent of whether the deltas in the batch can be applied successfully. A 201 response means that we accepted the batch, but individual deltas may contain errors.
If something went wrong processing the request (such as a syntax error), the response body contains a human-readable description of what went wrong. Because deltas include a timestamp, submitting a batch is an idempotent operation, so it is safe to retry the request.
Request
// Example request body:
// Set the "stock" field for item with EAN 9780321751041 to 20.
[
{
"operation": "update",
"timestamp": "2020-09-15T13:37:53.678607+00:00",
"fields": {
"ean": "9780321751041",
"stock": "20"
}
}
]
// Example request body:
// Delete the item with EAN 9780321751041.
[
{
"operation": "delete",
"timestamp": "2020-09-15T13:37:53.678609Z",
"fields": {
"ean": "9780321751041"
}
}
]
// Example request body:
// Create the item with ASIN B002HJ377A.
[
{
"operation": "insert",
"timestamp": "2020-09-15T13:37:53.678609+00:00",
"fields": {
"asin": "B002HJ377A",
"model_number": "56158",
"title": "Three Wolf Moon Shirt",
"stock": "122",
"material": "cotton",
"available_sizes": [
"S",
"M",
"L",
"XL",
"XXL"
],
"price": "18.95 USD"
}
}
]
// Example request body:
// Update various fields of various items.
[
{
"operation": "update",
"timestamp": "2020-09-15T13:37:53+00:00",
"fields": {
"asin": "B002HJ377A",
"title": "Three Wolf Moon T-Shirt",
"price": "20.95 USD"
}
},
{
"operation": "update",
"timestamp": "2020-09-15T13:37:54+00:00",
"fields": {
"asin": "B07NFTJ8WF",
"stock": "0"
}
}
]
// JSON Schema for a request body.
// See also https://json-schema.org/draft/2019-09/json-schema-validation.html
{
"items": {
"properties": {
"fields": {
"type": "object"
},
"operation": {
"enum": [
"insert",
"update",
"delete"
]
},
"timestamp": {
"format": "date-time",
"type": "string"
}
},
"required": [
"operation",
"timestamp",
"fields"
],
"type": "object"
},
"type": "array"
}
Method | PUT |
Endpoint | /projects/{projectId}/batches/{batchId} |
Full URL | https://api.channable.com/delta/v1/projects/{projectId}/batches/{batchId} |
projectId parameter
Identifier of the Channable project to push to.
To find the id of a project in Channable, visit its dashboard page.
The URL of the dashboard page is of the form
https://app.channable.com/companies/{companyId}/projects/{projectId}/dashboard
.
The project id is an unsigned 32-bit integer.
batchId parameter
You must include a unique batch identifier in the URL, which you can use to query the status of the batch later. The batch identifier must be a UUID. A UUID is a sequence of 32 hexadecimal digits. The format is not case sensitive, and hyphens are optional. Hyphens should not be percent-encoded. Examples of valid UUIDs:
39c2941a-8124-4cae-9033-f5968e8bf405
39C2941A-8124-4CAE-9033-F5968E8BF405
39c2941a81244cae9033f5968e8bf405
When you submit a batch with a batch id that already exists, we ignore the submission and return 201 Created. This ensures that you can safely retry a request, as long as the batch id and request body remain unchanged.
Request body
The request body contains a single batch of deltas encoded as json array. The json should be encoded in UTF-8. The maximum size of the request body is 30 MB (30×106 bytes).
A single delta is a json object with the following keys:
operation: One of insert, update, delete.
timestamp: An ISO-8601-formatted timestamp that indicates when the change occurred. The timestamp must include a UTC offset, so it unambiguously refers to a single instant. The offset can be given in
+HH:MM
format, or asZ
suffix as a shorthand for+00:00
. Timestamps determine the order in which deltas get applied. See also the ordering section.fields: A json object that represents the item. Must include at least the field configured as unique item identifier.
Response
// JSON Schema for a 201 response body.
// See also https://json-schema.org/draft/2019-09/json-schema-validation.html
{
"type": "object"
}
// JSON Schema for a 400 response body.
// See also https://json-schema.org/draft/2019-09/json-schema-validation.html
{
"type": "object"
}
// JSON Schema for a 401 response body.
// See also https://json-schema.org/draft/2019-09/json-schema-validation.html
{
"type": "object"
}
// JSON Schema for a 404 response body.
// See also https://json-schema.org/draft/2019-09/json-schema-validation.html
{
"type": "object"
}
// JSON Schema for a 429 response body.
// See also https://json-schema.org/draft/2019-09/json-schema-validation.html
{
"type": "object"
}
201 Created: The batch has been created in response to this request, or in response to a previous request with the same batch identifier and deltas. Note that this does not mean that all deltas can be applied successfully. The status endpoint reports the status per delta.
400 Bad Request: There was a syntax error in the request body. None of the deltas have been stored. The response body includes a human-readable description of the error.
401 Unauthorized: Your request does not contain a valid
Authorization
header, or the included token cannot be used to access this endpoint. The body contains a human-readable error message. See also the authentication section.404 Not Found: You may not access this project, or there is no project with this project identifier.
409 Conflict: This code is returned in two cases:
- A batch with the same project id and batch id was uploaded before, but with different deltas. Please make sure to generate unique batch ids for each set of deltas sent to this endpoint.
- A concurrent request for this project id and batch id is already in progress, so this request is rejected. To ensure that the batch was received correctly, retry the request at a later time, and do not make concurrent requests for the same batch identifier. (Concurrent requests for different batches are fine.) See also the Robustly calling the API section.
The response body of the request can be used to differentiate between the two cases.
410 Gone: A batch for this project id and batch id was created before but expired, so this requests is rejected. Please do not reuse batch identifiers.
413 Request Entity Too Large: The request body exceeds the maximum size of 30×106 bytes.
429 Too Many Requests: Your request rate is too high. See also the rate limits section.
500 Internal Server Error: We encountered a problem on our side. Retry your request with exponential backoff. If the problem persists, please contact us.
503 Service Unavailable: The service is unavailable at this time. This can happen occasionally and should not last longer than a minute or so. Retry your request with exponential backoff. If the situation persists, check www.channablestatus.com.
List previously submitted batches
This endpoint lists all stored batches. The response includes the time the batch was received and the id of the batch. The batch identifiers are formatted with all lowercase characters and including dashes, regardless of the format in which they were originally submitted. Times are formatted as ISO-8601 timestamps including time zone offset.
Request
Method | GET |
Endpoint | /projects/{projectId}/batches |
Full URL | https://api.channable.com/delta/v1/projects/{projectId}/batches |
projectId parameter
Identifier of the Channable project.
See also projectId
in the submission endpoint.
Response
// Example 200 response body:
// A list with no batches
[]
// Example 200 response body:
// A list with one batch
[
{
"received": "2020-09-23T15:22:24.288513Z",
"batch_id": "5c7d422f-3fec-498d-8cf0-c6906adbe1d9"
}
]
// JSON Schema for a 200 response body.
// See also https://json-schema.org/draft/2019-09/json-schema-validation.html
{
"items": {
"properties": {
"batch_id": {
"format": "uuid",
"type": "string"
},
"received": {
"format": "date-time",
"type": "string"
}
},
"required": [
"received",
"batch_id"
],
"type": "object"
},
"type": "array"
}
// JSON Schema for a 401 response body.
// See also https://json-schema.org/draft/2019-09/json-schema-validation.html
{
"type": "object"
}
// JSON Schema for a 404 response body.
// See also https://json-schema.org/draft/2019-09/json-schema-validation.html
{
"type": "object"
}
// JSON Schema for a 429 response body.
// See also https://json-schema.org/draft/2019-09/json-schema-validation.html
{
"type": "object"
}
200 OK: The list of batches has been generated.
401 Unauthorized: Your request does not contain a valid
Authorization
header, or the included token cannot be used to access this endpoint. The body contains a human-readable error message. See also the authentication section.404 Not Found: You may not access this project, or there is no project with this project identifier.
429 Too Many Requests: Your request rate is too high. See also the rate limits section.
500 Internal Server Error: We encountered a problem on our side. Retry your request with exponential backoff. If the problem persists, please contact us.
503 Service Unavailable: The service is unavailable at this time. This can happen occasionally and should not last longer than a minute or so. Retry your request with exponential backoff. If the situation persists, check www.channablestatus.com.
Get the status of deltas in a batch
This endpoint returns a status report for a previously created batch of deltas. It returns a list with one element for each delta in the batch, in the order that they were submitted. For example, if you submitted a batch with 100 deltas, then the response will be a list of 100 statuses. The status at index 0 is the status for the first delta, etc.
The status of a batch remains available for 14 days (336 hours) after we received the batch. For older batches, this endpoint returns 404. See also Retention.
The status of a delta is pending until the first time that the import phase runs after receiving the delta. At that point, the status changes to either applied, obsolete or error. Deltas that are older than the latest base feed become obsolete, as they would never be applied. A delta that was previously applied successfully can still fail to apply in a later import run, and vice versa. For example, an insert delta may apply successfully at first, and fail to apply on top of a newer feed later, because the feed already contains an item with the same id.
A delta status is a json object with the following keys:
- status: One of pending, applied, obsolete, error.
- processed: The time at which the delta was last processed. Only included for applied, obsolete and error statuses.
- error: An object that contains the keys code, with a stable machine-readable error code, message, that holds a human-readable error message, and in some cases, item_id, which indicates the affected item. Only included for error statuses.
Please note that fields might be added to the status in the future. No fields will be removed without prior communication.
The following error codes exist:
Code | Message |
---|---|
internal_error |
Something went wrong. |
conflicting_types_in_array_field |
One of the fields in this delta is an array, and not all elements of the array have the same type. |
delta_payload_type_mismatch |
The field … was expected to have a value of the type as configured in the import settings, while a value of different type was encountered. |
dot_in_field_name |
One or more fields in this delta have a dot (.) in their names. |
empty_unique_item_id |
The unique item identifier (field ‘…’, as configured in the project settings) of this delta is empty. |
project_has_no_unique_id |
The project has no unique ID per item configured. |
fields_are_not_json_object |
The fields for this delta are not given as a JSON object. |
item_exists |
This delta would create an item, but this item already exists. |
item_does_not_exist |
This delta would update an item, but this item does not exists. |
missing_unique_item_id |
The unique item identifier (field ‘…’, as configured in the project settings) of this delta is missing. |
non_string_unique_item_id |
The unique item identifier (field ‘…’, as configured in the project settings) of this delta is not a string. |
null_value_for_field |
The values for one or more of the fields of this delta are given as null, instead of a non-null value. |
Request
Method | GET |
Endpoint | /projects/{projectId}/batches/{batchId}/status |
Full URL | https://api.channable.com/delta/v1/projects/{projectId}/batches/{batchId}/status |
projectId parameter
Identifier of the Channable project.
See also projectId
in the submission endpoint.
batchId parameter
The batch identifier.
This must match the id that you included in the
request that created the batch. See also batchId
in the submission endpoint.
Response
// Example 200 response body:
// A batch with a two deltas that are pending application
[
{
"status": "pending"
},
{
"status": "pending"
}
]
// Example 200 response body:
// A batch with a single delta that has been applied
[
{
"status": "applied",
"processed": "2020-10-01T13:31:29.944515Z"
}
]
// Example 200 response body:
// A batch with a single delta that could not be applied
[
{
"status": "error",
"processed": "2020-10-01T13:31:29.944515Z",
"error": {
"code": "item_does_not_exist",
"item_id": "1",
"message": "This delta would update an item, but this item does not exist."
}
}
]
// JSON Schema for a 200 response body.
// See also https://json-schema.org/draft/2019-09/json-schema-validation.html
{
"items": {
"properties": {
"error": {
"properties": {
"code": {
"type": "string"
},
"message": {
"type": "string"
}
},
"required": [
"code",
"message"
],
"type": "object"
},
"processed": {
"format": "date-time",
"type": "string"
},
"status": {
"enum": [
"pending",
"applied",
"obsolete",
"error"
]
}
},
"required": [
"status"
],
"type": "object"
},
"type": "array"
}
// JSON Schema for a 401 response body.
// See also https://json-schema.org/draft/2019-09/json-schema-validation.html
{
"type": "object"
}
// JSON Schema for a 404 response body.
// See also https://json-schema.org/draft/2019-09/json-schema-validation.html
{
"type": "object"
}
// JSON Schema for a 409 response body.
// See also https://json-schema.org/draft/2019-09/json-schema-validation.html
{
"type": "object"
}
// JSON Schema for a 410 response body.
// See also https://json-schema.org/draft/2019-09/json-schema-validation.html
{
"type": "object"
}
// JSON Schema for a 429 response body.
// See also https://json-schema.org/draft/2019-09/json-schema-validation.html
{
"type": "object"
}
200 OK: The status report has been generated.
401 Unauthorized: Your request does not contain a valid
Authorization
header, or the included token cannot be used to access this endpoint. The body contains a human-readable error message. See also the authentication section.404 Not Found: You may not access this project, or there is no project with this project identifier, or there is no batch with this batch identifier, or the batch with this batch identifier has expired.
409 Conflict: This batch is still being uploaded, so the status is not known yet. Please check back after the PUT request completes. See also the Robustly calling the API section.
410 Gone: A batch for this project id and batch id was created before but expired, so this requests is rejected. When a batch expires we no longer track the individual deltas from the batch.
429 Too Many Requests: Your request rate is too high. See also the rate limits section.
500 Internal Server Error: We encountered a problem on our side. Retry your request with exponential backoff. If the problem persists, please contact us.
503 Service Unavailable: The service is unavailable at this time. This can happen occasionally and should not last longer than a minute or so. Retry your request with exponential backoff. If the situation persists, check www.channablestatus.com.
Appendix
Example timeline
// Feed retrieved at D1 (a tsv file):
"item_id" "stock" "price"
"1" "12" "5.99 EUR"
// Feed retrieved at D2 (a tsv file):
"item_id" "stock" "price"
"1" "5" "5.99 EUR"
// Delta fields received at B1:
{"item_id": "1", "stock": "11"}
// Delta fields received at B2:
{"item_id": "1", "stock": "10"}
// Delta fields received at B3:
{"item_id": "1", "stock": "9"}
Below is an example timeline that demonstrates how feed downloads and multiple
delta batches interact. In this example we focus on the stock
field of a
single item. We download a feed from the same url at two different times,
labeled D1 and D2. Three times we receive a batch of deltas, labeled B1, B2,
and B3. The labels are to clarify the graph, they do not correspond to
identifiers used in the API. Let’s say each batch in this example only contains
a single delta: an update for the stock
field of the single item we focus on.
Let’s go over this example, one step at a time, starting with download D1 and batch B1.
- At t0, we start downloading a feed (D1). The
stock
field of the item is set to 12 in this feed. All items encountered in the feed will be dated to t0, hence thestock: 12
value at that time. - At t1, we finish receiving delta batch B1, which contains
the update
stock: 11
, timestamped before t1, but after t0. The status of deltas in B1 is now pending: we received the batch, but none of the deltas have been applied yet. - At t2, we finish the download, and start the import phase (P1).
The import sees both the
stock: 12
from D1, and thestock: 11
from B1, and applies thestock: 11
delta on top of thestock: 12
from the feed. - At t3, the import finishes. The result
stock: 11
is now visible in the item overview in Channable, and will be used in subsequent export runs. The status of the deltas in B1 changes from pending to applied.
After this, your stock decrements from 11 to 10 to 9, so you send updates
stock: 10
in batch B2, and stock: 9
in batch B3. However, pushing B2 fails
at first, perhaps due to a network problem, so you retry the request, and it
comes in after B3.
- At t4, we finish receiving B3, and the
stock: 9
update in it. The status of deltas in B3 is now pending. - At t5, we finish receiving B2. The
stock: 10
update in there was timestamped with an earlier time than thestock: 9
update from B3. The status of deltas in B2 is now pending. - At t5 we start a new import run (P2), which applies the deltas
from B1, B2, and B3 to the feed downloaded at D1. The
stock: 9
delta has the most recent timestamp, so this value is used. - At t6 the import finishes. The status of deltas in batches B2 and B3 changes from pending to applied.
Finally, we download and import the feed again:
- At t7 we initiate download D2, so all items in that feed will be
dated to t7. This feed contains
stock: 5
. - At t8 we finish download D2 and start import P3. None of the
deltas in B1, B2, and B3, are eligible to be applied, because their timestamp
is less than t7. Therefore, when the import finishes at
t9,
stock: 5
is the result.
Python example
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from typing import Any, Dict, List
from uuid import UUID
import requests
import opnieuw
class Operation(Enum):
insert = 'insert'
update = 'update'
delete = 'delete'
@dataclass(frozen=True)
class Delta:
operation: Operation
timestamp: datetime
fields: Dict[str, Any]
def __post_init__(self) -> None:
# A Python peculiarity is that 'datetime' can represent both an instant,
# or a local datetime. We only want instants, not ambiguous local times.
assert self.timestamp.tzinfo is not None
def as_json(self) -> Dict[str, Any]:
return {
'operation': self.operation.value,
'timestamp': self.timestamp.isoformat(),
'fields': self.fields,
}
@dataclass
class Limiter:
"""
A limiter for use with leaky bucket rate limiting.
The limiter ensures that we never exceed the rate limit when all calls go
through this limiter. When there are other external systems making calls,
the limiter is still effective, but it it cannot guarantee that it will not
hit a rate limit any more.
"""
# The reset rate (equal to the sustained request rate), in Hertz.
leak_rate_hz: float
# Time at which we made the previous call, as reported by a monotonic clock,
# in seconds since some arbitrary epoch.
previous_call_second: Optional[float] = None
# The number of remaining calls after making the previous call.
previous_call_calls_left: Optional[int] = None
def get_wait_time_at(self, now_second: float) -> timedelta:
"""
Return how long to wait before making the next call.
"""
assert self.leak_rate_hz > 0.0, 'Leak rate must be positive.'
previous_call_second = self.previous_call_second
if previous_call_second is None:
previous_call_second = now_second
calls_left = self.previous_call_calls_left
if calls_left is None:
calls_left = 0
seconds_elapsed = now_second - previous_call_second
calls_left += self.leak_rate_hz * seconds_elapsed
# When we have 0 calls left, we would need to wait 1/leak_rate to have
# gained one call. When we have more calls left, we also wait, but a bit
# less. When we have 1 call left, wait for half the time, when we have 9
# calls left, wait for 10%, etc. The closer we get to zero, the harder
# we slam on the brakes.
wait_seconds = (1.0 / leak_rate_hz) / (1.0 + calls_left or 0)
return timedelta(seconds=wait_seconds)
def sleep_before_call(self) -> None:
"""
Sleep before making the next call to ensure we don't hit the rate limit.
"""
now_second = time.monotonic()
delay = self.get_wait_time_at(now_second)
self.previous_call_second = now_second
time.sleep(delay.total_seconds)
@dataclass
class DeltaApi:
auth_token: str
project_id: int
# This is the leaky bucket sustained request rate parameter.
self.rate_limit_rate_hz: float = 0.5
# A bit of state required to compute the wait time.
self._last_call_second: float = time.monotonic()
self._rate_limit_calls_remaining: int = 100
def wait_before_call(self) -> None:
"""
Sleep to avoid exceeding the rate limit. Sleep a very short time when we
are far away from the limit (about 20ms, with a burst of 100 and rate of
0.5 Hz), and sleep enough to avoid a 429 when we reached the limit.
"""
now_second = time.monotonic()
seconds_since_last_call = now_second - self._last_call_second
calls_remaining = (
self._rate_limit_calls_remaining +
self.rate_limit_rate_hz * seconds_since_last_call
)
sleep_seconds = (1.0 / self.rate_limit_rate_hz) / max(1.0, calls_remaining)
time.sleep(sleep_seconds)
self._last_call_second = now_second
@opnieuw.retry(
max_calls_total=6,
retry_window_after_first_call_in_seconds=120,
)
def submit_batch(
self,
batch_id: UUID,
deltas: List[Delta],
) -> Result:
self.wait_before_call()
base_url = 'https://api.channable.com/delta/v1'
connect_timeout_seconds = 5.0
read_timeout_seconds = 30.0
headers = {
'Authorization': f'Bearer {self.auth_token}',
}
response = requests.put(
f'{base_url}/projects/{project_id}/batches/{batch_id}',
timeout=(connect_timeout_seconds, read_timeout_second),
headers=headers,
json=[delta.as_json() for delta in deltas],
)
# TODO: Inspect status code, raise error.
# TODO: Store calls remaining.
To the right is a Python example that calls the Delta API, taking into account rate limiting and error handling.
Changelog
- 2022-01-30: The
batches/{batchId}/status
endpoint now returns anitem_id
for all errors of typeitem_exists
anditem_does_not_exist
. This field holds the affected item id. This should help with debugging. - 2020-12-03: The
batches/{batchId}/status
endpoint now returns a json array, no longer wrapped in an object. This brings the behavior in line with the documentation.