feat: Re-wrote the worker in Rust
Build Docker images / worker-amd64 (push) Successful in 3m39s
Build Docker images / static-amd64 (push) Successful in 1m10s
Build Docker images / django-arm64 (push) Successful in 2m59s
Build Docker images / worker-arm64 (push) Successful in 2m26s
Build Docker images / static-arm64 (push) Successful in 32s
Build Docker images / django-amd64 (push) Successful in 13m12s
pytest / acls (push) Successful in 3m30s

... this was mainly for fun, buuuut, that will stay like that~

Signed-off-by: prettysunflower <me@prettysunflower.moe>
This commit is contained in:
2025-10-11 17:59:06 -04:00
parent 2bac9a2533
commit 9ac391cd53
15 changed files with 2909 additions and 190 deletions
+1 -5
View File
@@ -38,11 +38,7 @@ def setup(request):
kakigoori_worker = DockerContainer(
str(docker_image),
env={
"RABBITMQ_HOST": rabbitmq_host,
"RABBITMQ_PORT": rabbitmq.port,
"RABBITMQ_USER": rabbitmq.username,
"RABBITMQ_PASSWORD": rabbitmq.password,
"RABBITMQ_VHOST": rabbitmq.vhost,
"RABBITMQ_ADDRESS": f"amqp://{rabbitmq_host}:{rabbitmq.port}/{rabbitmq.vhost}",
},
network=network,
)
+1
View File
@@ -0,0 +1 @@
target/
+8
View File
@@ -0,0 +1,8 @@
# Default ignored files
/shelf/
/workspace.xml
# Editor-based HTTP Client requests
/httpRequests/
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml
+8
View File
@@ -0,0 +1,8 @@
<component name="ProjectDictionaryState">
<dictionary name="project">
<words>
<w>avif</w>
<w>kakigoori</w>
</words>
</dictionary>
</component>
+8
View File
@@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/worker.iml" filepath="$PROJECT_DIR$/.idea/worker.iml" />
</modules>
</component>
</project>
+6
View File
@@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$/.." vcs="Git" />
</component>
</project>
+11
View File
@@ -0,0 +1,11 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="EMPTY_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$">
<sourceFolder url="file://$MODULE_DIR$/src" isTestSource="false" />
<excludeFolder url="file://$MODULE_DIR$/target" />
</content>
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>
+2584
View File
File diff suppressed because it is too large Load Diff
+13
View File
@@ -0,0 +1,13 @@
[package]
name = "kakigoori-worker"
version = "0.1.0"
edition = "2024"
[dependencies]
lapin = "4.0.0-rc.1"
futures-lite = "2.6.1"
tokio = { version = "1.47.1", features = ["rt", "macros"] }
uuid = {version = "1.18.1", features = ["v4"]}
serde_json = "1.0.145"
serde = { version = "1.0.228", features = ["derive"] }
base64 = "0.22.1"
+14 -18
View File
@@ -1,24 +1,20 @@
FROM alpine:3.22
FROM rust:1.90.0-alpine3.22 AS builder
RUN apk add musl-dev
WORKDIR /kakigooriworker
COPY . .
RUN cargo build --release
FROM cgr.dev/chainguard/static:latest
COPY --from=git.prettysunflower.moe/prettysunflower/avif:libavif-1.3.0-libaom-3.13.1-svtav1psyex-3.0.2B /usr/bin/avifenc /usr/bin/avifenc
RUN apk add curl libwebp libwebp-tools
RUN adduser -D kakigoori
RUN mkdir /kakigoori
RUN chown kakigoori:kakigoori /kakigoori
USER kakigoori
RUN curl -LsSf https://astral.sh/uv/install.sh | sh
ENV PATH="/home/kakigoori/.local/bin/:$PATH"
WORKDIR /kakigoori
COPY main.py .
COPY pyproject.toml .
COPY --from=builder /kakigooriworker/target/release/kakigoori-worker .
COPY --from=git.prettysunflower.moe/prettysunflower/webp:libwebp-1.6.0-2 /usr/bin/cwebp /usr/bin/
COPY --from=git.prettysunflower.moe/prettysunflower/avif:libavif-1.3.0-libaom-3.13.1-svtav1psyex-3.0.2b-2 /usr/bin/avifenc /usr/bin/
RUN uv sync
ENV PYTHONUNBUFFERED=1
CMD ["uv", "run", "main.py"]
CMD ["/kakigoori/kakigoori-worker"]
-158
View File
@@ -1,158 +0,0 @@
import base64
import functools
import json
import subprocess
import os
import pika
import re
def processing_function(func):
@functools.wraps(func)
def callback(self, ch, method, properties, body):
args = json.loads(body.decode("utf-8"))
original_file = base64.b64decode(args["original_file"])
variant_id = args["variant_id"]
print(f"Processing variant {variant_id}")
input_file = f"{variant_id}_original"
output_file = f"{variant_id}_processed"
with open(input_file, "wb") as original_image:
original_image.write(original_file)
print("Converting image...")
try:
func(self, input_file, output_file)
except Exception as e:
print(e)
os.remove(input_file)
try:
os.remove(output_file)
except:
pass
ch.basic_ack(delivery_tag=method.delivery_tag)
return
print("Image converted!")
with open(output_file, "rb") as f:
image = f.read()
message = {
"variant_id": variant_id,
"variant_file": base64.b64encode(image).decode("utf-8"),
}
print("Sending converted file...")
self.channel.basic_publish(
exchange="",
routing_key="process_variant",
body=json.dumps(message).encode("utf-8"),
properties=pika.BasicProperties(delivery_mode=pika.DeliveryMode.Persistent),
)
print("File sent!")
os.remove(input_file)
os.remove(output_file)
ch.basic_ack(delivery_tag=method.delivery_tag)
return callback
class Worker:
def __init__(self):
connection_parameters = self.make_rabbitmq_parameters()
self.connection = pika.BlockingConnection(connection_parameters)
self.channel = self.connection.channel()
self.channel.queue_declare(queue="kakigoori_avif", durable=True)
self.channel.queue_declare(queue="kakigoori_webp", durable=True)
self.channel.queue_declare(queue="process_variant", durable=True)
self.channel.basic_qos(prefetch_count=1)
worker_file_types = re.split(
r"[,;-_ /]", os.environ.get("WORKER_FILE_TYPES", "").lower()
)
if not any(worker_file_types):
worker_file_types = ["avif", "webp"]
print(worker_file_types)
if "avif" in worker_file_types:
self.channel.basic_consume(
queue="kakigoori_avif", on_message_callback=self.avif_processing
)
if "webp" in worker_file_types:
self.channel.basic_consume(
queue="kakigoori_webp", on_message_callback=self.webp_processing
)
print("Ready to accept requests")
def make_rabbitmq_parameters(self):
RABBITMQ_HOST = os.environ.get("RABBITMQ_HOST")
if not RABBITMQ_HOST:
raise ValueError("RABBITMQ_HOST environment variable is not set")
RABBITMQ_PORT = os.environ.get("RABBITMQ_PORT")
if not RABBITMQ_PORT:
RABBITMQ_PORT = 5672
connection_parameters = pika.ConnectionParameters(
host=RABBITMQ_HOST, port=RABBITMQ_PORT
)
if os.environ.get("RABBITMQ_USER") and os.environ.get("RABBITMQ_PASSWORD"):
RABBITMQ_USER = os.environ.get("RABBITMQ_USER")
RABBITMQ_PASSWORD = os.environ.get("RABBITMQ_PASSWORD")
connection_parameters.credentials = pika.PlainCredentials(
username=RABBITMQ_USER, password=RABBITMQ_PASSWORD
)
if os.environ.get("RABBITMQ_VHOST"):
connection_parameters.virtual_host = os.environ.get("RABBITMQ_VHOST")
return connection_parameters
@processing_function
def avif_processing(self, input_file, output_file):
subprocess.run(
[
*"/usr/bin/avifenc -c aom -s 4 -j 8 -d 10 -y 444 -q 50 -a end-usage=q -a cq-level=35 -a tune=iq".split(
" "
),
input_file,
output_file,
],
check=True,
)
@processing_function
def webp_processing(self, input_file, output_file):
subprocess.run(
[
"cwebp",
"-metadata",
"icc",
input_file,
"-o",
output_file,
]
)
worker = Worker()
try:
worker.channel.start_consuming()
except Exception as e:
worker.channel.stop_consuming()
print("Exiting...")
worker.connection.close()
-9
View File
@@ -1,9 +0,0 @@
[project]
name = "kakigoori-worker"
version = "0.1.0"
description = "Add your description here"
readme = "README.md"
requires-python = ">=3.13"
dependencies = [
"pika>=1.3.2",
]
+54
View File
@@ -0,0 +1,54 @@
use std::io;
use std::process::{Command, Output};
pub trait FileProcessor {
fn process(&self, input_file: &String, output_file: &String) -> io::Result<Output>;
}
pub struct AVIF {}
impl FileProcessor for AVIF {
fn process(&self, input_file: &String, output_file: &String) -> io::Result<Output> {
Command::new("/usr/bin/avifenc")
.args([
"-c",
"aom",
"-s",
"4",
"-j",
"8",
"-d",
"10",
"-y",
"444",
"-q",
"50",
"-a",
"end-usage=q",
"-a",
"cq-level=35",
"-a",
"tune=iq",
input_file.as_str(),
output_file.as_str(),
])
.output()
}
}
pub struct WebP {}
impl FileProcessor for WebP {
fn process(&self, input_file: &String, output_file: &String) -> io::Result<Output> {
Command::new("/usr/bin/cwebp")
.args([
"/usr/bin/webp",
"-q",
"75",
input_file.as_str(),
"-metadata",
"icc",
"-o",
output_file.as_str(),
])
.output()
}
}
+35
View File
@@ -0,0 +1,35 @@
pub mod json_messages {
use serde::{Deserialize, Serialize};
mod base64 {
use base64::{Engine as _, engine::general_purpose::STANDARD};
use serde::{Deserialize, Serialize};
use serde::{Deserializer, Serializer};
pub fn serialize<S: Serializer>(v: &Vec<u8>, s: S) -> Result<S::Ok, S::Error> {
let base64 = STANDARD.encode(v);
String::serialize(&base64, s)
}
pub fn deserialize<'de, D: Deserializer<'de>>(d: D) -> Result<Vec<u8>, D::Error> {
let base64 = String::deserialize(d)?;
STANDARD
.decode(base64.as_bytes())
.map_err(|e| serde::de::Error::custom(e))
}
}
#[derive(Deserialize)]
pub struct TaskRequest {
#[serde(with = "base64")]
pub original_file: Vec<u8>,
pub variant_id: String,
}
#[derive(Serialize)]
pub struct TaskResponse {
#[serde(with = "base64")]
pub variant_file: Vec<u8>,
pub variant_id: String,
}
}
+166
View File
@@ -0,0 +1,166 @@
mod file_processors;
mod json_messages;
use crate::json_messages::json_messages::{TaskRequest, TaskResponse};
use futures_lite::stream::StreamExt;
use lapin::message::Delivery;
use lapin::options::{
BasicAckOptions, BasicConsumeOptions, BasicPublishOptions, QueueDeclareOptions,
};
use lapin::{
BasicProperties, Channel, Connection, ConnectionProperties, Consumer, types::FieldTable,
};
use std::fs::File;
use std::io::{Read, Write};
use std::{fs, io};
fn generate_consumer_tag(channel: &Channel) -> String {
format!("ctag{}.{}", channel.id(), uuid::Uuid::new_v4().to_string())
}
async fn handle_task(
delivery: &Delivery,
process_variant_channel: &Channel,
task_function: &impl file_processors::FileProcessor,
) -> Result<(), io::Error> {
let task_request: TaskRequest = serde_json::from_slice(delivery.data.as_slice())?;
println!("New task! {}", &task_request.variant_id);
let input_file_path = format!("/tmp/{}_input", &task_request.variant_id);
let output_file_path = format!("/tmp/{}_output", &task_request.variant_id);
let mut input_file = File::create(&input_file_path)?;
input_file.write_all(&task_request.original_file)?;
let output = task_function.process(&input_file_path, &output_file_path)?;
if !output.status.success() {
return Err(io::Error::new(
io::ErrorKind::Other,
format!("process returned error, status {:?}", output.status.code()),
));
}
println!("Task {} succeeded! Sending...", &task_request.variant_id);
let mut contents = Vec::new();
let mut output_file = File::open(&output_file_path)?;
output_file.read_to_end(&mut contents)?;
fs::remove_file(&input_file_path)?;
fs::remove_file(&output_file_path)?;
let task_response = TaskResponse {
variant_file: contents,
variant_id: task_request.variant_id,
};
process_variant_channel
.basic_publish(
"".into(),
"process_variant".into(),
BasicPublishOptions::default(),
serde_json::to_vec(&task_response)?.as_slice(),
BasicProperties::default(),
)
.await
.map_err(|_| io::Error::from(io::ErrorKind::Other))?;
Ok(())
}
async fn handle_queue(
consumer: &mut Consumer,
process_variant_channel: Channel,
task_function: &impl file_processors::FileProcessor,
) -> Result<(), lapin::Error> {
while let Some(delivery) = consumer.next().await {
println!("Received message!");
let delivery = delivery.map_err(|_| io::Error::from(io::ErrorKind::Other))?;
let task_result = handle_task(&delivery, &process_variant_channel, task_function).await;
match task_result {
Ok(_) => (),
Err(e) => eprintln!("Error handling task: {}", e),
}
delivery.ack(BasicAckOptions::default()).await?;
}
Ok(())
}
async fn handle_file_type(
channel: Channel,
channel_process_variant: Channel,
queue: &str,
task_function: impl file_processors::FileProcessor,
) -> Result<Result<(), lapin::Error>, lapin::Error> {
let mut queue_declare_options = QueueDeclareOptions::default();
queue_declare_options.durable = true;
channel
.queue_declare(queue.into(), queue_declare_options, FieldTable::default())
.await?;
channel_process_variant
.queue_declare(
"process_variant".into(),
queue_declare_options,
FieldTable::default(),
)
.await?;
let mut consumer = channel
.basic_consume(
queue.into(),
generate_consumer_tag(&channel).into(),
BasicConsumeOptions::default(),
FieldTable::default(),
)
.await?;
Ok(handle_queue(&mut consumer, channel_process_variant, &task_function).await)
}
async fn connect_rabbit_mq() -> lapin::Result<Connection> {
let addr =
std::env::var("RABBITMQ_ADDRESS").unwrap_or_else(|_| "amqp://127.0.0.1:5672/%2f".into());
let conn = Connection::connect(
&addr,
ConnectionProperties::default().with_connection_name("kakigoori-worker".into()),
)
.await?;
let avif_task = tokio::spawn(handle_file_type(
conn.create_channel().await?,
conn.create_channel().await?,
"kakigoori_avif",
file_processors::AVIF {},
));
let webp_task = tokio::spawn(handle_file_type(
conn.create_channel().await?,
conn.create_channel().await?,
"kakigoori_webp",
file_processors::WebP {},
));
println!("Waiting for messages...");
for task in [avif_task, webp_task] {
task.await.map_err(std::io::Error::from)???;
}
Ok(conn)
}
#[tokio::main]
async fn main() -> io::Result<()> {
connect_rabbit_mq().await.expect("TODO: panic message");
Ok(())
}