🧩Examples
Typescript
import axios from 'axios';
import { ethers } from 'ethers';
const abi = [
"updatePriceFeeds((uint256,bytes,bytes32[])[],(uint256,uint256,bytes32,bytes32,uint256),uint256[2],bytes)"
]
const API_BASE_URL = 'https://api.eoracle.network/api/v1/get_rate';
const EORACLE_CHAINS = {
HOLESKY: {
ADDRESS: '0x723BD409703EF60d6fB9F8d986eb90099A170fd0',
RPC_URL: 'https://some.holesky.rpc',
},
}
const SYMBOL = 'btc'; // replace with your desired symbol
interface LeafInput {
leafIndex: BigInt;
unhashedLeaf: string;
proof: string;
}
interface Checkpoint {
epoch:BigInt:
blockNumber:BigInt;
eventRoot:string;
blockHash:string;
blockRound:BigInt;
}
interface IProvableFeed{
leafInput: LeafInput;
checkpoint:Checkpoint;
signature:BigInt[];
bitmap:string;
}
async function fetchFeed(symbol: string): Promise<IProvableFeed> {
const response = await axios.get(`${API_BASE_URL}?symbol=${symbol}`, {
auth: {
username: 'your_api_key', // replace with your actual authentication token
password: ''
}
});
const quote : IProvableFeed = {
symbol: response.data.symbol,
rate: response.data.rate,
timestamp: response.data.timestamp,
leafInput : response.data.leafinput,
checkpoint : reponse.data.checkpoint
signature: response.data.signature,
bitmap: response.data.bitmap
}
return quote;
}
async function main() {
const provider = new ethers.JsonRpcProvider(EORACLE_CHAINS.HOLESKY.RPC_URL); // replace `HOLESKY` with your network
const consumer = new ethers.Wallet(
'0xYOUR_PRIVATE_KEY',
provider
);
const contract = new ethers.Contract(EORACLE_CHAINS.HOLESKY.ADDRESS, contractAbi, consumer); // replace `HOLESKY` with your network
const feed = await fetchFeed(SYMBOL);
console.log(`received symbol for ${feed.symbol}: ${feed.rate} at timestamp ${feed.timestamp}`);
try {
const tx = await contract.updatePriceFeeds(feed);
console.log(`verifying price for symbol ${SYMBOL}. please wait...`)
await tx.wait();
const rate = await contract.getLatestFeed(SYMBOL);
console.log(`verified price of ${SYMBOL} is ${rate[0]} with timestamp ${rate[1]}`);
} catch (error) {
console.error('Error updating feeds:', error);
}
}
main()
.then(() => process.exit(0))
.catch(error => {
console.error(error);
process.exit(1);
});
import requests
from web3 import Web3, AsyncWeb3
ABI = [
"updatePriceFeeds((uint256,bytes,bytes32[])[],(uint256,uint256,bytes32,bytes32,uint256),uint256[2],bytes)"
]
API_BASE_URL = 'https://api.eoracle.network/api/v1/get_rate'
EORACLE_CHAINS = {
"HOLESKY": {
"ADDRESS": '0x723BD409703EF60d6fB9F8d986eb90099A170fd0',
"RPC_URL": 'https://some.holesky.rpc',
}
}
SYMBOL = 'btc' # replace with your desired symbol
def fetch_feed(symbol: str):
response = requests.get(f"{API_BASE_URL}?symbol={symbol}", auth=('your_api_key', ''))
response_data = response.json()
quote = {
'symbol': response_data['symbol'],
'rate': response_data['rate'],
'timestamp': response_data['timestamp'],
'leafInput': response_data['leafinput'],
'checkpoint': response_data['checkpoint'],
'signature': response_data['signature'],
'bitmap': response_data['bitmap']
}
return quote
def main():
provider = Web3(Web3.HTTPProvider(EORACLE_CHAINS['HOLESKY']['RPC_URL']))
consumer = provider.eth.account.privateKeyToAccount('0xYOUR_PRIVATE_KEY')
contract = provider.eth.contract(address=EORACLE_CHAINS['HOLESKY']['ADDRESS'], abi=ABI)
feed = fetch_feed(SYMBOL)
print(f"received symbol for {feed['symbol']}: {feed['rate']} at timestamp {feed['timestamp']}")
try:
tx = contract.functions.updatePriceFeeds(feed).buildTransaction({
'chainId': 1,
'gas': 70000,
'gasPrice': provider.toWei('1', 'gwei'),
'nonce': provider.eth.getTransactionCount(consumer.address),
})
signed_tx = provider.eth.account.sign_transaction(tx, private_key='0xYOUR_PRIVATE_KEY')
tx_hash = provider.eth.sendRawTransaction(signed_tx.rawTransaction)
print(f"verifying price for symbol {SYMBOL}. please wait...")
receipt = provider.eth.waitForTransactionReceipt(tx_hash)
rate = contract.functions.getLatestFeed(SYMBOL).call()
print(f"verified price of {SYMBOL} is {rate[0]} with timestamp {rate[1]}")
except Exception as error:
print('Error updating feeds:', error)
if __name__ == "__main__":
main()
// example of using the socket.io eoracle API
import { io, Socket } from 'socket.io-client';
interface LeafInput {
leafIndex: BigInt;
unhashedLeaf: string;
proof: string;
}
interface Checkpoint {
epoch:BigInt:
blockNumber:BigInt;
eventRoot:string;
blockHash:string;
blockRound:BigInt;
}
interface IProvableFeed{
leafInput: LeafInput;
checkpoint:Checkpoint;
signature:BigInt[];
bitmap:string;
}
const serverUrl = 'https://api.eoracle.network';
// create a socket.io client instance
const socket: Socket = io(serverUrl);
// authenticate when the client connects
socket.on('connect', () => {
console.log('connected to server');
socket.emit('authenticate', { token: 'your_api_key' });
});
// subscribe to symbol rates when the client is authenticated
socket.on('authenticated', () => {
console.log('authenticated with server');
socket.emit('subscribe', { topic: 'feed', symbols: ['btc', 'eth'] });
})
// listen for symbol rate updates from the server
socket.on('feed', (feed: IProvableFeed) => {
console.log(`received symbol quote for ${feed.symbol}: ${feed.rate}`);
// TODO: implement your own logic here to handle the symbol rate update
});
// Handle disconnection
socket.on('disconnect', () => {
console.log('disconnected from server');
});
import socketio
# socket.IO connection URL
server_url = 'https://api.eoracle.network/api/v1/get_rate'
# create a Socket.IO client instance
sio = socketio.Client()
def on_connect():
print('connected to server')
sio.emit('authenticate', { 'token': 'your_api_key' })
# subscribe to a specific topic when the client is authenticated
def on_authenticated():
print('authenticated with server')
sio.emit('subscribe', { 'topic': 'feed', 'symbols': ['btc', 'eth'] })
sio.on('authenticated', on_authenticated)
sio.on('connect', on_connect)
# listen for updates on the specified topic
def on_feed(feed):
print(f"Received symbol feed for {feed['symbol']}: {feed['rate']}")
# TODO: implement your own logic here to handle the symbol rate update
sio.on('feed', on_feed)
# handle disconnection
def on_disconnect():
print('Disconnected from server')
sio.on('disconnect', on_disconnect)
# Connect to the server
sio.connect(server_url)
# Keep the script running
try:
sio.wait()
except KeyboardInterrupt:
print('Disconnecting...')
sio.disconnect()
Last updated