# Moose / Streaming / From Your Code Documentation – Python
## Included Files
1. moose/streaming/from-your-code/from-your-code.mdx
## Publish Data
Source: moose/streaming/from-your-code/from-your-code.mdx
Write data to streams from applications, APIs, or external sources
# Publishing Data to Streams
## Overview
Publishing data to streams allows you to write data from various sources into your Kafka/Redpanda topics. This is the first step in building real-time data pipelines.
## Publishing Methods
### Using REST APIs
The most common way to publish data is through Moose's built-in ingestion APIs. These are configured to automatically sit in front of your streams and publish data to them whenever a request is made to the endpoint:
```py filename="PublishViaAPI.py" copy
from moose_lib import IngestPipeline, IngestPipelineConfig
# When you create an IngestPipeline with ingest_api: True, Moose automatically creates an API endpoint
raw_data = IngestPipeline[RawData]("raw_data", IngestPipelineConfig(
ingest_api=True, # Creates POST /ingest/raw_data endpoint
stream=True,
table=True
))
# You can then publish data via HTTP POST requests
response = requests.post('/ingest/raw_data', json={
'id': '123',
'value': 42
})
```
See the [OpenAPI documentation](/stack/open-api) to learn more about how to generate type-safe client SDKs in your language of choice for all of your Moose APIs.
### Direct Stream Publishing
You can publish directly to a stream from your Moose code using the stream's `send` method.
This is useful when emitting events from workflows or other backend logic.
`send` accepts a single record or an array of records.
If your `Stream` is configured with `schemaConfig.kind = "JSON"`,
Moose produces using the Confluent envelope automatically (0x00 + schema id + JSON).
No code changes are needed beyond setting `schemaConfig`. See the [Schema Registry guide](/moose/streaming/schema-registry).
```py filename="DirectPublish.py" copy
from moose_lib import Stream, StreamConfig, Key
from pydantic import BaseModel
from datetime import datetime
class UserEvent(BaseModel):
id: Key[str]
user_id: str
timestamp: datetime
event_type: str
# Create a stream (optionally pass StreamConfig with destination/table settings)
events = Stream[UserEvent]("user_events", StreamConfig())
# Publish a single record
events.send(UserEvent(
id="evt_1",
user_id="user_123",
timestamp=datetime.now(),
event_type="click"
))
# Publish multiple records
events.send([
UserEvent(id="evt_2", user_id="user_456", timestamp=datetime.now(), event_type="view"),
UserEvent(id="evt_3", user_id="user_789", timestamp=datetime.now(), event_type="signup"),
])
```
Moose builds the Kafka topic name from your stream name,
optional namespace, and optional version (dots become underscores).
For example, a stream named `events` with version `1.2.0` becomes `events_1_2_0`
(or `my_ns.events_1_2_0` when the namespace is `"my_ns"`).
### Using the Kafka/Redpanda Client from External Applications
You can also publish to streams from external applications using Kafka/Redpanda clients:
```py filename="ExternalPublish.py" copy
from kafka import KafkaProducer
from datetime import datetime
producer = KafkaProducer(
bootstrap_servers=['localhost:19092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
# Publish to the stream topic
producer.send('user-events', { # Stream name becomes the topic name
'id': 'event-123',
'user_id': 'user-456',
'timestamp': datetime.now().isoformat(),
'event_type': 'page_view'
})
```
#### Locating Redpanda Connection Details
When running your Moose backend within your local dev environment, you can find the connection details for your Redpanda cluster in the `moose.config.toml` file in the root of your project:
```toml filename="moose.config.toml" copy
[redpanda_config]
broker = "localhost:19092"
message_timeout_ms = 1000
retention_ms = 30000
replication_factor = 1
```